一、gRPC概述
gRPC是Google开源的高性能RPC框架,基于HTTP/2协议传输,使用Protocol Buffers作为接口定义语言,支持多种编程语言。
1.1 gRPC核心特性
- 高性能:基于HTTP/2,支持多路复用、头部压缩
- 强类型:使用Protocol Buffers定义接口,类型安全
- 多语言支持:支持Java、Python、Go、C++等多种语言
- 四种通信模式:简单RPC、服务端流、客户端流、双向流
1.2 gRPC架构
gRPC架构组件:
┌─────────────────────────────────────────┐
│ Client Application │
├─────────────────────────────────────────┤
│ Client Stub (Generated) │
├─────────────────────────────────────────┤
│ gRPC Channel │
├─────────────────────────────────────────┤
│ HTTP/2 Transport │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ HTTP/2 Transport │
├─────────────────────────────────────────┤
│ gRPC Server │
├─────────────────────────────────────────┤
│ Service Implementation │
├─────────────────────────────────────────┤
│ Server Application │
└─────────────────────────────────────────┘
二、Protocol Buffers定义
2.1 Proto文件编写
// user.proto
syntax = "proto3";
package com.example.grpc;
option java_multiple_files = true;
option java_package = "com.example.grpc";
option java_outer_classname = "UserServiceProto";
// 用户消息定义
message User {
int64 id = 1;
string name = 2;
string email = 3;
int32 age = 4;
UserStatus status = 5;
}
enum UserStatus {
UNKNOWN = 0;
ACTIVE = 1;
INACTIVE = 2;
SUSPENDED = 3;
}
// 请求消息
message GetUserRequest {
int64 id = 1;
}
message CreateUserRequest {
string name = 1;
string email = 2;
int32 age = 3;
}
message ListUsersRequest {
int32 page = 1;
int32 size = 2;
}
message ListUsersResponse {
repeated User users = 1;
int32 total = 2;
}
message DeleteUserRequest {
int64 id = 1;
}
message DeleteUserResponse {
bool success = 1;
string message = 2;
}
// 服务定义
service UserService {
// 简单RPC
rpc GetUser(GetUserRequest) returns (User);
// 服务端流式RPC
rpc ListUsers(ListUsersRequest) returns (stream User);
// 客户端流式RPC
rpc CreateUsers(stream CreateUserRequest) returns (ListUsersResponse);
// 双向流式RPC
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
message ChatMessage {
int64 user_id = 1;
string message = 2;
int64 timestamp = 3;
}
2.2 代码生成
# 安装protoc编译器
wget https://github.com/protocolbuffers/protobuf/releases/download/v25.1/protoc-25.1-linux-x86_64.zip
unzip protoc-25.1-linux-x86_64.zip -d /usr/local/
export PATH=$PATH:/usr/local/bin
# 安装Java插件
# Maven pom.xml配置
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.25.1:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.60.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
# 手动生成代码
protoc --java_out=src/main/java \
--grpc-java_out=src/main/java \
--plugin=protoc-gen-grpc-java=/path/to/protoc-gen-grpc-java \
src/main/proto/user.proto
# 输出示例
Generating Java code from user.proto…
– UserServiceProto.java
– UserServiceGrpc.java
Done. Generated 2 files.
Generating Java code from user.proto…
– UserServiceProto.java
– UserServiceGrpc.java
Done. Generated 2 files.
三、服务端实现
3.1 Maven依赖
<!-- pom.xml -->
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.60.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.60.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.60.0</version>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>
3.2 服务实现
// UserServiceImpl.java
package com.example.grpc.server;
import com.example.grpc.*;
import io.grpc.stub.StreamObserver;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class UserServiceImpl extends UserServiceGrpc.UserServiceImplBase {
private Map<Long, User> userStore = new ConcurrentHashMap<>();
private AtomicLong idGenerator = new AtomicLong(1);
@Override
public void getUser(GetUserRequest request, StreamObserver<User> responseObserver) {
User user = userStore.get(request.getId());
if (user != null) {
responseObserver.onNext(user);
responseObserver.onCompleted();
} else {
responseObserver.onError(
io.grpc.Status.NOT_FOUND
.withDescription("User not found: " + request.getId())
.asRuntimeException()
);
}
}
@Override
public void listUsers(ListUsersRequest request, StreamObserver<User> responseObserver) {
List<User> users = new ArrayList<>(userStore.values());
int start = request.getPage() * request.getSize();
int end = Math.min(start + request.getSize(), users.size());
for (int i = start; i < end; i++) {
responseObserver.onNext(users.get(i));
}
responseObserver.onCompleted();
}
@Override
public StreamObserver<CreateUserRequest> createUsers(
StreamObserver<ListUsersResponse> responseObserver) {
return new StreamObserver<CreateUserRequest>() {
List<User> createdUsers = new ArrayList<>();
@Override
public void onNext(CreateUserRequest request) {
User user = User.newBuilder()
.setId(idGenerator.getAndIncrement())
.setName(request.getName())
.setEmail(request.getEmail())
.setAge(request.getAge())
.setStatus(UserStatus.ACTIVE)
.build();
userStore.put(user.getId(), user);
createdUsers.add(user);
}
@Override
public void onError(Throwable t) {
responseObserver.onError(t);
}
@Override
public void onCompleted() {
ListUsersResponse response = ListUsersResponse.newBuilder()
.addAllUsers(createdUsers)
.setTotal(createdUsers.size())
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
};
}
}
3.3 服务端启动
// GrpcServer.java
package com.example.grpc.server;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.protobuf.services.ProtoReflectionService;
public class GrpcServer {
private Server server;
private int port = 50051;
public void start() throws Exception {
server = ServerBuilder.forPort(port)
.addService(new UserServiceImpl())
.addService(ProtoReflectionService.newInstance())
.intercept(new LoggingInterceptor())
.build()
.start();
System.out.println("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.err.println("Shutting down gRPC server...");
GrpcServer.this.stop();
}));
}
public void stop() {
if (server != null) {
server.shutdown();
}
}
public void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
public static void main(String[] args) throws Exception {
GrpcServer server = new GrpcServer();
server.start();
server.blockUntilShutdown();
}
}
// 日志拦截器
public class LoggingInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
System.out.println("Received call: " + call.getMethodDescriptor().getFullMethodName());
return next.startCall(call, headers);
}
}
四、客户端实现
4.1 同步客户端
// GrpcClient.java
package com.example.grpc.client;
import com.example.grpc.*;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
public class GrpcClient {
private final UserServiceGrpc.UserServiceBlockingStub blockingStub;
public GrpcClient(Channel channel) {
blockingStub = UserServiceGrpc.newBlockingStub(channel);
}
public User getUser(long id) {
GetUserRequest request = GetUserRequest.newBuilder()
.setId(id)
.build();
try {
return blockingStub.getUser(request);
} catch (StatusRuntimeException e) {
System.err.println("RPC failed: " + e.getStatus());
return null;
}
}
public void listUsers(int page, int size) {
ListUsersRequest request = ListUsersRequest.newBuilder()
.setPage(page)
.setSize(size)
.build();
try {
Iterator<User> users = blockingStub.listUsers(request);
while (users.hasNext()) {
User user = users.next();
System.out.println("User: " + user.getName());
}
} catch (StatusRuntimeException e) {
System.err.println("RPC failed: " + e.getStatus());
}
}
public static void main(String[] args) throws Exception {
ManagedChannel channel = ManagedChannelBuilder
.forAddress("fgedudb", 50051)
.usePlaintext()
.build();
try {
GrpcClient client = new GrpcClient(channel);
User user = client.getUser(1);
if (user != null) {
System.out.println("Found user: " + user.getName());
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
4.2 异步客户端
// AsyncGrpcClient.java
package com.example.grpc.client;
import com.example.grpc.*;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class AsyncGrpcClient {
private final UserServiceGrpc.UserServiceStub asyncStub;
public AsyncGrpcClient(Channel channel) {
asyncStub = UserServiceGrpc.newStub(channel);
}
public void getUserAsync(long id, CountDownLatch latch) {
GetUserRequest request = GetUserRequest.newBuilder()
.setId(id)
.build();
asyncStub.getUser(request, new StreamObserver<User>() {
@Override
public void onNext(User user) {
System.out.println("Received user: " + user.getName());
}
@Override
public void onError(Throwable t) {
System.err.println("Error: " + t.getMessage());
latch.countDown();
}
@Override
public void onCompleted() {
System.out.println("Request completed");
latch.countDown();
}
});
}
public void createUsersAsync(List<CreateUserRequest> requests, CountDownLatch latch) {
StreamObserver<ListUsersResponse> responseObserver = new StreamObserver<ListUsersResponse>() {
@Override
public void onNext(ListUsersResponse response) {
System.out.println("Created " + response.getTotal() + " users");
}
@Override
public void onError(Throwable t) {
System.err.println("Error: " + t.getMessage());
latch.countDown();
}
@Override
public void onCompleted() {
System.out.println("Create users completed");
latch.countDown();
}
};
StreamObserver<CreateUserRequest> requestObserver =
asyncStub.createUsers(responseObserver);
for (CreateUserRequest request : requests) {
requestObserver.onNext(request);
}
requestObserver.onCompleted();
}
}
五、负载均衡
5.1 客户端负载均衡
// 配置负载均衡策略
ManagedChannel channel = ManagedChannelBuilder
.forTarget("dns:///user-service")
.defaultLoadBalancingPolicy("round_robin")
.usePlaintext()
.build();
// 自定义负载均衡策略
public class CustomLoadBalancer extends LoadBalancer {
private List<EquivalentAddressGroup> servers;
private int currentIndex = 0;
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
servers = resolvedAddresses.getAddresses();
}
@Override
public void requestConnection() {
if (servers != null && !servers.isEmpty()) {
EquivalentAddressGroup server = servers.get(currentIndex);
currentIndex = (currentIndex + 1) % servers.size();
helper.createSubchannel(server, Attributes.EMPTY);
}
}
}
// 注册自定义负载均衡
LoadBalancerRegistry.getDefaultRegistry().register(
new LoadBalancerProvider() {
@Override
public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
return new CustomLoadBalancer(helper);
}
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 5;
}
@Override
public String getPolicyName() {
return "custom";
}
}
);
六、安全认证
6.1 TLS/SSL配置
// 服务端TLS配置
Server server = ServerBuilder.forPort(8443)
.useTransportSecurity(
new File("server.crt"),
new File("server.key")
)
.addService(new UserServiceImpl())
.build();
// 客户端TLS配置
ManagedChannel channel = ManagedChannelBuilder
.forAddress("fgedudb", 8443)
.useTransportSecurity()
.build();
// 双向TLS认证
SslContext sslContext = GrpcSslContexts.forClient()
.trustManager(new File("ca.crt"))
.keyManager(new File("client.crt"), new File("client.key"))
.build();
ManagedChannel channel = ManagedChannelBuilder
.forAddress("fgedudb", 8443)
.sslContext(sslContext)
.build();
6.2 Token认证
// 客户端Token拦截器
public class AuthClientInterceptor implements ClientInterceptor {
private final String token;
public AuthClientInterceptor(String token) {
this.token = token;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<>(
next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
headers.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER),
"Bearer " + token);
super.start(responseListener, headers);
}
};
}
}
// 服务端Token验证
public class AuthServerInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
String token = headers.get(
Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER));
if (token == null || !validateToken(token)) {
call.close(Status.UNAUTHENTICATED.withDescription("Invalid token"), headers);
return new ServerCall.Listener<ReqT>() {};
}
return next.startCall(call, headers);
}
private boolean validateToken(String token) {
return token.startsWith("Bearer ") && token.length() > 7;
}
}
七、监控与健康检查
7.1 健康检查
// 添加健康检查服务
Server server = ServerBuilder.forPort(50051)
.addService(new UserServiceImpl())
.addService(ProtoReflectionService.newInstance())
.build();
// 使用grpc-health-probe
# 下载健康检查工具
wget https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/v0.4.22/grpc_health_probe-linux-amd64
chmod +x grpc_health_probe
# 执行健康检查
./grpc_health_probe -addr=fgedudb:50051
# 输出示例
status: SERVING
7.2 Prometheus监控
<!-- Maven依赖 -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-census</artifactId>
<version>1.60.0</version>
</dependency>
// 配置Prometheus导出
GrpcPrometheusExporter exporter = new GrpcPrometheusExporter();
exporter.init();
Server server = ServerBuilder.forPort(50051)
.addService(new UserServiceImpl())
.intercept(MonitoringServerInterceptor.create(
MonitoringServerInterceptor.allInstrumentations()))
.build();
# prometheus.yml配置
scrape_configs:
- job_name: 'grpc'
static_configs:
- targets: ['fgedudb:9090']
# 关键监控指标
grpc_server_started_total
grpc_server_handled_total
grpc_server_msg_received_total
grpc_server_msg_sent_total
grpc_server_handling_seconds
八、最佳实践
8.1 配置建议
| 配置项 | 建议值 | 说明 |
|---|---|---|
| maxMessageSize | 16MB | 根据业务调整 |
| keepAliveTime | 30s | 保持连接活跃 |
| keepAliveTimeout | 10s | 超时检测 |
| deadline | 5s | 请求超时 |
8.2 常见问题排查
# 查看gRPC端口
netstat -tlnp | grep 50051
# 使用grpcurl调试
grpcurl -plaintext fgedudb:50051 list
grpcurl -plaintext fgedudb:50051 describe com.example.grpc.UserService
grpcurl -plaintext -d '{"id": 1}' fgedudb:50051 com.example.grpc.UserService/GetUser
# 查看日志
tail -f /var/log/grpc/server.log
# 测试连接
telnet fgedudb 50051
注意事项:
- 生产环境必须启用TLS加密
- 合理设置消息大小限制
- 配置健康检查和监控
- 使用Deadline避免请求阻塞
九、总结
gRPC是高性能微服务通信的重要框架。通过本培训文档,您应该掌握了:
风哥风哥提示:
- Protocol Buffers接口定义和代码生成
- gRPC服务端和客户端实现
- 四种通信模式的使用
- 负载均衡和安全认证配置
- 监控与健康检查
- 最佳实践和问题排查
IT运维培训文档系列 | 第251篇 | gRPC服务通信培训
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
