分布式

1

分布式架构

单机结构

一个系统业务量很小的时候所有的代码都放在一个项目中就好了,然后这个项
目部署在一台服务器上就好了。整个项目所有的服务都由这台服务器提供。这
就是单机结构

集群结构

单机处理到达瓶颈的时候,你就把单机复制几份,这样就构成了一个“集群”。集
群中每台服务器就叫做这个集群的一个“节点”,所有节点构成了一个集群。每个
节点都提供相同的服务,那么这样系统的处理能力就相当于提升了好几倍。
但问题是用户的请求究竟由哪个节点来处理呢?最好能够让此时此刻负载较小的
节点来处理,这样使得每个节点的压力都比较平均。要实现这个功能,就需要在
所有节点之前增加一个“调度者”的角色,用户的所有请求都先交给它,然后它
根据当前所有节点的负载情况,决定将这个请求交给哪个节点处理。这个“调度
者”有一个名字——负载均衡服务器。
集群结构的好处就是系统扩展非常容易。如果随着你们系统业务的发展,当前的系
统又支撑不住了,那么给这个集群再增加节点就行了。但是,当你的业务发展到一
定程度的时候,你会发现一个问题——无论怎么增加节点,貌似整个集群性能的提
升效果并不明显了。这时候,你就需要使用微服务结构了。

分布式结构

分布式结构就是将一个完整的系统,按照业务功能,拆分成一个个独立的子系统
,在分布式结构中,每个子系统就被称为“服务”。这些子系统能够独立运行在web
容器中,它们之间通过RPC方式通信。
举个例子,假设需要开发一个在线商城。按照微服务的思想,我们需要按照功能模
块拆分成多个独立的服务,如:用户服务、产品服务、订单服务、后台管理服务、数
据分析服务等等。这一个个服务都是一个个独立的项目,可以独立运行。如果服务
之间有依赖关系,那么通过RPC方式调用。好处如下

  • 系统之间的耦合度大大降低,可以独立开发、独立部署、独立测试,系统与系统
    之间的边界非常明确,排错也变得相当容易,开发效率大大提升
  • 系统之间的耦合度降低,从而系统更易于扩展。我们可以针对性地扩展某些服务
    。假设这个商城要搞一次大促,下单量可能会大大提升,因此我们可以针对性地提
    升订单系统、产品系统的节点数量,而对于后台管理系统、数据分析系统而言,节
    点数量维持原有水平即可
  • 服务的复用性更高。比如,当我们将用户系统作为单独的服务后,该公司所有
    的产品都可以使用该系统作为用户系统,无需重复开发

Raft算法

Raft是一种共识算法,它在容错性和性能上相当于Paxos。不同之处在于它被分
解成相对独立的子问题,并且清晰地处理了实际系统所需的所有主要部分

  1. Raft属于算法,而非协议
  2. 共识描述的是算法过程,而一致性描述的多副本的读取结果

Raft算法应用场景

CAP和BASE

CAP理论

CAP 也就是 Consistency(一致性)、Availability(可用性)、Partition
Tolerance(分区容错性) 这三个单词首字母组合。CAP 定理指出对于一个分
布式系统来说,当设计读写操作时,只能能同时满足以下三点中的两个:

  1. 一致性: 指数据在多个副本之间能够保持一致的特性,在分布式系统完成某
    写操作后任何读操作,都应该获取到该写操作写入的那个最新的值。相当于要求
    分布式系统中的各节点时时刻刻保持数据的一致性。
  2. 可用性: 指系统提供的服务必须一直处于可用的状态,每次请求都能获取到
    正确的响应,但是不保证获取的数据为最新数据
  3. 分区容错性: 分布式系统出现网络分区的时候,仍然能够对外提供服务。分
    布式系统中,多个节点之前的网络本来是连通的,但是因为某些故障(比如部
    分节点网络出了问题)某些节点之间不连通了,整个网络就分成了几块区域,
    这就叫网络分区,也就是说部分故障不影响整体使用。

不是所谓的“3 选 2”

当发生网络分区的时候,如果我们要继续服务,那么强一致性和可用性只能2选1

  1. 也就是说当网络分区之后P 是前提,决定了P 之后才有C 和A 的选择。也就
    是说分区容错性我们是必须要实现的
  2. 为了保证一致性(CP),不能访问未同步完成的节点,也就失去了部分可用性;
    为了保证可用性(AP),允许读取所有节点的数据,但是数据可能不一致
  3. 为啥不可能选择 CA 架构呢?举个例子:若系统出现“分区”,系统中的某个
    节点在进行写操作。为了保证 C,必须要禁止其他节点的读写操作,这就和A 发
    生冲突了。如果为了保证 A,其他节点的读写操作正常的话,那就和 C 发生冲
    突了。选择 CP 还是 AP 的关键在于当前的业务场景,没有定论,比如对于需
    要确保强一致性的场景如银行一般会选择保证 CP 。
  4. 在系统发生“分区”的情况下,CAP 理论只能满足 CP 或者 AP。要注意的
    是,这里的前提是系统发生了“分区”
    如果系统没有发生“分区”的话,节点间的网络连接通信正常的话,也就不存在
    P 了。这个时候,我们就可以同时保证 C 和 A 了。

CAP应用

注册中心负责服务地址的注册与查找,相当于目录服务,服务提供者和消费者只
在启动时与注册中心交互,注册中心不转发请求,压力较小。

  1. ZooKeeper 保证的是 CP。 任何时刻对 ZooKeeper 的读请求都能得到一致
    性的结果,但是ZooKeeper 不保证每次请求的可用性比如在Leader 选举过程
    中或者半数以上的机器不可用的时候服务就是不可用的
  2. Eureka 保证的则是 AP。 Eureka 在设计的时候就是优先保证 A(可用性)
    。在 Eureka 中不存在什么 Leader 节点,每个节点都是一样的、平等的。因此
    Eureka 不会像ZooKeeper 那样出现选举过程中或者半数以上的机器不可用的
    时候服务就是不可用的情况。Eureka 保证即使大部分节点挂掉也不会影响正常
    提供服务,只要有一个节点是可用的就行了。只不过这个节点上的数据可能并不
    是最新的
  3. Nacos 不仅支持 CP 也支持 AP

BASE理论

BASE 是 Basically Available(基本可用) 、Soft-state(软状态) 和
Eventually Consistent(最终一致性) 三个短语的缩写。BASE 理论是对
CAP 中一致性 C 和可用性 A 权衡的结果。
BASE 理论的核心思想是即使无法做到强一致性,但每个应用都可以根据自身业
务特点,采用适当的方式来使系统达到最终一致性。也就是牺牲数据的一致性来
满足系统的高可用性,系统中一部分数据不可用或者不一致时,仍需要保持系
统整体“主要可用”。
AP 方案只是在系统发生分区的时候放弃一致性,而不是永远放弃一致性。在分
区故障恢复后,系统应该达到最终一致性。这一点其实就是 BASE 理论延伸的
地方。

  1. 基本可用
    基本可用是指分布式系统在出现不可预知故障的时候,允许损失部分可用性。但是
    这绝不等价于系统不可用。
  • 响应时间上的损失: 正常情况下,处理用户请求需要 0.5s 返回结果,但是由
    于系统出现故障,处理用户请求的时间变为 3 s。
  • 系统功能上的损失:正常情况下,用户可以使用系统的全部功能,但是由于系
    统访问量突然剧增,系统的部分非核心功能无法使用。
  1. 软状态
    软状态指允许系统中的数据存在中间状态(CAP 理论中的数据不一致),并认为
    该中间状态的存在不会影响系统的整体可用性,即允许系统在不同节点的数据副
    本之间进行数据同步的过程存在延时。
  2. 最终一致性
    最终一致性强调的是系统中所有的数据副本,在经过一段时间的同步后,最终能
    够达到一个一致的状态。因此最终一致性的本质是需要系统保证最终数据能够达
    到一致,而不需要实时保证系统数据的强一致性。

