大家好,欢迎来到IT知识分享网。
前面对kafka的学习中已经了解到KafkaProducer通过设定参数retries
,如果发送消息到broker时抛出异常,且是允许重试的异常,那么就会最大重试retries参数指定的次数。
本片文章主要分析几个问题:
– 哪些异常可以重试
– 如何实现重试
接下来通过分析一一解开这些问题的答案。
1.哪些异常可以重试
org.apache.kafka.clients.producer.internals.Sender类中有如下方法:
private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
return batch.attempts() < this.retries &&
((response.error.exception() instanceof RetriableException) ||
(transactionManager != null && transactionManager.canRetry(response, batch)));
}
通过方法名可知,其作用是判断是否能重试,由方法体内的实现可知,允许重试需要满足两个条件:
1. 重试次数少于参数retries
指定的值;
2. 异常是RetriableException类型或者TransactionManager允许重试;
transactionManager.canRetry()后面会分析;先看看哪些异常是RetriableException类型异常。
- RetriableException类型异常
kafka对RetriableException异常注释是:短暂性的通过重试可以成功的异常;通过RetriableException类关系图可知,可重试异常有图中RetriableException的子类那些异常(可以通过异常是否继承自RetriableException判断是否可重试异常):
- TransactionManager允许重试
如果异常不属于RetriableException类型,但是只要满足(transactionManager != null && transactionManager.canRetry(response, batch))
就允许重试,所以,首先需要满足transactionManager不为null。transactionManager是在KafkaProducer中构造Sender传入的。构造TransactionManager的核心源码如下:
private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) {
TransactionManager transactionManager = null;
boolean userConfiguredIdempotence = false;
// 用户设置的Properties参数中是否有'enable.idempotence',如果有的话, 就用用户配置的
if (config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
userConfiguredIdempotence = true;
}
// 用户设置的Properties参数中是否有'transactional.id',如果有的话, 就用用户配置的
boolean userConfiguredTransactions = false;
if (config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {
userConfiguredTransactions = true;
}
// 得到参数'enable.idempotence'的值
boolean idempotenceEnabled = config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
// 如果用户显示配置enable.idempotence为false,并且又配置了transactional.id,就会抛出这个异常
if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions) {
throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
}
// 如果用户配置了transactional.id,那么idempotenceEnabled就认为是true(与)
if (userConfiguredTransactions) {
idempotenceEnabled = true;
}
// 只有用户配置了transactional.id,且enable.idempotence没有设置为false,这里才为true,就会构造一个有效的TransactionManager;从这里可知,如果用户没有配置transactional.id,那么TransactionManager为null
if (idempotenceEnabled) {
// 构造TransactionManager的几个重要参数
String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs);
... ...
}
return transactionManager;
}
根据上面源码分析可知,只要用户配置了transactional.id,且没有显示配置enable.idempotence为false,那么TransactionManager就不会为null;
接下来还要满足transactionManager.canRetry(response, batch)
才允许重试,主要包括下面几种情况:
– 碰到OutOfOrderSequenceException异常
– broker的响应报文中没有logStartOffset(正常的响应信息:”T0-0” -> “{error: NONE,offset: 0,logAppendTime: -1, logStartOffset: 0}”)
2.如何实现重试
上面说明了什么情况下允许重试,接下来分析kafka是如何实现重试的。
2.1原理图
本打算把原理图放在最后,但是最后还是决定放在前面。对重试机制有一定的了解后,再看后面的分析就容易很多。kafka发送&重试机制如下图所示:
说明:
1. new KafkaProducer()后创建一个后台线程KafkaThread扫描RecordAccumulator中是否有消息;
2. 调用KafkaProducer.send()发送消息,实际上只是把消息保存到RecordAccumulator中;
3. 后台线程KafkaThread扫描到RecordAccumulator中有消息后,将消息发送到kafka集群;
4. 如果发送成功,那么返回成功;
5. 如果发送失败,那么判断是否允许重试。如果不允许重试,那么返回失败的结果;如果允许重试,把消息再保存到RecordAccumulator中,等待后台线程KafkaThread扫描再次发送;
初步了解整个发送&重试过程后,再根据源码进行更深入的分析。
2.2后台线程
分析kafka如何实现重试之前,先看一下发送消息到broker前做的主要事情:
- 构造KafkaProducer时,构造Send并启动一个异步线程:
this.sender = new Sender(... ...);
String ioThreadName = "kafka-producer-network-thread" + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
且从这段代码可知,每个KafkaProducer会启动一个线程处理消息,这个线程命名为:kafka-producer-network-thread | ${clientId}。
笔者某个实例查看KafkaProducer启动的线程结果如下:
[afei@kafka ~]$ jstack -l 23715 | grep "kafka-producer-network-thread"
"kafka-producer-network-thread | producer-2" #109 daemon prio=5 os_prio=0 tid=0x00007fe081921000 nid=0x5dcb runnable [0x00007fdfeb92b000]
"kafka-producer-network-thread | producer-1" #46 daemon prio=5 os_prio=0 tid=0x00007fe081f5a800 nid=0x5d66 runnable [0x00007fe024d20000]
- 调用KafkaProducer的send()方法时,先把发送的消息存储在accumulator中:
RecordAccumulator.RecordAppendResult result = accumulator.append(
tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);
2.3**RecordAccumulator**
RecordAccumulator是保存需要发送的消息或者重试消息的核心。发送消息之前先把消息存放在这里,异步线程KafkaThread启动后从这里取消息然后发送到broker。当发送出错且允许重试时,又会把这些需要重试的消息保存到这里再进行重试。
当调用KafkaProducer的send()方法发送消息时,会调用append()方法将消息暂时存放,核心源码如下:
// 获得deque或者创建deque。因为核心数据结构是ConcurrentMap<TopicPartition, Deque<ProducerBatch>>,所以生产者批次消息是按照分区区分的。如果根据分区拿不到deque的话,就创建一个deque。
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
// 把需要发送的消息放入队列中,
dq.addLast(batch);
当发送出错且允许重试时,会调用reenqueue()方法将消息暂时存放,核心源码如下:
public void reenqueue(ProducerBatch batch, long now) {
Deque<ProducerBatch> deque = getOrCreateDeque(batch.topicPartition);
synchronized (deque) {
// 把需要重试的消息放入队列中,等到重试
deque.addFirst(batch);
}
}
RecordAccumulator简单总结:通过这两段代码的分析可知,保存需要发送的(重试)消息的核心数据结构是Deque。且创建队列时是new ArrayDeque()
,没有指定初始容量。这里不打算深入分析Deque,只是简单介绍一下,Deque是Double ended queue (双端队列) 的缩写。首尾都可写入可读取。
2.3发送&重试
下面分析kafka是如何发送并如何重试的。(TransactionManager相关代码被省略,其的作用后面有机会单独一篇文章分析);发送消息核心代码在Sender.java中, Sender.java实现了Runnable接口, 所以是后台线程异步发送消息到kafka集群:
public class Sender implements Runnable {
public void run() {
// KafkaProducer发送消息的线程启动后,一直运行,直到KafkaProducer.close()将running置为false
while (running) {
run(time.milliseconds());
}
// 根据日志可知,接下来是KafkaProducer关闭后的逻辑
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// 当非强制关闭时,可能依然有请求堆积在accumulator中, 我们需要将这些剩余的请求处理完成
while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
run(time.milliseconds());
}
if (forceClose) {
// 如果强制关闭,且有未处理完的消息,那么让这些消息的发送失败,并抛出异常new IllegalStateException("Producer is closed forcefully.").
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
... ...
}
}
KafkaProducer关闭有方式有两种:
close();
和close(long timeout, TimeUnit timeUnit)
,第一种是友好的关闭且设置timeout为Long.MAX_VALUE
,第二种如果设置timeout为0,就是强制关闭,即forceClose=true。
备注:drained: 流干,耗尽,undrained则表示未耗尽。
准备发送消息前需要尝试去accumulator中获取消息:
// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
this.maxRequestSize, now);
accumulator.drain()本质就是:
Deque<ProducerBatch> deque = getDeque(tp);ProducerBatch batch = deque.pollFirst();
,即根据分区信息得到Deque,然后不断获取ProducerBatch,即封装后的要发送的消息。
run(long)方法中往broker发送消息的部分核心代码(位于Sender.java中)如下:
private void sendProduceRequest(... ...){
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
// 这里是处理响应消息的地方
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
// 省略发送消息到broker的代码
... ...
}
handleProduceResponse()中收到的响应,如何是网络断开,那么构造响应:new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION)
。如果有版本不匹配问题,那么构造响应:new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION)
。还有一种特殊情况,如果指定了acks=0
,那么构造响应new ProduceResponse.PartitionResponse(Errors.NONE)
,因为这种情况下只需要发送即可,不需要响应结果。接下来调用下面的方法–完成或者重试请求:
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId, long now) {
Errors error = response.error;
if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&
(batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
log.warn("Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}", ...);
// 如果是'MESSAGE_TOO_LARGE'的错误,且是批量消息(recordCount>1),那么切割消息后再发送
this.accumulator.splitAndReenqueue(batch);
this.accumulator.deallocate(batch);
this.sensors.recordBatchSplit();
} else if (error != Errors.NONE) {
// 如果响应有错误,判断是否允许重试
if (canRetry(batch, response)) {
// 如果允许重试,会输出warn日志
log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}"... ...);
if (transactionManager == null) {
// 重新把消息放到队列中
reenqueueBatch(batch, now);
}
... ...
} else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
// 接收到这种错误,就认为返回成功。
completeBatch(batch, response);
} else {
... ...
}
// 到这里如果是UnknownTopicOrPartitionException异常,说明producer缓存的元数据信息可能已经过期,所以需要请求更新,代码省略
} else {
completeBatch(batch, response);
}
... ...
}
如果需要重试,重新入队列的源码如下:
// ProducerBatch就是发送的消息
private void reenqueueBatch(ProducerBatch batch, long currentTimeMs) {
// accumulator的reenqueue前面已经分析了,本质就是调用Deque的addFirst()
this.accumulator.reenqueue(batch, currentTimeMs);
this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
}
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/24658.html