1. 首页 > IT综合教程 > 正文

it教程FG386-微服务架构

内容大纲

1. 微服务架构概述

微服务架构是一种将应用程序拆分为多个独立服务的软件架构风格。每个服务都运行在自己的进程中,通过轻量级的通信机制(通常是HTTP/REST或消息队列)进行通信。微服务架构的目标是提高系统的可扩展性、可靠性和可维护性。

微服务架构的核心特点包括:

  • 服务独立性:每个服务都是独立部署和扩展的
  • 服务松耦合:服务之间通过明确的接口进行通信
  • 服务自治:每个服务可以使用不同的技术栈和数据库
  • 去中心化:没有中央控制,服务之间平等通信
  • 容错性:单个服务故障不会影响整个系统

更多学习教程www.fgedu.net.cn

2. 微服务设计原则

2.1 单一职责原则

每个微服务应该只负责一个特定的业务功能,职责单一明确。这样可以提高服务的可维护性和可测试性。

2.2 服务边界清晰

微服务的边界应该基于业务领域进行划分,确保服务内部高内聚,服务之间低耦合。

2.3 服务自治

每个微服务应该有自己的数据库和业务逻辑,独立部署和扩展,不依赖于其他服务的内部实现。

2.4 容错设计

微服务架构应该考虑服务故障的情况,实现熔断、限流、重试等机制,确保系统的可靠性。

2.5 数据一致性

微服务架构中,数据分布在不同的服务中,需要考虑最终一致性的实现方式,如事件驱动架构和Saga模式。

3. 微服务架构模式

3.1 API网关模式

# API网关实现
$ cat api-gateway.js
const express = require(‘express’);
const axios = require(‘axios’);
const app = express();
const port = 3000;

