● 3.1 producer按照消息的顺序进行发送
很多时候为了发送效率,采用的办法是多线程、异步、批量发送。
如果为了保证顺序,则不能使用多线程来执行发送任务。
异步:一般是把消息先发到一个队列中,由后台线程不断的执行发送任务。这种方式对消息的顺序也是有影响的:
如先发送消息1,后发送消息2,此时服务器端在处理消息1的时候返回了异常,可能在处理消息2的时候成功了,此时若再重试消息1就会造成消息乱序的问题。所以producer端需要先确认消息1发送成功了才能执行消息2的发送。
对于kafka来说,目前是异步、批量发送。解决异步的上述问题就是配置如下属性:
max.in.flight.requests.per.connection=1
即producer发现一旦还有未确认发送成功的消息,则后面的消息不允许发送。
● 3.2 相同key的消息能够hash到相同的分区
正常情况下是没问题的,但是一旦某个分区挂了,如原本总共4个分区,此时只有3个分区存活,在此分区恢复的这段时间内,是否会存在hash错乱到别的分区?
那就要看producer端获取的metadata信息是否会立马更新成3个分区。目前看来应该是不会的
producer见到的metadata数据是各个broker上的缓存数据,这些缓存数据是由KafkaController来统一进行更新的。一旦leader副本挂了,KafkaController并不会去立马更新成3个分区,而是去执行leader选举,选举完成后才会去更新metadata数据,此时选举完成后仍然是保证4个分区的,也就是说producer是不可能获取到只有3个分区的metadata数据的,所以producer端还是能正常hash的,不会错乱分区的。
在整个leader选举恢复过程,producer最多是无法写入数据(后期可以重试)。
● 3.3 系统对顺序消息的支持
leader副本按照消息到来的先后顺序写入本地日志的,一旦写入日志后,该顺序就确定了,follower副本也是按照该顺序进行复制的。对于消息的提交也是按照消息的offset从低到高来确认提交的,所以说kafka对于消息的处理是顺序的。
● 3.4 consumer能够按照消息的顺序进行消费
为了接收的效率,可能会使用多线程进行消费。这里为了保证顺序就只能使用单线程来进行消费了。
目前kafka的Consumer有scala版本的和java版本的(这一块之后再详细探讨),最新的java版本,对用户提供一个poll方法,用户自己去决定是使用多线程还是单线程。