深入理解Kafka的核心调优参数

UPDATE:配置属性已更新到2.11-0.11.0.0

kafka的配置属性多达几百个😓,在生产环境中对kafka进行调优时,该如何设置这些属性值呢?😤

调优之前,首先需要对业务场景进行分析,确定业务是吞吐量优先,还是对延时优先,是对可靠性要求比较高,还是对可用性要求比较高,然后再根据分析的结果,在吞吐量、延时、可靠性和可用性4个方面做权衡。

下面分别从吞吐量优先、延时优先、可靠性优先以及可用性优先4个方面,逐一分析kafka应该设置哪些核心属性以及提供建议值。

kafka的配置属性到底有多少,看看主要配置属性就知道😰💔

吞吐量优先

吞吐量优先意味着需要尽可能提升每秒发送消息的吞吐量

常见场景:日志收集

吞吐量优先 配置属性 默认值 建议值 说明
broker
分区个数 num.partitions 1 [确定分区个数][0] 分区是并行处理的最小单元。消息的并行发送和接收都是基于分区的。topic的分区越多,并行处理能力越强,吞吐量越大
producer 消息是异步发送的,先保存至内存,然后由后台线程(kafka-producer-network-thread$client-id)发送到broker集群
批量策略 batch.size 16384(16K) 100000 - 200000 设置每次批量提交消息的最大字节数,一旦待批量发送的消息的大小超过这个字节数,这些消息将被批量发送出去
linger.ms 0 10 - 100 该参数会影响延时,因为消息不是立即发送的,而是需要等待发送的消息大小超过batch.size或者收集消息的等待时间超过linger.ms才会发送消息
压缩 compression.type none lz4 将待发送的多个消息压缩成一个消息,因此和batch.size一起使用,batch.size越大,压缩率越高,吞吐量越大。支持3种压缩类型:lz4、snappy和gzip。[lz4能使吞吐量最优][1]
确认方式 acks 1 1 acks=1表示leader broker收到消息后就返回ack,而无需等所有的followers都确认收到消息。存在丢失消息的风险
队列限制 buffer.memory 33554432(32G) 分配给producer的内存,用于存储还未发送到broker的消息。如果分区比较多,那么设置的值也需要更大
max.block.ms 60000(10s) 定义被阻塞的时间;当buffer.memory达到上限时,再次发送消息时会被阻塞。当阻塞时间超过max.block.ms时,producer将会抛出异常
消息大小 max.request.size 1048576(1M) 定义每条消息的最大字节数。
请求超时时间 request.timeout.ms 30000(30s) 客户端等待请求的最大响应时间;如果设置了重试次数,超过这个时间,客户端将会重试。
consumer
获取策略 fetch.min.bytes 1 100000 定义从broker获取消息的最小字节数。只有大于这个值时,consumer才会拉取到消息,否则会等待到超时。这个值越大,从broker获取消息的次数越少,会减轻broker的CPU压力;但会影响延时
fetch.max.wait.ms 达到fetch.min.bytes或者超过fetch.max.wait.ms时,才会消费消息
GC
JVM [JVM调优][2] 长时间的GC时间会严重影响吞吐量;严重的情况下会导致broker失败,比如导致ZooKeeper session超时,ZooKeeper会认为该broker不可用

压缩类型为什么选择lz4?
因为这种类型的压缩方式下,吞吐量最大。
具体可以参考[压缩的性能测试][1]以及日志压缩机制

吞吐量优先时,会占用大量的网络带宽,如果不希望影响整个网络,可以设置配额。

如何设置配额

低延时优先

低延是指producer开始发送消息到consumer接收到消息的时间差。低延时优先意味着每条消息需要尽可能快地完成端对端(从producer到consumer)的传递

常见场景:近实时数据的传输、聊天、视频弹幕等应用

低延时优先 分类 配置属性 默认值 建议值 说明
broker
分区个数 num.partitions [如何确定分区个数][0] 控制每个broker的分区数量。(1)限制整个集群的分区数量;(2)增加集群的broker数量,从而减少每个broker上的分区数量
消息复制线程数 num.replica.fetchers 1 复制消息的线程复制将消息从leader上复制到每个followers上。当followers无法跟上leader时,可以将该值调大,加快复制消息的速率
producer
发送时间 linger.ms 0 0 即时发送,有消息就发送
压缩 compression.type none none 压缩会使用CPU、降低网络带宽使用率
确认方式 acks 1 0 异步发送,发送者无需等待broker的ack
consumer
获取策略 fetch.min.bytes 1 1 即时消费,有消息就消费