// 路由转发
app.get(‘/api/users’, async (req, res) => {
try {
const response = await axios.get(‘http://user-service:3001/users’);
res.json(response.data);
} catch (error) {
res.status(500).json({ error: ‘Internal server error’ });
}
});

app.get(‘/api/products’, async (req, res) => {
try {
const response = await axios.get(‘http://product-service:3002/products’);
res.json(response.data);
} catch (error) {
res.status(500).json({ error: ‘Internal server error’ });
}
});

app.listen(port, () => {
console.log(`API Gateway listening at http://fgedudb:${port}`);
});

# 启动API网关
$ node api-gateway.js

# 测试API网关
$ curl http://fgedudb:3000/api/users
$ curl http://fgedudb:3000/api/products

3.2 服务注册表模式

# 服务注册表实现
$ cat service-registry.js
const express = require(‘express’);
const app = express();
const port = 3000;

let services = [];

// 注册服务
app.post(‘/register’, (req, res) => {
const service = req.body;
services.push(service);
console.log(`Service registered: ${service.name} at ${service.url}`);
res.json({ status: ‘success’ });
});

// 注销服务
app.post(‘/unregister’, (req, res) => {
const serviceName = req.body.name;
services = services.filter(service => service.name !== serviceName);
console.log(`Service unregistered: ${serviceName}`);
res.json({ status: ‘success’ });
});

// 发现服务
app.get(‘/discover/:serviceName’, (req, res) => {
const serviceName = req.params.serviceName;
const service = services.find(s => s.name === serviceName);
if (service) {
res.json(service);
} else {
res.status(404).json({ error: ‘Service not found’ });
}
});

// 列出所有服务
app.get(‘/services’, (req, res) => {
res.json(services);
});

app.listen(port, () => {
console.log(`Service Registry listening at http://fgedudb:${port}`);
});

# 启动服务注册表
$ node service-registry.js

# 注册服务
$ curl -X POST http://fgedudb:3000/register \
-H “Content-Type: application/json” \
-d ‘{“name”: “user-service”, “url”: “http://user-service:3001”}’

# 发现服务
$ curl http://fgedudb:3000/discover/user-service

3.3 事件驱动架构

# 事件生产者
$ cat event-producer.js
const kafka = require(‘kafka-node’);
const Producer = kafka.Producer;
const client = new kafka.KafkaClient({ kafkaHost: ‘kafka:9092’ });
const producer = new Producer(client);

producer.on(‘ready’, () => {
console.log(‘Producer ready’);

// 发送事件
const payloads = [
{
topic: ‘user-events’,
messages: JSON.stringify({ event: ‘user.created’, data: { id: 1, name: ‘John Doe’ } })
}
];

producer.send(payloads, (err, data) => {
if (err) {
console.error(‘Error sending event:’, err);
} else {
console.log(‘Event sent:’, data);
}
});
});

producer.on(‘error’, (err) => {
console.error(‘Producer error:’, err);
});

# 事件消费者
$ cat event-consumer.js
const kafka = require(‘kafka-node’);
const Consumer = kafka.Consumer;
const client = new kafka.KafkaClient({ kafkaHost: ‘kafka:9092’ });
const consumer = new Consumer(
client,
[{ topic: ‘user-events’ }],
{ autoCommit: true }
);

consumer.on(‘message’, (message) => {
console.log(‘Received event:’, message.value);
const event = JSON.parse(message.value);

// 处理事件
if (event.event === ‘user.created’) {
console.log(‘Processing user created event:’, event.data);
// 执行相应的业务逻辑
}
});

consumer.on(‘error’, (err) => {
console.error(‘Consumer error:’, err);
});

# 启动Kafka
$ docker run -d –name kafka -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka:latest

# 启动事件生产者和消费者
$ node event-producer.js
$ node event-consumer.js

风哥风哥提示:微服务架构模式是实现微服务的重要指导,不同的模式适用于不同的场景,需要根据具体需求选择合适的模式。

4. 微服务技术栈

4.1 服务框架

  • Spring Boot/Spring Cloud:Java生态系统的主流选择
  • Node.js + Express/Koa:轻量级服务开发
  • Python + Flask/Django:数据处理和AI服务
  • Golang + Gin/Echo:高性能服务
  • Ruby on Rails:快速开发Web服务

4.2 容器化技术

# Dockerfile示例
$ cat Dockerfile
FROM node:14-alpine
WORKDIR /app
COPY package*.json ./
RUN npm install
COPY . .
EXPOSE 3000
CMD [“node”, “app.js”]

# 构建镜像
$ docker build -t user-service .

# 运行容器
$ docker run -d -p 3001:3000 –name user-service user-service

# Docker Compose配置
$ cat docker-compose.yml
version: ‘3’
services:
user-service:
build: ./user-service
ports:
– “3001:3000”
depends_on:
– db
product-service:
build: ./product-service
ports:
– “3002:3000”
depends_on:
– db
api-gateway:
build: ./api-gateway
ports:
– “3000:3000”
depends_on:
– user-service
– product-service
db:
image: postgres:13
environment:
POSTGRES_USER: admin
POSTGRES_PASSWORD: password
POSTGRES_DB: mydb
volumes:
– postgres-data:/var/lib/postgresql/data

volumes:
postgres-data:

# 启动服务
$ docker-compose up -d

# 查看服务状态
$ docker-compose ps

4.3 编排工具

  • Kubernetes:容器编排的事实标准
  • Docker Swarm:Docker原生编排工具
  • Nomad:HashiCorp的轻量级编排工具
  • OpenShift:企业级Kubernetes发行版

5. 服务通信

5.1 RESTful API

# RESTful API实现
$ cat user-service.js
const express = require(‘express’);
const app = express();
const port = 3001;

app.use(express.json());

let users = [
{ id: 1, name: ‘John Doe’, email: ‘john.doe@fgedu.net.cn’ },
{ id: 2, name: ‘Jane Smith’, email: ‘jane.smith@fgedu.net.cn’ }
];

// 获取所有用户
app.get(‘/users’, (req, res) => {
res.json(users);
});

// 获取单个用户
app.get(‘/users/:id’, (req, res) => {
const id = parseInt(req.params.id);
const user = users.find(u => u.id === id);
if (user) {
res.json(user);
} else {
res.status(404).json({ error: ‘User not found’ });
}
});

// 创建用户
app.post(‘/users’, (req, res) => {
const newUser = {
id: users.length + 1,
name: req.body.name,
email: req.body.email
};
users.push(newUser);
res.status(201).json(newUser);
});

// 更新用户
app.put(‘/users/:id’, (req, res) => {
const id = parseInt(req.params.id);
const userIndex = users.findIndex(u => u.id === id);
if (userIndex !== -1) {
users[userIndex] = {
…users[userIndex],
…req.body
};
res.json(users[userIndex]);
} else {
res.status(404).json({ error: ‘User not found’ });
}
});

// 删除用户
app.delete(‘/users/:id’, (req, res) => {
const id = parseInt(req.params.id);
const userIndex = users.findIndex(u => u.id === id);
if (userIndex !== -1) {
users.splice(userIndex, 1);
res.json({ status: ‘success’ });
} else {
res.status(404).json({ error: ‘User not found’ });
}
});

app.listen(port, () => {
console.log(`User service listening at http://fgedudb:${port}`);
});

# 测试RESTful API
$ curl http://fgedudb:3001/users
$ curl -X POST http://fgedudb:3001/users \
-H “Content-Type: application/json” \
-d ‘{“name”: “Bob Johnson”, “email”: “bob.johnson@fgedu.net.cn”}’
$ curl http://fgedudb:3001/users/3

5.2 gRPC

# 定义gRPC服务
$ cat user.proto
syntax = “proto3”;

package user;

service UserService {
rpc GetUser (GetUserRequest) returns (User) {}
rpc ListUsers (ListUsersRequest) returns (ListUsersResponse) {}
rpc CreateUser (CreateUserRequest) returns (User) {}
rpc UpdateUser (UpdateUserRequest) returns (User) {}
rpc DeleteUser (DeleteUserRequest) returns (DeleteUserResponse) {}
}

message GetUserRequest {
int32 id = 1;
}

message ListUsersRequest {
}

message CreateUserRequest {
string name = 1;
string email = 2;
}

message UpdateUserRequest {
int32 id = 1;
string name = 2;
string email = 3;
}

message DeleteUserRequest {
int32 id = 1;
}

message User {
int32 id = 1;
string name = 2;
string email = 3;
}

message ListUsersResponse {
repeated User users = 1;
}

message DeleteUserResponse {
bool success = 1;
}

# 生成gRPC代码
$ protoc –go_out=plugins=grpc:. user.proto

# 实现gRPC服务
$ cat server.go
package main

import (
“context”
“fmt”
“log”
“net”

“google.golang.org/grpc”
pb “github.com/fgedu/microservices/user-service/proto”
)

type server struct {
users []*pb.User
}

func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
for _, user := range s.users {
if user.Id == req.Id {
return user, nil
}
}
return nil, fmt.Errorf(“user not found”)
}

func (s *server) ListUsers(ctx context.Context, req *pb.ListUsersRequest) (*pb.ListUsersResponse, error) {
return &pb.ListUsersResponse{Users: s.users}, nil
}

func (s *server) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
newUser := &pb.User{
Id: int32(len(s.users) + 1),
Name: req.Name,
Email: req.Email,
}
s.users = append(s.users, newUser)
return newUser, nil
}

func (s *server) UpdateUser(ctx context.Context, req *pb.UpdateUserRequest) (*pb.User, error) {
for i, user := range s.users {
if user.Id == req.Id {
s.users[i].Name = req.Name
s.users[i].Email = req.Email
return s.users[i], nil
}
}
return nil, fmt.Errorf(“user not found”)
}

func (s *server) DeleteUser(ctx context.Context, req *pb.DeleteUserRequest) (*pb.DeleteUserResponse, error) {
for i, user := range s.users {
if user.Id == req.Id {
s.users = append(s.users[:i], s.users[i+1:]…)
return &pb.DeleteUserResponse{Success: true}, nil
}
}
return &pb.DeleteUserResponse{Success: false}, fmt.Errorf(“user not found”)
}

func main() {
s := grpc.NewServer()
pb.RegisterUserServiceServer(s, &server{
users: []*pb.User{
{Id: 1, Name: “John Doe”, Email: “john.doe@fgedu.net.cn”},
{Id: 2, Name: “Jane Smith”, Email: “jane.smith@fgedu.net.cn”},
},
})

listener, err := net.Listen(“tcp”, “:50051”)
if err != nil {
log.Fatalf(“Failed to listen: %v”, err)
}

log.Println(“Server listening on :50051”)
if err := s.Serve(listener); err != nil {
log.Fatalf(“Failed to serve: %v”, err)
}
}

# 启动gRPC服务
$ go run server.go

# 测试gRPC服务
$ grpcurl -plaintext -d ‘{“id”: 1}’ fgedudb:50051 user.UserService/GetUser
$ grpcurl -plaintext -d ‘{}’ fgedudb:50051 user.UserService/ListUsers

5.3 消息队列

# 使用RabbitMQ实现消息队列
$ cat producer.js
const amqp = require(‘amqplib/callback_api’);

amqp.connect(‘amqp://fgedudb’, (err, conn) => {
if (err) {
throw err;
}

conn.createChannel((err, ch) => {
if (err) {
throw err;
}

const queue = ‘user-events’;
const msg = JSON.stringify({ event: ‘user.created’, data: { id: 1, name: ‘John Doe’ } });

ch.assertQueue(queue, { durable: false });
ch.sendToQueue(queue, Buffer.from(msg));
console.log(” [x] Sent %s”, msg);
});

setTimeout(() => {
conn.close();
}, 500);
});

