RPC

参考

1
2
https://www.nowcoder.com/discuss/588903?type=post&order=time&pos=&page=2&
channel=-1&source_id=search_post_nctrack

RPC

什么是RPC

RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程
计算机程序上请求服务,而不需要了解底层网络技术的协议。比如两个不同的
服务A、B 部署在两台不同的机器上,那么服务A 如果想要调用服务B 中的某
个方法该怎么办呢?使用 HTTP请求当然可以,但是可能会比较慢而且一些优
化做的并不好。RPC 的出现就是为了解决这个问题。
最终解决的问题:让分布式或者微服务系统中不同服务之间的调用像本地调用
一样简单

RPC简要原理

  1. 服务消费方(client)调用以本地调用方式调用服务
  2. client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的
    消息体
  3. client stub找到服务地址,并将消息发送到服务端
  4. server stub收到消息后进行解码;
  5. server stub根据解码结果调用本地的服务
  6. 本地服务执行并将结果返回给server stub
  7. server stub将返回结果打包成消息并发送至消费方
  8. client stub接收到消息,并进行解码
  9. 服务消费方得到最终结果

常见PRC框架

  1. Dubbo: Dubbo 是阿里巴巴公司开源的一个高性能优秀的服务框架,使得
    应用可通过高性能的RPC 实现服务的输出和输入功能,可以和Spring框架无
    缝集成。目前Dubbo 已经成为 Spring Cloud Alibaba 中的官方组件
  2. gRPC :gRPC 是可以在任何环境中运行的高性能RPC框架。它可以通过可
    插拔的支持来有效地连接数据中心内和跨数据中心的服务,以实现负载平衡
    ,跟踪,运行状况检查和身份验证。它也适用于分布式计算的最后一英里,
    以将设备,移动应用程序和浏览器连接到后端服务
  3. Hessian: Hessian是一个轻量级的 remoting-on-http 工具,使用简
    单的方法提供了RMI 的功能。 相比WebService,Hessian 更简单、快捷。采
    用的是二进制RPC协议,因为采用的是二进制协议,所以它很适合于发送二进
    制数据

为什么用RPC,不用HTTP

  1. 首先需要指正,这两个并不是并行概念。RPC 是一种设计,就是为了解决
    不同服务之间的调用问题,完整的RPC 实现一般会包含有传输协议和序列化
    协议这两个
  2. 而HTTP 是一种传输协议,RPC 框架完全可以使用HTTP 作为传输协议,
    也可以直接使用TCP,使用不同的协议一般也是为了适应不同的场景

使用TCP和使用HTTP的区别

  1. 传输效率
  • TCP 通常自定义上层协议,可以让请求报文体积更小
  • HTTP 如果是基于HTTP 1.1 的协议,请求中会包含很多无用的内容
  1. 性能消耗,主要在于序列化和反序列化的耗时
  • TCP 可以基于各种序列化框架进行,效率比较高
  • HTTP 大部分是通过json 来实现的,字节大小和序列化耗时都要更消
    耗性能
  1. 跨平台
  • TCP 通常要求客户端和服务器为统一平台
  • HTTP 可以在各种异构系统上运行

总结:RPC 的 TCP 方式主要用于公司内部的服务调用,性能消耗低,传输效
率高。HTTP主要用于对外的异构环境,浏览器接口调用,APP接口调用,第三
方接口调用等

Java

调用如何在客户端无感(动态代理)

基于动态代理生成代理对象,当调用代理对象的方法时,由代理进行相关信息
(方法、参数等)的组装并发送到服务器进行远程调用,并由代理接收调用结
果并返回

代理没有实现接口的对象

  1. CGLIB 框架实现了对无接口的对象进行代理的方式。JDK 动态代理是基于接
    口实现的,而 CGLIB 是基于继承实现的。它会对目标类产生一个代理子类,通
    过方法拦截技术对过滤父类的方法调用。代理子类需要实 MethodInterceptor
    接口
  2. CGLIB 底层是通过 asm 字节码框架实时生成类的字节码,达到动态创建
    类的目的,效率较 JDK 动态代理低。Spring 中的AOP 就是基于动态代理的
    ,如果被代理类实现了某个接口,Spring 会采用 JDK 动态代理,否则会采
    用CGLIB

对象是怎么在网络中传输的(序列化)

