RPC学习笔记与项目实战

Guide哥RPC学习笔记

RPC简介

什么是RPC?

RPC(Remote Procedure Call) 即远程过程调用,通过名字我们就能看出 RPC 关注的是远程调用而非本地调用。

为什么要 RPC ? 因为,两个不同的服务器上的服务提供的方法不在一个内存空间,所以,需要通过网络编程才能传递方法调用所需要的参数。并且,方法调用的结果也需要通过网络编程来接收。但是,如果我们自己手动网络编程来实现这个调用过程的话工作量是非常大的,因为,我们需要考虑底层传输方式(TCP 还是 UDP)、序列化方式等等方面。

RPC 能帮助我们做什么呢? 简单来说,通过 RPC 可以帮助我们调用远程计算机上某个服务的方法,这个过程就像调用本地方法一样简单。并且,我们不需要了解底层网络编程的具体细节。

举个例子:两个不同的服务 A、B 部署在两台不同的机器上,服务 A 如果想要调用服务 B 中的某个方法的话就可以通过 RPC 来做。

一言蔽之:RPC 的出现就是为了让你调用远程方法像调用本地方法一样简单。

RPC原理

RPC的核心功能由以下5个部分实现:

  1. 客户端(服务消费端):调用远程方法的一端。
  2. 客户端 Stub(桩): 这其实就是一代理类。代理类主要做的事情很简单,就是把你调用方法、类、方法参数等信息传递到服务端。
  3. 网络传输: 网络传输就是你要把你调用的方法的信息比如说参数啊这些东西传输到服务端,然后服务端执行完之后再把返回结果通过网络传输给你传输回来。网络传输的实现方式有很多种比如最基本的 Socket 或者性能以及封装更加优秀的 Netty(推荐)。
  4. 服务端 Stub(桩):这个桩就不是代理类了。我觉得理解为桩实际不太好,大家注意一下就好。这里的服务端 Stub 实际指的就是接收到客户端执行方法的请求后,去指定对应的方法然后返回结果给客户端的类。
  5. 服务端(服务提供端):提供远程方法的一端。

RPC原理图

执行流程:

  1. **服务消费端(client):**以本地调用的方式调用远程服务;
  2. 客户端 Stub(client stub): 接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体(序列化):RpcRequest
  3. 客户端 Stub(client stub): 找到远程服务的地址,并将消息发送到服务提供端;
  4. **服务端 Stub(桩):**收到消息将消息反序列化为 Java 对象: RpcRequest
  5. **服务端 Stub(桩):**根据RpcRequest中的类、方法、方法参数等信息调用本地的方法;
  6. **服务端 Stub(桩):**得到方法执行结果并将组装成能够进行网络传输的消息体:RpcResponse(序列化)发送至消费方;
  7. **客户端 Stub(client stub):**接收到消息并将消息反序列化为 Java 对象:RpcResponse ,这样也就得到了最终结果。

常见RPC框架

这里说的 RPC 框架指的是可以让客户端直接调用服务端方法,就像调用本地方法一样简单的框架,比如我下面介绍的 DubboMotangRPC这些。 如果需要和 HTTP 协议打交道,解析和封装 HTTP 请求和响应。这类框架并不能算是“RPC 框架”,比如Feign。

Dubbo

Dubbo

Apache Dubbo 是一款微服务框架,为大规模微服务实践提供高性能 RPC 通信、流量治理、可观测性等解决方案, 涵盖 Java、Golang 等多种语言 SDK 实现。

Dubbo 提供了从服务定义、服务发现、服务通信到流量管控等几乎所有的服务治理能力,支持 Triple 协议(基于 HTTP/2 之上定义的下一代 RPC 通信协议)、应用级服务发现、Dubbo Mesh (Dubbo3 赋予了很多云原生友好的新特性)等特性。

7d56f751-63a9-4ee6-9834-a71121858d64

Dubbo 是由阿里开源,后来加入了 Apache 。正是由于 Dubbo 的出现,才使得越来越多的公司开始使用以及接受分布式架构。

Motan

Motan 是新浪微博开源的一款 RPC 框架,据说在新浪微博正支撑着千亿次调用。不过笔者倒是很少看到有公司使用,而且网上的资料也比较少。

很多人喜欢拿 Motan 和 Dubbo 作比较,毕竟都是国内大公司开源的。笔者在查阅了很多资料,以及简单查看了其源码之后发现:Motan 更像是一个精简版的 Dubbo,可能是借鉴了 Dubbo 的思想,Motan 的设计更加精简,功能更加纯粹。

不过,不推荐在实际项目中使用 Motan。如果要是公司实际使用的话,还是推荐 Dubbo ,其社区活跃度以及生态都要好很多。

gRPC

gRPC

gRPC 是 Google 开源的一个高性能、通用的开源 RPC 框架。其由主要面向移动应用开发并基于 HTTP/2 协议标准而设计(支持双向流、消息头压缩等功能,更加节省带宽),基于 ProtoBuf 序列化协议开发,并且支持众多开发语言。

何谓 ProtoBuf? ProtoBuf( Protocol Buffer) 是一种更加灵活、高效的数据格式,可用于通讯协议、数据存储等领域,基本支持所有主流编程语言且与平台无关。不过,通过 ProtoBuf 定义接口和数据类型还挺繁琐的,这是一个小问题。

2f96b290-3173-4807-a6b8-49ab1424a31e

不得不说,gRPC 的通信层的设计还是非常优秀的,Dubbo-go 3.0 的通信层改进主要借鉴了 gRPC。

不过,gRPC 的设计导致其几乎没有服务治理能力。如果你想要解决这个问题的话,就需要依赖其他组件比如腾讯的 PolarisMesh(北极星)了。

Thirft

Apache Thrift 是 Facebook 开源的跨语言的 RPC 通信框架,目前已经捐献给 Apache 基金会管理,由于其跨语言特性和出色的性能,在很多互联网公司得到应用,有能力的公司甚至会基于 thrift 研发一套分布式服务框架,增加诸如服务注册、服务发现等功能。

Thrift支持多种不同的编程语言,包括C++、Java、Python、PHP、Ruby等(相比于 gRPC 支持的语言更多 )。

总结

gRPC 和 Thrift 虽然支持跨语言的 RPC 调用,但是它们只提供了最基本的 RPC 框架功能,缺乏一系列配套的服务化组件和服务治理功能的支撑。

Dubbo 不论是从功能完善程度、生态系统还是社区活跃度来说都是最优秀的。而且,Dubbo在国内有很多成功的案例比如当当网、滴滴等等,是一款经得起生产考验的成熟稳定的 RPC 框架。最重要的是你还能找到非常多的 Dubbo 参考资料,学习成本相对也较低。

下图展示了 Dubbo 的生态系统。

911676cf-9f1e-4b79-9d23-832aeab3af6d

Dubbo 也是 Spring Cloud Alibaba 里面的一个组件。

86b79899-0d94-4277-88f2-1bc8199d2d82

但是,Dubbo 和 Motan 主要是给 Java 语言使用。虽然,Dubbo 和 Motan 目前也能兼容部分语言,但是不太推荐。如果需要跨多种语言调用的话,可以考虑使用 gRPC。

综上,如果是 Java 后端技术栈,并且你在纠结选择哪一种 RPC 框架的话,我推荐你考虑一下 Dubbo。

如何自己实现一个RPC框架

像设计一个 RPC 框架/消息队列这类问题在面试中还是非常常见的。这是一道你花点精力稍微准备一下就能回答上来的一个问题。如果你回答的比较好的话,那面试官肯定会对你印象非常不错!

消息队列的设计实际上和 RPC 框架/非常类似,我这里就先拿 RPC 框架开涮。

如何自己设计一个RPC框架

一般情况下, RPC 框架不仅要提供服务发现功能,还要提供负载均衡、容错等功能,这样的 RPC 框架才算真正合格的。

RPC架构图

从上图我们可以看出:服务提供端 Server 向注册中心注册服务,服务消费者 Client 通过注册中心拿到服务相关信息,然后再通过网络请求服务提供端 Server。

作为 RPC 框架领域的佼佼者Dubbo的架构如下图所示,和我们上面画的大体也是差不多的。

Dubbo架构图

下面我们再来看一个比较完整的 RPC 框架使用示意图如下:

06f58c75-1637-456b-bea9-a83060a62c01

参考上面这张图,我们简单说一下设计一个最基本的 RPC 框架的思路或者说实现一个最基本的 RPC 框架需要哪些东西:

注册中心

注册中心首先是要有的。比较推荐使用 Zookeeper 作为注册中心。当然了,你也可以使用 Nacos ,甚至是 Redis。

ZooKeeper 为我们提供了高可用、高性能、稳定的分布式数据一致性解决方案,通常被用于实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能。并且,ZooKeeper 将数据保存在内存中,性能是非常棒的。 在“读”多于“写”的应用程序中尤其地高性能,因为“写”会导致所有的服务器间同步状态。(“读”多于“写”是协调服务的典型场景)。

关于 ZooKeeper 的更多介绍可以查看这篇文章:《ZooKeeper 相关概念总结》

当然了,如果你想通过文件来存储服务地址的话也是没问题的,不过性能会比较差。

注册中心负责服务地址的注册与查找,相当于目录服务。 服务端启动的时候将服务名称及其对应的地址(ip+port)注册到注册中心,服务消费端根据服务名称找到对应的服务地址。有了服务地址之后,服务消费端就可以通过网络请求服务端了。

我们再来结合 Dubbo 的架构图来理解一下

d0ba7446-455a-40e8-bba2-58f03634860d

上述节点简单说明:

  • Provider: 暴露服务的服务提供方
  • Consumer: 调用远程服务的服务消费方
  • Registry: 服务注册与发现的注册中心
  • Monitor: 统计服务的调用次数和调用时间的监控中心
  • Container: 服务运行容器

调用关系说明:

  1. 服务容器负责启动,加载,运行服务提供者。
  2. 服务提供者在启动时,向注册中心注册自己提供的服务。
  3. 服务消费者在启动时,向注册中心订阅自己所需的服务。
  4. 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
  5. 服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。
  6. 服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。

网络传输

既然我们要调用远程的方法,就要发送网络请求来传递目标类和方法的信息以及方法的参数等数据到服务提供端。

网络传输具体实现你可以使用 Socket ( Java 中最原始、最基础的网络通信方式。但是,Socket 是阻塞 IO、性能低并且功能单一)。

你也可以使用同步非阻塞的 I/O 模型 NIO ,但是用它来进行网络编程真的太麻烦了。不过没关系,你可以使用基于 NIO 的网络编程框架 Netty ,它将是你最好的选择!

先简单介绍一下 Netty ,后面的文章中会详细介绍到

  1. Netty 是一个基于 NIO 的 client-server(客户端服务器)框架,使用它可以快速简单地开发网络应用程序。
  2. 它极大地简化了 TCP 和 UDP 套接字服务器等网络编程,并且性能以及安全性等很多方面甚至都要更好。
  3. 支持多种协议如 FTP,SMTP,HTTP 以及各种二进制和基于文本的传统协议。

序列化和反序列化

要在网络传输数据就要涉及到序列化为什么需要序列化和反序列化呢?

因为网络传输的数据必须是二进制的。因此,我们的 Java 对象没办法直接在网络中传输。为了能够让 Java 对象在网络中传输我们需要将其序列化为二进制的数据。我们最终需要的还是目标 Java 对象,因此我们还要将二进制的数据“解析”为目标 Java 对象,也就是对二进制数据再进行一次反序列化