分布式一致性的 3 种级别

  1. 强一致性 :系统写入了什么,读出来的就是什么。
  2. 弱一致性 :不一定可以读取到最新写入的值,也不保证多少时间之后读取到
    的数据是最新的,只是会尽量保证某个时刻达到数据一致的状态。
  3. 最终一致性 :弱一致性的升级版,系统会保证在一定时间内达到数据一致的
    状态。

实现最终一致性的具体方式

  1. 读时修复 : 在读取数据时,检测数据的不一致,进行修复。比如 Cassandra
    的 Read Repair 实现,具体来说,在向 Cassandra 系统查询数据的时候,如
    果检测到不同节点 的副本数据不一致,系统就自动修复数据。
  2. 写时修复 : 在写入数据,检测数据的不一致时,进行修复。比如 Cassandra
    的 Hinted Handoff 实现。具体来说,Cassandra 集群的节点之间远程写数据
    的时候,如果写失败 就将数据缓存下来,然后定时重传,修复数据的不一致性。
  3. 异步修复 : 这个是最常用的方式,通过定时对账检测副本数据的一致性,并
    修复

Zoopkeeper

什么是Zoopkeeper

ZooKeeper 是一个开源的分布式协调服务,它的设计目标是将那些复杂且容易出错的
分布式一致性服务封装起来,构成一个高效可靠的原语集,并以一系列简单易用的接
口提供给用户使用

ZooKeeper 特点

  1. 顺序一致性: 从同一客户端发起的事务请求,最终将会严格地按照顺序被应用到
    ZooKeeper 中去。
  2. 原子性: 所有事务请求的处理结果在整个集群中所有机器上的应用情况是一致的
    ,也就是说,要么整个集群中所有的机器都成功应用了某一个事务,要么都没有应用
  3. 单一系统映像 : 无论客户端连到哪一个ZooKeeper 服务器上,其看到的服务端
    数据模型都是一致的
  4. 可靠性: 一旦一次更改请求被应用,更改的结果就会被持久化,直到被下一次更
    改覆盖

ZooKeeper 典型应用场景

  1. 分布式锁 : 通过创建唯一节点获得分布式锁,当获得锁的一方执行完相关代码
    或者是挂掉之后就释放锁
  2. 命名服务 :可以通过ZooKeeper 的顺序节点生成全局唯一 ID
  3. 数据发布/订阅 :通过 Watcher 机制 可以很方便地实现数据发布/订阅。当你将
    数据发布到ZooKeeper 被监听的节点上,其他机器可通过监听ZooKeeper 上节点的
    变化来实现配置的动态更新

有哪些开源项目用到了ZooKeeper

  1. Kafka : ZooKeeper主要为Kafka提供Broker和Topic的注册以及多个Partition
    的负载均衡等功能
  2. Hbase : ZooKeeper 为 Hbase 提供确保整个集群只有一个 Master 以及保存和
    提供 regionserver 状态信息(是否在线)等功能
  3. Hadoop : ZooKeeper 为 Namenode 提供高可用支持

ZooKeeper 重要概念

Data model(数据模型)

ZooKeeper 数据模型采用层次化的多叉树形结构,每个节点上都可以存储数据,这些数
据可以是数字、字符串或者是二级制序列。并且。每个节点还可以拥有N 个子节点,最上
层是根节点以“/”来代表。每个数据节点在ZooKeeper中被称为znode,它是ZooKeeper
中数据的最小单元。并且每个znode 都一个唯一的路径标识。
ZooKeeper 主要是用来协调服务的,而不是用来存储业务数据的,所以不要放比较大的
数据在znode 上,ZooKeeper 给出的上限是每个结点的数据大小最大是1M

znode(数据节点)

介绍了ZooKeeper 树形数据模型之后,我们知道每个数据节点在ZooKeeper 中被称
为znode,它是ZooKeeper 中数据的最小单元。你要存放的数据就放在上面,是你使
用ZooKeeper 过程中经常需要接触到的一个概念。将znode 分为4 大类:

  1. 持久(PERSISTENT)节点 :一旦创建就一直存在即使ZooKeeper 集群宕机,直
    到将其删除
  2. 临时(EPHEMERAL)节点 :临时节点的生命周期是与客户端会话(session)
    绑定的,会话消失则节点消失 。并且临时节点只能做叶子节点,不能创建子节点
  3. 持久顺序(PERSISTENT_SEQUENTIAL)节点 :除了具有持久(PERSISTENT)
    节点的特性之外,子节点的名称还具有顺序性。比如 /node1/app0000000001、
    /node1/app0000000002
  4. 临时顺序(EPHEMERAL_SEQUENTIAL)节点 :除了具备临时(EPHEMERAL)节
    点的特性之外,子节点的名称还具有顺序性

每个znode 由2 部分组成:

  • stat :状态信息
  • data : 节点存放的数据的具体内容

如下所示,我通过get 命令来获取根目录下的dubbo 节点的内容,Stat 类中包
含了一个数据节点的所有状态信息的字段,包括事务 ID-cZxid、节点创建时间
-ctime 和子节点个数-numChildren 等等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[zk: 127.0.0.1:2181(CONNECTED) 6] get /dubbo
# 该数据节点关联的数据内容为空
null
# 下面是该数据节点的一些状态信息,其实就是 Stat 对象的格式化输出
cZxid = 0x2
ctime = Tue Nov 27 11:05:34 CST 2018
mZxid = 0x2
mtime = Tue Nov 27 11:05:34 CST 2018
pZxid = 0x3
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1

版本(version)

stat 中记录了这个znode 的三个相关的版本

  • dataVersion :当前 znode 节点的版本号
  • cversion : 当前 znode 子节点的版本
  • aclVersion : 当前 znode 的 ACL 的版本

ACL(权限控制)

ZooKeeper 采用 ACL(AccessControlLists)策略来进行权限控制,类似于
UNIX 文件系统的权限控制。对于znode 操作的权限,ZooKeeper 提供了以下
5种:

  1. CREATE : 能创建子节点
  2. READ :能获取节点数据和列出其子节点
  3. WRITE : 能设置/更新节点数据
  4. DELETE : 能删除子节点
  5. ADMIN : 能设置节点 ACL 的权限

CREATE 和 DELETE 这两种权限都是针对子节点的权限控制。对于身份认证,提
供了以下几种方式:

  1. world : 默认方式,所有用户都可无条件访问
  2. auth :不使用任何 id,代表任何已认证的用户
  3. digest :用户名:密码认证方式: username:password
  4. ip : 对指定 ip 进行限制

Watcher(事件监听器)

Watcher(事件监听器),是ZooKeeper 中的一个很重要的特性。ZooKeeper
允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,
ZooKeeper服务端会将事件通知到感兴趣的客户端上去,该机制是ZooKeeper
实现分布式协调服务的重要特性

会话(Session)