通过将对象序列化成字节数组,即可将对象发送到网络中。
序列化在计算机科学的资料处理中,是指将数据结构或对象状态转换成可取用
格式(例如存成文件,存于缓冲,或经由网络中发送),以留待后续在相同或
另一台计算机环境中,能恢复原先状态的过程。依照序列化格式重新获取字节
的结果时,可以利用它来产生与原始对象相同语义的副本。对于许多对象,像
是使用大量引用的复杂对象,这种序列化重建的过程并不容易。这种过程也称
为对象编组(marshalling)。从一系列字节提取数据结构的反向操作,是反
序列化(也称为解编组)。
在Java中想要序列化一个对象,这个对象所属的类必须实现了Serializable
接口,并且其内部属性必须都是可序列化的。如果有一个属性不是可序列化的
,则该属性必须被声明为transient。
JDK 中提供了ObjectOutStream 类来对对象进行序列化。

你的框架实现了哪几种序列化方式

实现了JSON、Kryo、Hessian 和 Protobuf 的序列化。除了JSON剩下的都是基
于字节的序列化。

  1. JSON 是一种轻量级的数据交换语言,该语言以易于让人阅读的文字为基础
    ,用来传输由属性值或者序列性的值组成的数据对象,类似xml,Json 比xml
    更小、更快更容易解析。JSON 由于采用字符方式存储,占用相对于字节方式较
    大,并且序列化后类的信息会丢失,可能导致反序列化失败
  2. Kryo 是一个快速高效的Java 序列化框架,旨在提供快速、高效和易用的
    API。无论文件、数据库或网络数据Kryo 都可以随时完成序列化。Kryo 还可以
    执行自动深拷贝、浅拷贝。这是对象到对象的直接拷贝,而不是对象->字节->
    对象的拷贝。kryo 速度较快,序列化后体积较小,但是跨语言支持较复杂
  3. Hessian 是一个基于二进制的协议,Hessian 支持很多种语言,例如 Java
    、python、c++,、net/c#、D、Erlang、PHP、Ruby、object-c等,它的序列化和反
    序列化也是非常高效。速度较慢,序列化后的体积较大
  4. protobuf 是由 Google 发布的数据交换格式,提供跨语言、跨平台的序列
    化和反序列化实现,底层由 C++ 实现,其他平台使用时必须使用 protocol
    compiler 进行预编译生成 protoc 二进制文件。性能主要消耗在文件的预
    编译上。序列化反序列化性能较高,平台无关

RPC简单实现

项目结构

  1. docs文件夹:这个文档的源文件
  2. images文件夹:Readme所用到的图片(其实只有一张)
  3. rpc-api文件夹:服务端与客户端的公共调用接口
  4. rpc-common文件夹:项目中的一些通用的枚举类和工具类
  5. rpc-core文件夹:框架的核心实现
  6. test-client文件夹:测试用的客户端项目
  7. test-server文件夹:测试用的服务端项目
  8. .gitignore:就是.gitignore
  9. .travis.yml:持续集成的脚本(其实什么也没干)
  10. LICENSE:基于MIT开源协议哦
  11. README.md:就是Readme
  12. pom.xml:项目的总的pom

modules的作用

在pom.xml文件中构建modules模块

  1. 用来管理同个项目中的各个模块,每个模块的配置都在各自的pom.xml里
  2. 在构建这个项目的时候,不需要深入每个module去单独构建,而只是在项
    目A下的pom.xml构建,就会完成对两个module的构建

通用接口

创建HelloService接口

1
2
3
4
5
package gaoming.rpc.api;

public interface HelloService {
String hello(HelloObject object);
}

hello方法需要传递一个对象,HelloObject对象,定义如下:

1
2
3
4
5
6
7
8
9
import java.io.Serializable;
@Data
@NoArgsConstructor
@AllArgsConstructor

public class HelloObject implements Serializable {
private Integer id;
private String message;
}

注意这个对象需要实现Serializable接口,因为它需要在调用过程中从客
户端传递给服务端。接着我们在服务端对这个接口进行实现,实现的方式也
很简单,返回一个字符串就行

1
2
3
4
5
6
7
8
9
10
public class HelloServiceImpl implements HelloService {
//声明一个Logger
private static final Logger logger = LoggerFactory.getLogger(
HelloServiceImpl.class);
public String hello(HelloObject object) {
//输出的log信息
logger.info("接收到消息:{}", object.getMessage());
return "这是掉用的返回值,id=" + object.getId();
}
}

Logger

打印日志

传输协议