另外,不仅网络传输的时候需要用到序列化和反序列化,将对象存储到文件、数据库等场景都需要用到序列化和反序列化。

序列化和反序列化

JDK 自带的序列化,只需实现 java.io.Serializable接口即可,不过这种方式不推荐,因为不支持跨语言调用并且性能比较差。

现在比较常用序列化的有 hessiankryoprotostuff ……。我会在下一篇文章中简单对比一下这些序列化方式。

动态代理

动态代理也是需要的。很多人可能不清楚为啥需要动态代理?我来简单解释一下吧!

我们知道代理模式就是: 我们给某一个对象提供一个代理对象,并由代理对象来代替真实对象做一些事情。你可以把代理对象理解为一个幕后的工具人。 举个例子:我们真实对象调用方法的时候,我们可以通过代理对象去做一些事情比如安全校验、日志打印等等。但是,这个过程是完全对真实对象屏蔽的。

讲完了代理模式,再来说动态代理在 RPC 框架中的作用。

前面第一节的时候,我们就已经提到 :RPC 的主要目的就是让我们调用远程方法像调用本地方法一样简单,我们不需要关心远程方法调用的细节比如网络传输。

怎样才能屏蔽远程方法调用的底层细节呢?

答案就是动态代理。简单来说,当你调用远程方法的时候,实际会通过代理对象来传输网络请求,不然的话,怎么可能直接就调用到远程方法。

相关文章: 代理模式详解:静态代理+JDK/CGLIB 动态代理实战

负载均衡

为什么需要负载均衡?

举个例子:我们的系统中的某个服务的访问量特别大,我们将这个服务部署在了多台服务器上,当客户端发起请求的时候,多台服务器都可以处理这个请求。那么,如何正确选择处理该请求的服务器就很关键。假如,你就要一台服务器来处理该服务的请求,那该服务部署在多台服务器的意义就不复存在了。负载均衡就是为了避免单个服务器响应同一请求,容易造成服务器宕机、崩溃等问题,我们从负载均衡的这四个字就能明显感受到它的意义。

传输协议

我们还需要设计一个私有的 RPC 协议,这个协议是客户端(服务消费方)和服务端(服务提供方)交流的基础。

简单来说:通过设计协议,我们定义需要传输哪些类型的数据, 并且还会规定每一种类型的数据应该占多少字节。这样我们在接收到二进制数据之后,就可以正确的解析出我们需要的数据。这有一点像密文传输的感觉。

通常一些标准的 RPC 协议包含下面这些内容:

  • 魔数: 通常是 4 个字节。这个魔数主要是为了筛选来到服务端的数据包,有了这个魔数之后,服务端首先取出前面四个字节进行比对,能够在第一时间识别出这个数据包并非是遵循自定义协议的,也就是无效数据包,为了安全考虑可以直接关闭连接以节省资源。
  • 序列化器编号:标识序列化的方式,比如是使用 Java 自带的序列化,还是 json,kryo 等序列化方式。
  • 消息体长度: 运行时计算出来。
  • ……

实现一个最基本的 RPC 框架需要哪些技术

按照我实现的这一款基于 Netty+kryo+Zookeeper 实现的 RPC 框架来说的话,你需要下面这些技术支撑:

Java

  1. 动态代理机制;
  2. 序列化机制以及各种序列化框架的对比,比如 hession2、kryo、protostuff;
  3. 线程池的使用;
  4. CompletableFuture 的使用;
  5. ……

Netty

  1. 使用 Netty 进行网络传输;
  2. ByteBuf 介绍;
  3. Netty 粘包拆包;
  4. Netty 长连接和心跳机制;
  5. ……

Zookeeper

  1. 基本概念;
  2. 数据结构;
  3. 如何使用 Netflix 公司开源的 Zookeeper 客户端框架 Curator 进行增删改查;
  4. ……

总结

实现一个最基本的 RPC 框架应该至少包括下面几部分:

  1. 注册中心:注册中心负责服务地址的注册与查找,相当于目录服务。
  2. 网络传输:既然我们要调用远程的方法,就要发送网络请求来传递目标类和方法的信息以及方法的参数等数据到服务提供端。
  3. 序列化和反序列化:要在网络传输数据就要涉及到序列化。
  4. 动态代理:屏蔽远程方法调用的底层细节。
  5. 负载均衡: 避免单个服务器响应同一请求,容易造成服务器宕机、崩溃等问题。
  6. 传输协议:这个协议是客户端(服务消费方)和服务端(服务提供方)交流的基础。

更完善的一点的 RPC 框架可能还有监控模块(拓展:你可以研究一下 Dubbo 的监控模块的设计)。

序列化介绍以及序列化协议选择

什么是序列化和反序列化

如果我们需要持久化 Java 对象比如将 Java 对象保存在文件中,或者在网络传输 Java 对象,这些场景都需要用到序列化。

简单来说:

  • 序列化:将数据结构或对象转换成二进制字节流的过程
  • 反序列化:将在序列化过程中所生成的二进制字节流转换成数据结构或者对象的过程

对于 Java 这种面向对象编程语言来说,我们序列化的都是对象(Object)也就是实例化后的类(Class),但是在 C++这种半面向对象的语言中,struct(结构体)定义的是数据结构类型,而 class 对应的是对象类型。

下面是序列化和反序列化常见应用场景:

  • 对象在进行网络传输(比如远程方法调用 RPC 的时候)之前需要先被序列化,接收到序列化的对象之后需要再进行反序列化;
  • 将对象存储到文件之前需要进行序列化,将对象从文件中读取出来需要进行反序列化;
  • 将对象存储到数据库(如 Redis)之前需要用到序列化,将对象从缓存数据库中读取出来需要反序列化;
  • 将对象存储到内存之前需要进行序列化,从内存中读取出来之后需要进行反序列化。

维基百科是这样介绍序列化的:

序列化(serialization)在计算机科学的数据处理中,是指将数据结构或对象状态转换成可取用格式(例如存成文件,存于缓冲,或经由网络中发送),以留待后续在相同或另一台计算机环境中,能恢复原先状态的过程。依照序列化格式重新获取字节的结果时,可以利用它来产生与原始对象相同语义的副本。对于许多对象,像是使用大量引用的复杂对象,这种序列化重建的过程并不容易。面向对象中的对象序列化,并不概括之前原始对象所关系的函数。这种过程也称为对象编组(marshalling)。从一系列字节提取数据结构的反向操作,是反序列化(也称为解编组、deserialization、unmarshalling)。

综上:序列化的主要目的是通过网络传输对象或者说是将对象存储到文件系统、数据库、内存中。

序列化和反序列化

序列化协议对应于 TCP/IP 4 层模型的哪一层?

我们知道网络通信的双方必须要采用和遵守相同的协议。TCP/IP 四层模型是下面这样的,序列化协议属于哪一层呢?

  1. 应用层
  2. 传输层
  3. 网络层
  4. 网络接口层

64adec3d-78ee-488f-92ad-ad3013006985

如上图所示,OSI 七层协议模型中,表示层做的事情主要就是对应用层的用户数据进行处理转换为二进制流。反过来的话,就是将二进制流转换成应用层的用户数据。这不就对应的是序列化和反序列化么?

因为,OSI 七层协议模型中的应用层、表示层和会话层对应的都是 TCP/IP 四层模型中的应用层,所以序列化协议属于 TCP/IP 协议应用层的一部分。

常见的序列化协议有哪些

JDK 自带的序列化方式一般不会用 ,因为序列化效率低并且存在安全问题。比较常用的序列化协议有 Hessian、Kryo、Protobuf、ProtoStuff,这些都是基于二进制的序列化协议。

像 JSON 和 XML 这种属于文本类序列化方式。虽然可读性比较好,但是性能较差,一般不会选择。

JDK自带的序列化方式

JDK 自带的序列化,只需实现 java.io.Serializable接口即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Builder
@ToString
public class RpcRequest implements Serializable {
private static final long serialVersionUID = 1905122041950251207L;
private String requestId;
private String interfaceName;
private String methodName;
private Object[] parameters;
private Class<?>[] paramTypes;
private RpcMessageTypeEnum rpcMessageTypeEnum;
}

serialVersionUID 有什么作用?

序列化号 serialVersionUID 属于版本控制的作用。反序列化时,会检查 serialVersionUID 是否和当前类的 serialVersionUID 一致。如果 serialVersionUID 不一致则会抛出 InvalidClassException 异常。强烈推荐每个序列化类都手动指定其 serialVersionUID,如果不手动指定,那么编译器会动态生成默认的 serialVersionUID

serialVersionUID 不是被 static 变量修饰了吗?为什么还会被“序列化”?

static 修饰的变量是静态变量,位于方法区,本身是不会被序列化的。 static 变量是属于类的而不是对象。你反序列之后,static 变量的值就像是默认赋予给了对象一样,看着就像是 static 变量被序列化,实际只是假象罢了。

如果有些字段不想进行序列化怎么办?

对于不想进行序列化的变量,可以使用 transient 关键字修饰。

transient 关键字的作用是:阻止实例中那些用此关键字修饰的的变量序列化;当对象被反序列化时,被 transient 修饰的变量值不会被持久化和恢复。

关于 transient 还有几点注意:

  • transient只能修饰变量,不能修饰类和方法。
  • transient修饰的变量,在反序列化后变量值将会被置成类型的默认值。例如,如果是修饰 int 类型,那么反序列后结果就是 0
  • static 变量因为不属于任何对象(Object),所以无论有没有 transient 关键字修饰,均不会被序列化。

为什么不推荐使用 JDK 自带的序列化?

我们很少或者说几乎不会直接使用 JDK 自带的序列化方式,主要原因有下面这些原因:

  • 不支持跨语言调用: 如果调用的是其他语言开发的服务的时候就不支持了。
  • 性能差:相比于其他序列化框架性能更低,主要原因是序列化之后的字节数组体积较大,导致传输成本加大。
  • 存在安全问题:序列化和反序列化本身并不存在问题。但当输入的反序列化的数据可被用户控制,那么攻击者即可通过构造恶意输入,让反序列化产生非预期的对象,在此过程中执行构造的任意代码。相关阅读:应用安全:JAVA反序列化漏洞之殇 - CryinJava反序列化安全漏洞怎么回事? - Monica

Kyro

Kryo 是一个高性能的序列化/反序列化工具,由于其变长存储特性并使用了字节码生成机制,拥有较高的运行速度和较小的字节码体积。

另外,Kryo 已经是一种非常成熟的序列化实现了,已经在 Twitter、Groupon、Yahoo 以及多个著名开源项目(如 Hive、Storm)中广泛的使用。

序列化和反序列化相关的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* Kryo serialization class, Kryo serialization efficiency is very high, but only compatible with Java language
*/
@Slf4j
public class KryoSerializer implements Serializer {

/**
* Because Kryo is not thread safe. So, use ThreadLocal to store Kryo objects
*/
private final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
kryo.register(RpcResponse.class);
kryo.register(RpcRequest.class);
return kryo;
});

@Override
public byte[] serialize(Object obj) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream)) {
Kryo kryo = kryoThreadLocal.get();
// Object->byte:将对象序列化为byte数组
kryo.writeObject(output, obj);
kryoThreadLocal.remove();
return output.toBytes();
} catch (Exception e) {
throw new SerializeException("Serialization failed");
}
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream)) {
Kryo kryo = kryoThreadLocal.get();
// byte->Object:从byte数组中反序列化出对象
Object o = kryo.readObject(input, clazz);
kryoThreadLocal.remove();
return clazz.cast(o);
} catch (Exception e) {
throw new SerializeException("Deserialization failed");
}
}

}