Session 可以看作是ZooKeeper 服务器与客户端的之间的一个TCP 长连接,
通过这个连接,客户端能够通过心跳检测与服务器保持有效的会话,也能够向
ZooKeeper 服务器发送请求并接受响应,同时还能够通过该连接接收来自服
务器的Watcher 事件通知。
Session 有一个属性叫做:sessionTimeout ,sessionTimeout 代表会话的
超时时间。当由于服务器压力太大、网络故障或是客户端主动断开连接等各种原
因导致客户端连接断开时,只要在sessionTimeout 规定的时间内能够重新连
接上集群中任意一台服务器,那么之前创建的会话仍然有效。
另外在为客户端创建会话之前,服务端首先会为每个客户端都分配一个sessionID。
由于sessionID是ZooKeeper 会话的一个重要标识,许多与会话相关的运行机制都
是基于这个sessionID 的,因此无论是哪台服务器为客户端分配的 sessionID,
都务必保证全局唯一

ZooKeeper 集群

为了保证高可用,最好是以集群形态来部署ZooKeeper,这样只要集群中大部分
机器是可用的(能够容忍一定的机器故障),那么ZooKeeper 本身仍然是可用
的。通常3 台服务器就可以构成一个 ZooKeeper 集群了。ZooKeeper 官方提
供的架构图就是一个ZooKeeper 集群整体对外提供服务。集群间通 ZAB 协议
(ZooKeeper Atomic Broadcast)来保持数据的一致性。
最典型集群模式: Master/Slave 模式(主备模式)。在这种模式中,通常
Master 服务器作为主服务器提供写服务,其他的Slave 服务器从服务器通
过异步复制的方式获取Master 服务器最新的数据提供读服务。

ZooKeeper 集群角色

但是在ZooKeeper 中没有选择传统的Master/Slave 概念,而是引入了Leader
、Follower 和Observer 三种角色。
ZooKeeper集群中的所有机器通过一个Leader选举过程来选定一台称为“Leader”
的机器,Leader 既可以为客户端提供写服务又能提供读服务。除了Leader 外,
Follower 和Observer 都只能提供读服务。Follower 和 Observer 唯一的区
别在于Observer 机器不参与Leader 的选举过程,也不参与写操作的“过半写
成功”策略,因此Observer 机器可以在不影响写性能的情况下提升集群的读
性能。

  1. Leader 为客户端提供读和写的服务,负责投票的发起和决议,更新系统状态
  2. Follower 为客户端提供读服务,如果是写服务则转发给 Leader。在选举过
    程中参与投票
  3. Observer 为客户端提供读服务器,如果是写服务则转发给 Leader。不参与
    选举过程中的投票,也不参与“过半写成功”策略。在不影响写性能的情况下提升
    集群的读性能。此角色于ZooKeeper3.3 系列新增的角色

当Leader 服务器出现网络中断、崩溃退出与重启等异常情况时,就会进入Leader
选举过程,这个过程会选举产生新的Leader 服务器。

  1. Leader election(选举阶段):节点在一开始都处于选举阶段,只要有一个
    节点得到超半数节点的票数,它就可以当选准leader
  2. Discovery(发现阶段) :在这个阶段,followers 跟准 leader 进行通信
    ,同步 followers 最近接收的事务提议
  3. Synchronization(同步阶段) :同步阶段主要是利用leader 前一阶段获得
    的最新提议历史,同步集群中所有的副本。同步完成之后准leader 才会成为真正
    的leader
  4. Broadcast(广播阶段) :到了这个阶段,ZooKeeper 集群才能正式对外提
    供事务服务,并且 leader 可以进行消息广播。同时如果有新的节点加入,还需
    要对新节点进行同步

ZooKeeper 集群中的服务器状态

  1. LOOKING :寻找 Leader
  2. LEADING :Leader 状态,对应的节点为 Leader
  3. FOLLOWING :Follower 状态,对应的节点为 Follower
  4. OBSERVING :Observer 状态,对应节点为Observer,该节点不参与Leader
    选举

ZooKeeper 集群为啥最好奇数台

ZooKeeper 集群在宕掉几个ZooKeeper 服务器之后,如果剩下的ZooKeeper 服务
器个数大于宕掉的个数的话整个ZooKeeper 才依然可用。假如我们的集群中有n 台
ZooKeeper 服务器,那么也就是剩下的服务数必须大于n/2。先说一下结论,2n和
2n-1 的容忍度是一样的,都是n-1,大家可以先自己仔细想一想,这应该是一个
很简单的数学问题了。 比如假如我们有3 台,那么最大允许宕掉1台ZooKeeper
服务器,如果我们有4 台的的时候也同样只允许宕掉1 台。 假如我们有 5 台,
那么最大允许宕掉2 台 ZooKeeper 服务器,如果我们有6 台的的时候也同样只
允许宕掉2 台

ZAB 协议和Paxos 算法

Paxos 算法应该可以说是ZooKeeper 的灵魂了。但是ZooKeeper 并没有完全采
用 Paxos算法 ,而是使用ZAB 协议作为其保证数据一致性的核心算法。另外在
ZooKeeper的官方文档中也指出,ZAB协议并不像 Paxos 算法那样,是一种通
用的分布式一致性算法,它是一种特别为Zookeeper设计的崩溃可恢复的原子
消息广播算法

ZAB 协议介绍

ZAB(ZooKeeper Atomic Broadcast 原子广播) 协议是为分布式协调服务
ZooKeeper 专门设计的一种支持崩溃恢复的原子广播协议。 在ZooKeeper中
,主要依赖ZAB 协议来实现分布式数据一致性,基于该协议,ZooKeeper 实
现了一种主备模式的系统架构来保持集群中各个副本之间的数据一致性

ZAB 协议两种基本的模式:崩溃恢复和消息广播

ZAB 协议包括两种基本的模式,分别是

  1. 崩溃恢复 :当整个服务框架在启动过程中,或是当Leader服务器出现网络中
    断、崩溃退出与重启等异常情况时,ZAB 协议就会进入恢复模式并选举产生新的
    Leader 服务器。当选举产生了新的 Leader 服务器,同时集群中已经有过半的
    机器与该Leader服务器完成了状态同步之后,ZAB 协议就会退出恢复模式。其中
    ,所谓的状态同步是指数据同步,用来保证集群中存在过半的机器能够和Leader
    服务器的数据状态保持一致。
  2. 消息广播 :当集群中已经有过半的Follower服务器完成了和Leader服务器
    的状态同步,那么整个服务框架就可以进入消息广播模式了。 当一台同样遵守
    ZAB协议的服务器启动后加入到集群中时,如果此时集群中已经存在一个Leader
    服务器在负责进行消息广播,那么新加入的服务器就会自觉地进入数据恢复模式
    :找到Leader所在的服务器,并与其进行数据同步,然后一起参与到消息广播
    流程中去

总结

  1. ZooKeeper 本身就是一个分布式程序(只要半数以上节点存活,ZooKeeper
    就能正常服务)
  2. 为了保证高可用,最好是以集群形态来部署ZooKeeper,这样只要集群中大部
    分机器是可用的(能够容忍一定的机器故障),那么 ZooKeeper 本身仍然是可
    用的
  3. ZooKeeper 将数据保存在内存中,这也就保证了高吞吐量和低延迟(但是内
    存限制了能够存储的容量不太大,此限制也是保持 znode 中存储的数据量较小
    的进一步原因)
  4. ZooKeeper 是高性能的。 在“读”多于“写”的应用程序中尤其地明显,因为“
    写”会导致所有的服务器间同步状态。(“读”多于“写”是协调服务的典型场景。)
  5. ZooKeeper 有临时节点的概念。 当创建临时节点的客户端会话一直保持活动
    ,瞬时节点就一直存在。而当会话终结时,瞬时节点被删除。持久节点是指一旦这
    个znode 被创建了,除非主动进行 znode 的移除操作,否则这个 znode 将一
    直保存在 ZooKeeper 上
  6. ZooKeeper 底层其实只提供了两个功能:① 管理(存储、读取)用户程序提
    交的数据;② 为用户程序提供数据节点监听服务