首先需要考虑服务端需要哪些信息,才能唯一确定服务端需要调用的接口的方
法。首先就是接口的名字和方法的名字,但是由于方法重载的缘故,我们还需
要这个方法的所有参数的类型,最后客户端调用时还需要传递参数的实际值
,那么服务端知道以上四个条件,就可以找到这个方法并且调用了。我们把
这四个条件写到一个对象里,到时候传输时传输这个对象就行了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class RpcRequest {
/**
* 请求号
*/
private String requestId;
/**
* 待调用接口名称
*/
private String interfaceName;
/**
* 待调用方法名称
*/
private String methodName;
/**
* 调用方法的参数
*/
private Object[] parameters;
/**
* 调用方法的参数类型,也可以用字符串
*/
private Class<?>[] paramTypes;
}

服务器调用完这个方法后,需要给客户端返回哪些信息呢?如果调用成功的
话,显然需要返回值,如果调用失败了,就需要失败的信息,这里封装成一
个RpcResponse对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Data
public class RpcResponse<T> implements Serializable {
/**
* 响应状态码
*/
private Integer statusCode;
/**
* 响应状态补充信息
*/
private String message;
/**
* 响应数据
*/
private T data;
public static <T> RpcResponse<T> success(T data) {
RpcResponse<T> response = new RpcResponse<>();
response.setStatusCode(ResponseCode.SUCCESS.getCode());
response.setData(data);
return response;
}
public static <T> RpcResponse<T> fail(ResponseCode code) {
RpcResponse<T> response = new RpcResponse<>();
response.setStatusCode(code.getCode());
response.setMessage(code.getMessage());
return response;
}
}

这里还多写了两个静态方法,用于快速生成成功与失败的响应对象。其中
statusCode属性可以自行定义,客户端服务端一致即可

客户端的实现——动态代理

客户端方面,由于在客户端这一侧我们并没有接口的具体实现类,就没有办
法直接生成实例对象。这时,我们可以通过动态代理的方式生成实例,并且
调用方法时生成需要的RpcRequest对象并且发送给服务端。这里我们采用
JDK动态代理,代理类是需要实现InvocationHandler接口的

1
2
3
4
5
6
7
8
9
10
11
12
13
public class RpcClientProxy implements InvocationHandler {
private String host;
private int port;
public RpcClientProxy(String host, int port) {
this.host = host;
this.port = port;
}
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(),
new Class<?>[]{clazz}, this);
}
}

我们需要传递host和port来指明服务端的位置。并且使用getProxy()方法来
生成代理对象。InvocationHandler接口需要实现invoke()方法,来指明代
理对象的方法被调用时的动作。在这里我们显然就需要生成一个RpcRequest
对象,发送出去,然后返回从服务端接收到的结果即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
RpcRequest rpcRequest = RpcRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.parameters(args)
.paramTypes(method.getParameterTypes())
.build();
RpcClient rpcClient = new RpcClient();
return ((RpcResponse) rpcClient.sendRequest(rpcRequest, host,
port)).getData();
}

生成RpcRequest很简单,我使用Builder模式来生成这个对象。发送的逻辑我
使用了一个RpcClient对象来实现,这个对象的作用,就是将一个对象发过去
,并且接受返回的对象。Socket是应用层与TCP/IP协议通信的中间软件抽象
层,它是一组接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class RpcClient {
private static final Logger logger = LoggerFactory.getLogger(
RpcClient.class);
public Object sendRequest(RpcRequest rpcRequest, String host, int port) {
try (Socket socket = new Socket(host, port)) {
ObjectOutputStream objectOutputStream = new ObjectOutputStream(
socket.getOutputStream());
ObjectInputStream objectInputStream = new ObjectInputStream(
socket.getInputStream());
objectOutputStream.writeObject(rpcRequest);
objectOutputStream.flush();
return objectInputStream.readObject();
} catch (IOException | ClassNotFoundException e) {
logger.error("调用时有错误发生:", e);
return null;
}
}
}

我的实现很简单,直接使用Java的序列化方式,通过Socket传输。创建一个
Socket,获取ObjectOutputStream对象,然后把需要发送的对象传进去即
可,接收时获取ObjectInputStream对象,readObject()方法就可以获得
一个返回的对象

Builder模式

将一个复杂对象的构建与其表示分离,使得同样的构建过程可以创建不同的表示
,建造者模式

服务端的实现——反射调用