Github 地址:https://github.com/EsotericSoftware/kryo

Protobuf

Protobuf 出自于 Google,性能还比较优秀,也支持多种语言,同时还是跨平台的。就是在使用中过于繁琐,因为你需要自己定义 IDL 文件和生成对应的序列化代码。这样虽然不灵活,但是,另一方面导致 protobuf 没有序列化漏洞的风险。

Protobuf 包含序列化格式的定义、各种语言的库以及一个 IDL 编译器。正常情况下你需要定义 proto 文件,然后使用 IDL 编译器编译成你需要的语言

一个简单的 proto 文件如下:

1
2
3
4
5
6
7
8
9
// protobuf的版本
syntax = "proto3";
// SearchRequest会被编译成不同的编程语言的相应对象,比如Java中的class、Go中的struct
message Person {
//string类型字段
string name = 1;
// int 类型字段
int32 age = 2;
}

Github 地址:https://github.com/protocolbuffers/protobuf

ProtoStuff

由于 Protobuf 的易用性,它的哥哥 Protostuff 诞生了。

protostuff 基于 Google protobuf,但是提供了更多的功能和更简易的用法。虽然更加易用,但是不代表 ProtoStuff 性能更差。

Github 地址:https://github.com/protostuff/protostuff

Hessian

Hessian 是一个轻量级的,自定义描述的二进制 RPC 协议。Hessian 是一个比较老的序列化实现了,并且同样也是跨语言的。

Hessian

Dubbo2.x 默认启用的序列化方式是 Hessian2 ,但是,Dubbo 对 Hessian2 进行了修改,不过大体结构还是差不多。

总结

Kryo 是专门针对 Java 语言序列化方式并且性能非常好,如果你的应用是专门针对 Java 语言的话可以考虑使用,并且 Dubbo 官网的一篇文章中提到说推荐使用 Kryo 作为生产环境的序列化方式。(文章地址:https://dubbo.apache.org/zh/docs/v2.7/user/references/protocol/rest/)

ae56c10e-532e-49b2-8874-c316ad1b6880

像 Protobuf、 ProtoStuff、hessian 这类都是跨语言的序列化方式,如果有跨语言需求的话可以考虑使用。

除了上面介绍到的序列化方式的话,还有像 Thrift,Avro 这些。

推荐阅读

Socket网络通信实战

什么是Socket(套接字)

Socket 是一个抽象概念,应用程序可以通过它发送或接收数据。在使用 Socket 进行网络通信的时候,通过 Socket 就可以让我们的数据在网络中传输。操作套接字的时候,和我们读写文件很像。套接字是 IP 地址与端口的组合,套接字 Socket=(IP 地址:端口号)

要通过互联网进行通信,至少需要一对套接字:

  1. 运行于服务器端的 Server Socket。
  2. 运行于客户机端的 Client Socket

在 Java 开发中使用 Socket 时会常用到两个类,都在 java.net 包中:

  1. Socket: 一般用于客户端
  2. ServerSocket :用于服务端

Socket网路通信过程

Socket 网络通信过程如下图所示:

aacf635e-4ed6-4ba5-9346-d45500b48098

Socket 网络通信过程简单来说分为下面 4 步:

  1. 建立服务端并且监听客户端请求
  2. 客户端请求,服务端和客户端建立连接
  3. 两端之间可以传递数据
  4. 关闭资源

对应到服务端和客户端的话,是下面这样的。

服务器端:

  1. 创建 ServerSocket 对象并且绑定地址(ip)和端口号(port):server.bind(new InetSocketAddress(host, port))
  2. 通过 accept()方法监听客户端请求
  3. 连接建立后,通过输入流读取客户端发送的请求信息
  4. 通过输出流向客户端发送响应信息
  5. 关闭相关资源

客户端:

  1. 创建Socket 对象并且连接指定的服务器的地址(ip)和端口号(port):socket.connect(inetSocketAddress)
  2. 连接建立后,通过输出流向服务器端发送请求信息
  3. 通过输入流获取服务器响应的信息
  4. 关闭相关资源

Socket网络通信实战

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class HelloServer {
private static final Logger logger = LoggerFactory.getLogger(HelloServer.class);

public void start(int port) {
//1.创建 ServerSocket 对象并且绑定一个端口
try (ServerSocket server = new ServerSocket(port);) {
Socket socket;
//2.通过 accept()方法监听客户端请求
while ((socket = server.accept()) != null) {
logger.info("client connected");
try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
//3.通过输入流读取客户端发送的请求信息
Message message = (Message) objectInputStream.readObject();
logger.info("server receive message:" + message.getContent());
message.setContent("new content");
//4.通过输出流向客户端发送响应信息
objectOutputStream.writeObject(message);
objectOutputStream.flush();
} catch (IOException | ClassNotFoundException e) {
logger.error("occur exception:", e);
}
}
} catch (IOException e) {
logger.error("occur IOException:", e);
}
}

public static void main(String[] args) {
HelloServer helloServer = new HelloServer();
helloServer.start(6666);
}
}

ServerSocketaccept() 方法是阻塞方法,也就是说 ServerSocket 在调用 accept()等待客户端的连接请求时会阻塞,直到收到客户端发送的连接请求才会继续往下执行代码。

很明显,上面演示的代码片段有一个很严重的问题:只能同时处理一个客户端的连接,如果需要管理多个客户端的话,就需要为我们请求的客户端单独创建一个线程。 如下图所示:

e82fa468-e17d-445a-8668-01b11e573a8a

对应的 Java 代码可能是下面这样的:

1
2
3
new Thread(() -> {
// 创建 socket 连接
}).start();

但是,这样会导致一个很严重的问题:资源浪费

我们知道线程是很宝贵的资源,如果我们为每一次连接都用一个线程处理的话,就会导致线程越来越多,最后达到了极限之后,就无法再创建线程处理请求了。处理的不好的话,甚至可能直接就宕机掉了。

很多人就会问了:那有没有改进的方法呢?

当然有! 比较简单并且实际的改进方法就是使用线程池。线程池还可以让线程的创建和回收成本相对较低,并且我们可以指定线程池的可创建线程的最大数量,这样就不会导致线程创建过多,机器资源被不合理消耗。

1
2
3
4
5
ThreadFactory threadFactory = Executors.defaultThreadFactory();
ExecutorService threadPool = new ThreadPoolExecutor(10, 100, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100), threadFactory);
threadPool.execute(() -> {
// 创建 socket 连接
});

但是,即使你再怎么优化和改变。也改变不了它的底层仍然是同步阻塞的 BIO 模型的事实,因此无法从根本上解决问题。

为了解决上述的问题,Java 1.4 中引入了 NIO ,一种同步非阻塞的 I/O 模型。 由于使用同步非阻塞的 I/O 模型 NIO 来进行网络编程真的太麻烦了。你可以使用基于 NIO 的网络编程框架 Netty ,它将是你最好的选择(前面的章节提到过,后面的章节会详细讲解如何使用 Netty 进行网络编程)!

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class HelloClient {

private static final Logger logger = LoggerFactory.getLogger(HelloClient.class);

public Object send(Message message, String host, int port) {
//1. 创建Socket对象并且指定服务器的地址和端口号
try (Socket socket = new Socket(host, port)) {
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
//2.通过输出流向服务器端发送请求信息
objectOutputStream.writeObject(message);
//3.通过输入流获取服务器响应的信息
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
return objectInputStream.readObject();
} catch (IOException | ClassNotFoundException e) {
logger.error("occur exception:", e);
}
return null;
}

public static void main(String[] args) {
HelloClient helloClient = new HelloClient();
Message message = (Message) helloClient.send(new Message("content from client"), "127.0.0.1", 6666);
System.out.println("client receive message:" + message.getContent());
}
}

发送的消息实体类:

1
2
3
4
5
@Data
@AllArgsConstructor
public class Message implements Serializable {
private String content;
}

首先运行服务端,然后再运行客户端,控制台输出如下:

服务端:

1
2
[main] INFO github.javaguide.socket.HelloServer - client connected
[main] INFO github.javaguide.socket.HelloServer - server receive message:content from client

客户端:

1
client receive message:new content

Netty从入门到网络通信实战

Netty介绍

  1. Netty 是一个基于 NIO 的 client-server(客户端服务器)框架,使用它可以快速简单地开发网络应用程序。
  2. 它极大地简化了 TCP 和 UDP 套接字服务器等网络编程,并且性能以及安全性等很多方面甚至都要更好。
  3. 支持多种协议如 FTP,SMTP,HTTP 以及各种二进制和基于文本的传统协议。

用官方的总结就是:Netty 成功地找到了一种在不妥协可维护性和性能的情况下实现易于开发,性能,稳定性和灵活性的方法。

Netty特点

根据官网的描述,我们可以总结出下面一些特点:

  • 统一的 API,支持多种传输类型,阻塞和非阻塞的。
  • 简单而强大的线程模型。
  • 自带编解码器解决 TCP 粘包/拆包问题。
  • 自带各种协议栈。
  • 真正的无连接数据包套接字支持。
  • 比直接使用 Java 核心 API 有更高的吞吐量、更低的延迟、更低的资源消耗和更少的内存复制。
  • 安全性不错,有完整的 SSL/TLS 以及 StartTLS 支持。
  • 社区活跃
  • 成熟稳定,经历了大型项目的使用和考验,而且很多开源项目都使用到了 Netty 比如我们经常接触的 Dubbo、RocketMQ 等等。
  • ……

Netty能做什么

简单说一下,理论上 NIO 可以做的事情 ,使用 Netty 都可以做并且更好。Netty 主要用来做网络通信 :

  • 作为 RPC 框架的网络通信工具 : 我们在分布式系统中,不同服务节点之间经常需要相互调用,这个时候就需要 RPC 框架了。不同服务节点的通信是如何做的呢?可以使用 Netty 来做。比如我调用另外一个节点的方法的话,至少是要让对方知道我调用的是哪个类中的哪个方法以及相关参数吧!
  • 实现一个自己的 HTTP 服务器 :通过 Netty 我们可以自己实现一个简单的 HTTP 服务器,这个大家应该不陌生。说到 HTTP 服务器的话,作为 Java 后端开发,我们一般使用 Tomcat 比较多。一个最基本的 HTTP 服务器可要以处理常见的 HTTP Method 的请求,比如 POST 请求、GET 请求等等。
  • 实现一个即时通讯系统 : 使用 Netty 我们可以实现一个可以聊天类似微信的即时通讯系统,这方面的开源项目还蛮多的,可以自行去 Github 找一找。
  • 消息推送系统 :市面上有很多消息推送系统都是基于 Netty 来做的。
  • ……

哪些开源项目用到了Netty

我们平常经常接触的 Dubbo、RocketMQ、Elasticsearch、gRPC 等等都用到了 Netty。

可以说大量的开源项目都用到了 Netty,所以掌握 Netty 有助于你更好的使用这些开源项目并且让你有能力对其进行二次开发。

实际上还有很多很多优秀的项目用到了 Netty,Netty 官方也做了统计,统计结果在这里:https://netty.io/wiki/related-projects.html

8308a46f-642e-477d-a289-791ffe08b6fd

Netty 使用 Kryo 序列化传输对象实战

注意:Kryo不支持没有无参构造函数的对象进行反序列化,因此如果某个对象希望使用Kryo来进行序列化操作的话,需要有相应的无参构造函数才可以。

