一、Producer整体架构

  1. Producer线程调用send时,只是将数据序列化后放入对应TopicPartition的Deque尾部的ProducerBatch数据结构中
  2. Sender线程每次扫描所有Deque的尾部,得到需要发送的readyNodes,并确认所有的readyNodes都已建立好连接。
  3. 遍历readyNodes,再遍历每个Node上所有partition的Deque的队头,直到凑齐max.request.size或遍历完,并使用NIO进行发送
  4. BufferPool会进行整体内存管理与ProducerBatch内存复用,减少GC

二、标准Producer代码

public class KafkaProducerTest {
    public static void main(String[] args) throws FileNotFoundException {
        Properties props = new Properties();
        // 初始嗅探的服务器
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.40.4:6667,10.0.40.5:6667");
        // ALL,代表写入leader和副本都成功才返回,1代表写入leader成功就返回,0代表直接返回,不保证server是否写入
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        // ProducerBatch的大小,这里是100KB
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 102400);
        // BuffPool的大小。当空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        // KafkaProducer初始化
        Producer<String, String> producer = new KafkaProducer<>(props);
        // 异步发送,返回结果是一个RecordMetadata,它指定了消息发送的分区,分配的offset和消息的时间戳
        for (int i = 0; i < 1000; i++) {
            producer.send(new ProducerRecord<String, String>("topic-name", "this is a meesage"), (metadata, excp) -> {
                if (excp != null) {
                    // 消息发送失败的处理逻辑
                    excp.printStackTrace();
                }
            });
        }
        producer.flush();
        producer.close();
    }
}

三、Producer线程逻辑

3.1 Producer核心发送逻辑解读

  1. 序列化key,value,计算出要发往哪个parition
  2. 将record根据TopicPartition append到RecordAccmulator内部Map对应的Deque尾部
  3. RecordAccmulator生成新batch时,会从内部BufferPool中申请内存,Batch被Sender线程使用完后,会将内存归还给BufferPool。
  4. Sender线程会遍历RecordAccmulator中全部队列,判断队首的batch是否发送
  5. RecordAccmulator原理参考第四章,BufferPool原理第五章
值得学习的点:
2.2 new KafkaProducer()初始化逻辑
  1. metric、intercepters、partitioner、key、value序列化器初始化、配置初始化
  2. 初始化accumulator
  3. 初始化sender线程并启动
······
// 分区器,决定了每条消息发往哪个分区
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
······
// 初始化accumulator,内部batches是一个Map封装了发往不同TopicPartition的不同队列
this.accumulator = new RecordAccumulator(···,batch_size,totalMemory,compressionType,···);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
// 获取集群元数据
if (metadata != null) {
    this.metadata = metadata;
} else {
    this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
        true, true, clusterResourceListeners);
    this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
}
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
// NIO的包装类,sender线程内部使用此类进行网络IO
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(···
        new Selector(···),
        this.metadata,···);
// sender线程,封装了从队头中读取ProducerBatch并发送的逻辑
this.sender = new Sender(···,
        client,
        this.metadata,
        this.accumulator,···);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
// KafkaThread只是一个包装类,为了启动Sender线程
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
// 启动Sender线程
this.ioThread.start();
······
2.3 doSend逻辑
  1. 序列化key,value
  2. 调用partition()获取写入的partition id
  3. 调用ensureValidRecordSize()判断此条消息是否超过max.request.size与buffer.memrory
  4. 调用accumulator.append,将Record加入内存队列
  5. 根据返回结果,判断当前batch满了或新创建了一个batch则唤醒sender线程
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    ······
    // key序列化
    byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
    // value序列化
    byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
    // 计算出record发往的partition
    int partition = partition(record, serializedKey, serializedValue, cluster);
    tp = new TopicPartition(record.topic(), partition);

    setReadOnly(record.headers());
    Header[] headers = record.headers().toArray();

    int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
            compressionType, serializedKey, serializedValue, headers);
    // 校验序列化后大小是否超过max.request.size, buffer.memory。注意这里是压缩前大小
    ensureValidRecordSize(serializedSize);
    ······
    // 将record append到队列中
    RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
            serializedValue, headers, interceptCallback, remainingWaitMs);
    // 新产生一个batch或者batch满了都唤醒sender线程
    if (result.batchIsFull || result.newBatchCreated) {
        log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
        this.sender.wakeup();
    }
    // 返回异步future结果
    return result.future;
    ······
}
2.4 partition逻辑
  1. 指明 partition的情况下,直接将指明的值直接作为 partiton 值;
  2. 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
  3. 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。