服务端的实现就简单多了,使用一个ServerSocket监听某个端口,循环接收
连接请求,如果发来了请求就创建一个线程,在新线程中处理调用。这里创建
线程采用线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class RpcServer {
private final ExecutorService threadPool;
private static final Logger logger = LoggerFactory.getLogger(
RpcServer.class);
public RpcServer() {
int corePoolSize = 5;
int maximumPoolSize = 50;
long keepAliveTime = 60;
BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(100);
ThreadFactory threadFactory = Executors.defaultThreadFactory();
threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, TimeUnit.SECONDS, workingQueue, threadFactory);
}
}

这里简化了一下,RpcServer暂时只能注册一个接口,即对外提供一个接口的
调用服务,添加register方法,在注册完一个服务后立刻开始监听:

1
2
3
4
5
6
7
8
9
10
11
12
public void register(Object service, int port) {
try (ServerSocket serverSocket = new ServerSocket(port)) {
logger.info("服务器正在启动...");
Socket socket;
while((socket = serverSocket.accept()) != null) {
logger.info("客户端连接!Ip为:" + socket.getInetAddress());
threadPool.execute(new WorkerThread(socket, service));
}
} catch (IOException e) {
logger.error("连接时有错误发生:", e);
}
}

这里向工作线程WorkerThread传入了socket和用于服务端实例service。
WorkerThread实现了Runnable接口,用于接收RpcRequest对象,解析并且调
用,生成RpcResponse对象并传输回去。run方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public void run() {
try (ObjectInputStream objectInputStream = new ObjectInputStream(
socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(
socket.getOutputStream())) {
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Method method = service.getClass().getMethod(rpcRequest.
getMethodName(), rpcRequest.getParamTypes());
Object returnObject = method.invoke(service, rpcRequest.
getParameters());
objectOutputStream.writeObject(RpcResponse.success(returnObject));
objectOutputStream.flush();
} catch (IOException | ClassNotFoundException |
NoSuchMethodException | IllegalAccessException |
InvocationTargetException e) {
logger.error("调用或发送时有错误发生:", e);
}
}

其中通过class.getMethod方法,传入方法名和方法参数类型即可获得Method
对象。如果你上面RpcRequest中使用String数组来存储方法参数类型的话,这
里你就需要通过反射生成对应的Class数组了。通过method.invoke方法,传入
对象实例和参数,即可调用并且获得返回值

测试

服务端侧,我们已经在上面实现了一个HelloService的实现类HelloServiceImpl
的实现类了,我们只需要创建一个RpcServer并且把这个实现类注册进去就行了:

1
2
3
4
5
6
7
public class TestServer {
public static void main(String[] args) {
HelloService helloService = new HelloServiceImpl();
RpcServer rpcServer = new RpcServer();
rpcServer.register(helloService, 9000);
}
}

服务端开放在9000端口。客户端方面,我们需要通过动态代理,生成代理对象
,并且调用,动态代理会自动帮我们向服务端发送请求的:

1
2
3
4
5
6
7
8
9
public class TestClient {
public static void main(String[] args) {
RpcClientProxy proxy = new RpcClientProxy("127.0.0.1", 9000);
HelloService helloService = proxy.getProxy(HelloService.class);
HelloObject object = new HelloObject(12, "This is a message");
String res = helloService.hello(object);
System.out.println(res);
}
}

注册多个服务

之前一个服务器只能注册一个服务,将服务的注册和服务器启动分离,使得服务
端可以提供多个服务

服务注册表

我们需要一个容器,这个容器很简单,就是保存一些本地服务的信息,并且在获
得一个服务名字的时候能够返回这个服务的信息。创建一个ServiceRegistry接

1
2
3
4
public interface ServiceRegistry {
<T> void register(T service);
Object getService(String serviceName);
}

一个register注册服务信息,一个getService获取服务信息。
我们新建一个默认的注册表类DefaultServiceRegistry 来实现这个接口,提供
服务注册服务,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class DefaultServiceRegistry implements ServiceRegistry {

private static final Logger logger = LoggerFactory.
getLogger(DefaultServiceRegistry.class);

private final Map<String, Object> serviceMap = new ConcurrentHashMap<>();
private final Set<String> registeredService = ConcurrentHashMap.newKeySet();

@Override
public synchronized <T> void register(T service) {
String serviceName = service.getClass().getCanonicalName();
if(registeredService.contains(serviceName)) return;
registeredService.add(serviceName);
Class<?>[] interfaces = service.getClass().getInterfaces();
if(interfaces.length == 0) {
throw new RpcException(RpcError.SERVICE_NOT_IMPLEMENT_ANY_INTERFACE);
}
for(Class<?> i : interfaces) {
serviceMap.put(i.getCanonicalName(), service);
}
logger.info("向接口: {} 注册服务: {}", interfaces, serviceName);
}

@Override
public synchronized Object getService(String serviceName) {
Object service = serviceMap.get(serviceName);
if(service == null) {
throw new RpcException(RpcError.SERVICE_NOT_FOUND);
}
return service;
}
}

我们将服务名与提供服务的对象的对应关系保存在一个ConcurrentHashMap 中
,并且使用一个 Set 来保存当前有哪些对象已经被注册。在注册服务时,默认
采用这个对象实现的接口的完整类名作为服务名,例如某个对象A 实现了接口
X 和Y,那么将A 注册进去后,会有两个服务名X 和Y 对应于A 对象。这种处
理方式也就说明了某个接口只能有一个对象提供服务。

其他处理

为了降低耦合度,我们不会把ServiceRegistry 和某一个 RpcServer 绑定
在一起,而是在创建RpcServer 对象时,传入一个ServiceRegistry作为这
个服务的注册表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class RpcServer {
private static final Logger logger = LoggerFactory.getLogger(
RpcServer.class);
private static final int CORE_POOL_SIZE = 5;
private static final int MAXIMUM_POOL_SIZE = 50;
private static final int KEEP_ALIVE_TIME = 60;
private static final int BLOCKING_QUEUE_CAPACITY = 100;
private final ExecutorService threadPool;
private RequestHandler requestHandler = new RequestHandler();
private final ServiceRegistry serviceRegistry;
public RpcServer(ServiceRegistry serviceRegistry) {
this.serviceRegistry = serviceRegistry;
BlockingQueue<Runnable> workingQueue = new
ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
ThreadFactory threadFactory = Executors.defaultThreadFactory();
threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
workingQueue, threadFactory);
}

public void start(int port) {
try (ServerSocket serverSocket = new ServerSocket(port)) {
logger.info("服务器启动……");
Socket socket;
while((socket = serverSocket.accept()) != null) {
logger.info("消费者连接: {}:{}", socket.getInetAddress(),
socket.getPort());
threadPool.execute(new RequestHandlerThread(
socket, requestHandler, serviceRegistry));
}
threadPool.shutdown();
} catch (IOException e) {
logger.error("服务器启动时有错误发生:", e);
}
}
}