传输实体类

我们首先定义两个对象,这两个对象是客户端与服务端进行交互的实体类。 客户端将 RpcRequest类型的对象发送到服务端,服务端进行相应的处理之后将得到结果 RpcResponse 对象返回给客户端。

客户端请求

RpcRequest.java :客户端请求实体类

1
2
3
4
5
6
7
8
9
@AllArgsConstructor
@Getter
@NoArgsConstructor
@Builder
@ToString
public class RpcRequest {
private String interfaceName;
private String methodName;
}
服务器端响应

RpcResponse.java :服务端响应实体类

1
2
3
4
5
6
7
8
@AllArgsConstructor
@Getter
@NoArgsConstructor
@Builder
@ToString
public class RpcResponse {
private String message;
}

客户端

初始化客户端

客户端中主要有一个用于向服务端发送消息的 sendMessage()方法,通过这个方法你可以将消息也就是RpcRequest 对象发送到服务端,并且你可以同步获取到服务端返回的结果也就是RpcResponse 对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class NettyClient {
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
private final String host;
private final int port;
private static final Bootstrap b;

public NettyClient(String host, int port) {
this.host = host;
this.port = port;
}

// 初始化相关资源比如 EventLoopGroup, Bootstrap
static {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
b = new Bootstrap();
KryoSerializer kryoSerializer = new KryoSerializer();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
// 连接的超时时间,超过这个时间还是建立不上的话则代表连接失败
// 如果 15 秒之内没有发送数据给服务端的话,就发送一次心跳请求
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
/*
自定义序列化编解码器
*/
// RpcResponse -> ByteBuf
ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcResponse.class));
// ByteBuf -> RpcRequest
ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcRequest.class));
ch.pipeline().addLast(new NettyClientHandler());
}
});
}

/**
* 发送消息到服务端
*
* @param rpcRequest 消息体
* @return 服务端返回的数据
*/
public RpcResponse sendMessage(RpcRequest rpcRequest) {
try {
ChannelFuture f = b.connect(host, port).sync();
logger.info("client connect {}", host + ":" + port);
Channel futureChannel = f.channel();
logger.info("send message");
if (futureChannel != null) {
futureChannel.writeAndFlush(rpcRequest).addListener(future -> {
if (future.isSuccess()) {
logger.info("client send message: [{}]", rpcRequest.toString());
} else {
logger.error("Send failed:", future.cause());
}
});
//阻塞等待 ,直到Channel关闭
futureChannel.closeFuture().sync();
// 将服务端返回的数据也就是RpcResponse对象取出
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
return futureChannel.attr(key).get();
}
} catch (InterruptedException e) {
logger.error("occur exception when connect server:", e);
}
return null;
}
}

sendMessage()方法分析:

  1. 首先初始化了一个 Bootstrap
  2. 通过 Bootstrap 对象连接服务端
  3. 通过 Channel 向服务端发送消息RpcRequest
  4. 发送成功后,阻塞等待 ,直到Channel关闭
  5. 拿到服务端返回的结果 RpcResponse
自定义 ChannelHandler 处理服务端消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
RpcResponse rpcResponse = (RpcResponse) msg;
logger.info("client receive msg: [{}]", rpcResponse.toString());
// 声明一个 AttributeKey 对象
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
// 将服务端的返回结果保存到 AttributeMap 上,AttributeMap 可以看作是一个Channel的共享数据源
// AttributeMap的key是AttributeKey,value是Attribute
ctx.channel().attr(key).set(rpcResponse);
ctx.channel().close();
} finally {
ReferenceCountUtil.release(msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("client caught exception", cause);
ctx.close();
}
}

NettyClientHandler用于读取服务端发送过来的 RpcResponse 消息对象,并将 RpcResponse 消息对象保存到 AttributeMap 上,AttributeMap 可以看作是一个Channel的共享数据源。

这样的话,我们就能通过 channel 和 key 将数据读取出来。

1
2
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
return futureChannel.attr(key).get();

额外提一下 AttributeMap ,AttributeMap 是一个接口,但是类似于 Map 数据结构 。

1
2
3
4
5
6
public interface AttributeMap {

<T> Attribute<T> attr(AttributeKey<T> key);

<T> boolean hasAttr(AttributeKey<T> key);
}

Channel 实现了 AttributeMap 接口,这样也就表明它存在了AttributeMap 相关的属性。 每个 Channel上的AttributeMap属于共享数据。AttributeMap 的结构,和Map很像,我们可以把 key 看作是AttributeKeyvalue 看作是Attribute,我们可以根据 AttributeKey找到对应的Attribute

1
2
3
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
......
}

服务端

初始化服务端

NettyServer 主要作用就是开启了一个服务端用于接受客户端的请求并处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class NettyServer {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
private final int port;

private NettyServer(int port) {
this.port = port;
}

private void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
KryoSerializer kryoSerializer = new KryoSerializer();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
.childOption(ChannelOption.TCP_NODELAY, true)
// 是否开启 TCP 底层心跳机制
.childOption(ChannelOption.SO_KEEPALIVE, true)
//表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcRequest.class));
ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcResponse.class));
ch.pipeline().addLast(new NettyServerHandler());
}
});

// 绑定端口,同步等待绑定成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.error("occur exception when start server:", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

}
自定义 ChannelHandler 处理客户端消息

NettyServerHandler 用于接收客户端发送过来的消息并返回结果给客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private static final AtomicInteger atomicInteger = new AtomicInteger(1);

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
RpcRequest rpcRequest = (RpcRequest) msg;
logger.info("server receive msg: [{}] ,times:[{}]", rpcRequest, atomicInteger.getAndIncrement());
RpcResponse messageFromServer = RpcResponse.builder().message("message from server").build();
ChannelFuture f = ctx.writeAndFlush(messageFromServer);
f.addListener(ChannelFutureListener.CLOSE);
} finally {
ReferenceCountUtil.release(msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("server catch exception",cause);
ctx.close();
}
}

编码器

自定义编码器

NettyKryoEncoder 是我们自定义的编码器。它负责处理”出站”消息,将消息格式转换为字节数组然后写入到字节数据的容器 ByteBuf 对象中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 自定义编码器。
* <p>
* 网络传输需要通过字节流来实现,ByteBuf 可以看作是 Netty 提供的字节数据的容器,使用它会让我们更加方便地处理字节数据。
*/
@AllArgsConstructor
public class NettyKryoEncoder extends MessageToByteEncoder<Object> {
private final Serializer serializer;
private final Class<?> genericClass;

/**
* 将对象转换为字节码然后写入到 ByteBuf 对象中
*/
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) {
if (genericClass.isInstance(o)) {
// 1. 将对象转换为byte
byte[] body = serializer.serialize(o);
// 2. 读取消息的长度
int dataLength = body.length;
// 3.写入消息对应的字节数组长度,writerIndex 加 4
byteBuf.writeInt(dataLength);
//4.将字节数组写入 ByteBuf 对象中
byteBuf.writeBytes(body);
}
}
}

自定义解码器

NettyKryoDecoder是我们自定义的解码器。它负责处理”入站”消息,它会从 ByteBuf中读取到业务对象对应的字节序列,然后再将字节序列转换为我们的业务对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 自定义解码器
*/
@AllArgsConstructor
@Slf4j
public class NettyKryoDecoder extends ByteToMessageDecoder {

private final Serializer serializer;
private final Class<?> genericClass;

/**
* Netty传输的消息长度也就是对象序列化后对应的字节数组的大小,存储在 ByteBuf 头部
*/
private static final int BODY_LENGTH = 4;

/**
* 解码 ByteBuf 对象
*
* @param ctx 解码器关联的 ChannelHandlerContext 对象
* @param in "入站"数据,也就是 ByteBuf 对象
* @param out 解码之后的数据对象需要添加到 out 对象里面
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {

//1.byteBuf中写入的消息长度所占的字节数已经是4了,所以 byteBuf 的可读字节必须大于 4,
if (in.readableBytes() >= BODY_LENGTH) {
//2.标记当前readIndex的位置,以便后面重置readIndex 的时候使用
in.markReaderIndex();
//3.读取消息的长度
//注意:消息长度是encode的时候我们自己写入的,参见 NettyKryoEncoder 的encode方法
int dataLength = in.readInt();
//4.遇到不合理的情况直接 return
if (dataLength < 0 || in.readableBytes() < 0) {
log.error("data length or byteBuf readableBytes is not valid");
return;
}
//5.如果可读字节数小于消息长度的话,说明是不完整的消息,重置readIndex
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
// 6.走到这里说明没什么问题了,可以序列化了
byte[] body = new byte[dataLength];
in.readBytes(body);
// 将bytes数组转换为我们需要的对象
Object obj = serializer.deserialize(body, genericClass);
out.add(obj);
log.info("successful decode ByteBuf to Object");
}
}
}

自定义序列化接口

Serializer 接口主要有两个方法一个用于序列化,一个用户反序列化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface Serializer {
/**
* 序列化
*
* @param obj 要序列化的对象
* @return 字节数组
*/
byte[] serialize(Object obj);

/**
* 反序列化
*
* @param bytes 序列化后的字节数组
* @param clazz 类
* @param <T>
* @return 反序列化的对象
*/
<T> T deserialize(byte[] bytes, Class<T> clazz);
}

实现序列化接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class KryoSerializer implements Serializer {
/**
* 由于 Kryo 不是线程安全的。每个线程都应该有自己的 Kryo,Input 和 Output 实例。
* 所以,使用 ThreadLocal 存放 Kryo 对象
*/
private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
kryo.register(RpcResponse.class);
kryo.register(RpcRequest.class);
kryo.setReferences(true);//默认值为true,是否关闭注册行为,关闭之后可能存在序列化问题,一般推荐设置为 true
kryo.setRegistrationRequired(false);//默认值为false,是否关闭循环引用,可以提高性能,但是一般不推荐设置为 true
return kryo;
});

@Override
public byte[] serialize(Object obj) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream)) {
Kryo kryo = kryoThreadLocal.get();
// Object->byte:将对象序列化为byte数组
kryo.writeObject(output, obj);
kryoThreadLocal.remove();
return output.toBytes();
} catch (Exception e) {
throw new SerializeException("序列化失败");
}
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream)) {
Kryo kryo = kryoThreadLocal.get();
// byte->Object:从byte数组中反序列化出对对象
Object o = kryo.readObject(input, clazz);
kryoThreadLocal.remove();
return clazz.cast(o);
} catch (Exception e) {
throw new SerializeException("反序列化失败");
}
}

}

自定义序列化异常类 SerializeException 如下:

1
2
3
4
5
public class SerializeException extends RuntimeException {
public SerializeException(String message) {
super(message);
}
}

测试效果

启动服务端:

1
new NettyServer(8889).run();

启动客户端并发送 4 次消息给服务端:

1
2
3
4
5
6
7
8
9
RpcRequest rpcRequest = RpcRequest.builder()
.interfaceName("interface")
.methodName("hello").build();
NettyClient nettyClient = new NettyClient("127.0.0.1", 8889);
for (int i = 0; i < 3; i++) {
nettyClient.sendMessage(rpcRequest);
}
RpcResponse rpcResponse = nettyClient.sendMessage(rpcRequest);
System.out.println(rpcResponse.toString());

5823b0ff-e6f2-402d-83e7-f7d8904809f3

服务端控制台输出:

e2d76c20-4f39-4aff-8723-f0f3fcdca27d

