Kafka

生产者发送消息的整体流程

img

消息追加器 RecordAccumulator

前面几个组件,在 3.1 的文章中,已经说清楚。现在来看 RecordAccumulator 组件

RecordAccumulator 主要用于缓存消息,以便 Sender 线程能够批量发送消息。RecordAccumulator 会将消息放入缓存 BufferPool(实际上就是 ByteBuffer) 中。BufferPool 默认最大为 33554432B,即 32MB, 可通过 buffer.memory 进行配置。
当生产者生产消息的速度大于 sender 线程的发送速度,那么 send 方法就会阻塞。默认阻塞 60000ms,可通过 max.block.ms 配置。

RecordAccumulator 类的几个重要属性

1
2
3
4
5
public final class RecordAccumulator {
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
// 缓存空间,默认 32MB,可通过上面说的 buffer.memory 参数进行配置
private final BufferPool free;
}

TopicPartition 为分区的抽象。定义如下所示

1
2
3
4
5
public final class TopicPartition implements Serializable {
private int hash = 0;
private final int partition;
private final String topic;
}

主线程发送的消息,都会被放入batcher 中, batches 将发往不同 TopicPartition 的消息,存放到各自的 ArrayDeque<ProducerBatch> 中。
主线程 append 时,往队尾插入,sender 线程取出时,则往队头取出。

ProducerBatch 批量消息

ProducerBatch 为批量消息的抽象。
在编写客户端发送消息时,客户端面向的类则是 ProducerRecordkafka 客户端,在发送消息时,会将 ProducerRecord 放入 ProducerBatch,使消息更加紧凑。
如果为每个消息都独自创建内存空间,那么内存空间的开辟和释放,则将会比较耗时。因此 ProducerBatch 内部有一个 ByteBufferOutputStream bufferStream(实则为 ByteBuffer), 使用 ByteBuffer 重复利用内存空间。

bufferStream 值的大小为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final class RecordAccumulator {

// 该值大小,可通过 buffer.memory 配置
private final BufferPool free;

public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {

int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));

}
}

其中,batchSize 默认 16384B,即 16kb,可通过 batch.size 配置。第2个入参的值则为消息的大小。

需要注意的是,bufferStream 的内存空间是从 free 内存空间中划出的。

上面有说到,ProducerBatch 会使用 ByteBuffer 追加消息。但是,如果你看代码,你会发现 ProducerBatch 在做消息的追加时,会将消息放入 DataOutputStream appendStream。好像跟我们说的 不一样! 但是实际上,就是利用 ByteBuffer,这里还需要看 appendStream 是如何初始化的!

注:MemoryRecordsBuilder 为 ProducerBatch 中的一个属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class MemoryRecordsBuilder {
private final ByteBufferOutputStream bufferStream;
private DataOutputStream appendStream;

private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
Header[] headers) throws IOException {
ensureOpenForRecordAppend();
int offsetDelta = (int) (offset - baseOffset);
long timestampDelta = timestamp - firstTimestamp;
// 往 appendStream 中追加消息
int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
recordWritten(offset, timestamp, sizeInBytes);
}
}

MemoryRecordsBuilder 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class MemoryRecordsBuilder {
private final ByteBufferOutputStream bufferStream;
private DataOutputStream appendStream;

public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
long baseOffset,
long logAppendTime,
long producerId,
short producerEpoch,
int baseSequence,
boolean isTransactional,
boolean isControlBatch,
int partitionLeaderEpoch,
int writeLimit) {

// ..省略部分代码
bufferStream.position(initialPosition + batchHeaderSizeInBytes);
this.bufferStream = bufferStream;

// 使用 bufferStream 包装
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
}
}

可以看到实际上使用的还是 ByteBufferOutputStream bufferStream

Sender 线程

Sender 线程在发送消息时,会从 RecordAccumulator 中取出消息,并将放在 RecordAccumulator 中的 Deque<ProducerBatch> 转换成 Map<nodeId, List<ProducerBatch>>,这里的 nodeIdkafka 节点的 id。再发送给 kafka 之前,又会将消息封装成 Map<nodeId, ClientRequest>

请求在从 Sender 发往 kafka 时,还会被存入 InFlightRequests

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class NetworkClient implements KafkaClient {
/* the set of requests currently being sent or awaiting a response */
private final InFlightRequests inFlightRequests;

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
String destination = clientRequest.destination();
RequestHeader header = clientRequest.makeHeader(request.version());
if (log.isDebugEnabled()) {
int latestClientVersion = clientRequest.apiKey().latestVersion();
if (header.apiVersion() == latestClientVersion) {
log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,
clientRequest.correlationId(), destination);
} else {
log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",
header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), destination);
}
}
Send send = request.toSend(destination, header);
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);

