Zookeeper-client源码解析

Zookeeper-client源码解析

1. client内几个重要的类

1.1 Zookeeper

Zookeeper实例化出来的对象作为Java程序对接Zookeeper服务的客户端,Java程序通过他的public方法即上述API来使用Zookeeper。Zookeeper类作为client的最外层向用户程序提供API。

1.2 ClientCnxn

为Zookeeper类的成员变量,主要负责接收并转化外部调用Zookeeper对象的API,维护并调度发送线程池和事件线程池,并负责管理client与server间的socket I/O。

1.3 SendThread

SendThread为ClientCnxn类的内部类,在实例化Zookeeper时被实例化出,作为线程类,run方法负责向Zookeeper Server发送心跳维持该client与对应Server的session,调用ClientCnxnSocket向Zookeeper Server发送请求Packet,并对session异常的情况进行处理。

1.4 EventThread

EventThread为ClientCnxn类的内部类,内部维护了waitingEvents队列(Object类型),并提供了queueEvent(WatchedEvent event)queuePacket(Packet packet)两个方法向队列添加元素,当Client收到Server的事件时,如znode节点变更,Client解析后生成WatchedEvent调用queueEvent(WatchedEvent event)添加到队列中,如果Client发送是异步请求时(带有Callback参数),那么在Server返回成功会调用queuePacket(Packet packet)将对应的Packet添加到waitingEvents队列。其run方法消费waitingEvents队列进行处理,如元素为WatchedEvent则触发执行对应的watcher,如元素为Packet则执行对应的Callback。

1.5 ClientCnxnSocket

ClientCnxnSocket为抽象类,其子类为Client与Server的具体的网络通信实现,Zookeeper提供了ClientCnxnSocketNIO与ClientCnxnSocketNetty,分别使用NIO与Netty的实现方式。其对象为SendThread的成员变量,SendThread调用其API与Server通信。具体使用NIO或Netty的实现取决于Client的zookeeper.clientCnxnSocket配置项,默认为ClientCnxnSocketNIO。

1.6 ZKWatchManager

ZKWatchManager是Zookeeper Client用来管理用户通过API提交上来的watcher。提供了watcher的注册、删除、查询等方法。

2. Zookeeper-API调用的内部工作流程与机制

2.1 Client与Server的连接方式

Zookeeper Client与Server在Zookeeper类的对象实例化后即创建与Server的长连接,Server端维护该Client连接的Session。

如上图,除发送Client被调用的API请求之外,Client还会定时向Server发送心跳,保持Server端的Session不超时。同时该Client监控的Znode节点有变化时,Server会主动向Client发送Notification event使Client触发对应的watcher。这点与Hadoop的心跳区别就比较明显了,Hadoop只是slave向master定时发送心跳,master根据心跳间隔和心跳内容来进行处理,并不会主动向slave请求。

2.2 一次API调用后代码的执行流程

2.2.1 同步调用

2.2.2 异步调用

2.2.3 Server触发事件后client的执行流程

3. Watcher的注册、管理与调用

3.1 注册watcher

通过Zookeeper client提供的API作为参数将watcher对象的引用注册在client中,可以注册的API有Zookeeper类的构造方法、getData、exists、getChildren,此外client也提供了removeWatchesAPI将已注册在client中的对应path的watcher删除。

3.1.1 ZKWatchManager类

ZKWatchManager类是Zookeeper的内部类,该类的对象负责维护path与watcher对象引用的关系,提供查询path下所有watcher、删除watcher的功能,内部维护了4种watcher分别为defaultWatcher、dataWatches、existWatches、childWatches,除defaultWatcher外,存储watcher的数据结构均为Map,key为对应watcher的path,value为对应path的watcher的引用Set。

3.1.2 抽象类WatchRegistration

提到ZKWatchManager,不得不提的是抽象类WatchRegistration,改抽象类主要的作用是将实现该抽象类的实现类的对象中的watcher注册到ZKWatchManager的对象中。由于client向server发送的请求是异步的,且注册watcher的过程在server返回之后,所以在封装Packet对象时会将watcher封装成对应的DataWatchRegistration、ExistsWatchRegistration、ChildWatchRegistration对象装入不同API生成的Packet对象中,并加入到pendingQueue中,在收到Server请求的response后,会调用该对象的register方法将其watcher注册到ZKWatchManager的对象中。

3.2 Watcher种类

watcher的种类主要按照调用不同的Zookeeper API的决定的,由Zookeeper类的内部类ZKWatchManager提供watcher的管理功能,client接收到server的event及其他特定的场景会触发不同的watcher执行对应的process方法。

3.2.1 defaultWatcher