静态代理+JDK/CGLIB 动态代理实战

代理模式

代理模式是一种比较好的理解的设计模式。简单来说就是 我们使用代理对象来代替对真实对象(real object)的访问,这样就可以在不修改原目标对象的前提下,提供额外的功能操作,扩展目标对象的功能。

代理模式的主要作用是扩展目标对象的功能,比如说在目标对象的某个方法执行前后你可以增加一些自定义的操作。

举个例子:你找了小红来帮你问话,小红就可以看作是代理你的代理对象,代理的行为(方法)是问话。

1597030048717-a9d59ae7-2ae3-4537-956b-704129a4826e

代理模式有静态代理和动态代理两种实现方式,我们先来看一下静态代理模式的实现。

静态代理

静态代理中,我们对目标对象的每个方法的增强都是手动完成的(后面会具体演示代码),非常不灵活(比如接口一旦新增加方法,目标对象和代理对象都要进行修改)且麻烦(需要对每个目标类都单独写一个代理类)。 实际应用场景非常非常少,日常开发几乎看不到使用静态代理的场景。

上面我们是从实现和应用角度来说的静态代理,从 JVM 层面来说, 静态代理在编译时就将接口、实现类、代理类这些都变成了一个个实际的 class 文件。

静态代理实现步骤:

  1. 定义一个接口及其实现类;
  2. 创建一个代理类同样实现这个接口
  3. 将目标对象注入进代理类,然后在代理类的对应方法调用目标类中的对应方法。这样的话,我们就可以通过代理类屏蔽对目标对象的访问,并且可以在目标方法执行前后做一些自己想做的事情。

下面通过代码展示

1.定义发送短信的接口

1
2
3
public interface SmsService {
String send(String message);
}

2.实现发送短信的接口

1
2
3
4
5
6
public class SmsServiceImpl implements SmsService {
public String send(String message) {
System.out.println("send message:" + message);
return message;
}
}

3.创建代理类并同样实现发送短信的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class SmsProxy implements SmsService {

private final SmsService smsService;

public SmsProxy(SmsService smsService) {
this.smsService = smsService;
}

@Override
public String send(String message) {
//调用方法之前,我们可以添加自己的操作
System.out.println("before method send()");
smsService.send(message);
//调用方法之后,我们同样可以添加自己的操作
System.out.println("after method send()");
return null;
}
}

4.实际使用

1
2
3
4
5
6
7
public class Main {
public static void main(String[] args) {
SmsService smsService = new SmsServiceImpl();
SmsProxy smsProxy = new SmsProxy(smsService);
smsProxy.send("java");
}
}

运行上述代码之后,控制台打印出:

1
2
3
before method send()
send message:java
after method send()

可以输出结果看出,我们已经增强了 SmsServiceImplsend()方法。

动态代理

相比于静态代理来说,动态代理更加灵活。我们不需要针对每个目标类都单独创建一个代理类,并且也不需要我们必须实现接口,我们可以直接代理实现类(CGLIB 动态代理机制)。

从 JVM 角度来说,动态代理是在运行时动态生成类字节码,并加载到 JVM 中的。

说到动态代理,Spring AOP、RPC 框架应该是两个不得不的提的,它们的实现都依赖了动态代理。

动态代理在我们日常开发中使用的相对较小,但是在框架中的几乎是必用的一门技术。学会了动态代理之后,对于我们理解和学习各种框架的原理也非常有帮助。

就 Java 来说,动态代理的实现方式有很多种,比如 JDK 动态代理CGLIB 动态代理等等。

JDK 动态代理机制

介绍

在 Java 动态代理机制中 InvocationHandler 接口和 Proxy 类是核心。

Proxy 类中使用频率最高的方法是:newProxyInstance() ,这个方法主要用来生成一个代理对象。

1
2
3
4
5
6
7
public static Object newProxyInstance(ClassLoader loader,
Class<?>[] interfaces,
InvocationHandler h)
throws IllegalArgumentException
{
......
}

这个方法一共有 3 个参数:

  1. **loader :**类加载器,用于加载代理对象。
  2. interfaces : 被代理类实现的一些接口;
  3. h : 实现了 InvocationHandler 接口的对象;

要实现动态代理的话,还必须需要实现InvocationHandler 来自定义处理逻辑。 当我们的动态代理对象调用一个方法时候,这个方法的调用就会被转发到实现InvocationHandler 接口类的 invoke 方法来调用。

1
2
3
4
5
6
7
8
public interface InvocationHandler {

/**
* 当你使用代理对象调用方法的时候实际会调用到这个方法
*/
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable;
}

invoke() 方法有下面三个参数:

  1. **proxy :**动态生成的代理类
  2. method : 与代理类对象调用的方法相对应
  3. args : 当前 method 方法的参数

也就是说:你通过Proxy 类的 newProxyInstance() 创建的代理对象在调用方法的时候,实际会调用到实现InvocationHandler 接口的类的 invoke()方法。 你可以在 invoke() 方法中自定义处理逻辑,比如在方法执行前后做什么事情。

JDK 动态代理类使用步骤
  1. 定义一个接口及其实现类;
  2. 自定义 InvocationHandler 并重写invoke方法,在 invoke 方法中我们会调用原生方法(被代理类的方法)并自定义一些处理逻辑;
  3. 通过 Proxy.newProxyInstance(ClassLoader loader,Class<?>[] interfaces,InvocationHandler h) 方法创建代理对象;
代码示例

1.定义发送短信的接口

1
2
3
public interface SmsService {
String send(String message);
}

2.实现发送短信的接口

1
2
3
4
5
6
public class SmsServiceImpl implements SmsService {
public String send(String message) {
System.out.println("send message:" + message);
return message;
}
}

3.定义一个 JDK 动态代理类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

public class DebugInvocationHandler implements InvocationHandler {
/**
* 代理类中的真实对象
*/
private final Object target;

public DebugInvocationHandler(Object target) {
this.target = target;
}

public Object invoke(Object proxy, Method method, Object[] args) throws InvocationTargetException, IllegalAccessException {
//调用方法之前,我们可以添加自己的操作
System.out.println("before method " + method.getName());
Object result = method.invoke(target, args);
//调用方法之后,我们同样可以添加自己的操作
System.out.println("after method " + method.getName());
return result;
}
}

invoke() 方法: 当我们的动态代理对象调用原生方法的时候,最终实际上调用到的是 invoke() 方法,然后 invoke() 方法代替我们去调用了被代理对象的原生方法。

4.获取代理对象的工厂类

1
2
3
4
5
6
7
8
9
public class JdkProxyFactory {
public static Object getProxy(Object target) {
return Proxy.newProxyInstance(
target.getClass().getClassLoader(), // 目标类的类加载
target.getClass().getInterfaces(), // 代理需要实现的接口,可指定多个
new DebugInvocationHandler(target) // 代理对象对应的自定义 InvocationHandler
);
}
}

getProxy() :主要通过Proxy.newProxyInstance()方法获取某个类的代理对象

5.实际使用

1
2
SmsService smsService = (SmsService) JdkProxyFactory.getProxy(new SmsServiceImpl());
smsService.send("java");

运行上述代码之后,控制台打印出:

1
2
3
before method send
send message:java
after method send

CGLIB动态代理机制

介绍

JDK 动态代理有一个最致命的问题是其只能代理实现了接口的类。

为了解决这个问题,我们可以用 CGLIB 动态代理机制来避免。

CGLIB(Code Generation Library)是一个基于ASM的字节码生成库,它允许我们在运行时对字节码进行修改和动态生成。CGLIB 通过继承方式实现代理。很多知名的开源框架都使用到了CGLIB, 例如 Spring 中的 AOP 模块中:如果目标对象实现了接口,则默认采用 JDK 动态代理,否则采用 CGLIB 动态代理。

在 CGLIB 动态代理机制中 MethodInterceptor 接口和 Enhancer 类是核心。

你需要自定义 MethodInterceptor 并重写 intercept 方法,intercept 用于拦截增强被代理类的方法。

1
2
3
4
5
public interface MethodInterceptor extends Callback{
// 拦截被代理类中的方法
public Object intercept(Object obj, java.lang.reflect.Method method, Object[] args,
MethodProxy proxy) throws Throwable;
}
  1. **obj :**被代理的对象(需要增强的对象)
  2. **method :**被拦截的方法(需要增强的方法)
  3. **args :**方法入参
  4. **methodProxy :**用于调用原始方法

你可以通过 Enhancer类来动态获取被代理类,当代理类调用方法的时候,实际调用的是 MethodInterceptor 中的 intercept 方法。

CGLIB 动态代理类使用步骤
  1. 定义一个类;
  2. 自定义 MethodInterceptor 并重写 intercept 方法,intercept 用于拦截增强被代理类的方法,和 JDK 动态代理中的 invoke 方法类似;
  3. 通过 Enhancer 类的 create()创建代理类;
代码示例

不同于 JDK 动态代理不需要额外的依赖。CGLIB(Code Generation Library) 实际是属于一个开源项目,如果你要使用它的话,需要手动添加相关依赖。

1
2
3
4
5
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib</artifactId>
<version>3.3.0</version>
</dependency>

1.实现一个使用阿里云发送短信的类

1
2
3
4
5
6
7
8
package github.javaguide.dynamicProxy.cglibDynamicProxy;

public class AliSmsService {
public String send(String message) {
System.out.println("send message:" + message);
return message;
}
}

2.自定义 MethodInterceptor(方法拦截器)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;

import java.lang.reflect.Method;

/**
* 自定义MethodInterceptor
*/
public class DebugMethodInterceptor implements MethodInterceptor {

/**
* @param o 代理对象(增强的对象)
* @param method 被拦截的方法(需要增强的方法)
* @param args 方法入参
* @param methodProxy 用于调用原始方法
*/
@Override
public Object intercept(Object o, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
//调用方法之前,我们可以添加自己的操作
System.out.println("before method " + method.getName());
Object object = methodProxy.invokeSuper(o, args);
//调用方法之后,我们同样可以添加自己的操作
System.out.println("after method " + method.getName());
return object;
}
}

3.获取代理类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import net.sf.cglib.proxy.Enhancer;

public class CglibProxyFactory {

public static Object getProxy(Class<?> clazz) {
// 创建动态代理增强类
Enhancer enhancer = new Enhancer();
// 设置类加载器
enhancer.setClassLoader(clazz.getClassLoader());
// 设置被代理类
enhancer.setSuperclass(clazz);
// 设置方法拦截器
enhancer.setCallback(new DebugMethodInterceptor());
// 创建代理类
return enhancer.create();
}
}

4.实际使用

1
2
AliSmsService aliSmsService = AliSmsService)CglibProxyFactory.getProxy(AliSmsService.class);
aliSmsService.send("java");

运行上述代码之后,控制台打印出:

1
2
3
before method send
send message:java
after method send

JDK 动态代理和 CGLIB 动态代理对比

  1. JDK 动态代理只能只能代理实现了接口的类,而 CGLIB 可以代理未实现任何接口的类。 另外, CGLIB 动态代理是通过生成一个被代理类的子类来拦截被代理类的方法调用,因此不能代理声明为 final 类型的类和方法。
  2. 就二者的效率来说,大部分情况都是 JDK 动态代理更优秀,随着 JDK 版本的升级,这个优势更加明显。