一致性问题

设计一个分布式系统必定会遇到一个问题—— 因为分区容忍性的存在,就必定要求
我们需要在系统可用性和数据一致性中做出权衡。这就是著名的CAP 定理。

一致性协议和算法

而为了解决数据一致性问题,在科学家和程序员的不断探索中,就出现了很多的一
致性协议和算法。比如2PC(两阶段提交),3PC(三阶段提交),Paxos算法等等。

2PC(两阶段提交)

两阶段提交是一种保证分布式系统数据一致性的协议,现在很多数据库都是采用的
两阶段提交协议来完成分布式事务的处理。我们所需要解决的是在分布式系统中,
整个调用链中,我们所有服务的数据处理要么都成功要么都失败,即所有服务的
原子性问题。
在两阶段提交中,主要涉及到两个角色,分别是协调者和参与者。

  1. 第一阶段:当要执行一个分布式事务的时候,事务发起者首先向协调者发起事
    务请求,然后协调者会给所有参与者发送prepare 请求(其中包括事务内容)告
    诉参与者你们需要执行事务了,如果能执行我发的事务内容那么就先执行但不提
    交,执行后请给我回复。然后参与者收到prepare 消息后,他们会开始执行事务
    (但不提交),并将 Undo 和 Redo 信息记入事务日志中,之后参与者就向协
    调者反馈是否准备好了
  2. 第二阶段:第二阶段主要是协调者根据参与者反馈的情况来决定接下来是否可以
    进行事务的提交操作,即提交事务或者回滚事务。比如这个时候所有的参与者都返回
    了准备好了的消息,这个时候就进行事务的提交,协调者此时会给所有的参与者发
    送Commit 请求,当参与者收到Commit 请求的时候会执行前面执行的事务的提交
    操作,提交完毕之后将给协调者发送提交成功的响应。
    而如果在第一阶段并不是所有参与者都返回了准备好了的消息,那么此时协调者将
    会给所有参与者发送回滚事务的rollback 请求,参与者收到之后将会回滚它在第
    一阶段所做的事务处理,然后再将处理情况返回给协调者,最终协调者收到响应后
    便给事务发起者返回处理失败的结果

事实上它只解决了各个事务的原子性问题,随之也带来了很多的问题。

  1. 单点故障问题,如果协调者挂了那么整个系统都处于不可用的状态了
  2. 阻塞问题,即当协调者发送 prepare 请求,参与者收到之后如果能处理那么它
    将会进行事务的处理但并不提交,这个时候会一直占用着资源不释放,如果此时协
    调者挂了,那么这些资源都不会再释放了,这会极大影响性能
  3. 数据不一致问题,比如当第二阶段,协调者只发送了一部分的 commit 请求就
    挂了,那么也就意味着,收到消息的参与者会进行事务的提交,而后面没收到的则
    不会进行事务提交,那么这时候就会产生数据不一致性问题

3PC(三阶段提交)

因为2PC存在的一系列问题,比如单点,容错机制缺陷等等,从而产生了 3PC(三
阶段提交)

  1. CanCommit阶段:协调者向所有参与者发送CanCommit 请求,参与者收到请求后
    会根据自身情况查看是否能执行事务,如果可以则返回YES响应并进入预备状态,否
    则返回NO
  2. PreCommit阶段:协调者根据参与者返回的响应来决定是否可以进行下面的 PreCommit 操作。如果上面参与者返回的都是 YES,那么协调者将向所有参与者发送 PreCommit 预提交请求,参与者收到预提交请求后,会进行事务的执行操作,并将 Undo 和 Redo 信息写入事务日志中 ,最后如果参与者顺利执行了事务则给协调者返回成功的响应。如果在第一阶段协调者收到了 任何一个 NO 的信息,或者 在一定时间内 并没有收到全部的参与者的响应,那么就会中断事务,它会向所有参与者发送中断请求(abort),参与者收到中断请求之后会立即中断事务,或者在一定时间内没有收到协调者的请求,它也会中断事务。
  3. DoCommit阶段:这个阶段其实和 2PC 的第二阶段差不多,如果协调者收到了所有参与者在 PreCommit 阶段的 YES 响应,那么协调者将会给所有参与者发送 DoCommit 请求,参与者收到 DoCommit 请求后则会进行事务的提交工作,完成后则会给协调者返回响应,协调者收到所有参与者返回的事务提交成功的响应之后则完成事务。若协调者在 PreCommit 阶段 收到了任何一个 NO 或者在一定时间内没有收到所有参与者的响应 ,那么就会进行中断请求的发送,参与者收到中断请求后则会 通过上面记录的回滚日志 来进行事务的回滚操作,并向协调者反馈回滚状况,协调者收到参与者返回的消息后,中断事务。

Paxos 算法

Raft协议

像 2PC 和 3PC 都需要引入一个协调者的角色,当协调者 down 掉之后,整个
事务都无法提交,参与者的资源都出于锁定的状态,对于系统的影响是灾难性的
,而且出现网络分区的情况,很有可能会出现数据不一致的情况。有没有不需要
协调者角色,每个参与者来协调事务呢,在网络分区的情况下,又能最大程度
保证一致性的解决方案呢。此时 Paxos 出现了。
Paxos 算法是 Lamport 于 1990 年提出的一种基于消息传递的一致性算法。
Paxos 协议是一个解决分布式系统中,多个节点之间就某个值(提案)达成一
致(决议)的通信协议。它能够处理在少数节点离线的情况下,剩余的多数节
点仍然能够达成一致。即每个节点,既是参与者,也是决策者。
分布式系统中的节点通信存在两种模型:共享内存(Shared memory)和消息
传递(Messages passing),基于消息传递通信模型的分布式系统,不可避
免的会发生以下错误:进程可能会慢、被杀死或者重启,消息可能会延迟、丢
失、重复,在基础Paxos场景中,先不考虑可能出现消息篡改,即拜占庭错误
的情况。(网络环境一般为自建内网,消息安全相对高)。
Paxos算法解决的问题是在一个可能发生上述异常的分布式系统中如何就某个
值达成一致,保证不论发生以上任何异常,都不会破坏决议的一致性。
Paxos 协议的角色 主要有三类节点:

  1. 提议者(Proposer):提议一个值;
  2. 接受者(Acceptor):对每个提议进行投票;
  3. 告知者(Learner):被告知投票的结果,不参与投票过程。

过程

规定一个提议包含两个字段:[n, v],其中 n 为序号(具有唯一性),v 为
提议值。

分布式锁

我们在开发应用的时候,如果需要对某一个共享变量进行多线程同步访问的时
候,可以使用Java多线程解决。注意这是单机应用,也就是所有的请求都会分
配到当前服务器的JVM内部,然后映射为操作系统的线程进行处理,而这个共
享变量只是在这个JVM内部的一块内存空间。
后来业务发展需要做集群,一个应用需要部署到几台机器上然后做负载均衡。
多个请求分别操作不同JVM内存区域的数据,变量之间不存在共享,也不具有
可见性,处理的结果也是不对的。
为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行,
在传统单体应用单机部署的情况下,可以使用Java并发处理相关的API进行互
斥控制。由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单
机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁
的能力。使用分布式锁它是控制分布式系统之间互斥访问共享资源的一种方式

