专栏名称: 数据分析与开发
伯乐在线旗下账号,分享数据库相关技术文章、教程和工具,另外还包括数据库相关的工作。偶尔也谈谈程序员人生 :)
目录
相关文章推荐
51好读  ›  专栏  ›  数据分析与开发

使用 Python 进行分布式系统协调

数据分析与开发  · 公众号  · 数据库  · 2016-11-08 20:23

正文

(点击 上方蓝字 ,快速关注我们)


来源:naughty

链接:my.oschina.net/taogang/blog/410864


笔者之前的博文提到过,随着大数据时代的到来,分布式是解决大数据问题的一个主要手段,随着越来越多的分布式的服务,如何在分布式的系统中对这些服务做协调变成了一个很棘手的问题。今天我们就来看看如何使用Python,利用开源对分布式服务做协调。


在对分布式的应用做协调的时候,主要会碰到以下的应用场景:


  • 业务发现(service discovery)找到分布式系统中存在那些可用的服务和节点


  • 名字服务 (name service)通过给定的名字知道到对应的资源


  • 配置管理 (configuration management)如何在分布式的节点中共享配置文件,保证一致性。


  • 故障发现和故障转移 (failure detection and failover)当某一个节点出故障的时候,如何检测到并通知其它节点, 或者把想用的服务转移到其它的可用节点


  • 领导选举(leader election)如何在众多的节点中选举一个领导者,来协调所有的节点


  • 分布式的锁 (distributed exclusive lock)如何通过锁在分布式的服务中进行同步


  • 消息和通知服务 (message queue and notification)如何在分布式的服务中传递消息,以通知的形式对事件作出主动的响应


有许多的开源软件试图解决以上的全部或者部分问题,例如ZooKeeper,consul,doozerd等等,我们现在就看看它们是如何做的。


ZooKeeper


ZooKeeper是使用最广泛,也是最有名的解决分布式服务的协调问题的开源软件了,它最早和Hadoop一起开发,后来成为了Apache的顶级项目,很多开源的项目都在使用ZooKeeper,例如大名鼎鼎的Kafka。


Zookeeper本身是一个分布式的应用,通过对共享的数据的管理来实现对分布式应用的协调。


ZooKeeper使用一个树形目录作为数据模型,这个目录和文件目录类似,目录上的每一个节点被称作ZNodes。



ZooKeeper提供基本的API来操纵和控制Znodes,包括对节点的创建,删除,设置和获取数据,获得子节点等。


除了这些基本的操作,ZooKeeper还提供了一些配方(Recipe),其实就是一些常见的用例,例如锁,两阶段提交,领导选举等等。


ZooKeeper本身是用Java开发的,所以对Java的支持是最自然的。它同时还提供了C语言的绑定。


Kazoo是一个非常成熟的Zookeeper Python客户端,我们这就看看如果使用Python来调用ZooKeeper。(注意,运行以下的例子,需要在本地启动ZooKeeper的服务)


基本操作


以下的例子现实了对Znode的基本操作,首先要创建一个客户端的连接,并启动客户端。然后我们可以利用该客户端对Znode做增删改,取内容的操作。最后推出客户端。


from kazoo . client import KazooClient

import logging

logging . basicConfig ()

zk = KazooClient ( hosts = '127.0.0.1:2181' )

zk . start ()

# Ensure a path, create if necessary

zk . ensure_path ( "/test/zk1" )

# Create a node with data

zk . create ( "/test/zk1/node" , b "a test value" )

# Determine if a node exists

if zk . exists ( "/test/zk1" ) :

print "the node exist"

# Print the version of a node and its data

data , stat = zk . get ( "/test/zk1" )

print ( "Version: %s, data: %s" % ( stat . version , data . decode ( "utf-8" )))

# List the children

children = zk . get_children ( "/test/zk1" )

print ( "There are %s children with names %s" % ( len ( children ), children ))

zk . stop ()


通过对ZNode的操作,我们可以完成一些分布式服务协调的基本需求,包括名字服务,配置服务,分组等等。


故障检测(Failure Detection)


在分布式系统中,一个最基本的需求就是当某一个服务出问题的时候,能够通知其它的节点或者某个管理节点。


ZooKeeper提供ephemeral Node的概念,当创建该Node的服务退出或者异常中止的时候,该Node会被删除,所以我们就可以利用这种行为来监控服务运行状态。


