1. 首页 > IT综合教程 > 正文

251. gRPC服务通信培训

一、gRPC概述

gRPC是Google开源的高性能RPC框架,基于HTTP/2协议传输,使用Protocol Buffers作为接口定义语言,支持多种编程语言。

1.1 gRPC核心特性

  • 高性能:基于HTTP/2,支持多路复用、头部压缩
  • 强类型:使用Protocol Buffers定义接口,类型安全
  • 多语言支持:支持Java、Python、Go、C++等多种语言
  • 四种通信模式:简单RPC、服务端流、客户端流、双向流

1.2 gRPC架构

gRPC架构组件:
┌─────────────────────────────────────────┐
│           Client Application            │
├─────────────────────────────────────────┤
│           Client Stub (Generated)       │
├─────────────────────────────────────────┤
│           gRPC Channel                  │
├─────────────────────────────────────────┤
│           HTTP/2 Transport              │
└─────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────┐
│           HTTP/2 Transport              │
├─────────────────────────────────────────┤
│           gRPC Server                   │
├─────────────────────────────────────────┤
│           Service Implementation        │
├─────────────────────────────────────────┤
│           Server Application            │
└─────────────────────────────────────────┘

二、Protocol Buffers定义

2.1 Proto文件编写

// user.proto
syntax = "proto3";

package com.example.grpc;

option java_multiple_files = true;
option java_package = "com.example.grpc";
option java_outer_classname = "UserServiceProto";

// 用户消息定义
message User {
    int64 id = 1;
    string name = 2;
    string email = 3;
    int32 age = 4;
    UserStatus status = 5;
}

enum UserStatus {
    UNKNOWN = 0;
    ACTIVE = 1;
    INACTIVE = 2;
    SUSPENDED = 3;
}

// 请求消息
message GetUserRequest {
    int64 id = 1;
}

message CreateUserRequest {
    string name = 1;
    string email = 2;
    int32 age = 3;
}

message ListUsersRequest {
    int32 page = 1;
    int32 size = 2;
}

message ListUsersResponse {
    repeated User users = 1;
    int32 total = 2;
}

message DeleteUserRequest {
    int64 id = 1;
}

message DeleteUserResponse {
    bool success = 1;
    string message = 2;
}