分布式锁应该具备的条件

  1. 获取锁和释放锁的性能要好
  2. 判断是否获得锁必须是原子性的,否则可能导致多个请求都获取到锁
  3. 网络中断或宕机无法释放锁时,锁必须被清除,不然会发生死锁
  4. 可重入一个线程中可以多次获取同一把锁,比如一个线程在执行一个带锁
    的方法,该方法中又调用了另一个需要相同锁的方法,则该线程可以直接执
    行调用的方法,而无需重新获得锁;
  5. 阻塞锁和非阻塞锁,阻塞锁即没有获取到锁,则继续等待获取锁;非阻塞
    锁即没有获取到锁后,不继续等待,直接返回锁失败。
  6. 支持锁超时,防止死锁

分布式锁的三种实现方式

  1. 基于数据库实现分布式锁(悲观锁、乐观锁)
  2. 基于缓存(Redis等)实现分布式锁
  3. 基于Zookeeper实现分布式锁

数据库锁

数据库悲观锁

悲观锁的实现依靠数据库提供的锁机制,数据库的行锁、表锁、排他锁等都是悲观
锁。场景比如在秒杀案例中,生成订单和扣减库存的操作,可以通过商品记录的
行锁,进行保护。在查询商品表库存时将该条记录加锁,待下单减库存完成后,
再释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
begin; 

//1.查询出商品信息

select stockCount from seckill_good where id=1 for update;

//2.根据商品信息生成订单

insert into seckill_order (id,good_id) values (null,1);

//3.修改商品stockCount减一

update seckill_good set stockCount=stockCount-1 where id=1;

//4.提交事务

commit;

如果以上修改库存的代码发生并发,同一时间只有一个线程可以开启事务并获得
id=1的锁,其它的事务必须等本次事务提交之后才能执行。这样我们可以保证当
前的数据不会被其它事务修改。悲观锁的流程如下:

  1. 在对记录进行修改前,先尝试为该记录加上排他锁
  2. 如果加锁失败,说明该记录正在被修改,那么当前查询可能要等待或者抛出
    异常。具体响应方式由开发者根据实际需要决定。
  3. 如果成功加锁,那么就可以对记录做修改,事务完成后就会解锁了。
  4. 其间如果有其他事务对该记录做加锁的操作,都要等待当前事务解锁或直接
    抛出异常。

该实现方式完全依靠数据库唯一索引来实现,当想要获得锁时,即向数据库中
插入一条记录,释放锁时就删除这条记录。这种方式存在以下几个问题:

  • 这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导
    致业务系统不可用
  • 这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中
    ,其他线程无法再获得到锁
  • 这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报
    错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得
    锁操作
  • 这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为
    数据中数据已经存在了

数据库乐观锁

主要就是两个步骤:冲突检测和数据更新。其实现方式有一种比较典型的就是CAS
技术。CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只
有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起
,而是被告知这次竞争中失败,并可以再次尝试。
CAS的实现中,在表中增加一个version字段,操作前先查询version信息,在数
据提交时检查version字段是否被修改,如果没有被修改则进行提交,否则认为
是过期数据。

1
2
3
4
5
6
7
8
9
//1.查询出商品信息
select stockCount, version from seckill_good where id=1;

//2.根据商品信息生成订单
insert into seckill_order (id,good_id) values (null,1);

//3.修改商品库存
update seckill_good set stockCount=stockCount-1, version = version+1
where id=1, version=version;

乐观锁的方式,在高并发时,只有一个线程能执行成功,会造成大量的失败,这给
用户的体验显然是很不好的

基于redis实现

  1. 基于Jedis 手工造轮子分布式锁
  2. 介绍Redission 分布式锁的使用和原理

基于Jedis

Redis分布式锁机制,主要借助setnx和expire两个命令完成。

  1. SETNX 是SET if Not eXists的简写。将 key 的值设为 value,当且仅当
    key 不存在; 若给定的 key 已经存在,则 SETNX 不做任何动作
  2. expire命令为 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会
    被自动删除

通过Redis的setnx、expire命令可以实现简单的锁机制:

  1. key不存在时创建,并设置value和过期时间,返回值为1;成功获取到锁;
  2. 如key存在时直接返回0,抢锁失败;
  3. 持有锁的线程释放锁时,手动删除key; 或者过期时间到,key自动删除,锁
    释放。

以上简单redis分布式锁的问题:如果出现了这么一个问题:如果setnx是成功的
,但是expire设置失败,一旦出现了释放锁失败,或者没有手工释放,那么这个
锁永远被占用,其他线程永远也抢不到锁。所以需要保障setnx和expire两个操
作的原子性,要么全部执行,要么全部不执行,二者不能分开。
解决的办法有两种:

  1. 使用set的命令时,同时设置过期时间,不再单独使用 expire命令
  2. 使用lua脚本,将加锁的命令放在lua脚本中原子性的执行
1
2
3
4
5
6
7
set unlock "234" EX 100 NX
set test "111" EX 100 NX
set key value [EX seconds] [PX milliseconds] [NX|XX]
EX seconds:设置失效时长,单位秒
PX milliseconds:设置失效时长,单位毫秒
NX:key不存在时设置value,成功返回OK,失败返回(nil)
XX:key存在时设置value,成功返回OK,失败返回(nil)

加锁的简单代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class JedisCommandLock {
private RedisTemplate redisTemplate;

private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
/**
* 尝试获取分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功
*/
public static boolean tryGetDistributedLock(Jedis jedis, String
lockKey, String requestId, int expireTime) {
//jedis.set(String key, String value, String nxxx, String expx, int time)
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST,
SET_WITH_EXPIRE_TIME, expireTime);

if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
}
  1. 第一个为key,我们使用key来当锁,因为key是唯一的。
  2. 第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作
    为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时
    ,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId
    ,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。
  3. requestId可以使用UUID.randomUUID().toString()方法生成。
  4. 第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,
    即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
  5. 第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过
    期的设置,具体时间由第五个参数决定。
  6. 第五个为time,与第四个参数相呼应,代表key的过期时间。

解锁的简单代码实现:
解锁的过程就是将Key键删除。但也不能乱删,不能说客户端1的请求将客户端2
的锁给删除掉。这时候random_value的作用就体现出来。为了保证解锁操作的
原子性,我们用LUA脚本完成这一操作。先判断当前锁的字符串是否与传入的
值相等,是的话就删除Key,解锁成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class JedisCommandLock {
private static final Long RELEASE_SUCCESS = 1L;
/**
* 释放分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @return 是否释放成功
*/
public static boolean releaseDistributedLock(Jedis jedis, String
lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey),
Collections.singletonList(requestId));

if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}
}
  1. 首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁
    (解锁)
  2. 第一行代码,我们写了一个简单的Lua脚本代码。
  3. 第二行代码,我们将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋
    值为lockKey,ARGV[1]赋值为requestId。eval()方法是将Lua代码交给Redis
    服务端执行。

那么为什么要使用Lua语言来实现呢?因为要确保上述操作是原子性的。那么为什么
执行eval()方法可以确保原子性,源于Redis的特性。简单来说,就是在eval命令
执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行
完成,Redis才会执行其他命令。

最常见的解锁代码就是直接使用 jedis.del() 方法删除锁,这种不先判断锁的拥
有者而直接解锁的方式,会导致任何客户端都可以随时进行解锁,即使这把锁不是
它的。

1
2
3
public static void wrongReleaseLock1(Jedis jedis, String lockKey) {
jedis.del(lockKey);
}

这种解锁代码乍一看也是没问题,唯一区别的是分成两条命令去执行,代码如下:

1
2
3
4
5
6
7
8
public static void wrongReleaseLock2(Jedis jedis, String lockKey, 
String requestId) {
// 判断加锁与解锁是不是同一个客户端
if (requestId.equals(jedis.get(lockKey))) {
// 若在此时,这把锁突然不是这个客户端的,则会误解锁
jedis.del(lockKey);
}
}

