kafka源码分析之Producer

最近在kafka的代码,着实有点吃力,因为这是我第一次看java类代码,没有当时看redis(c语言)和NSQ(golang语言)那种享受,第一是kafka代码量非常大,第二是kafka代码封装较多。我们知道数据库或者文件戏系统的客户端一般都是当需要获取数据时,发送请求和等待回复。然而消息队列的客户端比上述复杂,例如,kafka一般是用在分布式架构下,因此kafka server都不止一台,所以kafka消息队列的Producer就需要一个IO多路复用进行每个连接是否可写以及可读;而且kafka的Consumer是基于pull策略,因此需要每隔一段时间向kafka server发送fetch request等等。

kafka server的整体架构比较容易看懂,因为服务器的架构大体是IO多路复用,事件分发,事件处理等等,但是kafka server的其他细节,包括日志持久化,日志复制以及选主等等则不是一天两天能看懂的,因此我看代码的策略就是在把kafka server的整体大体架构看懂之后,先看Producer和Consumer,从简单的开始,可以先把一些基础的知识点过一遍,例如Java NIO以及Java Concurrent包。而Producer比Consumer稍微更简单些,我就从Producer开始看了。

我看的客户端是Java(Kafka源码自带),也是Kafka最新版本推荐的客户端。本文大概按如下组织:

  1. Kafka Producer使用介绍;
  2. Kafka Producer运行原理;
  3. Kafka Producer运行过程;
  4. 总结;

【版权声明】博客内容由罗道文的私房菜拥有版权,允许转载,但请标明原文链接http://luodw.cc/2017/05/02/kafka02/#more

Kafka Producer使用介绍


