这周我们学习下消费者,还是先从一个消费者的Hello World学起:
public class Consumer {
public static void main(String [] args) {
Properties properties = new Properties();
properties.put("key.deserializer" ,
"org.apache.kafka.common.serialization.StringDeserializer" );
properties.put("value.deserializer" ,
"org.apache.kafka.common.serialization.StringDeserializer" );
properties.put("bootstrap.servers" , "localhost:9092" );
properties.put("group.id" , "group.demo" );
KafkaConsumer<String , String > consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("topic-demo" ));
try {
while (true ) {
ConsumerRecords<String , String > records = consumer.poll(Duration.ofMillis(1000 ));
for (ConsumerRecord<String , String > record : records) {
System.out.println(record.value());
}
}
} finally {
consumer.close();
}
}
}
前两步和生产者类似,配置参数然后根据参数创建实例,区别在于消费者使用的是反序列化器,以及多了一个必填参数
group.id
,用于指定消费者所属的消费组。关于消费组的概念在
《
图解Kafka中的基本概念》
中介绍过了,消费组使得消费者的消费能力可横向扩展,这次再介绍一个新的概念 “
再均衡
” ,其意思是将分区的所属权进行重新分配,发生于消费者中有新的消费者加入或者有消费者宕机的时候。我们先了解再均衡的概念,至于如何再均衡不在此深究。
我们继续看上面的代码,第3步,
subscribe
订阅期望消费的主题,然后进入第4步,轮循调用
poll
方法从
Kafka
服务器拉取消息。给poll方法中传递了一个
Duration
对象,指定
poll
方法的超时时长,即当缓存区中没有可消费数据时的阻塞时长,避免轮循过于频繁。
poll
方法返回的是一个
ConsumerRecords
对象,其内部对多个分区的
ConsumerRecored
进行了封装,其结构如下:
public class ConsumerRecords V > implements Iterable V >> {
private final Map>>> records;
// ...
}
而
ConsumerRecord
则类似
ProducerRecord
,封装了消息的相关属性:
public class ConsumerRecord <K , V > {
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private final Optional leaderEpoch;
偏移量
相比
ProdercerRecord
的属性更多,其中重点讲下
偏移量
,偏移量是分区中一条消息的唯一标识。消费者在每次调用poll方法时,则是根据偏移量去分区拉取相应的消息。而当一台消费者宕机时,会发生再均衡,将其负责的分区交给其他消费者处理,这时可以根据偏移量去继续从宕机前消费的位置开始。
而为了应对消费者宕机情况,偏移量被设计成不存储在消费者的内存中,而是被持久化到一个Kafka的内部主题
__consumer_offsets
中,在 Kafka 中,将偏移量存储的操作称作提交。而消息者在每次消费消息时都将会将偏移量进行
提交
,提交的偏移量为下次消费的位置,例如本次消费的偏移量为x,则提交的是x+1。
在代码中我们并没有看到显示的提交代码,那么Kafka的默认提交方式是什么?默认情况下,消费者会定期以
auto_commit_interval_ms
(5秒)的频率进行一次自动提交,而提交的动作发生于
poll
方法里,在进行拉取操作前会先检查是否可以进行偏移量提交,如果可以,则会提交即将拉取的偏移量。
下面我们看下这样一个场景,上次提交的偏移量为2,而当前消费者已经处理了2、3、4号消息,正准备提交5,但却宕机了。当发生再均衡时,其他消费者将继续从已提交的2开始消费,于是发生了
重复消费
的现象。
我们可以通过减小自动提交的时间间隔来减小重复消费的窗口大小,但这样仍然无法避免重复消费的发生。
按照线性程序的思维,由于自动提交是延迟提交,即在处理完消息之后进行提交,所以应该不会出现
消息丢失
的现象,也就是已提交的偏移量会大于正在处理的偏移量。但放在多线程环境中,消息丢失的现象是可能发生的。例如线程 A 负责调用 poll 方法拉取消息并放入一个队列中,由线程 B 负责处理消息。如果线程 A 已经提交了偏移量5,而线程B还未处理完2、3、4号消息,这时候发生宕机,则将丢失消息。
从上述场景的描述,我们可以知道自动提交是存在风险的。所以Kafka除了自动提交,还提供了
手动提交
的方式,可以细分为
同步提交
和
异步提交
,分别对应了
KafkaConsumer
中的
commitSync
和
commitAsync
方法。我们先尝试使用同步提交修改程序:
while (true ) {
ConsumerRecords<String , String > records = consumer.poll(Duration.ofMillis(1000 ));
for (ConsumerRecord<String , String > record : records) {
System.out.println(record.value());
}
consumer.commitSync();;
}
在处理完一批消息后,都会提交偏移量,这样能减小重复消费的窗口大小,但是由于是
同步提交
,所以程序会阻塞等待提交成功后再继续处理下一条消息,这样会限制程序的吞吐量。那我们改为使用异步提交:
while (true ) {
ConsumerRecords<String , String > records = consumer.poll(Duration.ofMillis(1000 ));
for (ConsumerRecord<String , String > record : records) {
System.out.println(record.value());
}
consumer.commitAsync();;
}
异步提交
时,程序将不会阻塞,但异步提交在提交失败时也不会进行重试,所以提交是否成功是无法保证的。因此我们可以组合使用两种提交方式。在轮循中使用异步提交,而当关闭消费者时,再通过同步提交来保证提交成功。
try {
while (true ) {
ConsumerRecords<String , String > records = consumer.poll(Duration.ofMillis(1000 ));
for (ConsumerRecord<String , String > record : records) {
System.out.println(record.value());
}
consumer.commitAsync();
}
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
上述介绍的两种无参的提交方式都是提交的
poll
返回的一个批次的数据。若未来得及提交,也会造成重复消费,如果还想更进一步减少重复消费,可以在 for 循环中为
commitAsync
和
commitSync
传入分区和偏移量,进行更细粒度的提交,例如每1000条消息我们提交一次:
Map currentOffsets = new HashMap<>();
int count = 0 ;
while (true ) {
ConsumerRecords<String , String > records = consumer.poll(Duration.ofMillis(1000 ));
for (ConsumerRecord<String , String > record : records) {
System.out.println(record.value());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1 ));
if (count % 1000 == 0 ) {
consumer.commitAsync(currentOffsets, null );
}
count++;
}
}
关于提交就介绍到这里。在使用消费者的代理中,我们可以看到
poll 方法
是其中最为核心的方法,能够拉取到我们需要消费的消息。所以接下来,我们一起深入到消费者 API 的幕后,看看在
poll
方法中,都发生了什么,其实现如下:
public ConsumerRecords poll (final Duration timeout) {
return poll(time.timer(timeout), true );
}
在我们使用设置超时时间的 poll 方法中,会调用重载方法,第二个参数
includeMetadataInTimeout
用于标识是否把元数据的获取算在超时时间内,这里传值为
true
,也就是算入超时时间内。下面再看重载的
poll
方法的实现:
private ConsumerRecords poll(final Timer timer, final boolean includeMetadataInTimeout) {
acquireAndEnsureOpen();
try {
this .kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
if (this .subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions" );
}
do {
client.maybeTriggerWakeup();
if (includeMetadataInTimeout) {
updateAssignmentMetadataIfNeeded(timer, false );
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long .MAX_VALUE), true )) {
log.warn("Still waiting for metadata" );
}
}
final Map>> records = pollForFetches(timer);
if (!records.isEmpty()) {
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.transmitSends();
}
return this .interceptors.onConsume(new ConsumerRecords<>(records));
}
} while (timer.notExpired());
return ConsumerRecords.empty();
} finally {
release();
this .kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
}
}
我们对上面的代码逐步分析,首先是第1步
acquireAndEnsureOpen
方法,获取锁并确保消费者没有关闭,其实现如下:
private void acquireAndEnsureOpen ( ) {
acquire();
if (this .closed) {
release();
throw new IllegalStateException("This consumer has already been closed." );
}
}
其中
acquire
方法用于获取锁,为什么这里会要上锁。这是因为
KafkaConsumer
是线程不安全的,所以需要上锁,确保只有一个线程使用
KafkaConsumer
拉取消息,其实现如下:
private static final long NO_CURRENT_THREAD = -1L ;
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
private final AtomicInteger refcount = new AtomicInteger(0 );
private void acquire () {
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access" );
refcount.incrementAndGet();
}
用一个原子变量
currentThread
作为锁,通过 cas 操作获取锁,如果 cas 失败,即获取锁失败,表示发生了竞争,有多个线程在使用
KafkaConsumer
,则会抛出
ConcurrentModificationException
异常,如果 cas 成功,还会将
refcount
加一,用于重入。
再看第2、3步,记录 poll 的开始以及检查是否有订阅主题。然后进入 do-while 循环,如果没有拉取到消息,将在不超时的情况下一直轮循。
第4步,安全的唤醒消费者,并不是唤醒,而是检查是否有唤醒的风险,如果程序在执行不可中断的方法或是收到中断请求,会抛出异常,这里我还不是很明白,先放一下。
第5步,更新偏移量,就是我们在前文说的在进行拉取操作前会先检查是否可以进行偏移量提交。
第6步,
pollForFetches
方法拉取消息,其实现如下:
private Map>> pollForFetches(Timer timer) {
long pollTimeout = coordinator == null ? timer.remainingMs() :
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
final Map>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records;
}
fetcher.sendFetches();
if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}
Timer pollTimer = time.timer(pollTimeout);
client.poll(pollTimer, () -> {
return !fetcher.hasAvailableFetches();
});
timer.update(pollTimer.currentTimeMs());
return fetcher.fetchedRecords();
}
如果 fetcher 已经有消息了则立即返回,这里和下面将要讲的第7步对应。如果没有消息则使用
Fetcher
准备拉取请求然后再通过
ConsumerNetworkClient
发送请求,最后返回消息。
为啥消息会已经有了呢,我们回到
poll
的第7步,如果拉取到了消息或者有未处理的请求,由于用户还需要处理未处理的消息,这时候可以使用异步的方式发起下一次的拉取消息的请求,将数据提前拉取,减少网络IO的等待时间,提高程序的效率。
第8步,调用消费者拦截器处理,就像
KafkaProducer
中有
ProducerInterceptor
,在
KafkaConsumer
中也有
ConsumerInterceptor
,用于处理返回的消息,处理完后,再返回给用户。
第9、10步,释放锁和记录 poll 结束,对应了第1、2步。
对
KafkaConsumer
的
poll
方法就分析到这里。最后用一个思维导图回顾下文中较为重要的知识点:
参考
:
你绝对能看懂的Kafka源代码分析-KafkaConsumer类代码分析: https://blog.csdn.net/liyiming2017/article/details/89187474
Kafka消费者源码解析之一KafkaConsumer:https://blog.csdn.net/lt793843439/article/details/89511405
更多精彩推荐
☞ 开启人才进阶之旅,鲲鹏开发者技术沙龙点燃计算行业激情
☞ 全面拥抱云原生应用研发的拐点已经到来
☞ 微软全球 AKS 女掌门人,这样击破云原生“怪圈”!
☞ 阿里动物园再添新丁,小蛮驴搞定物流最后三公里
☞ 中国移动云智融合峰会 与您相约揽胜九天