基于Lua脚本实现分布式锁

lua脚本是高并发、高性能的必备脚本语言。加锁和删除锁的操作,使用纯lua进行
封装,保障其执行时候的原子性。
加锁的Lua脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
--- -1 failed
--- 1 success
---
local key = KEYS[1]
local requestId = KEYS[2]
local ttl = tonumber(KEYS[3])
local result = redis.call('setnx', key, requestId)
if result == 1 then
--PEXPIRE:以毫秒的形式指定过期时间
redis.call('pexpire', key, ttl)
else
result = -1;
-- 如果value相同,则认为是同一个线程的请求,则认为重入锁
local value = redis.call('get', key)
if (value == requestId) then
result = 1;
redis.call('pexpire', key, ttl)
end
end
-- 如果获取锁成功,则返回 1
return result

解锁的Lua脚本

1
2
3
4
5
6
7
8
9
10
11
12
--- -1 failed
--- 1 success

-- unlock key
local key = KEYS[1]
local requestId = KEYS[2]
local value = redis.call('get', key)
if value == requestId then
redis.call('del', key);
return 1;
end
return -1

在Java中调用lua脚本,完成加锁操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
@Slf4j
@Data
@AllArgsConstructor
public class JedisLock implements Lock {

private RedisTemplate redisTemplate;
RedisScript<Long> lockScript = null;
RedisScript<Long> unLockScript = null;

public static final int DEFAULT_TIMEOUT = 2000;
public static final Long LOCKED = Long.valueOf(1);
public static final Long UNLOCKED = Long.valueOf(1);
public static final Long WAIT_GAT = Long.valueOf(200);
public static final int EXPIRE = 2000;

String key;
String lockValue; // lockValue 锁的value ,代表线程的uuid

/**
* 默认为2000ms
*/
long expire = 2000L;
public JedisLock(String lockKey, String lockValue) {
this.key = lockKey;
this.lockValue = lockValue;
}

private volatile boolean isLocked = false;
private Thread thread;

/**
* 获取一个分布式锁 , 超时则返回失败
*
* @return 获锁成功 - true | 获锁失败 - false
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws
InterruptedException {

//本地可重入
if (isLocked && thread == Thread.currentThread()) {
return true;
}
expire = unit != null ? unit.toMillis(time) : DEFAULT_TIMEOUT;
long startMillis = System.currentTimeMillis();
Long millisToWait = expire;

boolean localLocked = false;

int turn = 1;
while (!localLocked) {

localLocked = this.lockInner(expire);
if (!localLocked) {
millisToWait = millisToWait - (System.currentTimeMillis()
- startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait > 0L) {
/**
* 还没有超时
*/
ThreadUtil.sleepMilliSeconds(WAIT_GAT);
log.info("睡眠一下,重新开始,turn:{},剩余时间:{}",
turn++, millisToWait);
} else {
log.info("抢锁超时");
return false;
}
} else {
isLocked = true;
localLocked = true;
}
}
return isLocked;
}

/**
* 有返回值的抢夺锁
*
* @param millisToWait
*/
public boolean lockInner(Long millisToWait) {
if (null == key) {
return false;
}
try {
List<String> redisKeys = new ArrayList<>();
redisKeys.add(key);
redisKeys.add(lockValue);
redisKeys.add(String.valueOf(millisToWait));
Long res = (Long) redisTemplate.execute(lockScript, redisKeys);

return res != null && res.equals(LOCKED);
} catch (Exception e) {
e.printStackTrace();
throw BusinessException.builder().errMsg("抢锁失败").build();
}

}
}

在Java中调用lua脚本,完成解锁操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Slf4j
@Data
@AllArgsConstructor
public class JedisLock implements Lock {

private RedisTemplate redisTemplate;

RedisScript<Long> lockScript = null;
RedisScript<Long> unLockScript = null;

//释放锁
@Override
public void unlock() {
if (key == null || requestId == null) {
return;
}
try {
List<String> redisKeys = new ArrayList<>();
redisKeys.add(key);
redisKeys.add(requestId);
Long res = (Long) redisTemplate.execute(unLockScript, redisKeys);

} catch (Exception e) {
e.printStackTrace();
throw BusinessException.builder().errMsg("释放锁失败").build();

}
}
}

zookeeper分布式锁

zookeeper是一个为分布式应用提供一致性服务的软件,它内部是一个分层的
文件系统目录树结构,规定同一个目录下只能有一个唯一文件名。
数据模型:

  1. 永久节点:节点创建后,不会因为会话失效而消失
  2. 临时节点:与永久节点相反,如果客户端连接失效,则立即删除节点
  3. 顺序节点:与上述两个节点特性类似,如果指定创建这类节点时,zk会自动
    在节点名后加一个数字后缀,并且是有序的

监视器(watcher):

  1. 当创建一个节点时,可以注册一个该节点的监视器,当节点状态发生改变时
    ,watch被触发时ZooKeeper将会向客户端发送且仅发送一条通知,因为watch
    只能被触发一次

最经典的分布式锁是可重入的公平锁。在重入锁模型中,一把独占锁,可以被多
次锁定,这就叫做可重入锁。

  1. ZooKeeper每一个节点,都是一个天然顺序发号器。在每一个节点下面创建
    临时顺序节点类型,新的子节点后面,会加上一个次序编号,而这个生成的次
    序编号,是上一个生成的次序编号加一。例如有一个用于发号的节点 “/test/
    lock”为父亲节点,可以在这个父节点下面创建相同前缀的临时顺序子节点,
    假定相同的前缀为 “/test/lock/seq-”。第一个创建的子节点基本上应该为
    /test/lock/seq-0000000000,下一个节点/test/lock/seq-0000000001
    ,依次类推。
  2. ZooKeeper节点的递增有序性,可以确保锁的公平
    一个ZooKeeper分布式锁,首先需要创建一个父节点,尽量是持久节点,然后
    每个要获得锁的线程,都在这个节点下创建个临时顺序节点。由于ZK节点,是
    按照创建的次序,依次递增的。为了确保公平,可以简单的规定:编号最小的
    那个节点,表示获得了锁。所以,每个线程在尝试占用锁之前,首先判断自己
    是排号是不是当前最小,如果是则获取锁。
  3. ZooKeeper的节点监听机制,可以保障占有锁的传递有序而且高效
    每个线程抢占锁之前,先尝试创建自己的ZNode。同样释放锁的时候,就需要删
    除创建的Znode。创建成功后,如果不是排号最小的节点,就处于等待通知的状
    态。等谁的通知呢?不需要其他人,只需要等前一个Znode的通知就可以了。前
    一个Znode删除的时候,会触发Znode事件,当前节点能监听到删除事件,就
    是轮到了自己占有锁的时候。第一个通知第二个、第二个通知第三个,击鼓传
    花似的依次向后。
    ZooKeeper的节点监听机制,能够非常完美地实现这种击鼓传花似的信息传递。
    具体的方法是,每一个等通知的Znode节点,只需要监听(linsten)或者监视
    (watch)排号在自己前面那个,而且紧挨在自己前面的那个节点,就能收到其
    删除事件了。只要上一个节点被删除了,就进行再一次判断,看看自己是不是序
    号最小的那个节点,如果是,自己就获得锁。
    另外ZooKeeper的内部优越的机制,能保证由于网络异常或者其他原因,集群
    中占用锁的客户端失联时,锁能够被有效释放。一旦占用Znode 锁的客户端与
    ZooKeeper 集群服务器失去联系,这个临时Znode也将自动删除。排在它后面
    的那个节点,也能收到删除事件,从而获得锁。正是由于这个原因,在创建取
    号节点的时候,尽量创建临时znode节点
  4. ZooKeeper的节点监听机制,能避免羊群效应
    ZooKeeper这种首尾相接,后面监听前面的方式,可以避免羊群效应。所谓羊群
    效应就是一个节点挂掉,所有节点都去监听,然后做出反应,这样会给服务器
    带来巨大压力,所以有了临时顺序节点,当一个节点挂掉,只有它后面的那一
    个节点才做出反应。