// 服务定义
service UserService {
    // 简单RPC
    rpc GetUser(GetUserRequest) returns (User);
    
    // 服务端流式RPC
    rpc ListUsers(ListUsersRequest) returns (stream User);
    
    // 客户端流式RPC
    rpc CreateUsers(stream CreateUserRequest) returns (ListUsersResponse);
    
    // 双向流式RPC
    rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

message ChatMessage {
    int64 user_id = 1;
    string message = 2;
    int64 timestamp = 3;
}

2.2 代码生成

# 安装protoc编译器
wget https://github.com/protocolbuffers/protobuf/releases/download/v25.1/protoc-25.1-linux-x86_64.zip
unzip protoc-25.1-linux-x86_64.zip -d /usr/local/
export PATH=$PATH:/usr/local/bin

# 安装Java插件
# Maven pom.xml配置
<plugin>
    <groupId>org.xolstice.maven.plugins</groupId>
    <artifactId>protobuf-maven-plugin</artifactId>
    <version>0.6.1</version>
    <configuration>
        <protocArtifact>com.google.protobuf:protoc:3.25.1:exe:${os.detected.classifier}</protocArtifact>
        <pluginId>grpc-java</pluginId>
        <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.60.0:exe:${os.detected.classifier}</pluginArtifact>
    </configuration>
    <executions>
        <execution>
            <goals>
                <goal>compile</goal>
                <goal>compile-custom</goal>
            </goals>
        </execution>
    </executions>
</plugin>

# 手动生成代码
protoc --java_out=src/main/java \
       --grpc-java_out=src/main/java \
       --plugin=protoc-gen-grpc-java=/path/to/protoc-gen-grpc-java \
       src/main/proto/user.proto
# 输出示例
Generating Java code from user.proto…
– UserServiceProto.java
– UserServiceGrpc.java
Done. Generated 2 files.

三、服务端实现

3.1 Maven依赖

<!-- pom.xml -->
<dependencies>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-netty-shaded</artifactId>
        <version>1.60.0</version>
    </dependency>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-protobuf</artifactId>
        <version>1.60.0</version>
    </dependency>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-stub</artifactId>
        <version>1.60.0</version>
    </dependency>
    <dependency>
        <groupId>javax.annotation</groupId>
        <artifactId>javax.annotation-api</artifactId>
        <version>1.3.2</version>
    </dependency>
</dependencies>

3.2 服务实现

// UserServiceImpl.java
package com.example.grpc.server;

import com.example.grpc.*;
import io.grpc.stub.StreamObserver;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class UserServiceImpl extends UserServiceGrpc.UserServiceImplBase {
    
    private Map<Long, User> userStore = new ConcurrentHashMap<>();
    private AtomicLong idGenerator = new AtomicLong(1);
    
    @Override
    public void getUser(GetUserRequest request, StreamObserver<User> responseObserver) {
        User user = userStore.get(request.getId());
        if (user != null) {
            responseObserver.onNext(user);
            responseObserver.onCompleted();
        } else {
            responseObserver.onError(
                io.grpc.Status.NOT_FOUND
                    .withDescription("User not found: " + request.getId())
                    .asRuntimeException()
            );
        }
    }
    
    @Override
    public void listUsers(ListUsersRequest request, StreamObserver<User> responseObserver) {
        List<User> users = new ArrayList<>(userStore.values());
        int start = request.getPage() * request.getSize();
        int end = Math.min(start + request.getSize(), users.size());
        
        for (int i = start; i < end; i++) {
            responseObserver.onNext(users.get(i));
        }
        responseObserver.onCompleted();
    }
    
    @Override
    public StreamObserver<CreateUserRequest> createUsers(
            StreamObserver<ListUsersResponse> responseObserver) {
        return new StreamObserver<CreateUserRequest>() {
            List<User> createdUsers = new ArrayList<>();
            
            @Override
            public void onNext(CreateUserRequest request) {
                User user = User.newBuilder()
                    .setId(idGenerator.getAndIncrement())
                    .setName(request.getName())
                    .setEmail(request.getEmail())
                    .setAge(request.getAge())
                    .setStatus(UserStatus.ACTIVE)
                    .build();
                userStore.put(user.getId(), user);
                createdUsers.add(user);
            }
            
            @Override
            public void onError(Throwable t) {
                responseObserver.onError(t);
            }
            
            @Override
            public void onCompleted() {
                ListUsersResponse response = ListUsersResponse.newBuilder()
                    .addAllUsers(createdUsers)
                    .setTotal(createdUsers.size())
                    .build();
                responseObserver.onNext(response);
                responseObserver.onCompleted();
            }
        };
    }
}

3.3 服务端启动

// GrpcServer.java
package com.example.grpc.server;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.protobuf.services.ProtoReflectionService;

public class GrpcServer {
    private Server server;
    private int port = 50051;
    
    public void start() throws Exception {
        server = ServerBuilder.forPort(port)
            .addService(new UserServiceImpl())
            .addService(ProtoReflectionService.newInstance())
            .intercept(new LoggingInterceptor())
            .build()
            .start();
        
        System.out.println("Server started, listening on " + port);
        
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.err.println("Shutting down gRPC server...");
            GrpcServer.this.stop();
        }));
    }
    
    public void stop() {
        if (server != null) {
            server.shutdown();
        }
    }
    
    public void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            server.awaitTermination();
        }
    }
    
    public static void main(String[] args) throws Exception {
        GrpcServer server = new GrpcServer();
        server.start();
        server.blockUntilShutdown();
    }
}

// 日志拦截器
public class LoggingInterceptor implements ServerInterceptor {
    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
            ServerCall<ReqT, RespT> call,
            Metadata headers,
            ServerCallHandler<ReqT, RespT> next) {
        System.out.println("Received call: " + call.getMethodDescriptor().getFullMethodName());
        return next.startCall(call, headers);
    }
}

四、客户端实现

4.1 同步客户端

// GrpcClient.java
package com.example.grpc.client;

import com.example.grpc.*;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;

public class GrpcClient {
    private final UserServiceGrpc.UserServiceBlockingStub blockingStub;
    
    public GrpcClient(Channel channel) {
        blockingStub = UserServiceGrpc.newBlockingStub(channel);
    }
    
    public User getUser(long id) {
        GetUserRequest request = GetUserRequest.newBuilder()
            .setId(id)
            .build();
        try {
            return blockingStub.getUser(request);
        } catch (StatusRuntimeException e) {
            System.err.println("RPC failed: " + e.getStatus());
            return null;
        }
    }
    