在调用Zookeeper类的构造方法中会传一个watcher参数,该watcher即为该client的defaultWatcher。
defaultWatcher的触发条件为EventType为None且session状态有变化时,一般为client与server的连接异常活重新连接、鉴权失败等情况。
此外,Zookeeper API中部分API提供的watcher参数为boolean类型,如果为ture即会将defaultWatcher作为对应watcher进行API调用,会以defaultWatcher生成不同的WatchRegistration,在注册时将defaultWatcher注册到ZKWatchManager不同的map中(dataWatches、existWatches、childWatches)。
我个人理解defaultWatcher主要用来进行clent与server的failover情况,做一些失败重试。

3.2.2 dataWatcher/existWatcher/childrenWatcher

通过不同的API注册的watcher的种类不同,通过getData方法注册的watcher为dataWatcher;exist方法注册的watcher为existWatcher;getChildren方法注册的watcher为childrenWatcher。不同种类的watcher收到对应的节点变更event时会被调用(后面会详细描述事件种类与watcher触发的规则)。三种不同的watcher类型为别按照key为path,value为watcher引用Set的Map中,分别对应dataWatches、existWatches、childWatches。
需要注意的是,session异常时也会触发该种类watcher。
如果session异常是需要清除已注册的watcher,将zookeeper.disableAutoWatchReset该配置项配置成true。

3.3 Watcher属性

3.3.1 Watcher

用户实现抽象方法process来处理接收到WatchedEvent时的处理逻辑。

3.3.2 WatchedEvent

3.4 触发Watcher

3.4.1 notification response

client处理server的消息的主要的代码逻辑为SendThread类中的readResponse方法,该方法反序列化出对应的应答对象并进行对应的处理。client收到server的消息分为几种:

其中主要与业务相关的只有notification-response与API-response。notification-response为节点变更等事物操作后,Zookeeper会主动向与该Server建立连接的client发送消息event,client解析该event后会触发对应path的所有watcher,而API-response则为调用API返回的数据,如调用getData方法后server返回给client对应节点的内容,不会触发watcher。

3.4.2 eventType与watcher的触发关系

针对不同event类型来触发何种watcher的代码在ZKWatchManager类中的materialize方法,可以看到对应的触发逻辑。

3.4.2.1 API与EventType的对应关系
API event For “/path” event For “/path/child”
create(“/path”) EventType.NodeCreated NA
delete(“/path”) EventType.NodeDeleted NA
setData(“/path”) EventType.NodeDataChanged NA
create(“/path/child”) EventType.NodeChildrenChanged EventType.NodeCreated
delete(“/path/child”) EventType.NodeChildrenChanged EventType.NodeDeleted
setData(“/path/child”) NA EventType.NodeDataChanged
3.4.2.2 操作与watcher关系
API “/path” “/path/child”
exists (existWatcher) getData (dataWatcher) getChildren (childrenWatcher) exists (existWatcher) getData (dataWatcher) getChildren (childrenWatcher)
create(“/path”)
delete(“/path”)
setData(“/path”)
create(“/path/child”)
delete(“/path/child”)
setData(“/path/child”)

需要注意的是,网上大多数博客上这个对应关系是错误的,在调用create("/path")创建节点时是无法触发dataWatcher的。
虽然ZKWatchManager中的materialize方法是如下的实现:

但是仔细看下注册watcher的代码实现:

可以发现,除了ExistsWatchRegistration类有重写shouldAddWatch方法外,DataWatchRegistration、ChildWatchRegistration都没有重写该方法,WatchRegistration的shouldAddWatch实现如下:

也就是说,当Server的response返回的是znode节点不存在时,除ExistsWatchRegistration可以正常将existWatcher注册上,其他dataWatcher和childrenWatcher均无法被注册。所以在节点不存在时注册dataWatcher和childrenWatcher会失败。

3.4.3 client调用Watcher的特点

1.在上述代码中可以看到,对应path的watch取出后即被删除,所以存在注册在client里的watcher只被对应事件触发一次,每次收到该节点的时间都会触发,只能自己实现对应的逻辑。
2.由于ZKWatchManager存储watch的结构为Map<String, Set<Watcher>>,所以同一个path对应的存储结构为Set,如果多次调用API将相同对象的引用注册在同一路径下时,Set内也只会存一个引用,也只会被调用一次,所以如果希望多次执行相同watcher时需要注意这一点。

4. 异步API与AsyncCallback

4.1 Zookeeper的异步API

Zookeeper类的每个API都有对应的异步模式,void类型。在调用时只需要实现对应的回调接口,传入业务相关的上下文ctx即可。Zookeeper client会在Server返回该调用后调用传入API时的回调方法,用户在回调方法中实现自己的业务逻辑。

4.2 成功后回调AsyncCallback的流程