分布式锁的抢占过程

zk里有一把锁,这个锁就是zk上的一个节点。然后呢两个客户端都要来获取这
个锁,假设客户端A抢先一步,对zk发起了加分布式锁的请求,这个加锁请求
是用到了zk 中的一个特殊的概念,叫做“临时顺序节点”。简单来说,就是直
接在”my_lock”这个锁节点下,创建一个顺序节点,这个顺序节点有zk内部
自行维护的一个节点序号。
第一个客户端来搞一个顺序节点,zk内部会给起个名字叫做:xxx-000001。然
后第二个客户端来搞一个顺序节点,zk可能会起个名字叫做:xxx-000002。最
后一个数字都是依次递增的,从1开始逐次递增。zk会维护这个顺序。
客户端A创建完一个顺序节点。还没完,他会查一下”my_lock”这个锁节点下的
所有子节点,并且这些子节点是按照序号排序的,这个时候他大概会拿到这么
一个集合:我就是第一个来创建顺序节点的人,所以我就是第一个尝试加分布
式锁的人。
客户端A都加完锁了,客户端B过来想要加锁了,这个时候他会干一样的事儿:
先是在”my_lock”这个锁节点下创建一个临时顺序节点,客户端B因为是第二
个来创建顺序节点的,所以zk内部会维护序号为”2”。此时第一个是客户端A
创建的那个顺序节点,序号为”01”的那个。所以加锁失败!
加锁失败了以后,客户端B就会通过ZK的API对他的顺序节点的上一个顺序节点
加一个监听器。zk天然就可以实现对某个节点的监听。监听这个节点是否被删除
等变化。
客户端A加锁之后,可能处理了一些代码逻辑,然后就会释放锁,释放锁的过程
其实很简单,就是把自己在zk里创建的那个顺序节点给删除。删除了那个节点之
后,zk会负责通知监听这个节点的监听器,也就是客户端B之前加的那个监听器
,你监听的那个节点被删除了,有人释放了锁。
此时就会通知客户端B重新尝试去获取锁,客户端B判断自己居然是集合中的第一
个顺序节点。

分布式锁的基本实现

实现分布式锁

  1. 创建一个锁目录lock
  2. 希望获得锁的线程A就在lock目录下,创建临时顺序节点
  3. 获取锁目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,
    则说明当前线程顺序号最小,获得锁
  4. 线程B获取所有节点,判断自己不是最小节点,设置监听(watcher)比自
    己次小的节点(只关注比自己次小的节点是为了防止发生“羊群效应”)
  5. 线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是最小
    的节点,获得锁。

首先定义一个锁的接口Lock,仅仅两个抽象方法:一个加锁方法,一个解锁方法

1
2
3
4
5
6
public interface Lock {

boolean lock() throws Exception;

boolean unlock();
}
  1. 一把分布式锁通常使用一个Znode节点表示;如果锁对应的Znode节点不存在
    ,首先创建Znode节点。这里假设为“/test/lock”,代表了一把需要创建的分布
    式锁
  2. 抢占锁的所有客户端,使用锁的Znode节点的子节点列表来表示;如果某个客
    户端需要占用锁,则在“/test/lock”下创建一个临时有序的子节点.这里所有临
    时有序子节点,尽量共用一个有意义的子节点前缀
  3. 客户端创建子节点后,需要进行判断:自己创建的子节点,是否为当前子节点
    列表中序号最小的子节点。如果是,则认为加锁成功;如果不是,则监听前一个
    Znode子节点变更消息,等待前一个节点释放锁。
  4. 一旦队列中的后面的节点,获得前一个子节点变更通知,则开始进行判断,判
    断自己是否为当前子节点列表中序号最小的子节点,如果是,则认为加锁成功;
    如果不是,则持续监听,一直到获得锁。
  5. 获取锁后,开始处理业务流程。完成业务流程后,删除自己的对应的子节点,
    完成释放锁的工作,以方面后继节点能捕获到节点变更通知,获得分布式锁。

lock()方法的实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Slf4j
public class ZkLock implements Lock {
//ZkLock的节点链接
private static final String ZK_PATH = "/test/lock";
private static final String LOCK_PREFIX = ZK_PATH + "/";
private static final long WAIT_TIME = 1000;
//Zk客户端
CuratorFramework client = null;

private String locked_short_path = null;
private String locked_path = null;
private String prior_path = null;
final AtomicInteger lockCount = new AtomicInteger(0);
private Thread thread;

public ZkLock() {
ZKclient.instance.init();
synchronized (ZKclient.instance) {
if (!ZKclient.instance.isNodeExist(ZK_PATH)) {
ZKclient.instance.createNode(ZK_PATH, null);
}
}
client = ZKclient.instance.getClient();
}

@Override
public boolean lock() {
//可重入,确保同一线程,可以重复加锁
synchronized (this) {
if (lockCount.get() == 0) {
thread = Thread.currentThread();
lockCount.incrementAndGet();
} else {
if (!thread.equals(Thread.currentThread())) {
return false;
}
lockCount.incrementAndGet();
return true;
}
}
try {
boolean locked = false;
//首先尝试着去加锁
locked = tryLock();
if (locked) {
return true;
}
//如果加锁失败就去等待
while (!locked) {
await();

//获取等待的子节点列表
List<String> waiters = getWaiters();
//判断,是否加锁成功
if (checkLocked(waiters)) {
locked = true;
}
}
return true;
} catch (Exception e) {
e.printStackTrace();
unlock();
}

return false;
}
}

尝试加锁的tryLock方法是关键,做了两件重要的事情:

  1. 创建临时顺序节点,并且保存自己的节点路径
  2. 判断是否是第一个,如果是第一个,则加锁成功。如果不是,就找到前一
    个Znode节点,并且保存其路径到prior_path
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private boolean tryLock() throws Exception {
//创建临时Znode
locked_path = ZKclient.instance
.createEphemeralSeqNode(LOCK_PREFIX);
//然后获取所有节点
List<String> waiters = getWaiters();

if (null == locked_path) {
throw new Exception("zk error");
}
//取得加锁的排队编号
locked_short_path = getShortPath(locked_path);

//获取等待的子节点列表,判断自己是否第一个
if (checkLocked(waiters)) {
return true;
}

// 判断自己排第几个
int index = Collections.binarySearch(waiters, locked_short_path);
if (index < 0) { // 网络抖动,获取到的子节点列表里可能已经没有自己了
throw new Exception("节点没有找到: " + locked_short_path);
}
//如果自己没有获得锁,则要监听前一个节点
prior_path = ZK_PATH + "/" + waiters.get(index - 1);

return false;
}

private String getShortPath(String locked_path) {

int index = locked_path.lastIndexOf(ZK_PATH + "/");
if (index >= 0) {
index += ZK_PATH.length() + 1;
return index <= locked_path.length() ?
locked_path.substring(index) : "";
}
return null;
}