$ cat consumer.js
const amqp = require(‘amqplib/callback_api’);

amqp.connect(‘amqp://fgedudb’, (err, conn) => {
if (err) {
throw err;
}

conn.createChannel((err, ch) => {
if (err) {
throw err;
}

const queue = ‘user-events’;

ch.assertQueue(queue, { durable: false });
console.log(” [*] Waiting for messages in %s. To exit press CTRL+C”, queue);

ch.consume(queue, (msg) => {
console.log(” [x] Received %s”, msg.content.toString());
const event = JSON.parse(msg.content.toString());

// 处理事件
if (event.event === ‘user.created’) {
console.log(‘Processing user created event:’, event.data);
// 执行相应的业务逻辑
}
}, { noAck: true });
});
});

# 启动RabbitMQ
$ docker run -d –name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

# 启动生产者和消费者
$ node producer.js
$ node consumer.js

学习交流加群风哥微信: itpux-com

6. 服务发现

6.1 客户端服务发现

# 客户端服务发现实现
$ cat service-discovery.js
const axios = require(‘axios’);

class ServiceDiscovery {
constructor(registryUrl) {
this.registryUrl = registryUrl;
this.services = {};
this.refreshInterval = setInterval(() => this.refreshServices(), 30000);
}

async refreshServices() {
try {
const response = await axios.get(`${this.registryUrl}/services`);
this.services = {};
response.data.forEach(service => {
this.services[service.name] = service.url;
});
console.log(‘Services refreshed:’, this.services);
} catch (error) {
console.error(‘Error refreshing services:’, error);
}
}

async getServiceUrl(serviceName) {
if (!this.services[serviceName]) {
await this.refreshServices();
}
return this.services[serviceName];
}

async callService(serviceName, path, method = ‘GET’, data = {}) {
const url = await this.getServiceUrl(serviceName);
if (!url) {
throw new Error(`Service ${serviceName} not found`);
}

const options = {
method,
url: `${url}${path}`,
data,
headers: {
‘Content-Type’: ‘application/json’
}
};

try {
const response = await axios(options);
return response.data;
} catch (error) {
console.error(`Error calling service ${serviceName}:`, error);
throw error;
}
}
}