静态代理和动态代理的对比

  1. 灵活性 :动态代理更加灵活,不需要必须实现接口,可以直接代理实现类,并且可以不需要针对每个目标类都创建一个代理类。另外,静态代理中,接口一旦新增加方法,目标对象和代理对象都要进行修改,这是非常麻烦的!
  2. JVM 层面 :静态代理在编译时就将接口、实现类、代理类这些都变成了一个个实际的 class 文件。而动态代理是在运行时动态生成类字节码,并加载到 JVM 中的。

总结

这篇文章中主要介绍了代理模式的两种实现:静态代理以及动态代理。涵盖了静态代理和动态代理实战、静态代理和动态代理的区别、JDK 动态代理和 Cglib 动态代理区别等内容。

ZooKeeper常用命令+ Curator使用详解

ZooKeeper安装和使用

使用Docker 安装 ZooKeeper

  1. 下载
1
docker pull zookeeper:3.5.8
  1. 运行
1
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.5.8

连接ZooKeeper服务

  1. 进入Zookeeper容器中

先使用 docker ps 查看 ZooKeeper 的 ContainerID,然后使用 docker exec -it ContainerID /bin/bash 命令进入容器中。

  1. 先进入 bin 目录,然后通过 ./zkCli.sh -server 127.0.0.1:2181命令连接ZooKeeper 服务
1
root@eaf70fc620cb:/apache-zookeeper-3.5.8-bin# cd bin

1596940060539-7d46c5f0-97cc-4db4-92f4-ed535f224072

常用命令

查看常用命令(help 命令)

通过 help 命令查看 ZooKeeper 常用命令

创建节点(create 命令)

通过 create 命令在根目录创建了 node1 节点,与它关联的字符串是”node1”

1
[zk: 127.0.0.1:2181(CONNECTED) 34] create /node1 “node1”

通过 create 命令在根目录创建了 node1 节点,与它关联的内容是数字 123

1
2
[zk: 127.0.0.1:2181(CONNECTED) 1] create /node1/node1.1 123
Created /node1/node1.1

更新节点数据内容(set 命令)

1
[zk: 127.0.0.1:2181(CONNECTED) 11] set /node1 "set node1"

获取节点的数据(get 命令)

get 命令可以获取指定节点的数据内容和节点的状态,可以看出我们通过 set 命令已经将节点数据内容改为 “set node1”。

1
2
3
4
5
6
7
8
9
10
11
12
set node1
cZxid = 0x47
ctime = Sun Jan 20 10:22:59 CST 2019
mZxid = 0x4b
mtime = Sun Jan 20 10:41:10 CST 2019
pZxid = 0x4a
cversion = 1
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 1

查看某个目录下的子节点(ls 命令)

通过 ls 命令查看根目录下的节点

1
2
[zk: 127.0.0.1:2181(CONNECTED) 37] ls /
[dubbo, ZooKeeper, node1]

通过 ls 命令查看 node1 目录下的节点

1
2
[zk: 127.0.0.1:2181(CONNECTED) 5] ls /node1
[node1.1]

ZooKeeper 中的 ls 命令和 linux 命令中的 ls 类似, 这个命令将列出绝对路径 path 下的所有子节点信息(列出 1 级,并不递归)

查看节点状态(stat 命令)

1
2
3
4
5
6
7
8
9
10
11
12
[zk: 127.0.0.1:2181(CONNECTED) 10] stat /node1
cZxid = 0x47
ctime = Sun Jan 20 10:22:59 CST 2019
mZxid = 0x47
mtime = Sun Jan 20 10:22:59 CST 2019
pZxid = 0x4a
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 11
numChildren = 1

ZooKeeper 相关概念总结(入门)

查看节点信息和状态(ls2 命令)

ls2 命令更像是 ls 命令和 stat 命令的结合。 ls2 命令返回的信息包括两部分:

  1. 子节点列表
  2. 当前节点的 stat 信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
[zk: 127.0.0.1:2181(CONNECTED) 7] ls2 /node1
[node1.1]
cZxid = 0x47
ctime = Sun Jan 20 10:22:59 CST 2019
mZxid = 0x47
mtime = Sun Jan 20 10:22:59 CST 2019
pZxid = 0x4a
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 11
numChildren = 1

删除节点(delete 命令)

这个命令很简单,但是需要注意的一点是如果你要删除某一个节点,那么这个节点必须无子节点才行

1
[zk: 127.0.0.1:2181(CONNECTED) 3] delete /node1/node1.1

ZooKeeper Java客户端 Curator 简单使用

Curator 是Netflix公司开源的一套 ZooKeeper Java客户端框架,相比于 Zookeeper 自带的客户端 zookeeper 来说,Curator 的封装更加完善,各种 API 都可以比较方便地使用。

1596940060354-4e3298eb-c56e-427a-9366-f1ecfac4793e

Curator4.0+版本对ZooKeeper 3.5.x支持比较好。开始之前,请先将下面的依赖添加进你的项目。

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>

连接 ZooKeeper 客户端

通过 CuratorFrameworkFactory 创建 CuratorFramework 对象,然后再调用 CuratorFramework 对象的 start() 方法即可!

1
2
3
4
5
6
7
8
9
10
11
private static final int BASE_SLEEP_TIME = 1000;
private static final int MAX_RETRIES = 3;

// Retry strategy. Retry 3 times, and will increase the sleep time between retries.
RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES);
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
// the server to connect to (can be a server list)
.connectString("127.0.0.1:2181")
.retryPolicy(retryPolicy)
.build();
zkClient.start();

对于一些基本参数的说明:

  • baseSleepTimeMs:重试之间等待的初始时间
  • maxRetries:最大重试次数
  • connectString:要连接的服务器列表
  • retryPolicy:重试策略

数据节点的增删改查

创建节点

我们在 ZooKeeper常见概念解读 中介绍到,我们通常是将 znode 分为 4 大类:

  • 持久(PERSISTENT)节点 :一旦创建就一直存在即使 ZooKeeper 集群宕机,直到将其删除。
  • 临时(EPHEMERAL)节点 :临时节点的生命周期是与 客户端会话(session) 绑定的,会话消失则节点消失 。并且,临时节点 只能做叶子节点 ,不能创建子节点。
  • 持久顺序(PERSISTENT_SEQUENTIAL)节点 :除了具有持久(PERSISTENT)节点的特性之外, 子节点的名称还具有顺序性。比如 /node1/app0000000001 、/node1/app0000000002 。
  • 临时顺序(EPHEMERAL_SEQUENTIAL)节点 :除了具备临时(EPHEMERAL)节点的特性之外,子节点的名称还具有顺序性。

你在使用的 ZooKeeper 的时候,会发现 CreateMode 类中实际有 7种 znode 类型 ,但是用的最多的还是上面介绍的 4 种。

  1. 创建持久化节点
1
2
3
//注意:下面的代码会报错,下文说了具体原因
zkClient.create().forPath("/node1/00001");
zkClient.create().withMode(CreateMode.PERSISTENT).forPath("/node1/00002");

但是,你运行上面的代码会报错,这是因为的父节点node1还未创建。

你可以先创建父节点 node1 ,然后再执行上面的代码就不会报错了。

1
zkClient.create().forPath("/node1");

更推荐的方式是通过下面这行代码, creatingParentsIfNeeded() 可以保证父节点不存在的时候自动创建父节点,这是非常有用的。

1
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/node1/00001");
  1. 创建临时节点
1
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/node1/00001");
  1. 创建节点并指定数据内容
1
2
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/node1/00001","java".getBytes());
zkClient.getData().forPath("/node1/00001"); //获取节点的数据内容,获取到的是 byte数组
  1. 检测节点是否创建成功
1
zkClient.checkExists().forPath("/node1/00001"); //不为null的话,说明节点创建成功
删除节点
  1. 删除一个子节点
1
zkClient.delete().forPath("/node1/00001");
  1. 删除一个节点以及其下的所有子节点
1
zkClient.delete().deletingChildrenIfNeeded().forPath("/node1");
获取/更新节点数据内容
1
2
3
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/node1/00001","java".getBytes());
zkClient.getData().forPath("/node1/00001"); //获取节点的数据内容
zkClient.setData().forPath("/node1/00001","c++".getBytes()); //更新节点数据内容
获取某个节点的所有子节点路径
1
List<String> childrenPaths = zkClient.getChildren().forPath("/node1");

监听器

下面简单演示一下如何给某个节点注册子节点监听器 。注册了监听器之后,这个节点的子节点发生变化比如增加、减少或者更新的时候,你可以自定义回调操作。

1
2
3
4
5
6
7
String path = "/node1";
PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, path, true);
PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) -> {
// do something
};
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
pathChildrenCache.start();

如果你要获取节点事件类型的话,可以通过:

1
pathChildrenCacheEvent.getType()

一共有下面几种类型:

1
2
3
4
5
6
7
8
9
10
11
12
public static enum Type {
CHILD_ADDED,//子节点增加
CHILD_UPDATED,//子节点更新
CHILD_REMOVED,//子节点被删除
CONNECTION_SUSPENDED,
CONNECTION_RECONNECTED,
CONNECTION_LOST,
INITIALIZED;

private Type() {
}
}

RPC框架代码分析之网络传输模块

以下提到的 服务端 指的是提供服务/方法的一端,客户端 指的是调用远程(服务端)服务/方法的一端。

我们之前在“如何自己实现一个 RPC 框架?”这篇文章中介绍到说:既然我们要调用远程的方法,就要发送网络请求来传递目标类和方法的信息以及方法的参数等数据到服务端。 这就涉及到了网络传输!网络传输具体实现你可以使用 Socket ( Java 中最原始、最基础的网络通信方式。但是,Socket 是阻塞 IO、性能低并且功能单一)。你也可以使用同步非阻塞的 I/O 模型 NIO ,但是用它来进行网络编程真的太麻烦了。不过没关系,你可以使用基于 NIO 的网络编程框架 Netty ,它将是你最好的选择!

网络传输模块整体结构如下:

27ac1c21-ad7d-4fe3-aa7f-ae880b7eeef9

一共被分为了 4 个包

  1. constants: 存放一些网络传输模块共用的常量
  2. dto: 用于网络传输的类。
  3. handler: 里面只有一个用于处理 rpc 请求的类RpcRequestHandler(根据 rpc 请求调用目标类的目标方法)。
  4. transport: 用户网络传输相关类(真正传输网络请求的地方。提供了 Socket 和 Netty 两种网络传输方式)。

网络传输实体类

网络传输实体类在 dto 包下,主要有两个类。

RpcRequest.java

rpc 请求实体类。当你要调用远程方法的时候,你需要先传输一个 RpcRequest 给对方,RpcRequest 里面包含了要调用的目标方法和类的名称、参数等数据。

另外,version 字段(服务版本)主要是为后续不兼容升级提供可能。group 字段主要用于处理一个接口有多个类实现的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Builder
@ToString
public class RpcRequest implements Serializable {
private static final long serialVersionUID = 1905122041950251207L;
private String requestId;
private String interfaceName;
private String methodName;
private Object[] parameters;
private Class<?>[] paramTypes;
private RpcMessageType rpcMessageType;
private String version;
private String group;

public RpcServiceProperties toRpcProperties() {
return RpcServiceProperties.builder().serviceName(this.getInterfaceName())
.version(this.getVersion())
.group(this.getGroup()).build();
}
}

RpcResponse.java

既然有了 rpc 请求实体类,那肯定就要有 rpc 响应实体类了。