    public void listUsers(int page, int size) {
        ListUsersRequest request = ListUsersRequest.newBuilder()
            .setPage(page)
            .setSize(size)
            .build();
        try {
            Iterator<User> users = blockingStub.listUsers(request);
            while (users.hasNext()) {
                User user = users.next();
                System.out.println("User: " + user.getName());
            }
        } catch (StatusRuntimeException e) {
            System.err.println("RPC failed: " + e.getStatus());
        }
    }
    
    public static void main(String[] args) throws Exception {
        ManagedChannel channel = ManagedChannelBuilder
            .forAddress("fgedudb", 50051)
            .usePlaintext()
            .build();
        
        try {
            GrpcClient client = new GrpcClient(channel);
            User user = client.getUser(1);
            if (user != null) {
                System.out.println("Found user: " + user.getName());
            }
        } finally {
            channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
        }
    }
}

4.2 异步客户端

// AsyncGrpcClient.java
package com.example.grpc.client;

import com.example.grpc.*;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class AsyncGrpcClient {
    private final UserServiceGrpc.UserServiceStub asyncStub;
    
    public AsyncGrpcClient(Channel channel) {
        asyncStub = UserServiceGrpc.newStub(channel);
    }
    
    public void getUserAsync(long id, CountDownLatch latch) {
        GetUserRequest request = GetUserRequest.newBuilder()
            .setId(id)
            .build();
        
        asyncStub.getUser(request, new StreamObserver<User>() {
            @Override
            public void onNext(User user) {
                System.out.println("Received user: " + user.getName());
            }
            
            @Override
            public void onError(Throwable t) {
                System.err.println("Error: " + t.getMessage());
                latch.countDown();
            }
            
            @Override
            public void onCompleted() {
                System.out.println("Request completed");
                latch.countDown();
            }
        });
    }
    
    public void createUsersAsync(List<CreateUserRequest> requests, CountDownLatch latch) {
        StreamObserver<ListUsersResponse> responseObserver = new StreamObserver<ListUsersResponse>() {
            @Override
            public void onNext(ListUsersResponse response) {
                System.out.println("Created " + response.getTotal() + " users");
            }
            
            @Override
            public void onError(Throwable t) {
                System.err.println("Error: " + t.getMessage());
                latch.countDown();
            }
            
            @Override
            public void onCompleted() {
                System.out.println("Create users completed");
                latch.countDown();
            }
        };
        
        StreamObserver<CreateUserRequest> requestObserver = 
            asyncStub.createUsers(responseObserver);
        
        for (CreateUserRequest request : requests) {
            requestObserver.onNext(request);
        }
        requestObserver.onCompleted();
    }
}

五、负载均衡

5.1 客户端负载均衡

// 配置负载均衡策略
ManagedChannel channel = ManagedChannelBuilder
    .forTarget("dns:///user-service")
    .defaultLoadBalancingPolicy("round_robin")
    .usePlaintext()
    .build();

// 自定义负载均衡策略
public class CustomLoadBalancer extends LoadBalancer {
    private List<EquivalentAddressGroup> servers;
    private int currentIndex = 0;
    
    @Override
    public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
        servers = resolvedAddresses.getAddresses();
    }
    
    @Override
    public void requestConnection() {
        if (servers != null && !servers.isEmpty()) {
            EquivalentAddressGroup server = servers.get(currentIndex);
            currentIndex = (currentIndex + 1) % servers.size();
            helper.createSubchannel(server, Attributes.EMPTY);
        }
    }
}

// 注册自定义负载均衡
LoadBalancerRegistry.getDefaultRegistry().register(
    new LoadBalancerProvider() {
        @Override
        public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
            return new CustomLoadBalancer(helper);
        }
        
        @Override
        public boolean isAvailable() {
            return true;
        }
        
        @Override
        public int getPriority() {
            return 5;
        }
        
        @Override
        public String getPolicyName() {
            return "custom";
        }
    }
);

六、安全认证

6.1 TLS/SSL配置

// 服务端TLS配置
Server server = ServerBuilder.forPort(8443)
    .useTransportSecurity(
        new File("server.crt"),
        new File("server.key")
    )
    .addService(new UserServiceImpl())
    .build();