·····doSend调用parition的逻辑·····
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    // 1、若record本身设置了partition,则取此parition为目的partition
    Integer partition = record.partition();
    // 2、若未设置partition,调用分区器parititioner得到发往的分区id
    return partition != null ?
            partition :
            partitioner.partition(
                    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

······DefaultPartitioner的逻辑······
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    // 1、从元数据中获取topic对应的全部partition
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        // 2、若没有设置key,nextValue会对每个topic产生一个随机数,以此为起点做round-robin轮询
        int nextValue = nextValue(topic);
        // 注意不设置key,只轮询可用的partition
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            // 求模映射
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            // 如果没有可用的partition,还是根据总parition数返回一个值
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        // 如果设置了key,严格按照key进行hash映射到对应的partition上
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

三、 Sender线程

https://blog.csdn.net/bohu83/article/details/88853553

public void run() {
    // 核心循环,定时轮询accumulator判断是否有record发送
    while (running) {
        try {
            // 核心运行逻辑
            run(time.milliseconds());
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }

    // okay we stopped accepting requests but there may still be
    // requests in the accumulator or waiting for acknowledgment,
    // wait until these are completed.
    // 正常关闭,将内存队列中剩余项先发送完
    while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
        try {
            run(time.milliseconds());
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }
    // 强行关闭,也需要等待Producer线程退出
    if (forceClose) {
        // We need to fail all the incomplete batches and wake up the threads waiting on the futures.
        this.accumulator.abortIncompleteBatches();
    }
    // 网络层client关闭
    try {
        this.client.close();
    } catch (Exception e) {
        log.error("Failed to close network client", e);
    }

    log.debug("Shutdown of Kafka producer I/O thread has completed.");
}

void run(long now) {
    ······
    // 从accumulator的Deque中取数据,通过client异步发送
    long pollTimeout = sendProducerData(now);
    // NIO Reactor事件循环,网络事件发生时,处理网络事件
    client.poll(pollTimeout, now);
}

private long sendProducerData(long now) {
    Cluster cluster = metadata.fetch();

    // get the list of partitions with data ready to send
    // 遍历所有队列队首,得到可发送的batch所在的节点集合(并且是leader可知的)返回。
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

    // if there are any partitions whose leaders are not known yet, force metadata update
    // 结果中leader不可知的TopicPartition,再次发送元数据更新请求
    if (!result.unknownLeaderTopics.isEmpty()) {
        // The set of topics with unknown leader contains topics with leader election pending as well as
        // topics which may have expired. Add the topic again to metadata to ensure it is included
        // and request metadata update, since there are messages to send to the topic.
        for (String topic : result.unknownLeaderTopics)
            this.metadata.add(topic);
        this.metadata.requestUpdate();
    }

    // remove any nodes we aren't ready to send to
    // 需要
    Iterator<Node> iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node = iter.next();
        // 返回true,如果已经和此节点建立好连接或建立连接成功
        if (!this.client.ready(node, now)) {
            // 从readNodes中移除此Node
            iter.remove();
            notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
        }
    }

    // create produce requests
    // 遍历每个已定的需要发送数据的Broker,遍历它上面所有TopicPartition队列的队首,组装直到满足max.request.size。
    // 返回节点id与发往此节点的ProducerBatch集合
    Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
            this.maxRequestSize, now);
    // maxInflightRequests == 1代表需要保证顺序
    if (guaranteeMessageOrder) {
        // Mute all the partitions drained
        // 发送前,将要发送的TopicPartition放到mute列表中,禁止后续再发送mute列表中的TopicPartition的Batch,避免网络乱序
        for (List<ProducerBatch> batchList : batches.values()) {
            for (ProducerBatch batch : batchList)
                // 将TopicPartition加入accumulator中mute集合中
                this.accumulator.mutePartition(batch.topicPartition);
        }
    }

    // 得到所有队列中过期的batch集合
    List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
    // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
    // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
    // we need to reset the producer id here.
    if (!expiredBatches.isEmpty())
        log.trace("Expired {} batches in accumulator", expiredBatches.size());
    // 超时的直接走Batch失败流程,Producer线程会拿到异常报错信息
    for (ProducerBatch expiredBatch : expiredBatches) {
        failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false);
        if (transactionManager != null && expiredBatch.inRetry()) {
            // This ensures that no new batches are drained until the current in flight batches are fully resolved.
            transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
        }
    }

    sensors.updateProduceRequestMetrics(batches);

    // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
    // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
    // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
    // with sendable data that aren't ready to send since they would cause busy looping.
    long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
    if (!result.readyNodes.isEmpty()) {
        log.trace("Nodes with data ready to send: {}", result.readyNodes);
        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        pollTimeout = 0;
    }
    // 封装请求,通过NetworkClient发送
    sendProduceRequests(batches, now);

    return pollTimeout;
}