优化durability

可靠性就是要降低丢失消息的概率。最常见的做法就是通过消息复制实现高可靠。

可靠性优先 分类 配置属性 默认值 建议值 说明
broker
复制 default.replication.factor 1 3 指定分区副本的个数。设置为3,可以保证集群中的3个borker即使出现2个不可用时,消息依旧不会丢失
auto.create.topics.enable true false 当该值设置为true时,必须将default.replication.factor设置为3,这样topic将会随着复制一同被创建
min.insync.replicas 1 2 指定ISR列表的最小个数(包含leader);topic上的属性值可以覆盖该值
unclean.leader.election.enable true false 只从ISR中选举leader,不允许不在ISR列表中的broker参加leader的选举,否则会导致已经提交但是还未复制的消息的丢失。topic上的属性值可以覆盖该值
log.flush.interval.messages 1 同步刷磁盘。topic上的属性值可以覆盖该值
producer 复制个数 replication.factor 1 3 可以在每个topic上配置
确认方式 acks 1 -1 leader等待ISR中的所有follower确认接收到消息后再返回ack
重试 retries 0 3 重试后依据失败的,需要在producer根据业务场景做对应的处理,比如,在callback里面记录发生失败
max.in.flight.requests.per.connection 5 1 保证消息的有序性
consumer 消息确认 auto.commit.enable true false 通过显式调用commitSync()或者commitAsync()手动确认消息被消费。消费者是通过更新offsets来表明offsets之前的消息已经被消费了;当消费者的poll()执行完时,会自动commit offsets。但是如果poll作为业务事务中的一部分的时候,为了保证可靠性,必须在事务提交之后才能提交offsets,所以需要将auto.commit.enable=false,并且显示地调用commitSync() or commitAsync()

必须调用producer的close()方法,该方法会一直block,直到之前发送的消息全部发送成功
retries>0 & max.in.flight.requests.per.connection>1 会产生消息reordering

  • default.replication.factor和min.insync.replicas的区别
    default.replication.factor是指分区的总的副本个数,min.insync.replicas是指ISR列表中最少的在线副本的个数(含leader),当在线的副本个数小于min.insync.replicas时,生产者发送消息会失败。
    default.replication.factor=3,min.insync.replicas=2表示消息总共有3个副本,当在线的副本大于或者等于2时,生产者可以继续发送消息,能够容忍1个备份不可用,否则不能发送消息。

可靠性的解决方案
复制机制

平衡可靠性和可用性
min.insync.replicas

推荐配置:每个分区一个物理存储磁盘,每个分区一个consumer

  • retries 自动重试

重试的副作用

  1. 重复消息;集群出现短暂失败时可能会导致重复消息,需要消费者端自己处理重复消息;另外exactly once semantics (EOS) 的功能还在开发中,这种机制可以保证无重复消息。
  2. 导致消息重排;为了保证消息有序并且允许重新发生失败的消息,需要将max.in.flight.requests.per.connection设置为1,从而保证任何时候只有一个消息发送到broker。

优化availability

提高可用性,就需要在kafka出现故障时,能够尽快地恢复。

可用性优先 分类 配置属性 默认值 建议值 说明
broker
分区个数 num.partitions 如何确定分区个数
unclean.leader.election.enable true true
min.insync.replicas 1 1
num.recovery.threads.per.data.dir 1 broker启动的时候会加载日志文件;通过设置该值可以缩短价值日志文件的时间
consumer
session.timeout.ms 10000 broker检测consumer是否可用的时间,消费者发生异常时broker会检测到失败并且重新rebalance。GC时间或者poll()处理时间超过该值时,会被认为consumer不可用
  • acks对吞吐量、延时和可靠性的影响
acks 吞吐量 延时 可靠性
0 high low no guarantee
1 medium medium leader
-1 low high ISR

消息大小TIPs

fetch.message.max.bytes(consumer default:1MB)>message.max.bytes(broker,default:1000000 0.96M),否则消息过大不能被消费,导致consumer被hang住。
replica.fetch.max.bytes(broker)>message.max.bytes(broker),否则消息过大导致broker复制消息失败,存在消息丢失的风险

参考资料

1.kafka configs
2.Kafka 0.9 Configuration Best Practices
3.Kafka 0.10 Compression Benchmark
4.tips-for-improving-performance-of-kafka-producer
5.handling-large-messages-kafka