// 使用服务发现
const discovery = new ServiceDiscovery(‘http://service-registry:3000’);

async function main() {
try {
// 调用用户服务
const users = await discovery.callService(‘user-service’, ‘/users’);
console.log(‘Users:’, users);

// 调用产品服务
const products = await discovery.callService(‘product-service’, ‘/products’);
console.log(‘Products:’, products);
} catch (error) {
console.error(‘Error:’, error);
}
}

main();

6.2 服务端服务发现

# 服务端服务发现实现(Nginx)
$ cat nginx.conf
http {
upstream user-service {
server user-service-1:3001;
server user-service-2:3001;
server user-service-3:3001;
}

upstream product-service {
server product-service-1:3002;
server product-service-2:3002;
server product-service-3:3002;
}

server {
listen 80;

location /api/users/ {
proxy_pass http://user-service/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}

location /api/products/ {
proxy_pass http://product-service/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
}

# 启动Nginx
$ docker run -d –name nginx -p 80:80 -v ./nginx.conf:/etc/nginx/nginx.conf nginx:latest

# 测试服务发现
$ curl http://fgedudb/api/users
$ curl http://fgedudb/api/products

6.3 服务注册表

  • Consul:HashiCorp的服务发现和配置工具
  • Eureka:Netflix的服务发现框架
  • etcd:分布式键值存储,可用于服务发现
  • ZooKeeper:分布式协调服务,可用于服务发现

学习交流加群风哥QQ113257174

7. 负载均衡

7.1 客户端负载均衡

# 客户端负载均衡实现
class LoadBalancer {
constructor() {
this.services = [];
this.index = 0;
}

addService(serviceUrl) {
this.services.push(serviceUrl);
}

removeService(serviceUrl) {
this.services = this.services.filter(url => url !== serviceUrl);
}

getNextService() {
if (this.services.length === 0) {
throw new Error(‘No services available’);
}

// 轮询算法
const service = this.services[this.index];
this.index = (this.index + 1) % this.services.length;
return service;
}

// 随机算法
getRandomService() {
if (this.services.length === 0) {
throw new Error(‘No services available’);
}

const randomIndex = Math.floor(Math.random() * this.services.length);
return this.services[randomIndex];
}

// 加权轮询算法
getWeightedService(weights) {
if (this.services.length === 0) {
throw new Error(‘No services available’);
}

// 实现加权轮询逻辑
// …
}
}

// 使用负载均衡
const lb = new LoadBalancer();
lb.addService(‘http://user-service-1:3001’);
lb.addService(‘http://user-service-2:3001’);
lb.addService(‘http://user-service-3:3001’);

// 获取服务
const serviceUrl = lb.getNextService();
console.log(‘Selected service:’, serviceUrl);

7.2 服务端负载均衡

# Nginx负载均衡配置
$ cat nginx.conf
http {
upstream backend {
# 轮询
server backend1:8080;
server backend2:8080;
server backend3:8080;

# 权重
# server backend1:8080 weight=3;
# server backend2:8080 weight=2;
# server backend3:8080 weight=1;

# IP哈希
# ip_hash;

# 最少连接
# least_conn;
}

server {
listen 80;

location / {
proxy_pass http://backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
}

# HAProxy负载均衡配置
$ cat haproxy.cfg
global
log 127.0.0.1 local0
maxconn 4096
daemon

defaults
log global
mode http
option httplog
option dontlognull
retries 3
option redispatch
timeout connect 5000
timeout client 50000
timeout server 50000

frontend http-in
bind *:80
default_backend backend

backend backend
balance roundrobin
server backend1 backend1:8080 check
server backend2 backend2:8080 check
server backend3 backend3:8080 check

7.3 Kubernetes负载均衡

# Kubernetes Service配置
$ cat service.yaml
apiVersion: v1
kind: Service
metadata:
name: user-service
spec:
selector:
app: user-service
ports:
– port: 80
targetPort: 3001
type: ClusterIP

# Kubernetes Ingress配置
$ cat ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: app-ingress
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /
spec:
rules:
– host: app.fgedu.net.cn
http:
paths:
– path: /api/users
pathType: Prefix
backend:
service:
name: user-service
port:
number: 80
– path: /api/products
pathType: Prefix
backend:
service:
name: product-service
port:
number: 80

# 应用配置
$ kubectl apply -f service.yaml
$ kubectl apply -f ingress.yaml

# 查看服务
$ kubectl get services
$ kubectl get ingress

更多学习教程公众号风哥教程itpux_com

8. 微服务安全

8.1 认证与授权

# JWT认证实现
$ cat auth-service.js
const express = require(‘express’);
const jwt = require(‘jsonwebtoken’);
const bcrypt = require(‘bcrypt’);
const app = express();
const port = 3003;

app.use(express.json());

// 模拟用户数据库
const users = [
{ id: 1, username: ‘admin’, password: ‘$2b$10$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW’ } // password: admin123
];

// 登录接口
app.post(‘/login’, async (req, res) => {
const { username, password } = req.body;

// 查找用户
const user = users.find(u => u.username === username);
if (!user) {
return res.status(401).json({ error: ‘Invalid credentials’ });
}

// 验证密码
const validPassword = await bcrypt.compare(password, user.password);
if (!validPassword) {
return res.status(401).json({ error: ‘Invalid credentials’ });
}

// 生成JWT
const token = jwt.sign({ id: user.id, username: user.username }, ‘secret_key’, { expiresIn: ‘1h’ });
res.json({ token });
});

// 验证中间件
function authenticateToken(req, res, next) {
const authHeader = req.headers[‘authorization’];
const token = authHeader && authHeader.split(‘ ‘)[1];

if (!token) {
return res.status(401).json({ error: ‘Access token required’ });
}

jwt.verify(token, ‘secret_key’, (err, user) => {
if (err) {
return res.status(403).json({ error: ‘Invalid or expired token’ });
}
req.user = user;
next();
});
}

// 受保护的接口
app.get(‘/protected’, authenticateToken, (req, res) => {
res.json({ message: ‘Protected route’, user: req.user });
});

app.listen(port, () => {
console.log(`Auth service listening at http://fgedudb:${port}`);
});

# 测试认证服务
$ curl -X POST http://fgedudb:3003/login \
-H “Content-Type: application/json” \
-d ‘{“username”: “admin”, “password”: “admin123”}’

$ curl http://fgedudb:3003/protected \
-H “Authorization: Bearer

8.2 API网关安全

# API网关安全实现
$ cat api-gateway.js
const express = require(‘express’);
const axios = require(‘axios’);
const jwt = require(‘jsonwebtoken’);
const app = express();
const port = 3000;

app.use(express.json());

// 验证中间件
function authenticateToken(req, res, next) {
const authHeader = req.headers[‘authorization’];
const token = authHeader && authHeader.split(‘ ‘)[1];

if (!token) {
return res.status(401).json({ error: ‘Access token required’ });
}

jwt.verify(token, ‘secret_key’, (err, user) => {
if (err) {
return res.status(403).json({ error: ‘Invalid or expired token’ });
}
req.user = user;
next();
});
}

// 路由转发
app.get(‘/api/users’, authenticateToken, async (req, res) => {
try {
const response = await axios.get(‘http://user-service:3001/users’);
res.json(response.data);
} catch (error) {
res.status(500).json({ error: ‘Internal server error’ });
}
});

app.get(‘/api/products’, authenticateToken, async (req, res) => {
try {
const response = await axios.get(‘http://product-service:3002/products’);
res.json(response.data);
} catch (error) {
res.status(500).json({ error: ‘Internal server error’ });
}
});

// 健康检查
app.get(‘/health’, (req, res) => {
res.json({ status: ‘ok’ });
});

app.listen(port, () => {
console.log(`API Gateway listening at http://fgedudb:${port}`);
});

8.3 服务间通信安全

# 服务间通信安全实现
const axios = require(‘axios’);

class SecureServiceClient {
constructor(serviceUrl, apiKey) {
this.serviceUrl = serviceUrl;
this.apiKey = apiKey;
}

async call(path, method = ‘GET’, data = {}) {
const options = {
method,
url: `${this.serviceUrl}${path}`,
data,
headers: {
‘Content-Type’: ‘application/json’,
‘X-API-Key’: this.apiKey
}
};

try {
const response = await axios(options);
return response.data;
} catch (error) {
console.error(‘Error calling service:’, error);
throw error;
}
}
}

// 使用安全客户端
const userService = new SecureServiceClient(‘http://user-service:3001’, ‘api_key_123’);

async function main() {
try {
const users = await userService.call(‘/users’);
console.log(‘Users:’, users);
} catch (error) {
console.error(‘Error:’, error);
}
}

main();

# 服务端验证
$ cat user-service.js
const express = require(‘express’);
const app = express();
const port = 3001;

app.use(express.json());

// API密钥验证中间件
function validateApiKey(req, res, next) {
const apiKey = req.headers[‘x-api-key’];
if (!apiKey || apiKey !== ‘api_key_123’) {
return res.status(401).json({ error: ‘Invalid API key’ });
}
next();
}

// 应用中间件
app.use(validateApiKey);

// 路由
app.get(‘/users’, (req, res) => {
res.json([{ id: 1, name: ‘John Doe’ }]);
});

app.listen(port, () => {
console.log(`User service listening at http://fgedudb:${port}`);
});

author:www.itpux.com

9. 微服务监控

9.1 日志管理

# 日志管理实现
const winston = require(‘winston’);

// 配置日志
const logger = winston.createLogger({
level: ‘info’,
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
transports: [
new winston.transports.File({ filename: ‘error.log’, level: ‘error’ }),
new winston.transports.File({ filename: ‘combined.log’ }),
new winston.transports.Console()
]
});

// 使用日志
logger.info(‘Service started’);
logger.warn(‘Warning message’);
logger.error(‘Error message’);

# 集中式日志管理(ELK Stack)
$ cat docker-compose.yml
version: ‘3’
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.14.0
environment:
– discovery.type=single-node
ports:
– “9200:9200”
logstash:
image: docker.elastic.co/logstash/logstash:7.14.0
volumes:
– ./logstash.conf:/etc/logstash/conf.d/logstash.conf
ports:
– “5044:5044”
kibana:
image: docker.elastic.co/kibana/kibana:7.14.0
ports:
– “5601:5601”

$ cat logstash.conf
input {
beats {
port => 5044
}
}

filter {
if [service] {
grok {
match => {
“message” => “%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{DATA:message}”
}
}
}
}

output {
elasticsearch {
hosts => [“elasticsearch:9200”]
index => “%{service}-%{+YYYY.MM.dd}”
}
}

# 启动ELK Stack
$ docker-compose up -d

9.2 指标监控

# 指标监控实现
const prometheus = require(‘prom-client’);

// 创建指标
const httpRequestsTotal = new prometheus.Counter({
name: ‘http_requests_total’,
help: ‘Total number of HTTP requests’,
labelNames: [‘method’, ‘route’, ‘status’]
});

const httpRequestDuration = new prometheus.Histogram({
name: ‘http_request_duration_seconds’,
help: ‘HTTP request duration in seconds’,
labelNames: [‘method’, ‘route’],
buckets: [0.1, 0.5, 1, 2, 5]
});

// 中间件
function metricsMiddleware(req, res, next) {
const start = Date.now();

res.on(‘finish’, () => {
const duration = (Date.now() – start) / 1000;
httpRequestsTotal.inc({ method: req.method, route: req.path, status: res.statusCode });
httpRequestDuration.observe({ method: req.method, route: req.path }, duration);
});

next();
}

// 指标端点
app.get(‘/metrics’, (req, res) => {
res.set(‘Content-Type’, prometheus.register.contentType);
res.end(prometheus.register.metrics());
});

# Prometheus配置
$ cat prometheus.yml
global:
scrape_interval: 15s

scrape_configs:
– job_name: ‘user-service’
static_configs:
– targets: [‘user-service:3001’]
– job_name: ‘product-service’
static_configs:
– targets: [‘product-service:3002’]
– job_name: ‘api-gateway’
static_configs:
– targets: [‘api-gateway:3000’]

# 启动Prometheus和Grafana
$ docker-compose up -d

$ cat docker-compose.yml
version: ‘3’
services:
prometheus:
image: prom/prometheus:latest
volumes:
– ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
– “9090:9090”
grafana:
image: grafana/grafana:latest
ports:
– “3000:3000”
environment:
– GF_SECURITY_ADMIN_PASSWORD=admin

9.3 分布式追踪

# 分布式追踪实现(Jaeger)
const opentracing = require(‘opentracing’);
const jaeger = require(‘jaeger-client’);

// 配置Jaeger
const config = {
serviceName: ‘user-service’,
sampler: {
type: ‘const’,
param: 1
},
reporter: {
logSpans: true
}
};

const options = {
logger: {
info: function logInfo(msg) {
console.log(‘INFO ‘, msg);
},
error: function logError(msg) {
console.error(‘ERROR’, msg);
}
}
};

const tracer = jaeger.initTracer(config, options);
opentracing.initGlobalTracer(tracer);

// 使用追踪
function getUser(req, res) {
const span = tracer.startSpan(‘getUser’);

// 模拟数据库查询
setTimeout(() => {
span.finish();
res.json({ id: 1, name: ‘John Doe’ });
}, 100);
}

# 启动Jaeger
$ docker run -d –name jaeger -p 6831:6831/udp -p 16686:16686 jaegertracing/all-in-one:latest

# 查看追踪
# 打开浏览器访问 http://fgedudb:16686

10. 最佳实践

10.1 微服务设计最佳实践

  • 服务边界清晰:基于业务领域划分服务
  • 服务自治:每个服务独立部署和扩展
  • API设计:使用RESTful API或gRPC
  • 数据管理:每个服务有自己的数据库
  • 容错设计:实现熔断、限流、重试机制
  • 监控和观测:实现日志、指标、追踪
  • 安全设计:实现认证、授权、加密
  • 部署自动化:使用CI/CD管道
  • 版本管理:实现API版本控制
  • 文档:维护API文档

10.2 微服务部署最佳实践

# CI/CD配置
$ cat .github/workflows/ci.yml
name: CI

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]

jobs:
build:
runs-on: ubuntu-latest
steps:
– uses: actions/checkout@v2
– name: Set up Node.js
uses: actions/setup-node@v2
with:
node-version: ’14’
– name: Install dependencies
run: npm install
– name: Run tests
run: npm test
– name: Build
run: npm run build
– name: Docker build and push
run: |
docker build -t user-service .
docker tag user-service fgedu/user-service:latest
docker push fgedu/user-service:latest

# Kubernetes部署配置
$ cat deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
spec:
replicas: 3
selector:
matchLabels:
app: user-service
template:
metadata:
labels:
app: user-service
spec:
containers:
– name: user-service
image: fgedu/user-service:latest
ports:
– containerPort: 3001
resources:
requests:
cpu: “100m”
memory: “256Mi”
limits:
cpu: “500m”
memory: “512Mi”
readinessProbe:
httpGet:
path: /health
port: 3001
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
path: /health
port: 3001
initialDelaySeconds: 30
periodSeconds: 30

# 应用部署
$ kubectl apply -f deployment.yaml
$ kubectl apply -f service.yaml
$ kubectl apply -f ingress.yaml

10.3 微服务监控最佳实践

  • 集中式日志管理:使用ELK Stack或Graylog
  • 指标监控:使用Prometheus和Grafana
  • 分布式追踪:使用Jaeger或Zipkin
  • 健康检查:实现 readiness 和 liveness 探针
  • 告警:配置合理的告警规则
  • 仪表盘:创建服务监控仪表盘
  • 日志聚合:聚合服务日志
  • 性能监控:监控服务响应时间和吞吐量
  • 错误率监控:监控服务错误率
  • 资源使用监控:监控CPU、内存、磁盘使用情况

生产环境建议

  • 选择合适的微服务技术栈
  • 实施服务网格(如Istio)
  • 使用容器编排平台(如Kubernetes)
  • 建立CI/CD管道
  • 实现自动化测试
  • 建立监控和告警系统
  • 实施安全措施
  • 定期进行性能测试
  • 培训团队掌握微服务技能
  • 持续优化微服务架构

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息