四、accumulator结构分析

// 内存池,ProducerBatch实际使用的内存来自于此
private final BufferPool free;
// CopyOnWriteMap,每个TopicPartition会维护一个Deque
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
// 若maxInflightRequests=1,将有正在发送数据的TopicPartition放入此集合,发送时会跳过
private final Set<TopicPartition> muted;

append()
ready()
drain()
expiredBatches()
mutePartition()
unmutePartition()

五、BufferPool实现内存复用

// 总内存大小=buffer.memory
private final long totalMemory;
// BufferPool中能重用的ByteBuffer大小=batch.size
private final int poolableSize;
// 可复用的ByteBuffer
private final Deque<ByteBuffer> free;
// 内存不够时,Producer线程会在这里等待内存释放,等待被唤醒
private final Deque<Condition> waiters;

public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
    // batch_size,只有batch_size
    this.poolableSize = poolableSize;
    this.lock = new ReentrantLock();
    // ByteBuffer,用于缓存使用过的HeapByteBuffer,注意只有大小为poolableSize的HeapByteBuffer才会缓存
    this.free = new ArrayDeque<>();
    // 内存不够时,条件等待,这里是等待队列
    this.waiters = new ArrayDeque<>();
    // 总内存大小,就是buffer.memory,这个配置基本代表Producer的堆大小
    this.totalMemory = memory;
    // 剩余内存大小
    this.availableMemory = memory;
    this.metrics = metrics;
    this.time = time;
    this.waitTime = this.metrics.sensor(WAIT_TIME_SENSOR_NAME);
    MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
                                               metricGrpName,
                                               "The fraction of time an appender waits for space allocation.");
    this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
}

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
    if (size > this.totalMemory)
        throw new IllegalArgumentException("Attempt to allocate " + size
                                           + " bytes, but there is a hard limit of "
                                           + this.totalMemory
                                           + " on memory allocations.");

    ByteBuffer buffer = null;
    this.lock.lock();
    try {
        // check if we have a free buffer of the right size pooled
        // 请求的size为batch_size,若free列表不为空,说明有可复用的HeapByteBuffer,直接复用
        if (size == poolableSize && !this.free.isEmpty())
            return this.free.pollFirst();

        // now check if the request is immediately satisfiable with the
        // memory on hand or if we need to block
        int freeListSize = freeSize() * this.poolableSize;
        // 若可用内存足够(可用内存=totalmeory - 有部分被申请了,没有归还的=this.nonPooledAvailableMemory + freeListSize)
        if (this.nonPooledAvailableMemory + freeListSize >= size) {
            // we have enough unallocated or pooled memory to immediately
            // satisfy the request, but need to allocate the buffer
            // 释放free list里的内存(说明申请的是非常规的Size,超过了Batch_size大小)
            freeUp(size);
            this.nonPooledAvailableMemory -= size;
        } else {
            // we are out of memory and will have to block
            // 可用内存不够,阻塞,等待batch发送(因为外部做了校验,理论上等待足够长,总内存是肯定够的)
            int accumulated = 0;
            Condition moreMemory = this.lock.newCondition();
            try {
                long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                this.waiters.addLast(moreMemory);
                // loop over and over until we have a buffer or have reserved
                // enough memory to allocate one
                while (accumulated < size) {
                    long startWaitNs = time.nanoseconds();
                    long timeNs;
                    boolean waitingTimeElapsed;
                    try {
                        // 阻塞,有内存归还时会被唤醒
                        waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                    } finally {
                        long endWaitNs = time.nanoseconds();
                        timeNs = Math.max(0L, endWaitNs - startWaitNs);
                        this.waitTime.record(timeNs, time.milliseconds());
                    }

                    // 如果是因为超时退出阻塞,报错,分配内存失败
                    if (waitingTimeElapsed) {
                        throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                    }

                    // 记录超时时间
                    remainingTimeToBlockNs -= timeNs;

                    // check if we can satisfy this request from the free list,
                    // otherwise allocate memory
                    // 如果分配的batch_size大小的内存,又有batch归还到了free list,直接复用
                    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                        // just grab a buffer from the free list
                        buffer = this.free.pollFirst();
                        accumulated = size;
                    } else {
                        // we'll need to allocate memory, but we may only get
                        // part of what we need on this iteration
                        freeUp(size - accumulated);
                        int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
                        this.nonPooledAvailableMemory -= got;
                        accumulated += got;
                    }
                }
                // Don't reclaim memory on throwable since nothing was thrown
                accumulated = 0;
            } finally {
                // When this loop was not able to successfully terminate don't loose available memory
                // try块在最后一句执行accumulated = 0;若执行成功,下面语句无作用
                // 若失败,说明发生异常,将已分配的内存“还”给BufferPool
                this.nonPooledAvailableMemory += accumulated;
                this.waiters.remove(moreMemory);
            }
        }
    } finally {
        // signal any additional waiters if there is more memory left
        // over for them
        try {
            // 判断还有多余内存,唤醒下一个等待内存的线程
            if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
                this.waiters.peekFirst().signal();
        } finally {
            // Another finally... otherwise find bugs complains
            lock.unlock();
        }
    }

    if (buffer == null)
        return safeAllocateByteBuffer(size);
    else
        return buffer;
}

public void deallocate(ByteBuffer buffer, int size) {
    // 因为producer设计可以多线程访问,因此可能多线程访问,需要加锁
    lock.lock();
    try {
        // 只有poolableSize大小的ByteBuffer才缓存
        if (size == this.poolableSize && size == buffer.capacity()) {
            buffer.clear();
            this.free.add(buffer);
        } else {
            // 否则只是将内存放回,增加availableMemory
            this.nonPooledAvailableMemory += size;
        }

        // memory增长了,因此通知因为内存不够等待的调用者,目前有新的可用内存
        Condition moreMem = this.waiters.peekFirst();
        if (moreMem != null)
            moreMem.signal();
    } finally {
        lock.unlock();
    }
}

private void freeUp(int size) {
    // 当不能复用free中的HeapByteBuffer时
    // 释放free的内存,以增加availableMemory中的内存,提供服务
    // totalMemory=buffer.memory=free.size()*batch_size + availableMemory
    while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
        this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}

六、NIO网络层

七、其它

压缩
  1. Producer线程会压缩,再tryAppend调用时,写入HeapByteBuffer时会进行流式压缩(如lz4为64kb一个block进行压缩)
08-22 17:44