在上篇文章中,给出了一个Java客户端简单的使用示例,可以参考kafka源码分析之概述。Java客户端默认使用的异步发送,即Producer不断将消息发送到一个队列中,然后一个后台线程不断从队列中获取消息,最后封装成请求发送给kafka server。对于异步模式,Producer.send函数参数提供一个参数callback,即当请求完成时执行的回调函数;一般调用情况如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});
}
`

这段代码实例化一个ProducerRecord,然后通过producer.send函数发送,当发送成功并且得到kafka server的ack时,执行回调函数Callback。这个回调函数接收这条记录的元数据,包括偏移量,事件戳,checksum,序列后key和value的长度以及分区。

当然,Java客户端也可以模拟同步阻塞调用,可以采用如下方式:

1
2
3
4
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();

因为Producer.send方法返回值是java.util.concurrent.Future,所以Future.get()方法会阻塞直到kafka server的响应返回。

Kafka Producer运行原理


之前有提到过,Kafka Producer采取异步的方式发送消息。当实例化一个Kafka Producer,同时会开启一个IO线程sender,该线程不断读取队列中的消息,然后通过Selector将数据发送给Kafka Server。用户调用Producer.send方法发送消息时,所执行的步骤如下:

  1. 与topic相关的broker建立连接,更新元数据(包括当前producer相关的topic,partition,broker等等信息),后续分析分析元数据时再细说。
  2. 根据key决定当前消息将发送给当前topic的哪个分区。如果用户给出key,则对该key进行哈希求余,得到分区号;如果用户没有提供key,则采用round-robin方式决定分区。
  3. 将该消息添加到RecordAccumulator中对应的队列中,而RecordAccumulator内包含若干双端队列列表,每个topic的每个partition都对应着一个双端队列列表,每个列表含有若干RecordBatch,而每个RecordBatch可以存储若干条消息。

上述就是producer.send方法所执行的过程,当将数据放到队列后,即可返回执行下一次的producer.send; 因为异步,执行速度非常快; 接下来,当队列中有了数据之后,即可把视角切换到IO线程sender,sender线程处于一个while循环中,不断获取消息发送,执行过程如下:

  1. 将RecordAccumulator中消息按broker进行重新分类,因为kafka把发往同一个broker的消息整合在一个request中,因此,如果有四个broker,则会产生size=4的Map<Integer, List<RecordBatch>>。
  2. 将每个broker对应的List<RecordBatch>序列化后存储再ByteBuffer,并将这个ByteBuffer与broker对应的socketChannel绑定,最后注册到selector等待发送。
  3. 在下一次selector超时时,即可读取Kafka Server返回的response,并执行相应的回调函数;

当然,上述步骤看起来非常的简单,但是体现在代码上却是非常的复杂。下面跟着代码来看看。

Kafka Producer运行过程


这里不可能把所有代码都粘贴进来,因为代码量太大了,所以把Producer执行的大致流程以及不容易看不懂的地方记录下。Producer除了发送消息,还时不时的需要更新metadata,例如一开始时需要连接到broker上,如果Producer要把消息发送到新的topic上,则这时也需要更新metadata,因为metadata没有关于这个topic的partition和broker信息。这里屏蔽更新metadata的细节,专注与消息的发送和接受,之后再讲metadata的更新。

Producer发送消息

由测试例子可知,要发送消息,首先要实例化一个Producer实例,代码如下,只保留最重要的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
try {
log.trace("Starting the Kafka producer");
// 获取用户传入的参数
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
this.time = Time.SYSTEM;
clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
/*.......*/
// 设置分区类,支持自定义分区策略,默认使用的DefaultPartitioner
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
// load interceptors and make sure they get clientId
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
// 与该Producer相关的元数据
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
// 每个请求的最大字节数
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
// Producer异步队列内存的最大值
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
// 压缩类型
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
// 缓存每个topic-partition对应消息队列集合
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time);
// 用户提供的broker地址
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
// kafka封装selector,而NetworkClient则是处理selector返回的结果
NetworkClient client = new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
this.metadata,
clientId,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.requestTimeoutMs,
time,
true);
// 异步线程类
this.sender = new Sender(client,
this.metadata,
this.accumulator,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
config.getInt(ProducerConfig.RETRIES_CONFIG),
this.metrics,
Time.SYSTEM,
this.requestTimeoutMs);
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();//开启异步线程类
log.debug("Kafka producer started");
} catch (Throwable t) {
/*.....*/
}
}

由代码可知,在实例化KafkaProducer过程中,首先将用户输入的属性转化为ProducerConfig对象,利用该配置对象实例化整个KafkaProducer所需要的资源,例如分区类,消息缓存队列以及异步线程类等等。由代码还可以知:

  1. 分区类可以自己定义,即自己定义每条消息发送至哪个分区,如果用户没有自定义,则使用默认分区类DefaultPartitioner;
  2. 缓存类accumulator,用户可以定义缓存类可以使用的内存大小(ProducerConfig.BUFFER_MEMORY_CONFIG),每个RecordBatch大小(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), producer汇集时长为linger_ms之间的消息发送,消息重传的时间间隔(retryBackoffMs);
  3. 不需要给出所有的broker地址,因为Producer会定期更新metadata,因此会自动获取所有broker地址;
  4. 负责网络发送的客户端NetworkClient,用户可以配置的信息为连接最长闲置时间,每个连接正在发送的最大消息数,重连接间隔时间,发送和接收缓存大小以及请求超时时长;
  5. 对于异步线程类,用户可以配置是否保证消息的顺序性,单次请求最大值,ack配置,请求重传配置等等;

在KafkaProducer实例化成功之后,接下来即可通过send方法发送消息,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// waitOnMetadata方法用于保证在发送消息之前,已经连上topic相关的broker以及更新metadata
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.key());
} catch (ClassCastException cce) {
/*.....*/
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.value());
} catch (ClassCastException cce) {
/*....*/
}
// 设置该消息的分区号
int partition = partition(record, serializedKey, serializedValue, cluster);
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
ensureValidRecordSize(serializedSize);
tp = new TopicPartition(record.topic(), partition);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
// 将消息添加到accumulator缓存中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
// 如果RecordBatch满了,则唤醒IO线程发送一次消息; 
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
/*.....*/
}
}

在这个函数中先是根据消息计算出该消息所对应的分区,然后调用accumulator.append追加对应的消息列表中。消息队列时一个map,键为TopicPartition,值为一个RecordBatch列表,即一个topic分区对应着消息列表中。

再accumulator.append方法中先是根据TopicPartition找到对应的列表(如果没有,则新建一个),然后取出列表最后一个RecordBatch,最后调用RecordBatch.tryAppend方法,下面看下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
if (!recordsBuilder.hasRoomFor(key, value)) {
return null;
} else {
long checksum = this.recordsBuilder.append(timestamp, key, value);
this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length);
if (callback != null)
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
}

由代码可知,该函数最后调用的时this.recordsBuilder.append方法添加,那么这个recordBuilder是什么了?append是把消息追加到哪了?我第一次看代码在这懵了。

同过查看RecoredBatch类定义可知这个recordsBuilder为MemoryRecordsBuilder,那我们接下来就可以来看下MemoryRecordsBuilder.append方法将消息添加到哪了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public long append(long timestamp, byte[] key, byte[] value) {
return appendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 1, timestamp, key, value);
}
/*----------------------------------------------------------------------------------------------*/
//appendStream为封装ByteBuffer的数据流
bufferStream = new ByteBufferOutputStream(buffer);
appendStream = wrapForOutput(bufferStream, compressionType, magic, COMPRESSION_DEFAULT_BUFFER_SIZE);
/*--------------------------------------------------------------------------------------------------*/
public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) {
try {
if (lastOffset >= 0 && offset <= lastOffset)
throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset));
int size = Record.recordSize(magic, key, value);
LogEntry.writeHeader(appendStream, toInnerOffset(offset), size);
if (timestampType == TimestampType.LOG_APPEND_TIME)
timestamp = logAppendTime;
long crc = Record.write(appendStream, magic, timestamp, key, value, CompressionType.NONE, timestampType);
recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD);
return crc;
} catch (IOException e) {
throw new KafkaException("I/O exception when writing to the append stream, closing", e);
}
}

首先我们需要知道每个RecordBatch都对应一个ByteBuffer和MemoryRecordsBuilder,而MemoryRecordsBuilder的作用就是将每条消息追加到ByteBuffer中。上述代码可知,ByteBuffer被封装成DataOutputStream,LogEntry.writeHeaderRecord.write方法即执行把消息头和消息体追加到ByteBuffer当中。

这里先做个小结,每个KafkaProducer都有一个消息缓存类accumulator,这个缓存类有一个map存储TopicPartiton-->List<RecordBatch>键值对。用户每次发送消息都是先添加到该消息对应TopicPartition的RecordBatch中。而RecordBatch中有一个大小限制的ByteBuffer,存储用户发送的消息。

sender线程发送请求

等消息添加到RecordBatch中之后,接下来可以把视角切换到sender线程。因为sender是一个线程类,我们直接可以定位到该类的run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void run() {
while (running) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
}
/*............................................*/
void run(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
//如果还不确定某些topic的leader,则需要更新metadata
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// create produce requests
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);
sendProduceRequests(batches, now);
// 调用selector发送消息
this.client.poll(pollTimeout, now);
}

sender线程处在一个while循环中,不停的从accumulator获取准备好的消息,然后发送。上述run方法最重要步骤有

首先,accumulator.drain函数将TopicPartition-->list<RecordBatch>转换为broker-->list<RecordBatch>,即把消息按broker分类,发往每个broker的消息封装成一次请求。例如如果有两个topic(t1和t2),每个topic都有三个分区(t1p1 t1p2 t1p3 t2p1 t2p2 t2p3),以及三个broker(b1 b2 b3),分别对应一个分区。那么t1p1和t2p1的数据都将发送到b1中,t1p2和t2p2的数据都将发送到b2中,t1p3和t2p3的数据都将发送到b3中。

接着调用sendProduceRequests函数,这个函数本质上做的事就是把socketChannel和ByteBuffer绑定起来。但是Kafka做了封装,因为这个函数最主要就是把kafkaChannel和每个broker上的list<RecordBatch>绑定起来。这样当selector返回,kafkaChannel为可写,就可以把list<RecordBatch>发送到对应的broker上。

最后this.client.poll(pollTimeout, now)执行selector.poll函数执行读写操作以及调用response的回调函数。我们先来看下sendProduceRequests函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void sendProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
sendProduceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue());
}//分别处理发往每个broker上的消息
/*..................................................................................................*/
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<>(batches.size());
for (RecordBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
//batch.records()返回MemoryRecords
produceRecordsByPartition.put(tp, batch.records());
recordsByPartition.put(tp, batch);
}//将发往某个broker上消息按TopicPartition进行归类
ProduceRequest.Builder requestBuilder =
new ProduceRequest.Builder(acks, timeout, produceRecordsByPartition);
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
String nodeId = Integer.toString(destination);
//实例化一个ClientRequest,这个Request时发往某个broker,包含大于等于1个TopicPartition
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
// 调用NetworkClient做进一步处理
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

由上可知传递给requtestBuilder的参数produceRecordsByPartition已经时Map<TopicPartition, MemoryRecords>。MemoryRecords对象最重要的成员变量为ByteBuffer,保存该RecordBatch内的所有消息。

下面我们来看下client.send方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void send(ClientRequest request, long now) {
doSend(request, false, now);
}
/*.................................................................*/
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
String nodeId = clientRequest.destination();
if (!isInternalRequest) {
if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
}
AbstractRequest request = null;
AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
request = builder.build();// builder为ProduceRequest.Builder,request为ProduceRequest
RequestHeader header = clientRequest.makeHeader();
/*..................................................*/
// Send是一个接口,这里返回的是NetworkSend,而NetworkSend继承ByteBufferSend
Send send = request.toSend(nodeId, header);
// 表示正在发送的请求
InFlightRequest inFlightRequest = new InFlightRequest(
header,
clientRequest.createdTimeMs(),
clientRequest.destination(),
clientRequest.callback(),
clientRequest.expectResponse(),
isInternalRequest,
send,
now);
this.inFlightRequests.add(inFlightRequest);
// 将send和对应kafkaChannel绑定起来,并开启该kafkaChannel底层socket的写事件
selector.send(inFlightRequest.send);
}

dosend方法内部先是将clientRequest转换为ProduceRequest,然后调用ProduceRequest.toSend方法将消息封装成NetworkSend类型,该类型继承ByteBufferSend,最主要成员有一个size=2的ByteBuffer数组,第一个ByteBuffer存储所有消息的size,另一个ByteBuffer存储所有消息。

在发送请求之前需要把该请求添加到inFlightRequests队列中,表示正在发送还没收到ack的请求,当收到kafka server的ack之后,kafka producer将该消息从inFlightRequest中删除。

最后调用selector.send(inFlightRequest.send)将该NetworkSend与KafkaChannel绑定起来,并为KafkaChannel底层的socket开启可写事件。下面来看下selector.send方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void send(Send send) {
String connectionId = send.destination();
if (closingChannels.containsKey(connectionId))
this.failedSends.add(connectionId);
else {
KafkaChannel channel = channelOrFail(connectionId, false);
try {
channel.setSend(send);
} catch (CancelledKeyException e) {
this.failedSends.add(connectionId);
close(channel, false);
}
}
}
/*................................................................*/
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;
// 开启可写事件
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

ok,当以上步骤的执行结束时,接下来即可返回到sender线程类的run()方法的sendProduceRequests(batches, now)方法,该方法执行结束,此时为每个broker的kafkaChannel都绑定了各自的NetworkSend。最后调用this.client.poll(pollTimeout, now);方法将消息发送出去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);
// 当收到kafka server的ack时,调用每个请求当时设置的毁掉函数
for (ClientResponse response : responses) {
try {
response.onComplete();
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
return responses;
}

这个函数先是调用selector.poll方法,这个selector是kafka封装java nio的selector,即IO多路复用。selector.poll方法监听到某个broker的KafkaChannel可写时,即可将调用KafkaChannel.write方法,把数据发送给kafka server。如果kafka server立即返回数据,那么再selector下一次轮询时即可收到kafka server返回的response。因此再selector.poll函数返回时,此时已经完成了socket的读写操作,最后再NetworkClient.poll方法中做最后处理工作。例如handleCompletedSends函数需要把已经发送成功的请求从inFlightRequests列表中删除。

这里需要说明的是,那个for循环执行的回调函数,是从大到小的顺序执行。即先是执行ClientResponse.onComplete方法,然后onComplete函数内部执行sender.handleProduceResponse方法,接着在上述方法内部执行completeBatch方法,最后再completeBatch方法内部执行每条消息的的回调函数。

ok,到这里就把消息的发送过程讲完了,下面看看元数据的更新。

更新metadata

KafkaProducer除了发送消息外,还需要不断更新元数据。例如第一次发送消息时,需要先获取TopicPartition对应broker上的连接,然后初始化Cluster信息; 当KafkaProducer发送一个还没识别的Topic时,也需要进行一次metadata更新等等。

下面从第一次更新metadata开始,再KafkaProducer.dosend()方法中的waitOnMetadata函数,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// 把topic添加metadata里面的topic列表中
metadata.add(topic);
Cluster cluster = metadata.fetch();
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// 如果之前已经获取了该topic的元数据,并缓存再cluster,则直接返回ClusterAndWaitTime实例
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
// Issue metadata requests until we have metadata for the topic or maxWaitTimeMs is exceeded.
// In case we already have cached metadata for the topic, but the requested partition is greater
// than expected, issue an update request only once. This is necessary in case the metadata
// is stale and the number of partitions for this topic has increased in the meantime.
do {
log.trace("Requesting metadata update for topic {}.", topic);
int version = metadata.requestUpdate();
sender.wakeup();
try {
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
}
cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
if (cluster.unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic);
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null);
if (partition != null && partition >= partitionsCount) {
throw new KafkaException(
String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
}
return new ClusterAndWaitTime(cluster, elapsed);
}

该函数时在KafkaProducer线程中,因此如果metadata没有被更新的话,则一直处于while循环中。再do...wwhile循环内部,

  1. 一开始调用metadata.requestUpdata(),设置this.needUpdate = true,并返回当前版本号。
  2. 然后调用sender.wakeup唤醒sender线程。
  3. metadata.awaitUpdate(version, remainingWaitMs)等待更新。

我们来看下metadata.awaitUpdate方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
if (maxWaitMs < 0) {
throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
}
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
while (this.version <= lastVersion) {
if (remainingWaitMs != 0)
wait(remainingWaitMs);
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
}

该函数会阻塞再while循环中,即当前版本小于等于最后一次更新的版本。因此此时KafkaProducer会阻塞再两个while循环中。

  • waitOnMetadata方法中的do...while循环,等待获取topic的partition数不为0
  • awaitUpdate方法中的while循环,等待版本更新。

那么在哪更新这个版本了,当然不用想,肯定是在sender函数,因为此时KafkaProducer已经阻塞了。我们可以发现在NetworkClient的poll函数中有如下调用

1
2
3
4
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
//.....................
}

因此我们可以知道在任何一次发送数据之前都会进行一次判断元数据是否需要更新, 接下来我们看下metadataUpdater.maybeUpdate(now),

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public long maybeUpdate(long now) {
// 判断metadata是否需要更新,主要是通过metadataTimeout和requestTimeoutMs进行判断
// 即metadata超时时间和请求超时时间
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long waitForMetadataFetch = this.metadataFetchInProgress ? requestTimeoutMs : 0;
long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
if (metadataTimeout > 0) {
return metadataTimeout;
}
// 选择一个负载最小的KafkaServer更新metadata
Node node = leastLoadedNode(now);
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
return reconnectBackoffMs;
}
return maybeUpdate(now, node);
}

这个函数,timeToNextMetadataUpdate变量为metadata需要更新的下一个时间;如果已经发送metadataFetchRequest,则waitForMetadataFetch为请求的超时时间,否则为0。如果上述两个值最大值大于0,则还没到更新metadata的时间,直接返回更新时间。

如果需要更新,则先是获取一个负载最小的kafkaServer,然后执行metadata更新,下面这个时metadata更新最终的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private long maybeUpdate(long now, Node node) {
String nodeConnectionId = node.idString();
// 如果该节点可以发送请求,则发送metadataRequest
if (canSendRequest(nodeConnectionId)) {
this.metadataFetchInProgress = true;
MetadataRequest.Builder metadataRequest;
if (metadata.needMetadataForAllTopics())
metadataRequest = MetadataRequest.Builder.allTopics();
else
metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()));
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
return requestTimeoutMs;
}
// 如果正在与broker建立连接,则返回重连接延迟时间.
if (isAnyNodeConnecting()) {
return reconnectBackoffMs;
}
if (connectionStates.canConnect(nodeConnectionId, now)) {
// 如果node还没建立连接,则建立连接
log.debug("Initialize connection to node {} for sending metadata request", node.id());
initiateConnect(node, now);
return reconnectBackoffMs;
}
// connected, but can't send more OR connecting
// In either case, we just need to wait for a network event to let us know the selected
// connection might be usable again.
return Long.MAX_VALUE;
}

因此每次metadata更新时有三种情况

  1. 如果节点可以发送请求,则直接发送请求。判断是否可以发送请求包括是否已经建立连接,发往该broker的请求是否太多,即负载过大。
  2. 如果正在建立连接,则返回。
  3. 如果还没建立连接,则向broker发起连接。

而我们知道KafkaProducer之前阻塞在metadata版本更新上,因此sender线程

  1. 第一次轮询返回时,建立到broker的连接。
  2. 第二次轮询返回时,发送metadataFetch请求。
  3. 第三次轮询返回时,获取metadataResponse,并更新metadata。 经过上述sender线程三次轮询,metadata得到更新,此时KafkaProducer不再阻塞,开始发送消息数据。

metadata再两种情况下会进行更新,一种是像KafkaProducer第一次发送消息时强制更新,另一次是周期进行更新。当然代码中不止KafkaProducer第一次发送消息时强制更新数据,例如当发现有broker宕机时也会更新一次metadata等等。

总结


大概把KafkaProducer大体介绍完毕,KafkaProducer主要分为三个部分,一个数Producer主线程发送数据,一个是sender线程真正发送数据以及更新metadata。下篇文章说说KafkaConsumer。