当服务端通过 RpcRequest 中的相关数据调用到目标服务的目标方法之后,调用结果就通过 RpcResponse 返回给客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@Builder
@ToString
public class RpcResponse<T> implements Serializable {

private static final long serialVersionUID = 715745410605631233L;
private String requestId;
/**
* response code
*/
private Integer code;
/**
* response message
*/
private String message;
/**
* response body
*/
private T data;

public static <T> RpcResponse<T> success(T data, String requestId) {
RpcResponse<T> response = new RpcResponse<>();
response.setCode(RpcResponseCode.SUCCESS.getCode());
response.setMessage(RpcResponseCode.SUCCESS.getMessage());
response.setRequestId(requestId);
if (null != data) {
response.setData(data);
}
return response;
}

public static <T> RpcResponse<T> fail(RpcResponseCode rpcResponseCode) {
RpcResponse<T> response = new RpcResponse<>();
response.setCode(rpcResponseCode.getCode());
response.setMessage(rpcResponseCode.getMessage());
return response;
}

}

网络传输

由于,这部分我提供了一种基于 Socket,一种基于 Netty 的网络传输方式(循序渐进)。

因此,我先定义了一个发送 RPC 请求的顶层接口,然后我们分别使用 SocketNetty 两种方式对这个接口进行实现即可!

RpcRequestTransport.java 传输请求的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* send RpcRequest
*/
@SPI
public interface RpcRequestTransport {
/**
* send rpc request to server and get result
*
* @param rpcRequest message body
* @return data from server
*/
Object sendRpcRequest(RpcRequest rpcRequest);
}

下面,我们先来看一下比较简单点的使用 Socket 进行网络传输的方式。

Socket

客户端

这里的客户端实际就是发送 RPC 请求的一端,可以对照我们之间画的 RPC 调用的原理图来理解。

客户端主要用于发送网络请求到服务端(目标方法所在的服务器)。当我们知道了服务端的地址之后,我们就可以通过 SocketRpcClient 发送 rpc 请求(RpcRequest) 到服务端了(如果我们要找到服务端的地址,涉及到了注册中心相关的知识。下一节会提到。)。

我们直接实现上面定义的 RpcRequestTransport.java 即可。这样的话,通过 Socket 来传输消息的模块就写好了!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 基于 Socket 传输 RpcRequest
*/
@AllArgsConstructor
@Slf4j
public class SocketRpcClient implements RpcRequestTransport {
private final ServiceDiscovery serviceDiscovery;

public SocketRpcClient() {
this.serviceDiscovery = ExtensionLoader.getExtensionLoader(ServiceDiscovery.class).getExtension("zk");
}

@Override
public Object sendRpcRequest(RpcRequest rpcRequest) {
// build rpc service name by rpcRequest
String rpcServiceName = RpcServiceProperties.builder().serviceName(rpcRequest.getInterfaceName())
.group(rpcRequest.getGroup()).version(rpcRequest.getVersion()).build().toRpcServiceName();
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcServiceName);
try (Socket socket = new Socket()) {
socket.connect(inetSocketAddress);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
// Send data to the server through the output stream
objectOutputStream.writeObject(rpcRequest);
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
// Read RpcResponse from the input stream
return objectInputStream.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RpcException("调用服务失败:", e);
}
}
}
服务端

SocketRpcServer.java

Socket 服务端。用于等待客户端连接。当客户端成功连接之后,就可以发送 rpc 请求(RpcRequest) 到服务端了。然后,服务端拿到 RpcRequest就会去执行对应的方法。执行完对应的方法之后,就把执行得到的结果放在 RpcResponse 中返回给客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Slf4j
public class SocketRpcServer {

private final ExecutorService threadPool;
private final ServiceProvider serviceProvider;

public SocketRpcServer() {
threadPool = ThreadPoolFactoryUtils.createCustomThreadPoolIfAbsent("socket-server-rpc-pool");
serviceProvider = SingletonFactory.getInstance(ServiceProviderImpl.class);
}

public void registerService(Object service) {
serviceProvider.publishService(service);
}

public void registerService(Object service, RpcServiceProperties rpcServiceProperties) {
serviceProvider.publishService(service, rpcServiceProperties);
}

public void start() {
try (ServerSocket server = new ServerSocket()) {
String host = InetAddress.getLocalHost().getHostAddress();
server.bind(new InetSocketAddress(host, PORT));
CustomShutdownHook.getCustomShutdownHook().clearAll();
Socket socket;
while ((socket = server.accept()) != null) {
log.info("client connected [{}]", socket.getInetAddress());
threadPool.execute(new SocketRpcRequestHandlerRunnable(socket));
}
threadPool.shutdown();
} catch (IOException e) {
log.error("occur IOException:", e);
}
}

}

Netty

Netty 这部分的原理也差不多,不过实现代码差别很大。

客户端

NettyClient.java

Netty 客户端主要提供了:

  • doConnect() :用于连接服务端(目标方法所在的服务器)并返回对应的 Channel。当我们知道了服务端的地址之后,我们就可以通过 NettyClient 成功连接服务端了。(有了 Channel之后就能发送数据到服务端了)
  • sendRpcRequest() : 用于传输 rpc 请求(RpcRequest) 到服务端。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Slf4j
public final class NettyRpcClient implements RpcRequestTransport {
private final ServiceDiscovery serviceDiscovery;
private final UnprocessedRequests unprocessedRequests;
private final ChannelProvider channelProvider;
private final Bootstrap bootstrap;
private final EventLoopGroup eventLoopGroup;

@SneakyThrows
public Channel doConnect(InetSocketAddress inetSocketAddress) {
CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
log.info("The client has connected [{}] successful!", inetSocketAddress.toString());
completableFuture.complete(future.channel());
} else {
throw new IllegalStateException();
}
});
return completableFuture.get();
}

@Override
public Object sendRpcRequest(RpcRequest rpcRequest) {
// build return value
CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
// build rpc service name by rpcRequest
String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName();
// get server address
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcServiceName);
// get server address related channel
Channel channel = getChannel(inetSocketAddress);
if (channel.isActive()) {
// put unprocessed request
unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setData(rpcRequest);
rpcMessage.setCodec(SerializationTypeEnum.PROTOSTUFF.getCode());
rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
rpcMessage.setMessageType(RpcConstants.REQUEST_TYPE);
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
log.info("client send message: [{}]", rpcMessage);
} else {
future.channel().close();
resultFuture.completeExceptionally(future.cause());
log.error("Send failed:", future.cause());
}
});
} else {
throw new IllegalStateException();
}
return resultFuture;
}
}

UnprocessedRequests.java

用于存放未被服务端处理的请求(建议限制 map 容器大小,避免未处理请求过多 OOM)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class UnprocessedRequests {
private static final Map<String, CompletableFuture<RpcResponse<Object>>> UNPROCESSED_RESPONSE_FUTURES = new ConcurrentHashMap<>();

public void put(String requestId, CompletableFuture<RpcResponse<Object>> future) {
UNPROCESSED_RESPONSE_FUTURES.put(requestId, future);
}

public void complete(RpcResponse<Object> rpcResponse) {
CompletableFuture<RpcResponse<Object>> future = UNPROCESSED_RESPONSE_FUTURES.remove(rpcResponse.getRequestId());
if (null != future) {
future.complete(rpcResponse);
} else {
throw new IllegalStateException();
}
}
}

NettyClientHandler

自定义客户端 ChannelHandler 用于处理服务器发送的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private final UnprocessedRequests unprocessedRequests;
private final ChannelProvider channelProvider;

public NettyClientHandler() {
this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);
this.channelProvider = SingletonFactory.getInstance(ChannelProvider.class);
}

/**
* 读取从服务端返回的消息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
log.info("client receive msg: [{}]", msg);
if (msg instanceof RpcResponse) {
RpcResponse<Object> rpcResponse = (RpcResponse<Object>) msg;
unprocessedRequests.complete(rpcResponse);
}
} finally {
ReferenceCountUtil.release(msg);
}
}

// Netty 心跳机制相关。保证客户端和服务端的连接不被断掉,避免重连。
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//省略部分代码
}
}

从代码中,可以看出当 rpc 请求被成功处理(客户端收到服务端的执行结果)之后,我们调用了 unprocessedRequests.complete(rpcResponse) 方法,这样的话,你只需要通过下面的方式就能成功接收到服务端返回的结果。

1
2
CompletableFuture<RpcResponse> completableFuture = (CompletableFuture<RpcResponse>) clientTransport.sendRpcRequest(rpcRequest);
rpcResponse = completableFuture.get();

ChannelProvider.java

用于存放 ChannelChannel用于在服务端和客户端之间传输数据)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
public class ChannelProvider {

private final Map<String, Channel> channelMap;

public ChannelProvider() {
channelMap = new ConcurrentHashMap<>();
}

public Channel get(InetSocketAddress inetSocketAddress) {
String key = inetSocketAddress.toString();
// determine if there is a connection for the corresponding address
if (channelMap.containsKey(key)) {
Channel channel = channelMap.get(key);
// if so, determine if the connection is available, and if so, get it directly
if (channel != null && channel.isActive()) {
return channel;
} else {
channelMap.remove(key);
}
}
return null;
}

public void set(InetSocketAddress inetSocketAddress, Channel channel) {
String key = inetSocketAddress.toString();
channelMap.put(key, channel);
}

public void remove(InetSocketAddress inetSocketAddress) {
String key = inetSocketAddress.toString();
channelMap.remove(key);
log.info("Channel map size :[{}]", channelMap.size());
}
}
服务端

NettyRpcServer.java

Netty 服务端。并监听客户端的连接。另外,还提供了两个用户手动注册服务的方法(还可以通过注解RpcService注册服务,这个后面也会介绍到)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Slf4j
@Component
public class NettyRpcServer {

public static final int PORT = 9998;

private final ServiceProvider serviceProvider = SingletonFactory.getInstance(ServiceProviderImpl.class);

public void registerService(Object service, RpcServiceProperties rpcServiceProperties) {
serviceProvider.publishService(service, rpcServiceProperties);
}

@SneakyThrows
public void start() {
CustomShutdownHook.getCustomShutdownHook().clearAll();
String host = InetAddress.getLocalHost().getHostAddress();
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
DefaultEventExecutorGroup serviceHandlerGroup = new DefaultEventExecutorGroup(
RuntimeUtil.cpus() * 2,
ThreadPoolFactoryUtils.createThreadFactory("service-handler-group", false)
);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
.childOption(ChannelOption.TCP_NODELAY, true)
// 是否开启 TCP 底层心跳机制
.childOption(ChannelOption.SO_KEEPALIVE, true)
//表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
// 当客户端第一次进行请求的时候才会进行初始化
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 30 秒之内没有收到客户端请求的话就关闭连接
ChannelPipeline p = ch.pipeline();
p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
p.addLast(new RpcMessageEncoder());
p.addLast(new RpcMessageDecoder());
p.addLast(serviceHandlerGroup, new NettyRpcServerHandler());
}
});
// 绑定端口,同步等待绑定成功
ChannelFuture f = b.bind(host, PORT).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("occur exception when start server:", e);
} finally {
log.error("shutdown bossGroup and workerGroup");
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
serviceHandlerGroup.shutdownGracefully();
}
}
}

NettyServerHandler.java

自定义服务端 ChannelHandler 用于处理客户端发送的数据。

当客户端发的 rpc 请求(RpcRequest) 来了之后,服务端就会处理 rpc 请求(RpcRequest) ,处理完之后就把得到 rpc 相应(RpcResponse)传输给客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

private final RpcRequestHandler rpcRequestHandler;

public NettyServerHandler() {
this.rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class);
}

/**
* 读取从客户端消息,然后调用目标服务的目标方法并返回给客户端。
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 省略部分代码
}


// Netty 心跳机制相关。保证客户端和服务端的连接不被断掉,避免重连。
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 省略部分代码
}

}

传输协议

简单来说:通过设计协议,我们定义需要传输哪些类型的数据, 并且还会规定每一种类型的数据应该占多少字节。这样我们在接收到二级制数据之后,就可以正确的解析出我们需要的数据。这有一点像密文传输的感觉。

以下便是我们设计的传输协议(编解码器这里会用到!!!):

1
2
3
4
5
6
7
8
9
10
11
*   0     1     2     3     4        5     6     7     8         9          10      11     12  13  14   15 16
* +-----+-----+-----+-----+--------+----+----+----+------+-----------+-------+----- --+-----+-----+-------+
* | magic code |version | full length | messageType| codec|compress| RequestId |
* +-----------------------+--------+---------------------+-----------+-----------+-----------+------------+
* | |
* | body |
* | |
* | ... ... |
* +-------------------------------------------------------------------------------------------------------+
* 4B magic code(魔法数) 1B version(版本) 4B full length(消息长度) 1B messageType(消息类型)
* 1B compress(压缩类型) 1B codec(序列化类型) 4B requestId(请求的Id)
  • 魔法数 : 通常是 4 个字节。这个魔数主要是为了筛选来到服务端的数据包,有了这个魔数之后,服务端首先取出前面四个字节进行比对,能够在第一时间识别出这个数据包并非是遵循自定义协议的,也就是无效数据包,为了安全考虑可以直接关闭连接以节省资源。
  • 序列化器类型 :标识序列化的方式,比如是使用 Java 自带的序列化,还是 json,kyro 等序列化方式。
  • 消息长度 : 运行时计算出来。
  • ……

编解码器

编解码器这里主要用到了 Kryo 序列化和反序列化以及 Netty 网络传输字节容器 ByteBuf 相关的知识。

编解码器的作用主要是让我们在 Netty 进行网络传输所用的对象类型 ByteBuf 与 我们代码层面需要的业务对象之间转换。这部分的代码还是比较多的,小伙伴们可以自己阅读以下,整体逻辑还是比较简单的。

一定要先搞懂传输协议之后再去看这部分代码。

RpcMessageDecoder.java

自定义解码器。负责处理”入站”消息,将 ByteBuf 消息格式的对象转换为我们需要的业务对象。

网络传输需要通过字节流来实现,ByteBuf 可以看作是 Netty 提供的字节数据的容器,使用它会让我们更加方便地处理字节数据。

RpcMessageEncoder.java

自定义编码器。负责处理”出站”消息,将消息格式转换字节数组然后写入到字节数据的容器 ByteBuf 对象中。

RPC框架代码分析之注册中心模块

我们之前在“如何自己实现一个 RPC 框架?”这篇文章中介绍到说:注册中心负责服务地址的注册与查找,相当于目录服务。 服务端启动的时候将服务名称及其对应的地址(ip+port)注册到注册中心,服务消费端根据服务名称找到对应的服务地址。有了服务地址之后,服务消费端就可以通过网络请求服务端了。

简单来说注册中心就像是一个中转站,提供的作用就是根据调用的服务名称找到远程服务的地址(数据保存服务)。

注册中心模块整体结构如下:

1597046857437-3a577559-1fb8-4c6d-ba0b-b0b244dcee6a

我们定义了两个接口 ServiceDiscovery.javaServiceRegistry.java,这两个接口分别定义了服务发现和服务注册行为。

ServiceRegistry.java

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 服务注册
*/
public interface ServiceRegistry {
/**
* 注册服务到注册中心
*
* @param rpcServiceName 完整的服务名称(class name+group+version)
* @param inetSocketAddress 远程服务地址
*/
void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress);

}