如2.2.2图所示,SendThread类中的readResponse方法中解析Server的返回内容并从pendingQueue中取出对应请求的Packet,在调用ClientCnxn类的finishPacket方法,该方法会将Packet中封装的AsyncCallback添加到waitingEvents队列中,EventThread消费waitingEvents队列元素,执行用户提交的回调。

5. Faliover-Event及常规处理逻辑

5.1 与Server连接异常时 client都做了什么

Zookeeper Client与Server大部分出现的异常都为网络连接异常,出现网络连接异常后,SendThread中的run方法有对应的代码不断尝试重新向Server发起连接。

在连接异常后,同样的在run方法中有catch异常的操作,将连接异常catch住后会向waitingQueue中添加EventType为None KeeperState为Disconnected的event,进而触发所有watcher。

5.2 Faliover-Event的种类

5.2.1 Disconnected

连接断开,我在开发机使用iptables禁用端口操作触发了该事件,重新开启端口后client自动重连。具体参见SendThread的run方法会不断重连并且切换server进行重试,这种重试会重置server的session,所以不会触发到Expired事件。

5.2.2 SyncConnected

连接建立,与Server成功建立连接时会触发该事件,参见ClientCnxn类中的onConnected方法。

5.2.3 Expired

session异常,触发条件为connectResponse中的TimeOut小于等于0,Server端的SessionTracker中维护的Session不合法,收到该事件必须重新初始化Zookeeper-Client。可能的场景如下:

Expired触发事件的场景我没有模拟出来,发生的概率也比较低。

5.3 如何处理事件

SyncConnected

针对特有业务来看,主要做一些watcher重新注册的操作。

Expired

这种session异常的情况是要强制重新实例化出一个新的Zookeeper对象,需要在defaultWatcher或其他的watcher中实现对应的逻辑。

6. API

6.1 Zookeeper构造方法

6.1.1 功能

Zookeeper的客户端为Zookeeper类,调用构造方法实例化出来的对象可以提供Zookeeper服务。

6.1.2 API

6.1.3 参数说明

参数名 类型 描述
connectString String 必填项,zookeeper server的地址加端口号
sessionTimeout String session的超时时间,与server建立session时会传给server,当网络异常导致超过这一时长没有收到数据时,恢复链接时client会收到Expire回复,此时需要重新创建zookeeper client。
watcher Watcher zookeeper client的defaultWatcher,通常在client与server端的session状态改变时,也就是对应EventType.None的event,会调用该watcher。此外,调用其他API注册watcher时也可以注册defaultWatcher。
conf ZKClientConfig 自定义的zookeeper client config
canBeReadOnly boolean 是否允许只读模式,当zk server的多数派无法链接,但其中少数派可以连上,那么该client会连上对应少数派的zk server但只会发送读请求,并会不断尝试链接多数派server。
aHostProvider HostProvider 自定义HostProvider,在client与server的链接异常时,client会调用其next方法获得其他server的地址。
sessionId int zookeeper复用其他client session时的账号
sessionPasswd byte[] zookeeper复用其他client session时的密码

6.2 create

6.2.1 功能

创建zookeeper对应路径的znode节点。

6.2.2 API

6.2.3 参数说明

参数名 类型 描述
path String 创建对应节点的路径。
data byte[] znode节点的值。
acl List\<ACL> 该znode节点的权限信息,可以配置对应的账号密码,一般通过zkCli.sh创建时都默认可见,代码中传入 Ids.OPEN_ACL_UNSAFE
createMode CreateMode 改znode节点的生命周期,可以看CreateMode类的注释,如CreateMode.EPHEMERAL表示创建该节点的client与server断开后该节点自动被删除,一般通过zkCli.sh创建的znode为CreateMode.PERSISTENT即创建该节点的client断开也不会被删除。
stat Stat 传入该对象,会获得创建节点成功后对应的Stat信息。
ttl long CreateMode为PERSISTENT_WITH_TTLPERSISTENT_SEQUENTIAL_WITH_TTL需要设置该值,含义为在创建后的ttl时间内没有被更改且没有子节点,则该节点被删除。两者区别为创建该节点的client与server断开后该节点是否被自动被删除。
cb StringCallback 异步接口时使用,实现该接口的实例对象会在server返回znode节点创建的response时被调用,调用方法为public void processResult(int rc, String path, Object ctx, String name); 参数含义分别为请求的结果码,对应创建的节点路径,调用create时传入的业务相关的向下文,节点名称(通常与path一致,除非为sequential node)
cb Create2Callback 异步接口时使用,实现该接口的实例对象会在server返回znode节点创建的response时被调用,调用方法为public void processResult(int rc, String path, Object ctx, String name, Stat stat);前四个参数与StringCallback一致,只是多了一个创建成功后的描述节点状态的对象Stat。
ctx Object 调用异步create时的上下文,任意类型,client在收到server的response后进行调用回调对象时,会把调用时的ctx对象透传到回调方法中,方便用户在回调方法中进行业务处理。参见StringCallback或Create2Callback。