ZooKeeper分布式锁的优缺点

  1. 优点:ZooKeeper分布式锁(如InterProcessMutex),能有效的解决分布
    式问题,不可重入问题,使用起来也较为简单。
  2. 缺点:ZooKeeper实现的分布式锁,性能并不太高。为啥呢?因为每次在创建
    锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。大家知道,ZK
    中创建和删除节点只能通过Leader服务器来执行,然后Leader服务器还需要将数
    据同不到所有的Follower机器上,这样频繁的网络通信性能的短板是非常突出的。

Netty

Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能
协议服务器和客户端。Netty 基于NIO 的,封装了JDK 的NIO,让我们使用起来
更加方法灵活。特点和优势:

  1. 使用简单:封装了 NIO 的很多细节,使用更简单
  2. 功能强大:预置了多种编解码功能,支持多种主流协议
  3. 定制能力强:可以通过 ChannelHandler 对通信框架进行灵活地扩展
  4. 性能高:通过与其他业界主流的 NIO 框架对比,Netty 的综合性能最优

为什么 Netty 性能高

  1. IO 线程模型:同步非阻塞,用最少的资源做更多的事
  2. 内存零拷贝:尽量减少不必要的内存拷贝,实现了更高效率的传输。
  3. 内存池设计:申请的内存可以重用,主要指直接内存。内部实现是用一颗二
    叉查找树管理内存分配情况
  4. 串行化处理读写:避免使用锁带来的性能开销
  5. 高性能序列化协议:支持 protobuf 等高性能序列化协议

BIO、NIO 和 AIO

  1. BIO:一个连接一个线程,客户端有连接请求时服务器端就需要启动一个
    线程进行处理。线程开销大
  2. 伪异步IO:将请求连接放入线程池,一对多,但线程还是很宝贵的资源
  3. NIO:一个请求一个线程,但客户端发送的连接请求都会注册到多路复用器
    上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理
  4. AIO:一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知
    服务器应用去启动线程进行处理

Netty核心组件

Bytebuf

网络通信最终都是通过字节流进行传输的。 ByteBuf 就是 Netty 提供的一个
字节容器,其内部是一个字节数组。 当我们通过 Netty 传输数据的时候,就
是通过 ByteBuf 进行的。可以将 ByteBuf 看作是 Netty 对 Java NIO 提
供了 ByteBuffer 字节容器的封装和抽象

Bootstrap 和 ServerBootstrap

Bootstrap 是客户端的启动引导类/辅助类。
Bootstrap 通常使用 connet() 方法连接到远程的主机和端口,作为一个
Netty TCP 协议通信中的客户端。另外,Bootstrap 也可以通过 bind()
方法绑定本地的一个端口,作为 UDP 协议通信中的一端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动引导/辅助类:Bootstrap
Bootstrap b = new Bootstrap();
//指定线程模型
b.group(group).
......
// 尝试建立连接
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
// 优雅关闭相关线程组资源
group.shutdownGracefully();
}

ServerBootstrap 客户端的启动引导类/辅助类。
ServerBootstrap通常使用 bind() 方法绑定本地的端口上,然后等待客户
端的连接。
Bootstrap 只需要配置一个线程组— EventLoopGroup ,而 ServerBootstrap
需要配置两个线程组— EventLoopGroup ,一个用于接收连接,一个用于具体的
IO 处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 1.bossGroup 用于接收连接,workerGroup 用于具体的处理
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//2.创建服务端启动引导/辅助类:ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
//3.给引导类配置两大线程组,确定了线程模型
b.group(bossGroup, workerGroup).
......
// 6.绑定端口
ChannelFuture f = b.bind(port).sync();
// 等待连接关闭
f.channel().closeFuture().sync();
} finally {
//7.优雅关闭相关线程组资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

Channel(网络操作抽象类)

Channel 接口是 Netty 对网络操作抽象类。通过Channel 我们可以进行I/O
操作。一旦客户端成功连接服务端,就会新建一个 Channel 同该用户端进行
绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
//  通过 Bootstrap 的 connect 方法连接到服务端
public Channel doConnect(InetSocketAddress inetSocketAddress) {
CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener)
future -> {
if (future.isSuccess()) {
completableFuture.complete(future.channel());
} else {
throw new IllegalStateException();
}
});
return completableFuture.get();
}

比较常用的Channel接口实现类是 :

  1. NioServerSocketChannel(服务端)
  2. NioSocketChannel(客户端)

这两个Channel 可以和 BIO 编程模型中的ServerSocket以及Socket两个
概念对应上

EventLoop(事件循环)

EventLoop 的主要作用实际就是责监听网络事件并调用事件处理器进行相关
I/O 操作(读写)的处理。Channel 为 Netty 网络操作(读写等操作)抽象
类,EventLoop 负责处理注册到其上的Channel 的 I/O 操作,两者配合
进行 I/O 操作。
EventLoopGroup 包含多个 EventLoop(每一个 EventLoop 通常内部包含
一个线程),它管理着所有的 EventLoop 的生命周期。并且,EventLoop
处理的I/O事件都将在它专有的Thread上被处理,即Thread 和EventLoop
属于 1 : 1 的关系,从而保证线程安全。

ChannelHandler 和 ChannelPipeline

ChannelHandler 是消息的具体处理器,主要负责处理客户端/服务端接收和
发送的数据。

1
2
3
4
5
6
7
8
9
10
11
b.group(eventLoopGroup)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new NettyKryoDecoder(
kryoSerializer, RpcResponse.class));
ch.pipeline().addLast(new NettyKryoEncoder(
kryoSerializer, RpcRequest.class));
ch.pipeline().addLast(new KryoClientHandler());
}
});

当Channel 被创建时,它会被自动地分配到它专属的 ChannelPipeline。 一
个Channel包含一个ChannelPipeline。ChannelPipeline为ChannelHandler
的链,一个 pipeline 上可以有多个 ChannelHandler。
我们可以在 ChannelPipeline 上通过 addLast() 方法添加一个或者多个
ChannelHandler (一个数据或者事件可能会被多个 Handler 处理) 。
当一个ChannelHandler处理完之后就将数据交给下一个ChannelHandler。
当ChannelHandler被添加到的ChannelPipeline它得到一个ChannelHandlerContext
,它代表一个 ChannelHandler 和 ChannelPipeline 之间的“绑定”。
ChannelPipeline 通过 ChannelHandlerContext来间接管理 ChannelHandler 。

ChannelFuture(操作执行结果)

Netty 是异步非阻塞的,所有的 I/O 操作都为异步的。

1
2
3
4
5
6
7
8
9
public interface ChannelFuture extends Future<Void> {
Channel channel();

ChannelFuture addListener(GenericFutureListener<? extends
Future<? super Void>> var1);
......

ChannelFuture sync() throws InterruptedException;
}

因此我们不能立刻得到操作是否执行成功,但是可以通过ChannelFuture
接口的 addListener() 方法注册一个 ChannelFutureListener,当操
作执行成功或者失败时,监听就会自动触发返回结果。

1
2
3
4
5
6
7
ChannelFuture f = b.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
System.out.println("连接成功!");
} else {
System.err.println("连接失败!");
}
}).sync();

并且你还可以通过ChannelFuture 的 channel() 方法获取连接相关联的
Channel

1
Channel channel = f.channel();

另外我们还可以通过ChannelFuture 接口的sync()方法让异步的操作编程
同步的

1
2
//bind()是异步的,但是,你可以通过 `sync()`方法将其变为同步。
ChannelFuture f = b.bind(port).sync();

NioEventLoopGroup 默认的构造函数会起多少线程

NioEventLoopGroup 默认的构造函数实际会起的线程数为 CPU核心数*2

Author: 高明
Link: https://skysea-gaoming.github.io/2021/06/19/%E5%88%86%E5%B8%83%E5%BC%8F/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.