在创建 RpcServer 时需要传入一个已经注册好服务的ServiceRegistry,
而原来的register 方法也被改成了start 方法,因为服务的注册已经不由
RpcServer 处理了,它只需要启动就行了。
而在每一个请求处理线程中也就需要传入ServiceRegistry 了,这里把处理
线程和处理逻辑分成了两个类:RequestHandlerThread 只是一个线程,从
ServiceRegistry 获取到提供服务的对象后,就会把 RpcRequest 和服务
对象直接交给RequestHandler去处理,反射等过程被放到RequestHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class RequestHandlerThread implements Runnable {

private static final Logger logger = LoggerFactory.
getLogger(RequestHandlerThread.class);
private Socket socket;
private RequestHandler requestHandler;
private ServiceRegistry serviceRegistry;

public RequestHandlerThread(Socket socket, RequestHandler
requestHandler, ServiceRegistry serviceRegistry) {
this.socket = socket;
this.requestHandler = requestHandler;
this.serviceRegistry = serviceRegistry;
}

@Override
public void run() {
try (ObjectInputStream objectInputStream = new ObjectInputStream(
socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(
socket.getOutputStream())) {
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
String interfaceName = rpcRequest.getInterfaceName();
Object service = serviceRegistry.getService(interfaceName);
Object result = requestHandler.handle(rpcRequest, service);
objectOutputStream.writeObject(RpcResponse.success(result));
objectOutputStream.flush();
} catch (IOException | ClassNotFoundException e) {
logger.error("调用或发送时有错误发生:", e);
}
}
}

RequestHandler.java:通过反射进行方法调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class RequestHandler {
private static final Logger logger = LoggerFactory.getLogger(
RequestHandler.class);

public Object handle(RpcRequest rpcRequest, Object service) {
Object result = null;
try {
result = invokeTargetMethod(rpcRequest, service);
logger.info("服务:{} 成功调用方法:{}", rpcRequest.
getInterfaceName(), rpcRequest.getMethodName());
} catch (IllegalAccessException | InvocationTargetException e) {
logger.error("调用或发送时有错误发生:", e);
} return result;
}
private Object invokeTargetMethod(RpcRequest rpcRequest, Object
service) throws IllegalAccessException, InvocationTargetException {
Method method;
try {
method = service.getClass().getMethod(rpcRequest.
getMethodName(), rpcRequest.getParamTypes());
} catch (NoSuchMethodException e) {
return RpcResponse.fail(ResponseCode.METHOD_NOT_FOUND);
}
return method.invoke(service, rpcRequest.getParameters());
}
}

测试

1
2
3
4
5
6
7
8
9
public class TestServer {
public static void main(String[] args) {
HelloService helloService = new HelloServiceImpl();
ServiceRegistry serviceRegistry = new DefaultServiceRegistry();
serviceRegistry.register(helloService);
RpcServer rpcServer = new RpcServer(serviceRegistry);
rpcServer.start(9000);
}
}

Netty传输和通用序列化接口

父子 channel

在 Netty 中,Channel 是一个Socket 连接的抽象, 它为用户提供了关于底
层Socket 状态(是否是连接还是断开) 以及对 Socket 的读写等操作。
每当 Netty 建立了一个连接后,都会有一个对应的 Channel 实例。并且,有
父子channel 的概念。服务器连接监听的channel ,也叫parent channel。
对应于每一个 Socket 连接的channel,也叫 child channel。

Netty 服务端与客户端

BossEventLoop 只负责处理连接,故开销非常小,连接到来,马上按照策略将
SocketChannel 转发给 WorkerEventLoopGroup,WorkerEventLoopGroup
会由 next 选择其中一个 EventLoop 来将这 个SocketChannel 注册到其
维护的 Selector 并对其后续的 IO 事件进行处理。
ChannelPipeline 中的每一个 ChannelHandler 都是通过它的 EventLoop
(I/O 线程)来处理传递给它的事件的。所以至关重要的是不要阻塞这个线程
,因为这会对整体的 I/O 处理产生严重的负面影响。但有时可能需要与那些
使用阻塞 API 的遗留代码进行交互。
Netty所有的I/O操作都是异步的。因为一个操作可能不会立即返回,所以我们
需要一种用于在之后得某个时间点确定其结果的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class NettyServer implements RpcServer {
private static final Logger logger = LoggerFactory.getLogger(
NettyServer.class);

@Override
public void start(int port) {
//bossGroup线程的机制是多路复用,虽然是一个线程但是可以监听多个新连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {

ServerBootstrap serverBootstrap = new ServerBootstrap();
//是初始化两个线程,一个线程负责接受新的连接,一个负责处理读写
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 256)
.option(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CommonEncoder(new
JsonSerializer()));
pipeline.addLast(new CommonDecoder());
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture future = serverBootstrap.bind(port).sync();
future.channel().closeFuture().sync();

} catch (InterruptedException e) {
logger.error("启动服务器时有错误发生: ", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

Netty 中有一个很重要的设计模式——责任链模式,责任链上有多个处理器,每
个处理器都会对数据进行加工,并将处理后的数据传给下一个处理器。代码中的 CommonEncoder、CommonDecoder和NettyServerHandler 分别就是编码器,
解码器和数据处理器。因为数据从外部传入时需要解码,而传出时需要编码,
类似计算机网络的分层模型,每一层向下层传递数据时都要加上该层的信息
,而向上层传递时则需要对本层信息进行解码。

自定义协议与编解码器

在传输过程中,我们可以在发送的数据上加上各种必要的数据,形成自定义的协
议,而自动加上这个数据就是编码器的工作,解析数据获得原始数据就是解码器
的工作。

序列化接口

基于Nacos 的服务器注册与发现

我们目前实现的框架看起来工作的还不错,但是有一个问题:我们的服务端地址
是固化在代码中的,也就是说,对于一个客户端,它只会去寻找那么一个服务提
供者,如果这个提供者挂了或者换了地址,那就没有办法了。
在分布式架构中,有一个重要的组件,就是服务注册中心,它用于保存多个服务
提供者的信息,每个服务提供者在启动时都需要向注册中心注册自己所拥有的服
务。这样客户端在发起RPC 时,就可以直接去向注册中心请求服务提供者的信息
,如果拿来的这个挂了,还可以重新请求,并且在这种情况下可以很方便地实现
负载均衡。常见的注册中心有 Eureka、Zookeeper 和 Nacos。

获得 Nacos

Nacos 是阿里开发的一款服务注册中心,在SpringCloud Alibaba 逐步替代原
始的SpringCloud 的过程中,Nacos 逐步走红,所以我们就是用 Nacos 作为
我们的注册中心

Author: 高明
Link: https://skysea-gaoming.github.io/2021/04/15/RPC/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.