分布式消息系统kafka的提供了一个生产者、缓冲区、消费者的模型
broker:中间的kafka cluster,存储消息,是由多个server组成的集群
topic:kafka给消息提供的分类方式。broker用来存储不同topic的消息数据
producer:往broker中某个topic里面生产数据
consumer:往broker中某个topic获取数据
topic与消息
kafka将所有消息组织成多个topic的形式存储,而每个topic又可以拆分成多个partition,每个partition又由一个一个消息组成。每个消息都被标识了一个递增序列号代表其进来的先后顺序,并按顺序存储在partition中。
这样,消息就以一个个id的方式,组织起来。
这个id,在kafka中被称为offset
这种组织和处理策略提供了如下好处:
消费者可以根据需求,灵活指定offset消费
保证了消息不变性,为并发消费提供了线程安全的保证。每个consumer都保留自己的offset,互相之间不干扰,不存在线程安全问题
消息访问的并行高效性。每个topic中的消息被组织成多个partition,partition均匀分配到集群server中。生产、消费消息的时候,会被路由到指定partition,减少竞争,增加了程序的并行能力
增加消息系统的可伸缩性。每个topic中保留的消息可能非常庞大,通过partition将消息切分成多个子消息,并通过负责均衡策略将partition分配到不同server。这样当机器负载满的时候,通过扩容可以将消息重新均匀分配
保证消息可靠性。消息消费完成之后不会删除,可以通过重置offset重新消费,保证了消息不会丢失
灵活的持久化策略。可以通过指定时间段(如最近一天)来保存消息,节省broker存储空间
备份
消息以partition为单位分配到多个server,并以partition为单位进行备份。备份策略为:1个leader和N个followers,leader接受读写请求,followers被动复制leader。leader和followers会在集群中打散,保证partition高可用
producer
producer生产消息需要如下参数:
根据kafka源码,可以根据不同参数灵活调整生产、分区策略
if topic is None
throw Errorp=Noneif partition Not None
if partition 0 Or partition >= numPartitions throw Error
p=partitionelif key Not None
p=hash(key) % numPartitionselse
p=round-robin() % numPartitions
send message to the partition p
上面是我翻译的伪代码,其中round-robin就是简单轮询,hash采用的是murmurhash
consumer
传统消息系统有两种模式:
kafka通过consumer group将两种模式统一处理
每个consumer将自己标记consumer group名称,之后系统会将consumer group按名称分组,将消息复制并分发给所有分组,每个分组只有一个consumer能消费这条消息。
于是推理出两个极端情况:
多consumer并发消费消息时,容易导致消息乱序
通过限制消费者为同步,可以保证消息有序,但是这大大降低了程序的并发性。
kafka通过partition的概念,保证了partition内消息有序吗,缓解了上面的问题。partition内消息会复制分发给所有分组,每个分组只有一个consumer能消费这条消息。这个语义保证了某个分组消费某个分区的消息,是同步而非并发的。如果一个topic只有一个partition,那么这个topic并发消费有序,否则只是单个partition有序。
一般消息消息系统,consumer存在两种消费模型:
kafka采用pull,并采用可配置化参数保证当存在数据并且数据量达到一定量的时候,consumer端才进行pull操作,否则一直处于block状态
kakfa采用整数值consumer position来记录单个分区的消费状态,并且单个分区单个消息只能被consumer group内的一个consumer消费,维护简单开销小。消费完成,broker收到确认,position指向下次消费的offset。由于消息不会删除,在完成消费,position更新之后,consumer依然可以重置offset重新消费历史消息
消息发送语义
producer视角
consumer视角
如果消息处理后的输出端(如db)能保证消息更新幂等性,则多次消费也能保证exactly once语义
如果输出端能支持两阶段提交协议,则能保证确认position和处理输出消息同时成功或者同时失败
在消息处理的输出端存储更新后的position,保证了确认position和处理输出消息的原子性(简单、通用)
可用性
在kafka中,正常情况下所有node处于同步中状态,当某个node处于非同步中状态,也就意味着整个系统出问题,需要做容错处理
同步中代表了:
某个分区内同步中的node组成一个集合,即该分区的ISR
kafka通过两个手段容错:
当leader处于非同步中时,系统从followers中选举新leader
当某个follower状态变为非同步中时,leader会将此follower剔除ISR,当此follower恢复并完成数据同步之后再次进入ISR
另外,kafka有个保障:当producer生产消息时,只有当消息被所有ISR确认时,才表示该消息提交成功。只有提交成功的消息,才能被consumer消费
综上所述:当有N个副本时,N个副本都在ISR中,N-1个副本都出现异常时,系统依然能提供服务
假设N副本全挂了,node恢复后会面临同步数据的过程,这期间ISR中没有node,会导致该分区服务不可用。kafka采用一种降级措施来处理:选举第一个恢复的node作为leader提供服务,以它的数据为基准,这个措施被称为脏leader选举。
由于leader是主要提供服务的,kafka broker将多个partition的leader均分在不同的server上以均摊风险
每个parition都有leader,如果在每个partition内运行选主进程,那么会导致产生非常多选主进程。kakfa采用一种轻量级的方式:从broker集群中选出一个作为controller,这个controller监控挂掉的broker,为上面的分区批量选主
一致性
上面的方案保证了数据高可用,有时高可用是体现在对一致性的牺牲上。如果希望达到强一致性,可以采取如下措施:
持久化
基于以下几点事实,kafka重度依赖磁盘而非内存来存储消息
在持久化数据结构的选择上,kafka采用了queue而不是Btree
kafka只有简单的根据offset读和append操作,所以基于queue操作的时间复杂度为O(1),而基于Btree操作的时间复杂度为O(logN)
在大量文件读写的时候,基于queue的read和append只需要一次磁盘寻址,而Btree则会涉及多次。磁盘寻址过程极大降低了读写性能
性能
kafka在以下四点做了优化:
将大量小io改造成少量大io
利用sendfile减少数据拷贝
支持snappy,gzip,lz4三种算法批量压缩消息,减少网络传输消耗
采用nio网络模型,与1 acceptor thread + N processor threads的reactor线程模型
大量读写少量消息会导致性能较差,通过将消息聚合,可以减少读写次数(减少随机IO),增加单次读写数据量(增加顺序IO)
普通情况下,数据从磁盘传输到网络需要经历以下步骤:
磁盘->内核page cache
内核page cache->用户buffer
用户buffer->socket buffer
socket buffer->NIC buffer(NIC:网卡接口)
利用sendfile系统调用,可以简化至:
磁盘->内核page cache
内核page cache->NIC buffer
减少了两次拷贝步骤。在存在大量数据传输的操作时,会显著提升性能
在大量文件读写的时候,基于queue的read和append只需要一次磁盘寻址,而Btree则会涉及多次。磁盘寻址过程极大降低了读写性能
kafka server端采用与Mina一样的网络、线程模型。server端基于nio,采用1个acceptor线程接受tcp连接,并将连接分配给N个proccessor线程,proccessor线程执行具体的IO读写、逻辑处理操作。(注:相比较于这种模型,netty的N boss + N worker的模型更加灵活)
zookeeper
broker node在zookeeper中采用唯一id(整数)标识
/brokers/ids/[N] --> host:port 瞬时节点
此znode存储了broker node的ip端口
/brokers/topics/[topic]/partitions/[N]/state --> leader,isr 瞬时节点
[topic] 代表某个topic名称
[N] 代表分区数
此znode存储了该分区的leader id和isr列表(由id组成)
/consumers/[group_id]/ids/[customer_id] --> {"topic1": #streams, ..., "topicN": #streams} 瞬时节点
此znode存储了指定consumer消费topic所使用的线程数
/consumers/[group_id]/offsets/[topic]/[N] --> offset 永久节点
[group_id] 消费者所属groupid
[topic] 订阅topic name
[N] 分区数
consumer可以通过三种方式管理offset:
手动管理。使用低层次consumer api,灵活,较麻烦
交给zookeeper管理。使用高层次consumer api,设置offsets.storage=zookeeper,方便,性能稍差。0.8.2默认配置
交给kafka管理。使用高层次consumer api,设置offsets.storage=kafka,方便,原生态性能优。实现原理是kafka选出一个broker作为offset manager,创建一个名为__consumer_offsets的topic,将offset存储在该topic下,推荐采用
此znode存储了指定consumer在topic中最新consumer offset
/consumers/[group_id]/owners/[topic]/[N] --> consumer_id 瞬时节点
指定分区在某一时刻只能被所有consumer group中的某一个consumer消费,通过将consumer_id存在指定分区下,就能保证这时该分区只能被这个consumer消费
上面只是列出的最典型的znode,通过研究znode,可以开发出一个kafka monitor,用来监控kafka数据消费状况,比如KafkaOffsetMonitor
作者:jonathan_loda
链接:https://my.oschina.net/u/1378920/blog/904669