// 将请求放入
this.inFlightRequests.add(inFlightRequest);
selector.send(send);
}
}

InFlightRequests

1
2
3
4
5
6
7
/**
* The set of requests which have been sent or are being sent but haven't yet received a response
*/
final class InFlightRequests {
private final int maxInFlightRequestsPerConnection;
private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>();
}

InFlightRequests 的作用是存储已经发送的,或者发送了,但是未收到响应的请求。
InFlightRequests 类中有一个属性 maxInFlightRequestsPerConnection, 标识一个节点最多可以缓存多少个请求。该默认值为 5, 可通过 max.in.flight.requests.per.connection 进行配置, 需要注意的是 InFlightRequests 对象是在创建 KafkaProducer 时就会被创建。

requests 参数的 keynodeIdvalue 则为缓存的请求。

sender 线程 在发送消息时,会先判断 InFlightRequests 对应的请求缓存中是否超过了 maxInFlightRequestsPerConnection 的大小

代码入口:Sender.sendProducerData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Sender implements Runnable {
private long sendProducerData(long now) {
// ... 省略部分代码
while (iter.hasNext()) {
Node node = iter.next();

// todo 这里为代码入口
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
// ... 省略部分代码
}
}

public class NetworkClient implements KafkaClient {
private boolean canSendRequest(String node, long now) {
return connectionStates.isReady(node, now) && selector.isChannelReady(node) &&
inFlightRequests.canSendMore(node);
}
}

final class InFlightRequests {
public boolean canSendMore(String node) {
Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
}

InFlightRequests 的设计中,可以看到,我们可以很轻松的就知道,哪个 kafka 节点的负载是最低。因为只需要判断 requests 中对应 node 集合的大小即可。

重要参数

  1. acks
    用于指定分区中需要有多少个副本收到消息,生产者才会认为消息是被写入的
    acks = 1。默认为1, 只要 leader 副本写入,则被认为已经写入。如果消息已经被写入 leader 副本,且已经返回给生产者 ok,但是在 follower 拉取 leader 消息之前, leader 副本突然挂掉,那么此时消息也会丢失
    acks = 0。发送消息后,不需要等待服务端的响应,此配置,吞吐量最高。
    acks = -1 或者 all。需要等待所有 ISR 中的所有副本都成功写入消息之后,才会收到服务端的成功响应。
    需要注意的一点是 acks 入参是 String,而不是 int
  2. max.request.size
    客户端允许发送的消息最大长度,默认为 1MB.
  3. retriesretry.backoff.ms
    retries 配置生产者的重试次数,默认为 0. retry.backoff.ms 配置两次重试的间隔时间
  4. compression.type
    指定消息的压缩方式,默认为 **none**。可选配置gzip,snappy,lz4
  5. connection.max.idle.ms
    指定在多久之后关闭闲置的连接,默认 540000(ms),即 9分钟
  6. linger.ms
    指定发送 ProducerBatch 之前等待更多的消息(ProducerRecord) 加入 ProducerBatch 的时间,默认为 0。生产者在 ProducerBatch 填充满时,或者等待时间超过 linger.ms 发送消息出去。
  7. receive.buffer.bytes
    设置 Socket 接收消息缓存区的大小,默认 32678B, 32KB。如果设置为 -1, 则表示使用 操作系统的默认值。如果 Procuerkafka 处于不同的机房,可以调大此参数。
  8. send.buffer.bytes
    设置 Socket 发送消息缓冲区大小。默认 131072B, 即128KB。如果设置为 -1,则使用操作系统的默认值
  9. request.timeout.ms
    Producer 等待响应的最长时间,默认 30000ms。需要注意的是,该参数需要比 replica.lag.time.max.ms 值更大。可以减少因客户端重试,而造成的消息重复
  10. buffer.memory
    配置消息追加器,内存大小。默认最大为 33554432B,即 32MB
  11. batch.size
    ProducerBatch ByteBuffer 。默认 16384B,即 16kb
  12. max.block.ms
    生产者生成消息过快时,客户端最多阻塞多少时间。

总结

  1. kafka 将生产者生产消息,消息发送给服务端,拆成了 2 个过程。生产消息交由 主线程, 消息发送给服务端的任务交由 sender 线程。
  2. 通过 RecordAccumulator 的设计,将生产消息,与发送消息解耦。
  3. RecordAccumulator 内部存储数据的数据结构是 ArrayDeque. 队尾追加消息,队头取出消息
  4. 开发人员编写的 ProducerRecord,在消息发送之前会被转为 ProducetBatch。为的是批量发送消息,提高网络 IO 效率
  5. 为了避免,每个节点负载过高,kafka 设计了 InFlightRequests, 将为响应的消息放入其中
  6. 从源码角度,batch.size 最好是 buffer.memory 整数倍大小。因为 ProducerBatchByteBuffer 是从 RecordAccumulatorByteBuffer 中划出的

RocketMQ 区别

  1. RocketMQ 没有将生产消息与发送消息解耦。
  2. RocketMQ 的消息发送,分为 同步,异步、单向。其中单向发送与 kafkaacks = 0 的配置效果一样。但是实际上,还得看 RocketMQ broker刷盘配置
  3. kafka 发送失败,默认不重试,RocketMQ 默认重试 2 次。不过 RocketMQ 无法配置 2 次重试的间隔时间. kafka 可以配置重试的间隔时间。
  4. RocketMQ 默认消息最大为 4MB, kafka 默认 1MB
  5. RocketMQ 在消息的发送上,是直接使用 Nettykafka 则是使用 NIO 自己实现通信。(虽说,Netty 也是基于 NIO
  6. 当然还有很多咯….., 因为设计完全不一样!,实际解决场景也不一样

知识补充

ByteBuffer

ByteBuffer 一般用于网络传输的缓冲区。

先来看下 ByteBuffer 的类继承体系
img

ByteBuffer 主要的 2 个父类。 DirectByteBufferHeapByteBuffer。一般而言,我们主要的是使用 HeapByteBuffer

ByteBuffer 重要属性
  1. position
    当前读取的位置
  2. mark
    为某一读过的位置做标记,便于某些时候回退到该位置
  3. limit
    读取的结束位置
  4. capacity
    buffer 大小
ByteBuffer 基本方法
  1. put()
    buffer 中写数据,并将 position 往前移动
  2. flip()
    position 设置为0,limit 设置为当前位置
  3. rewind()
    position 设置为0, limit 不变
  4. mark()
    mark 设置为当前 position 值,调用 reset(), 会将 mark 赋值给 position
  5. clear()
    position 设置为0,limit 设置为 capacity
ByteBuffer 食用DEMO
1
2
3
4
5
6
7
8
9
10
FileInputStream fis = new FileInputStream("/Users/chenshaoping/text.txt");
FileChannel channel = fis.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);

int read = channel.read(buffer);
while (read != -1) {
System.out.println(new String(buffer.array(), Charset.defaultCharset()));
buffer.clear();
read = channel.read(buffer);
}

ArrayDeque

ArrayDeque,是一个双端队列。即可以从队头插入元素,也可以从队尾插入元素
对于双端队列,既可以使用 链表的方式实现,也可以使用数组的方式实现。
JDKLinkedList 使用链表实现,ArrayDeque 则使用数组的方式实现

来看 ArrayDeque 的实现。
ArrayDeque 中,有 head, tail 分别指向 头指针,和尾指针。可以把 ArrayDeque 想象成循环数组

插入
  1. 当往队尾插入元素时,tail 指针会往前走
    img
  2. 当往队前插入元素时, head 指针会向后走
    img
删除
  1. 从队头删除元素 4 , head 会往前走
    img
  2. 从队尾删除元素 3, tail 会往后走
    img

可以看到,这里通过移动 head, tail 指针就可以删除元素了。

扩容

tailhead 都指向都一个位置时,则需要扩容
img

扩容会将数组的大小扩充为原来的 2 倍,然后重新将 head 指向数组 0 下标, tail 指向数组的最后一个元素位置。

上面的数组,在重新扩容后,会变成下面这个样子
img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ArrayDeque<E> extends AbstractCollection<E>
implements Deque<E>, Cloneable, Serializable
{
private void doubleCapacity() {
assert head == tail;
int p = head;
int n = elements.length;
int r = n - p; // number of elements to the right of p
int newCapacity = n << 1;
if (newCapacity < 0)
throw new IllegalStateException("Sorry, deque too big");
Object[] a = new Object[newCapacity];
System.arraycopy(elements, p, a, 0, r);
System.arraycopy(elements, 0, a, r, p);
elements = a;
head = 0;
tail = n;
}
}