gRPC简介#
- gRPC 是由google开源的一个高性能的RPC框架。Stubby Google内部的RPC,演化而来的,2015正式开源。云原生时代是一个RPC标准。
- gRPC 核心的设计思路
- 网络通信 —> gRPC自己封装网络通信的部分 提供多种语言的 网络通信的封装 (C Java[Netty] GO)
- 协议 —> HTTP2 传输数据的时候 二进制数据内容。 支持双向流(双工)连接的多路复用。
- 序列化 —> 基本文本 JSON 基于二进制 Java原生序列化方式 Thrift二进制的序列化 压缩二级制序列化。
protobuf (Protocol Buffers) google开源一种序列化方式 时间效率和空间效率是JSON的3—5倍。
IDL语言
- 代理的创建 —>让调用者像调用本地方法那样 去调用远端的服务方法。
stub
- gRPC的好处
- 高效的进行进程间通信。
- 支持多种语言 原生支持 C Go Java实现。C语言版本上扩展 C++ C# NodeJS Python Ruby PHP..
- 支持多平台运行 Linux Android IOS MacOS Windows。
- gPRC序列化方式采用protobuf,效率高。
- 使用Http2协议
- 大厂的背书
Http2.0协议#
- 回顾 Http1.x协议
Http1.0协议 请求响应的模式 短连接协议(无状态协议) 传输数据文本结构 单工 无法实现服务端推送 变相实现推动(客户端轮训的方式)
Http1.1协议 请求响应的模式 有限的长连接 升级的方式WebSocket 双工 实现服务器向客户端推送。
总结Http1.x协议 共性
- 传输数据文本格式 可读性好的但是效率差。
- 本质上Http1.x协议无法实现双工通信。
- 资源的请求。需要发送多次请求,建立多个连接才可以完成。
- HTTP2.0协议
- Http2.0协议是一个二进制协议,效率高于Http1.x协议,可读性差。
- 可以实现双工通信。
- 一个请求 一个连接 可以请求多个数据。【多路复用】
- Http2.0协议的三个概念
- 数据流 stream
- 消息 message
- 帧 frame
- 其他的相关概念
- 数据流的优先级,可以通过为不同的stream设置权重,来限制不同流的传输顺序。
- 流控 client发送的数据太快了,server处理不过来,通知client暂停数据的发送。
Protocol Buffers#
- protobuf 是一种与编程语言无关【IDL】,与具体的平台无关【OS】。他定义的中间语言,可以方便的在client 于 server中进行RPC的数据传输。
- protobuf 两种版本 proto2 proto3,但是目前主流应用的都是proto3。
- protobuf主要安装protobuf的编译器,编译器目的,可以把protobuf的IDL语言,转换成具体某一种开发语言。
protobuf编译器的安装#
https://github.com/protocolbuffers/protobuf/releases
- 直接解压缩 方式在一个特定的目录下面
- 直接配置环境变量 path
protoc –version
protobuf的语法详解#
文件格式#
.proto
版本设定#
syntax = “proto3”;
- 单行注释 //
- 多行注释 /* */
与Java语言相关的语法#
1
2
3
4
5
6
7
8
| //后续protobuf生成的java代码 一个源文件还是多个源文件 xx.java
option java_multiple_files = false;
//指定protobuf生成的类 放置在哪个包中
option java_package = "com.nebula";
//指定的protobuf生成的外部类的名字(管理内部类【内部类才是真正开发使用】)
option java_outer_classname = "NebulaService";
|
逻辑包#
1
2
| // 对于protobuf对于文件内容的管理
package xxx;
|
1
2
| //Service.proto
import "xxx/NebulaService.proto";
|
基本类型#
.proto Type | Java/Kotlin Type |
---|
double | double |
float | float |
int32 | int |
int64 | long |
bool | boolean |
string | String |
bytes | ByteString |
uint32 | int[2] |
uint64 | long[2] |
sint32 | int |
sint64 | long |
fixed32 | int[2] |
fixed64 | long[2] |
sfixed32 | int |
sfixed64 | long |
特殊类型#
enum#
1
2
3
4
5
| enum EnumMessage{
//枚举的编号必须是0开始
SPRING = 0;
SUMMER = 1;
}
|
map#
1
| map<string, string> map = 1;
|
oneof#
定义可选的互斥字段集合
1
2
3
4
| oneof OneOfMessage {
string name = 1;
int32 id = 2;
}
|
消息 Message#
消息可以定义多个
消息可以嵌套
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| message LoginRequest {
message data{
string code = 1;
string url = 2;
}//定义内部消息
string username = 1;
string password = 2;
int32 age = 3;
repeated code = 4;
data data = 5;//引用内部消息
}
message LoginResponse {
LoginRequest.data = 1;//引用其他消息或子消息
}
|
从1开始 到2^29-1 注意:19000 - 19999 不能用这个区间内的编号,因为他是protobuf自己保留的。
修饰符#
singular:这个字段的值只能是0个或1个(默认关键字)
repeated:返回值是多个,等价于List,通过getXxxList()方法获取,通过addXxx()方法添加
reserved:用于保留字段标签号,以防止将来的扩展导致冲突。
extensions:用于定义扩展字段,允许在不破坏现有消息格式的情况下添加新的字段。
服务 Service#
里面是可以定义多个服务方法。
定义多个服务接口
gPRC服务4个服务方式
1
2
3
| service HelloService{
rpc hello(HelloRequest) returns(HelloResponse){}
}
|
gPRC开发#
项目结构#
- xxxx-api 模块
定义 protobuf idl语言
并且通过命令创建对应的代码
- message
- service
- xxxx-server模块
- 实现api模块中定义的服务接口
- 发布gRPC服务 (创建服务端程序)
- xxxx-clien模块
- 创建服务端stub(代理)
- 基于代理(stub) RPC调用。
api模块#
- .proto文件 书写protobuf的IDL
- protoc命令 把proto文件中的IDL 转换成编程语言
protoc –java_out(目标语言)=/xxx/xxx(目标位置) /xxx/xxx/xx.proto(目标文件)
- maven插件 进行protobuf IDL文件的编译,并把他放置IDEA具体位置。
插件地址:https://github.com/grpc/grpc-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<!--也可以统一引入-->
<!--<dependency>-->
<!-- <groupId>io.grpc</groupId>-->
<!-- <artifactId>grpc-all</artifactId>-->
<!-- <version>1.61.1</version>-->
<!--</dependency>-->
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.21.7:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.52.1:exe:${os.detected.classifier}</pluginArtifact>
<outputDirectory>${basedir}/src/main/java</outputDirectory>
<!--指定代码生成的目录-->
<clearOutputDirectory>false</clearOutputDirectory>
<!--追加式代码生成-->
</configuration>
<executions>
<execution>
<goals>
<!--通过这两个命令编译java文件-->
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
|
Server服务端模块#
定义实现类#
继承ServiceImplBase添加具体功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
/*
1. 接受client提交的参数
2. 调用对应的业务功能
3. 提供返回值
*/
@Override
public void test(TestRequest request, StreamObserver<TestResponse> responseObserver) {
String name = request.getName();//1.接受client的请求参数,不同的修饰符用不同的方法获取,如repeated使用getXxxList()方法获取
System.out.println("name" + name); //2.业务处理
TestResponse.Builder builder = TestResponse.newBuilder(); //3.封装响应,创建相应对象的构造者
builder.setResult("hello " + name); //4.填充数据
TestResponse response = builder.build(); //5.封装响应
responseObserver.onNext(response);//6.响应client
responseObserver.onCompleted();//7.响应完成
}
}
|
配置服务端启动器#
1
2
3
4
5
6
7
8
9
10
| public class GrpcServer {
public static void main(String[] args) throws IOException, InterruptedException {
ServerBuilder serverBuilder = ServerBuilder.forPort(9090);//1. 绑定端口
serverBuilder.addService(new TestServiceImpl());//2. 发布服务
// serverBuilder.addService(new ServiceImpl());//多个服务直接罗列即可
Server server = serverBuilder.build();//3. 创建服务对象
server.start();
server.awaitTermination();
}
}
|
Client客户端模块#
配置客户端启动器#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| public class GrpcClient {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090).usePlaintext().build();//1.创建通信的管道
try {
TestServiceGrpc.TestServiceBlockingStub testService = TestServiceGrpc.newBlockingStub(channel);//2.获得代理对象 stub
TestRequest.Builder builder = TestRequest.newBuilder();//3.准备参数
builder.setName("nebula");
// builder.addName("nebula");list类型用setXxx方法
TestRequest testRequest = builder.build();
TestResponse testResponse = testService.test(testRequest);//4.进行功能rpc调用,获取响应的内容
String result = testResponse.getResult();
System.out.println("result:" + result);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
channel.shutdown();//5.释放资源
}
}
}
|
注意事项#
1
2
3
4
5
6
7
| 服务端 处理返回值
responseObserver.onNext(helloResponse1); //通过这个方法 把响应的消息 回传client
responseObserver.onCompleted(); //通知client 整个服务结束。底层返回标记
服务端 处理返回值
requestObserver.onNext(helloRequest1);
requestObserver.onCompleted();
|
gRpc的四种通信方式#
- 简单rpc 一元rpc (Unary RPC)
- 服务端流式RPC (Server Streaming RPC)
- 客户端流式RPC (Client Streaming RPC)
- 双向流RPC (Bi-directional Stream RPC)
简单RPC(一元RPC)#
当client发起调用后,提交数据,并且等待 服务端响应。
开发过程中,主要采用就是一元RPC的这种通信方式
1
2
3
4
| service HelloService{
rpc hello(HelloRequest) returns (HelloResponse){}
rpc hello1(HelloRequest1) returns (HelloResponse1){}
}
|
服务端流式RPC#
一个请求对象,服务端可以回传多个结果对象。
1
2
3
4
| service HelloService{
rpc hello(HelloRequest) returns (stream HelloResponse){}
rpc hello1(HelloRequest1) returns (stream HelloResponse1){}
}
|
服务端#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| public void c2ss(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) {
//1 接受client的请求参数
String name = request.getName();
//2 做业务处理
System.out.println("name = " + name);
//3 根据业务处理的结果,提供响应
for (int i = 0; i < 9; i++) {
HelloProto.HelloResponse.Builder builder = HelloProto.HelloResponse.newBuilder();
builder.setResult("处理的结果 " + i);
HelloProto.HelloResponse helloResponse = builder.build();
responseObserver.onNext(helloResponse);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
responseObserver.onCompleted();
}
|
阻塞方式客户端#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| public class GprcClient3 {
public static void main(String[] args) {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
try {
HelloServiceGrpc.HelloServiceBlockingStub helloService = HelloServiceGrpc.newBlockingStub(managedChannel);
HelloProto.HelloRequest.Builder builder = HelloProto.HelloRequest.newBuilder();
builder.setName("sunshuai");
HelloProto.HelloRequest helloRequest = builder.build();
Iterator<HelloProto.HelloResponse> helloResponseIterator = helloService.c2ss(helloRequest);
while (helloResponseIterator.hasNext()) {
HelloProto.HelloResponse helloResponse = helloResponseIterator.next();
System.out.println("helloResponse.getResult() = " + helloResponse.getResult());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
managedChannel.shutdown();
}
}
}
|
异步方式客户端#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| public class GrpcClient4 {
public static void main(String[] args) {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
try {
HelloServiceGrpc.HelloServiceStub helloService = HelloServiceGrpc.newStub(managedChannel);
HelloProto.HelloRequest.Builder builder = HelloProto.HelloRequest.newBuilder();
builder.setName("xiaohei");
HelloProto.HelloRequest helloRequest = builder.build();
helloService.c2ss(helloRequest, new StreamObserver<HelloProto.HelloResponse>() {
@Override
public void onNext(HelloProto.HelloResponse value) {
//服务端 响应了 一个消息后,需要立即处理的话。把代码写在这个方法中。
System.out.println("服务端每一次响应的信息 " + value.getResult());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
//需要把服务端 响应的所有数据 拿到后,在进行业务处理。
System.out.println("服务端响应结束 后续可以根据需要 在这里统一处理服务端响应的所有内容");
}
});
managedChannel.awaitTermination(12, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
} finally {
managedChannel.shutdown();
}
}
}
|
客户端流式RPC#
客户端发送多个请求对象,服务端只返回一个结果
1
| rpc cs2s(stream HelloRequest) returns (HelloResponse){}
|
服务端#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| public StreamObserver<HelloProto.HelloRequest> cs2s(StreamObserver<HelloProto.HelloResponse> responseObserver) {
return new StreamObserver<HelloProto.HelloRequest>() {
@Override
public void onNext(HelloProto.HelloRequest value) {
System.out.println("接受到了client发送一条消息 " + value.getName());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
System.out.println("client的所有消息 都发送到了 服务端 ....");
//提供响应:响应的目的:当接受了全部client提交的信息,并处理后,提供相应
HelloProto.HelloResponse.Builder builder = HelloProto.HelloResponse.newBuilder();
builder.setResult("this is result");
HelloProto.HelloResponse helloResponse = builder.build();
responseObserver.onNext(helloResponse);
responseObserver.onCompleted();
}
};
}
|
客户端#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
| public class GrpcClient5 {
public static void main(String[] args) {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
try {
HelloServiceGrpc.HelloServiceStub helloService = HelloServiceGrpc.newStub(managedChannel);
StreamObserver<HelloProto.HelloRequest> helloRequestStreamObserver = helloService.cs2s(new StreamObserver<HelloProto.HelloResponse>() {
@Override
public void onNext(HelloProto.HelloResponse value) {
// 监控响应
System.out.println("服务端 响应 数据内容为 " + value.getResult());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
System.out.println("服务端响应结束 ... ");
}
});
//客户端 发送数据 到服务端 多条数据 ,不定时...
for (int i = 0; i < 10; i++) {
HelloProto.HelloRequest.Builder builder = HelloProto.HelloRequest.newBuilder();
builder.setName("sunshuai " + i);
HelloProto.HelloRequest helloRequest = builder.build();
helloRequestStreamObserver.onNext(helloRequest);
Thread.sleep(1000);
}
helloRequestStreamObserver.onCompleted();
managedChannel.awaitTermination(12, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
} finally {
managedChannel.shutdown();
}
}
}
|
双向流式RPC#
客户端可以发送多个请求消息,服务端响应多个响应消息
1
| rpc cs2ss(stream HelloRequest) returns (stream HelloResponse){}
|
服务端#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| public StreamObserver<HelloProto.HelloRequest> cs2ss(StreamObserver<HelloProto.HelloResponse> responseObserver) {
return new StreamObserver<HelloProto.HelloRequest>() {
@Override
public void onNext(HelloProto.HelloRequest value) {
System.out.println("接受到client 提交的消息 " + value.getName());
responseObserver.onNext(HelloProto.HelloResponse.newBuilder().setResult("response " + value.getName() + " result ").build());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
System.out.println("接受到了所有的请求消息 ... ");
responseObserver.onCompleted();
}
};
}
|
客户端#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| public class GrpcClient6 {
public static void main(String[] args) {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
try {
HelloServiceGrpc.HelloServiceStub helloService = HelloServiceGrpc.newStub(managedChannel);
StreamObserver<HelloProto.HelloRequest> helloRequestStreamObserver = helloService.cs2ss(new StreamObserver<HelloProto.HelloResponse>() {
@Override
public void onNext(HelloProto.HelloResponse value) {
System.out.println("响应的结果 " + value.getResult());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
System.out.println("响应全部结束...");
}
});
for (int i = 0; i < 10; i++) {
helloRequestStreamObserver.onNext(HelloProto.HelloRequest.newBuilder().setName("sunshuai " + i).build());
}
helloRequestStreamObserver.onCompleted();
managedChannel.awaitTermination(12, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
} finally {
managedChannel.shutdown();
}
}
}
|
gPRC代理方式#
- BlockingStub
阻塞 通信方式
- Stub
异步 通过监听处理的
- FutureStub
同步 异步 NettyFuture
FutureStub只能应用 一元RPC
FutureStub基本使用#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| public class GrpcClient7 {
public static void main(String[] args) {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
try {
TestServiceGrpc.TestServiceFutureStub testServiceFutureStub = TestServiceGrpc.newFutureStub(managedChannel);
ListenableFuture<TestProto.TestResponse> responseListenableFuture = testServiceFutureStub.testSuns(TestProto.TestRequest.newBuilder().setName("xiaojren").build());
/* 同步操作
TestProto.TestResponse testResponse = responseListenableFuture.get();
System.out.println(testResponse.getResult());*/
/* responseListenableFuture.addListener(() -> {
System.out.println("异步的rpc响应 回来了....");
}, Executors.newCachedThreadPool());*/
Futures.addCallback(responseListenableFuture, new FutureCallback<TestProto.TestResponse>() {
@Override
public void onSuccess(TestProto.TestResponse result) {
System.out.println("result.getResult() = " + result.getResult());
}
@Override
public void onFailure(Throwable t) {
}
}, Executors.newCachedThreadPool());
System.out.println("后续的操作....");
managedChannel.awaitTermination(12, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
} finally {
managedChannel.shutdown();
}
}
}
|
gPRC与SpringBoot整合#
gRPC和SpringBoot整合的思想#
- grpc-server
- grpc-client
服务端#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| <!--引入自定义API模块-->
<dependencys>
<dependency>
<groupId>com.nebula</groupId>
<artifactId>rpc-grpc-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-server-spring-boot-starter</artifactId>
<version>2.14.0.RELEASE</version>
</dependency>
</dependencys>
|
开发服务#
1
2
3
4
5
6
7
8
9
10
11
12
|
@GrpcService
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {
@Override
public void hello(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) {
String name = request.getName();
System.out.println("name is " + name);
responseObserver.onNext(HelloProto.HelloResponse.newBuilder().setResult("this is result").build());
responseObserver.onCompleted();
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
| # 核心配置的 就是gRPC服务的端口号
spring:
application:
name: boot-server
main:
web-application-type: none
# 不启动容器
grpc:
server:
port: 9000
|
客户端#
环境搭建#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
<dependencys>
<dependency>
<groupId>com.nebula</groupId>
<artifactId>rpc-grpc-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-client-spring-boot-starter</artifactId>
<version>2.14.0.RELEASE</version>
</dependency>
</dependencys>
|
开发服务#
1
2
3
4
5
| grpc:
client:
grpc-server:
address: 'static://127.0.0.1:9000'
negotiation-type: plaintext
|
1
2
| @GrpcClient("grpc-server")
private HelloServiceGrpc.HelloServiceBlockingStub stub;
|
注意设置入口扫描(ComponentScan)