// 客户端TLS配置
ManagedChannel channel = ManagedChannelBuilder
    .forAddress("fgedudb", 8443)
    .useTransportSecurity()
    .build();

// 双向TLS认证
SslContext sslContext = GrpcSslContexts.forClient()
    .trustManager(new File("ca.crt"))
    .keyManager(new File("client.crt"), new File("client.key"))
    .build();

ManagedChannel channel = ManagedChannelBuilder
    .forAddress("fgedudb", 8443)
    .sslContext(sslContext)
    .build();

6.2 Token认证

// 客户端Token拦截器
public class AuthClientInterceptor implements ClientInterceptor {
    private final String token;
    
    public AuthClientInterceptor(String token) {
        this.token = token;
    }
    
    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
            MethodDescriptor<ReqT, RespT> method,
            CallOptions callOptions,
            Channel next) {
        return new ForwardingClientCall.SimpleForwardingClientCall<>(
                next.newCall(method, callOptions)) {
            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) {
                headers.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER),
                    "Bearer " + token);
                super.start(responseListener, headers);
            }
        };
    }
}

// 服务端Token验证
public class AuthServerInterceptor implements ServerInterceptor {
    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
            ServerCall<ReqT, RespT> call,
            Metadata headers,
            ServerCallHandler<ReqT, RespT> next) {
        String token = headers.get(
            Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER));
        
        if (token == null || !validateToken(token)) {
            call.close(Status.UNAUTHENTICATED.withDescription("Invalid token"), headers);
            return new ServerCall.Listener<ReqT>() {};
        }
        
        return next.startCall(call, headers);
    }
    
    private boolean validateToken(String token) {
        return token.startsWith("Bearer ") && token.length() > 7;
    }
}

七、监控与健康检查

7.1 健康检查

// 添加健康检查服务
Server server = ServerBuilder.forPort(50051)
    .addService(new UserServiceImpl())
    .addService(ProtoReflectionService.newInstance())
    .build();

// 使用grpc-health-probe
# 下载健康检查工具
wget https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/v0.4.22/grpc_health_probe-linux-amd64
chmod +x grpc_health_probe

# 执行健康检查
./grpc_health_probe -addr=fgedudb:50051

# 输出示例
status: SERVING

7.2 Prometheus监控

<!-- Maven依赖 -->
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-census</artifactId>
    <version>1.60.0</version>
</dependency>

// 配置Prometheus导出
GrpcPrometheusExporter exporter = new GrpcPrometheusExporter();
exporter.init();

Server server = ServerBuilder.forPort(50051)
    .addService(new UserServiceImpl())
    .intercept(MonitoringServerInterceptor.create(
        MonitoringServerInterceptor.allInstrumentations()))
    .build();

# prometheus.yml配置
scrape_configs:
  - job_name: 'grpc'
    static_configs:
      - targets: ['fgedudb:9090']

# 关键监控指标
grpc_server_started_total
grpc_server_handled_total
grpc_server_msg_received_total
grpc_server_msg_sent_total
grpc_server_handling_seconds

八、最佳实践

8.1 配置建议

配置项 建议值 说明
maxMessageSize 16MB 根据业务调整
keepAliveTime 30s 保持连接活跃
keepAliveTimeout 10s 超时检测
deadline 5s 请求超时

8.2 常见问题排查

# 查看gRPC端口
netstat -tlnp | grep 50051

# 使用grpcurl调试
grpcurl -plaintext fgedudb:50051 list
grpcurl -plaintext fgedudb:50051 describe com.example.grpc.UserService
grpcurl -plaintext -d '{"id": 1}' fgedudb:50051 com.example.grpc.UserService/GetUser

# 查看日志
tail -f /var/log/grpc/server.log

# 测试连接
telnet fgedudb 50051
注意事项:

  • 生产环境必须启用TLS加密
  • 合理设置消息大小限制
  • 配置健康检查和监控
  • 使用Deadline避免请求阻塞

九、总结

gRPC是高性能微服务通信的重要框架。通过本培训文档,您应该掌握了:

风哥风哥提示:

  • Protocol Buffers接口定义和代码生成
  • gRPC服务端和客户端实现
  • 四种通信模式的使用
  • 负载均衡和安全认证配置
  • 监控与健康检查
  • 最佳实践和问题排查
IT运维培训文档系列 | 第251篇 | gRPC服务通信培训

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息