ServiceDiscovery.java

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 服务发现
*/
public interface ServiceDiscovery {
/**
* 根据 rpcServiceName 获取远程服务地址
*
* @param rpcServiceName 完整的服务名称(class name+group+version)
* @return 远程服务地址
*/
InetSocketAddress lookupService(String rpcServiceName);
}

接下来,我们使用 zookeeper 作为注册中心的实现方式,并实现了这两个接口。

ZkServiceRegistry.java

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 服务注册(基于zookeeper实现)
*/
@Slf4j
public class ZkServiceRegistry implements ServiceRegistry {

@Override
public void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress) {
String servicePath = CuratorUtils.ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName + inetSocketAddress.toString();
CuratorFramework zkClient = CuratorUtils.getZkClient();
CuratorUtils.createPersistentNode(zkClient, servicePath);
}
}

当我们的服务被注册进 zookeeper 的时候,我们将完整的服务名称 rpcServiceName (class name+group+version)作为根节点 ,子节点是对应的服务地址(ip+端口号)。

  • class name : 服务接口名也就是类名比如:github.javaguide.HelloService。
  • version : 服务版本。主要是为后续不兼容升级提供可能
  • group :服务所在的组。主要用于处理一个接口有多个类实现的情况。

一个根节点(rpcServiceName)可能会对应多个服务地址(相同服务被部署多份的情况)。

1597046857469-f43f5087-e095-4ee6-a665-3be3e6a38904

如果我们要获得某个服务对应的地址的话,就直接根据完整的服务名称来获取到其下的所有子节点,然后通过具体的负载均衡策略取出一个就可以了。相关代码如下在 ZkServiceDiscovery.java中已经给出。

ZkServiceDiscovery.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 服务发现(基于zookeeper实现)
*/
@Slf4j
public class ZkServiceDiscovery implements ServiceDiscovery {
private final LoadBalance loadBalance;

public ZkServiceDiscovery() {
this.loadBalance = new RandomLoadBalance();
}

@Override
public InetSocketAddress lookupService(String rpcServiceName) {
CuratorFramework zkClient = CuratorUtils.getZkClient();
List<String> serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, rpcServiceName);
if (serviceUrlList.size() == 0) {
throw new RpcException(RpcErrorMessage.SERVICE_CAN_NOT_BE_FOUND, rpcServiceName);
}
// load balancing
String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList);
log.info("Successfully found the service address:[{}]", targetServiceUrl);
String[] socketAddressArray = targetServiceUrl.split(":");
String host = socketAddressArray[0];
int port = Integer.parseInt(socketAddressArray[1]);
return new InetSocketAddress(host, port);
}
}

我们根据完整的服务名称便可以将对应的服务地址查出来, 查出来的服务地址可能并不止一个。

所以,我们可以通过对应的负载均衡策略来选择出一个服务地址。

CuratorUtils.java

另外,我们还自定义了一个 ZooKeeper Java 客户端 Curtor 的工具类 CuratorUtils.java 。关于这个工具类,这里就不再提了。

在《08 Zookeeper 常用命令+ Curtor 使用详解》中已经介绍的非常详细了。

RPC 框架代码分析之其他模块

动态代理屏蔽网络传输细节

我们在前面的章节讲到过我们需要用到动态代理来屏蔽复杂的网络传输细节。对应的代码: RpcClientProxy.java

1
2
3
4
5
6
7
8
9
@Slf4j
public class RpcClientProxy implements InvocationHandler {
public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
}
}

当我们去调用一个远程的方法的时候,实际上是通过代理对象调用的。

获取代理对象的方法如下:

1
2
3
public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}

网络传输细节都被封装在了 invoke() 方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public Object invoke(Object proxy, Method method, Object[] args) {
log.info("invoked method: [{}]", method.getName());
RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName())
.parameters(args)
.interfaceName(method.getDeclaringClass().getName())
.paramTypes(method.getParameterTypes())
.requestId(UUID.randomUUID().toString())
.group(rpcServiceProperties.getGroup())
.version(rpcServiceProperties.getVersion())
.build();
RpcResponse<Object> rpcResponse = null;
if (rpcRequestTransport instanceof NettyRpcClient) {
CompletableFuture<RpcResponse<Object>> completableFuture = (CompletableFuture<RpcResponse<Object>>) rpcRequestTransport.sendRpcRequest(rpcRequest);
rpcResponse = completableFuture.get();
}
if (rpcRequestTransport instanceof SocketRpcClient) {
rpcResponse = (RpcResponse<Object>) rpcRequestTransport.sendRpcRequest(rpcRequest);
}
this.check(rpcResponse, rpcRequest);
return rpcResponse.getData();
}

通过注解注册/消费服务

我们这里借用了 Spring 容器相关的功能。核心代码都放在了 : src/main/java/github/javaguide/spring 包下面。

我们定义两个注解:

  • RcpService :注册服务
  • RpcReference :消费服务

RcpService.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Inherited
public @interface RpcService {

/**
* Service version, default value is empty string
*/
String version() default "";

/**
* Service group, default value is empty string
*/
String group() default "";
}

RpcReference.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
@Inherited
public @interface RpcReference {

/**
* Service version, default value is empty string
*/
String version() default "";

/**
* Service group, default value is empty string
*/
String group() default "";
}

简单说一下原理。

我们实现需要 BeanPostProcessor 接口并重写 postProcessBeforeInitialization()方法和 postProcessAfterInitialization() 方法。

Spring bean 在实例化之前会调用 postProcessBeforeInitialization()方法,在 Spring bean 实例化之后会调用 postProcessAfterInitialization() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j
@Component
public class SpringBeanPostProcessor implements BeanPostProcessor {

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {

}

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

}
}

被我们使用 RpcServiceRpcReference 注解的类都算是 Spring Bean。

  • 我们可以在postProcessBeforeInitialization()方法中去判断类上是否有RpcService 注解。如果有的话,就取出 groupversion 的值。然后,再调用 ServiceProviderpublishService() 方法发布服务即可!
  • 我们可以在 postProcessAfterInitialization() 方法中遍历类的属性上是否有 RpcReference 注解。如果有的话,我们就通过反射将这个属性赋值即可!

(优化)使用CompletableFuture优化接受服务提供端返回结果

使用 AttributeMap 接受服务端返回结果

最开始的时候是通过 AttributeMap 绑定到Channel上实现的,相关代码如下:

NettyClientTransport.java(用来发送 RpcRequest 请求)

1597058614646-de6c89f2-60f4-4e26-82f6-efaf9c01f581

NettyClientHandler.java (自定义客户端 ChannelHandler 来处理服务端发过来的数据)

1597058614610-286cc9cd-1c6e-4a3d-95c0-8a4c02ffc5ac

这种是实现的缺点是不清晰,而且你每次都要调用 channel.closeFuture().sync(); 阻塞来手动等待请求返回。

使用 CompletableFuture 进行优化

通过CompletableFuture包装返回结果,对代码进行了重构,重要部分的代码如下:

NettyClientTransport.java(用来发送 RpcRequest 请求)

1597058614624-83b8b500-4b86-4438-b458-8abd47b18c58

NettyClientHandler.java (自定义客户端 ChannelHandler 来处理服务端发过来的数据)

1597058614600-6c2038dd-8b65-480e-9d83-4d5d7dd156e3

UnprocessedRequests.java 存放了未处理的请求(建议限制 map 容器大小,避免未处理请求过多 OOM

1597058614601-72be49db-42b4-46b6-9dfc-c6420b9d5a3e

现在只需要通过下面的方式就能成功接收到服务端返回的结果:

1
2
CompletableFuture<RpcResponse> completableFuture = (CompletableFuture<RpcResponse>) clientTransport.sendRpcRequest(rpcRequest);
rpcResponse = completableFuture.get();