(点击
上方公众号
,可快速关注)
来
源:holynull,
blog.leanote.com/post/holynull/Zookeeper
如有好文章投稿,请点击 → 这里了解详情
实现 Implementation
ZooKeeper服务可以在两种模式下运行。在standalone模式下,我们可以运行一个单独的ZooKeeper服务器,我们可以在这种模式下进行基本功能的简单测试,但是这种模式没有办法体现ZooKeeper的高可用特性和快速恢复特性。在生产环境中,我们一般采用replicated(复制)模式安装在多台服务器上,组建一个叫做ensemble的集群。ZooKeeper在他的副本之间实现高可用性,并且只要ensemble集群中能够推举出主服务器,ZooKeeper的服务就可以一直不终断。例如,在一个5个节点的ensemble中,容忍有2个节点脱离集群,服务还是可用的。因为剩下的3个节点投票,可以产生超过集群半数的投票,来推选一台主服务器。而6个节点的ensemble中,也只能容忍2个节点的服务器死机。因为如果3个节点脱离集群,那么剩下的3个节点无论如何不能产生超过集群半数的投票来推选一个主服务器。所以,一般情况下ensemble中的服务器数量都是奇数。
从概念上来看,ZooKeeper其实是很简单的。他所做的一切就是保证每一次对znode树的修改,都能够复制到ensemble的大多数服务器上。如果非主服务器脱离集群,那么至少有一台服务器上的副本保存了最新状态。剩下的其他的服务器上的副本,会很快更新这个最新的状态。
为了实现这个简单而不平凡的设计思路,ZooKeeper使用了一个叫做Zab的协议。这个协议分为两阶段,并且不断的运行在ZooKeeper上:
-
阶段 1:领导选举(Leader election)Ensemble中的成员通过一个程序来选举出一个首领成员,我们叫做leader。其他的成员就叫做follower。在大多数(quorum)follower完成与leader状态同步时,这个阶段才结束。
-
阶段 2: 原子广播(Atomic broadcast)所有的写入请求都会发送给leader,leader在广播给follower。当大多数的follower已经完成了数据改变,leader才会将更新提交,客户端就会随之得到leader更新成功的消息。协议中的设计也是具有原子性的,所以写入操作只有成功和失败两个结果。
如果leader脱离了集群,剩下的节点将选举一个新的leader。如果之前的leader回到了集群中,那么将被视作一个follower。leader的选举很快,大概200ms就能够产生结果,所以不会影响执行效率。
Ensemble中的所有节点都会在更新内存中的znode树的副本之前,先将更新数据写入到硬盘上。读操作可以请求任何一台ZooKeeper服务器,而且读取速度很快,因为读取是内存中的数据副本。
数据一致性 Consistency
理解了ZooKeeper的实现原理,有助于理解ZooKeeper如何保证数据的一致性。就像字面上理解的“leader”和“follower”的意思一样,在ensemble中follower的update操作会滞后于leader的update完成。事实的结果使我们在提交更新数据之前,不必在每一台ZooKeeper服务器上执行持久化变更数据,而是仅需在主服务器上执行持久化变更数据。ZooKeeper客户端的最佳实践是全部链接到follower上。然而客户端是有可能连接到leader上的,并且客户端控制不了这个选择,甚至客户端并不知道连接到了follower还是leader。下图所示,读操作向follower请求即可,而写操作由leader来提交。
每一个对znode树的更新操作,都会被赋予一个全局唯一的ID,我们称之为zxid(ZooKeeper Transaction ID)。更新操作的ID按照发生的时间顺序升序排序。例如,z1大于z2,那么z1的操作就早于z2操作。
ZooKeeper在数据一致性上实现了如下几个方面:
-
顺序一直性从客户端提交的更新操作是按照先后循序排序的。例如,如果一个客户端将一个znode z赋值为a,然后又将z的值改变成b,那么在这个过程中不会有客户端在z的值变为b后,取到的值是a。
-
原子性更新操作的结果不是失败就是成功。即,如果更新操作失败,其他的客户端是不会知道的。
-
系统视图唯一性无论客户端连接到哪个服务器,都将看见唯一的系统视图。如果客户端在同一个会话中去连接一个新的服务器,那么他所看见的视图的状态不会比之前服务器上看见的更旧。当ensemble中的一个服务器宕机,客户端去尝试连接另外一台服务器时,如果这台服务器的状态旧于之前宕机的服务器,那么服务器将不会接受客户端的连接请求,直到服务器的状态赶上之前宕机的服务器为止。
-
持久性一旦更新操作成功,数据将被持久化到服务器上,并且不能撤销。所以服务器宕机重启,也不会影响数据。
-
时效性系统视图的状态更新的延迟时间是有一个上限的,最多不过几十秒。如果服务器的状态落后于其他服务器太多,ZooKeeper会宁可关闭这个服务器上的服务,强制客户端去连接一个状态更新的服务器。
从执行效率上考虑,读操作的目标是内存中的缓存数据,并且读操作不会参与到写操作的全局排序中。这就会引起客户端在读取ZooKeeper的状态时产生不一致。例如,A客户端将znode z的值由a改变成a′,然后通知客户端B去读取z的值,但是B读取到的值是a,而不是修改后的a′。为了阻止这种情况出现,B在读取z的值之前,需要调用sync方法。sync方法会强制B连接的服务器状态与leader的状态同步,这样B在读取z的值就是A重新更改过的值了。
sync操作只在异步调用时才可用,原因是你不需要等待操作结束再去执行其他的操作。因此,ZooKeeper保证所有的子操作都会在sync结束后再执行,甚至在sync操作之前发出的操作请求也不例外。
会话 Sessions
ZooKeeper的客户端中,配置了一个ensemble服务器列表。当启动时,首先去尝试连接其中一个服务器。如果尝试连接失败,那么会继续尝试连接下一个服务器,直到连接成功或者全部尝试连接失败。
一旦连接成功,服务器就会为客户端创建一个会话(session)。session的过期时间由创建会话的客户端应用来设定,如果在这个时间期间,服务器没有收到客户端的任何请求,那么session将被视为过期,并且这个session不能被重新创建,而创建的ephemeral znode将随着session过期被删除掉。在会话长期存在的情况下,session的过期事件是比较少见的,但是应用程序如何处理好这个事件是很重要的。(我们将在《The Resilient ZooKeeper Application》中详细介绍)
在长时间的空闲情况下,客户端会不断的发送ping请求来保持session。(ZooKeeper的客户端开发工具的liberay实现了自动发送ping请求,所以我们不必去考虑如何维持session)ping请求的间隔被设置成足够短,以便能够及时发现服务器失败(由读操作的超时时长来设置),并且能够及时的在session过期前连接到其他服务器上。
容错连接到其他服务器上,是由ZooKeeper客户端自动完成的。重要的是在连接到其他服务器上后,之前的session以及epemeral节点还保持可用状态。
在容错的过程中,应用将收到与服务断开连接和连接的通知。Watch模式的通知在断开链接时,是不会发送断开连接事件给客户端的,断开连接事件是在重新连接成功后发送给客户端的。如果在重新连接到其他节点时,应用尝试一个操作,这个操作是一定会失败的。对于这一点的处理,是一个ZooKeeper应用的重点。(我们将在《The Resilient ZooKeeper Application》中讲述)
时间 Time
在ZooKeeper中有一些时间的参数。tick是ZooKeeper的基础时间单位,用来定义ensemble中服务器上运行的程序的时间表。其他时间相关的配置都是以tick为单位的,或者以tick的值为最大值或者最小值。例如,session的过期时间在2 ticks到20 ticks之间,那么你再设置时选择的session过期时间必须在2和20之间的一个数。
通常情况1 tick等于2秒。那么就是说session的过期时间的设置范围在4秒到40秒之间。在session过期时间的设置上有一些考虑。过期时间太短会造成加快物理失败的监测频率。在组成员关系的例子中,session的过期时间与从组中移除失败的成员花费的时间相等。如果设置过低的session过期时间,那么网络延迟就有可能造成非预期的session过期。这种情况下,就会出现在短时间内一台机器不断的离开组,然后又从新加入组中。
如果应用需要创建比较复杂的临时状态,那么就需要较长的session过期时间,因为重构花费的时间比较长。有一些情况下,需要在session的生命周期内重启,而且要保证重启完后session不过期(例如,应用维护和升级的情况)。服务器会给每一个session一个ID和密码,如果在连接创建时,ZooKeeper验证通过,那么session将被恢复使用(只要session没过期就行)。所以应用程序可以实现一个优雅的关机动作,在重启之前,将session的ID和密码存储在一个稳定的地方。重启之后,通过ID和密码恢复session。
这仅仅是在一些特殊的情况下,我们需要使用这个特性来使用比较长的session过期时间。大多数情况下,我们还是要考虑当出现非预期的异常失败时,如何处理session过期,或者仅需要优雅的关闭应用,在session过期前不用重启应用。
通常情况也越大规模的ensemble,就需要越长的session过期时间。Connetction Timeout、Read Timeout和Ping Periods都由一个以服务器数量为参数的函数计算得到,当ensemble的规模扩大,这些值需要逐渐减小。如果为了解决经常失去连接而需要增加timeout的时长,建议你先监控一下ZooKeeper的metrics,再去调整。
状态 States
ZooKeeper对象在他的生命周期内会有不同的状态,我们通过getState()来获得当前的状态。
public States getState()
状态是一个枚举类型的数据。新构建的ZooKeeper对象在尝试连接ZooKeeper服务时的状态是CONNECTING,一旦与服务建立了连接那么状态就变成了CONNECTED。
客户端可以通过注册一个观察者对象来接收ZooKeeper对象状态的迁移。当通过CONNECTED状态后,观察者将接收到一个WatchedEvent事件,他的属性KeeperState的值是SyncConnected。
ZooKeeper实例会与服务连接断开或者重新连接,状态会在CONNECTING和CONNECTED之间转换。如果连接断开,watcher会收到一个断开连接事件。请注意,这两个状态都是ZooKeeper实例自己初始化的,并且在断开连接后会自动进行重连接。
如果调用了close()或者session过期,ZooKeeper实例会转换为第三个状态CLOSED,此时在接受事件的KeeperState属性值为Expired。一旦ZooKeeper的状态变为CLOSED,说明实例已经不可用(可以通过isAlive()来判断),并且不能再被使用。如果要重新建立连接,就需要重新构建一个ZooKeeper实例。
ZooKeeper应用程序 Building Applications with ZooKeeper
在对ZooKeeper有了一个深入的了解以后,我们来看一下用ZooKeeper可以实现哪些应用。
配置服务 Configuration Service
一个基本的ZooKeeper实现的服务就是“配置服务”,集群中的服务器可以通过ZooKeeper共享一个通用的配置数据。从表面上,ZooKeeper可以理解为一个配置数据的高可用存储服务,为应用提供检索和更新配置数据服务。我们可以使用ZooKeeper的观察模式实现一个活动的配置服务,当配置数据发生变化时,可以通知与配置相关客户端。
接下来,我们来实现一个这样的活动配置服务。首先,我们设计用znode来存储key-value对,我们在znode中存储一个String类型的数据作为value,用znode的path来表示key。然后,我们实现一个client,这个client可以在任何时候对数据进行跟新操作。那么这个设计的ZooKeeper数据模型应该是:master来更新数据,其他的worker也随之将数据更新,就像HDFS的namenode那样。
我们在一个叫做ActiveKeyValueStore的类中编写代码如下:
public class ActiveKeyValueStore extends ConnectionWatcher {
private static final Charset CHARSET = Charset.forName("UTF-8");
public void write(String path, String value) throws InterruptedException,
KeeperException {
Stat stat = zk.exists(path, false);
if (stat == null) {
zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} else {
zk.setData(path, value.getBytes(CHARSET), -1);
}
}
}
write()方法主要实现将给定的key-value对写入到ZooKeeper中。这其中隐含了创建一个新的znode和更新一个已存在的znode的实现方法的不同。那么操作之前,我们需要根据exists()来判断znode是否存在,然后再根据情况进行相关的操作。其他值得一提的就是String类型的数据在转换成byte[]时,使用的字符集是UTF-8。
我们为了说明ActiveKeyValueStore怎么使用,我们考虑实现一个ConfigUpdater类来实现更新配置。下面代码实现了一个在一些随机时刻更新配置数据的应用。
public class ConfigUpdater {
public static final String PATH = "/config";
private ActiveKeyValueStore store;
private Random random = new Random();
public ConfigUpdater(String hosts) throws IOException, InterruptedException {
store = new ActiveKeyValueStore();
store.connect(hosts);
}
public void run() throws InterruptedException, KeeperException {
while (true) {
String value = random.nextInt(100) + "";
store.write(PATH, value);
System.out.printf("Set %s to %s\n", PATH, value);
TimeUnit.SECONDS.sleep(random.nextInt(10));
}
}
public static void main(String[] args) throws Exception {
ConfigUpdater configUpdater = new ConfigUpdater(args[0]);
configUpdater.run();
}
}
上面的代码很简单。在ConfigUpdater的构造函数中,ActiveKeyValueStore对象连接到ZooKeeper服务。然后run()不断的循环运行,使用一个随机数不断的随机更新/configznode上的值。
下面我们来看一下,如何读取/config上的值。首先,我们在ActiveKeyValueStore中实现一个读方法。
public String read(String path, Watcher watcher) throws InterruptedException,
KeeperException {
byte[] data = zk.getData(path, watcher, null/*stat*/);
return new String(data, CHARSET);
}
ZooKeeper的getData()方法的参数包含:path,一个Watcher对象和一个Stat对象。Stat对象中含有从getData()返回的值,并且负责接收回调信息。这种方式下,调用者不仅可以获得数据,还能够获得znode的metadata。
做为服务的consumer,ConfigWatcher以观察者身份,创建一个ActiveKeyValueStore对象,并且在启动以后调用read()函数(在dispalayConfig()函数中)获得相关数据。
下面的代码实现了一个以观察模式获得ZooKeeper中的数据更新的应用,并将值到后台中。
public class ConfigWatcher implements Watcher {
private ActiveKeyValueStore store;
public ConfigWatcher(String hosts) throws IOException, InterruptedException {
store = new ActiveKeyValueStore();
store.connect(hosts);
}
public void displayConfig() throws InterruptedException, KeeperException {
String value = store.read(ConfigUpdater.PATH, this);
System.out.printf("Read %s as %s\n", ConfigUpdater.PATH, value);
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeDataChanged) {
try {
displayConfig();
} catch (InterruptedException e) {
System.err.println("Interrupted. Exiting.");
Thread.currentThread().interrupt();
} catch (KeeperException e) {
System.err.printf("KeeperException: %s. Exiting.\n", e);
}
}
}
public static void main(String[] args) throws Exception {
ConfigWatcher configWatcher = new ConfigWatcher(args[0]);
configWatcher.displayConfig();
// stay alive until process is killed or thread is interrupted
Thread.sleep(Long.MAX_VALUE);
}
}
当ConfigUpadater更新znode时,ZooKeeper将触发一个EventType.NodeDataChanged的事件给观察者。ConfigWatcher将在他的process()函数中获得这个时间,并将显示读取到的最新的版本的配置数据。
由于观察模式的触发是一次性的,所以每次都要调用ActiveKeyValueStore的read()方法,这样才能获得未来的更新数据。我们不能确保一定能够接受到更新通知事件,因为在接受观察事件和下一次读取之间的窗口期内,znode可能被改变了(有可能很多次),但是client可能没有注册观察模式,所以client不会接到znode改变的通知。在配置服务中这不是一个什么问题,因为client只关心配置数据的最新版本。然而,建议读者关注一下这个潜在的问题。
让我们来看一下控制台打印的ConfigUpdater运行结果:
% java ConfigUpdater localhost
Set /config to 79
Set /config to 14
Set /config to 78
然后立即在另外的控制台终端窗口中运行ConfigWatcher:
% java ConfigWatcher localhost
Read /config as 79
Read /config as 14
Read /config as 78
坚韧的ZooKeeper应用 The Resilient ZooKeeper Application
分布式计算设计的第一谬误就是认为“网络是稳定的”。我们所实现的程序目前都是假设网络稳定的情况下实现的,所以当我们在一个真实的网络环境下,会有很多原因可以使程序执行失败。下面我们将阐述一些可能造成失败的场景,并且讲述如何正确的处理这些失败,让我们的程序在面对这些异常时更具韧性。
在ZooKeeper的API中,每一个ZooKeeper的操作都会声明抛出连个异常:InterruptedException和KeeperException。
InterrupedException
当一个操作被中断时,会抛出一个InterruptedException。在JAVA中有一个标准的阻塞机制用来取消程序的执行,就是在需要阻塞的的地方调用interrupt()。如果取消执行成功,会以抛出一个InterruptedException作为结果。ZooKeeper坚持了这个标准,所以我们可以用这种方式来取消client的对ZooKeeper的操作。用到ZooKeeper的类和库需要向上抛出InterruptedException,才能使我们的client实现取消操作。
InterruptedException并不意味着程序执行失败,可能是人为设计中断的,所以在上面配置应用的例子中,当向上抛出InterruptedException时,会引起应用终止。
KeeperException
当ZooKeeper服务器出现错误信号,或者出现了通信方面的问题,就会抛出一个KeeperException。由于错误的不同原因,所以KeeperException有很多子类。例如,KeeperException.NoNodeException当操作一个znode时,而这个znode并不存在,就会抛出这个异常。
每一个之类都有一个异常码作为异常的类型。例如,KeeperException.NoNodeException的异常码就是KeeperException.Code.NONODE(一个枚举值)。
有两种方法来处理KeeperException。一种是直接捕获KeeperException,然后根据异常码进行不同类型异常处理。另一种是捕获具体的子类,然后根据不同类型的异常进行处理。
KeeperException包含了3大类异常。
状态异常 State Exception
当无法操作znode树造成操作失败时,会产生状态异常。通常引起状态异常的原因是有另外的程序在同时改变znode。例如,一个setData()操作时,会抛出KeeperException.BadVersionException。因为另外的一个程序已经在setData()操作之前修改了znode,造成setData()操作时版本号不匹配了。程序员必须了解,这种情况是很有可能发生的,我们必须靠编写处理这种异常的代码来解决他。
有的一些异常是编写代码时的疏忽造成的,例如KeeperException.NoChildrenForEphemeralsException。这个异常是当我们给一个enphemeral类型的znode添加子节点时抛出的。
重新获取异常 Recoverable Exception
重新获取异常来至于那些能够获得同一个ZooKeeper session的应用。伴随的表现是抛出KeeperException.ConnectionLossException,表示与ZooKeeper的连接丢失。ZooKeeper将会尝试重新连接,大多数情况下重新连接都会成功并且能够保证session的完整性。
然而,ZooKeeper无法通知客户端操作由于KeeperException.ConnectionLossException而失败。这就是一个部分失败的例子。只能依靠程序员编写代码来处理这个不确定性。
在这点上,幂等操作和非幂等操作的差别就会变得非常有用了。一个幂等操作是指无论运行一次还是多次结果都是一样的,例如一个读请求,或者一个不设置任何值得setData操作。这些操作可以不断的重试。
一个非幂等操作不能被不分青红皂白的不停尝试执行,就像一些操作执行一次的效率和执行多次的效率是不同。我们将在之后会讨论如何利用非幂等操作来处理Recovreable Exception。
不能重新获取异常 Unrecoverable exceptions
在一些情况下,ZooKeeper的session可能会变成不可用的——比如session过期,或者因为某些原因session被close掉(都会抛出KeeperException.SessionExpiredException),或者鉴权失败(KeeperException.AuthFailedException)。无论何种情况,ephemeral类型的znode上关联的session都会丢失,所以应用在重新连接到ZooKeeper之前都需要重新构建他的状态。
一个稳定的配置服务 A reliable configuration service
回过头来看一下ActiveKeyValueStore中的write()方法,其中调用了exists()方法来判断znode是否存在,然后决定是创建一个znode还是调用setData来更新数据。
public void write(String path, String value) throws InterruptedException,
KeeperException {
Stat stat = zk.exists(path, false);
if (stat == null) {
zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} else {
zk.setData(path, value.getBytes(CHARSET), -1);
}
}
从整体上来看,write()方法是一个幂等方法,所以我们可以不断的尝试执行它。我们来修改一个新版本的write()方法,实现在循环中不断的尝试write操作。我们为尝试操作设置了一个最大尝试次数参数(MAX_RETRIES)和每次尝试间隔的休眠(RETRY_PERIOD_SECONDS)时长:
public void write(String path, String value) throws InterruptedException,
KeeperException {
int retries = 0;
while (true) {
try {
Stat stat = zk.exists(path, false);
if (stat == null) {
zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} else {
zk.setData(path, value.getBytes(CHARSET), stat.getVersion());
}
return;
} catch (KeeperException.SessionExpiredException e) {
throw e;
} catch (KeeperException e) {
if (retries++ == MAX_RETRIES) {
throw e;
}
// sleep then retry
TimeUnit.SECONDS.sleep(RETRY_PERIOD_SECONDS);
}
}
}
细心的读者可能会发现我们并没有在捕获KeeperException.SessionExpiredException时继续重新尝试操作,这是因为当session过期后,ZooKeeper会变为CLOSED状态,就不能再重新连接了。我们只是简单的抛出一个异常,通知调用者去创建一个新的ZooKeeper实例,所以write()方法可以不断的尝试执行。一个简单的方式来创建一个ZooKeeper实例就是重新new一个ConfigUpdater实例。
public static void main(String[] args) throws Exception {
while (true) {
try {
ResilientConfigUpdater configUpdater =
new ResilientConfigUpdater(args[0]);
configUpdater.run();
} catch (KeeperException.SessionExpiredException e) {
// start a new session
} catch (KeeperException e) {
// already retried, so exit
e.printStackTrace();
break;
}
}
}
另一个可以替代处理session过期的方法就是使用watcher来监控Expired的KeeperState,然后重新建立一个连接。这种方法下,我们只需要不断的尝试执行write(),如果我们得到了KeeperException.SessionExpiredException`异常,连接最终也会被重新建立起来。那么我们抛开如何从一个过期的session中恢复问题,我们的重点是连接丢失的问题也可以这样解决,只是处理方法不同而已。
这只是一个重复尝试的策略。还有很多的策略,比如指数补偿策略,每次尝试之间的间隔时间会被乘以一个常数,间隔时间会逐渐变长,直到与集群建立连接为止间隔时间才会恢复到一个正常值,来预备一下次连接异常使用。