6.3 exists

6.3.1 功能

判断对应路径的znode节点是否存在,并向对应的“路径”注册watcher(可选)。

6.3.2 API

6.3.3 参数说明

参数名 类型 描述
path String 目标znode路径。
watcher Watcher 注册到该path上的watcher。
watch boolean 为true时会将defaultWatcher注册到对应的path上,为false时不注册watcher。
cb StatCallback 异步调用时使用,实现StatCallback接口的对象,在收到server的response后调用该cb对象的方法 public void processResult(int rc, String path, Object ctx, Stat stat); 参数含义分别为server返回码,对应路径,业务应用上下文,节点状态Stat信息。
ctx Object 调用异步exist时的上下文,任意类型,client在收到server的response后进行调用回调对象时,会把调用时的ctx对象透传到回调方法中,方便用户在回调方法中进行业务处理。

6.4 getData

6.4.1 功能

获得对应路径的znode节点的值,并向对应的“节点”注册watcher(可选)。

6.4.2 API

6.4.3 参数说明

参数名 类型 描述
path String 目标znode路径。
watcher Watcher 注册到该znode上的watcher。
watch boolean 为true时会将defaultWatcher注册到对应的path上,为false时不注册watcher。
stat Stat 传参非null时该对象会被填入当前节点信息。
cb DataCallback 异步回调对象,在收到server的response后调用该cb对象的方法,实现接口方法public void processResult(int rc, String path, Object ctx, byte data[], Stat stat);,参数依次为server返回码,对应路径,业务应用上下文,节点状态Stat信息。
ctx Object 调用异步getData时的上下文,任意类型,client在收到server的response后进行调用回调对象时,会把调用时的ctx对象透传到回调方法中,方便用户在回调方法中进行业务处理。

6.4 getChildren

6.4.1 功能

或者path对应zonde的所有子节点的path(list),同时向参数path节点注册watcher(可选)。

6.4.2 API

6.4.3 参数说明

参数名 类型 描述
path String 目标znode路径。
watcher Watcher 注册到该znode上的watcher。
watch boolean 为true时会将defaultWatcher注册到对应的path上,为false时不注册watcher。
stat Stat 传参非null时该对象会被填入当前节点信息。
cb ChildrenCallback 异步回调对象,在收到server的response后调用该cb对象的方法,实现接口方法public void processResult(int rc, String path, Object ctx, List<String> children);,参数依次为server返回码,对应路径,业务应用上下文,所有子节点path list。
cb Children2Callback 异步回调对象,在收到server的response后调用该cb对象的方法,实现接口方法public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat);,参数依次为server返回码,对应路径,业务应用上下文,所有子节点path list,参数path对应节点的znode状态信息Stat对象,Children2Callback与ChildrenCallback的区别是接口方法参数的不同。
ctx Object 调用异步getChildren时的上下文,任意类型,client在收到server的response后进行调用回调对象时,会把调用时的ctx对象透传到回调方法中,方便用户在回调方法中进行业务处理。

6.5 delete

6.5.1 功能

删除zookeeper对应路径的znode节点。

6.5.2 API

6.5.3 参数说明

参数名 类型 描述
path String 目标znode路径。
version int 节点版本,可从Stat对象获取。
cb VoidCallback 异步回调对象,在收到server的response后调用该cb对象的方法,实现接口方法,public void processResult(int rc, String path, Object ctx);参数依次为server返回码,对应路径,业务应用上下文。
ctx Object 调用异步delete时的上下文,任意类型,client在收到server的response后进行调用回调对象时,会把调用时的ctx对象透传到回调方法中,方便用户在回调方法中进行业务处理。

6.6 sync

6.6.1 功能

client发送同步请求给其连接的server,server会向leader同步所有未同步的事物,当请求znode的时效性比较高时,需要先向对应server发sync请求,再对znode访问,所以只提供异步回调的方式。

6.6.2 API

6.6.3 参数说明

参数名 类型 描述
path String 目标znode路径。
cb VoidCallback 异步回调对象,在收到server的response后调用该cb对象的方法,实现接口方法,public void processResult(int rc, String path, Object ctx);参数依次为server返回码,对应路径,业务应用上下文。
ctx Object 调用异步sync时的上下文,任意类型,client在收到server的response后进行调用回调对象时,会把调用时的ctx对象透传到回调方法中,方便用户在回调方法中进行业务处理。