以下是worker的代码


from kazoo . client import KazooClient

import time

import logging

logging . basicConfig ()

zk = KazooClient ( hosts = '127.0.0.1:2181' )

zk . start ()

# Ensure a path, create if necessary

zk . ensure_path ( "/test/failure_detection" )

# Create a node with data

zk . create ( "/test/failure_detection/worker" ,

value = b "a test value" , ephemeral = True )

while True :

print "I am alive!"

time . sleep ( 3 )

zk . stop ()


以下的monitor 代码,监控worker服务是否运行。


from kazoo . client import KazooClient

import time

import logging

logging . basicConfig ()

zk = KazooClient ( hosts = '127.0.0.1:2181' )

zk . start ()

# Determine if a node exists

while True :

if zk . exists ( "/test/failure_detection/worker" ) :

print "the worker is alive!"

else :

print "the worker is dead!"

break

time . sleep ( 3 )

zk . stop ()


领导选举


Kazoo直接提供了领导选举的API,使用起来非常方便。


from kazoo . client import KazooClient

import time

import uuid

import logging

logging . basicConfig ()

my _ id = uuid . uuid4 ()

def leader_func () :

print "I am the leader {}" . format ( str ( my_id ))

while True :

print "{} is working! " . format ( str ( my_id ))

time . sleep ( 3 )

zk = KazooClient ( hosts = '127.0.0.1:2181' )

zk . start ()

election = zk . Election ( "/electionpath" )

# blocks until the election is won, then calls

# leader_func()

election . run ( leader_func )

zk . stop ()


你可以同时运行多个worker,其中一个会获得Leader,当你杀死当前的leader后,会有一个新的leader被选出。


分布式锁


锁的概念大家都熟悉,当我们希望某一件事在同一时间只有一个服务在做,或者某一个资源在同一时间只有一个服务能访问,这个时候,我们就需要用到锁。


from kazoo . client import KazooClient

import time

import uuid

import logging

logging . basicConfig ()

my _ id = uuid . uuid4 ()

def work () :

print "{} is working! " . format ( str ( my_id ))

zk = KazooClient ( hosts = '127.0.0.1:2181' )

zk . start ()

lock = zk . Lock ( "/lockpath" , str ( my_id ))

print "I am {}" . format ( str ( my_id ))

while True :

with lock :

work ()

time . sleep ( 3 )

zk . stop ()


当你运行多个worker的时候,不同的worker会试图获取同一个锁,然而只有一个worker会工作,其它的worker必须等待获得锁后才能执行。


监视


ZooKeeper提供了监视(Watch)的功能,当节点的数据被修改的时候,监控的function会被调用。我们可以利用这一点进行配置文件的同步,发消息,或其他需要通知的功能。


from kazoo . client import KazooClient

import time

import logging

logging . basicConfig ()

zk = KazooClient ( hosts = '127.0.0.1:2181' )

zk . start ()

@ zk . DataWatch ( '/path/to/watch' )

def my_func ( data , stat ) :

if data :

print "Data is %s" % data

print "Version is %s" % stat . version

else :

print "data is not available"

while True :

time . sleep ( 10 )

zk . stop ()


除了我们上面列举的内容外,Kazoo还提供了许多其他的功能,例如:计数,租约,队列等等,大家有兴趣可以参考它的文档


Consul


Consul是用Go开发的分布式服务协调管理的工具,它提供了服务发现,健康检查,Key/Value存储等功能,并且支持跨数据中心的功能。


Consul提供ZooKeeper类似的功能,它的基于HTTP的API可以方便的和各种语言进行绑定。自然Python也在列。


与Zookeeper有所差异的是Consul通过基于Client/Server架构的Agent部署来支持跨Data Center的功能。



Consul在Cluster伤的每一个节点都运行一个Agent,这个Agent可以使Server或者Client模式。Client负责到Server的高效通信,相对为无状态的。 Server负责包括选举领导节点,维护cluster的状态,对所有的查询做响应,跨数据中心的通信等等。


KV基本操作


类似于Zookeeper,Consul支持对KV的增删查改的操作。


import consul

c = consul . Consul ()

# set data for key foo

c . kv . put ( 'foo' , 'bar' )

# poll a key for updates

index = None

while True :

index ,







请到「今天看啥」查看全文