UPDATE:配置属性已更新到2.11-0.11.0.0
kafka的配置属性多达几百个😓,在生产环境中对kafka进行调优时,该如何设置这些属性值呢?😤
调优之前,首先需要对业务场景进行分析,确定业务是吞吐量优先,还是对延时优先,是对可靠性要求比较高,还是对可用性要求比较高,然后再根据分析的结果,在吞吐量、延时、可靠性和可用性4个方面做权衡。
下面分别从吞吐量优先、延时优先、可靠性优先以及可用性优先4个方面,逐一分析kafka应该设置哪些核心属性以及提供建议值。
kafka的配置属性到底有多少,看看主要配置属性就知道😰💔
吞吐量优先
吞吐量优先意味着需要尽可能提升每秒发送消息的吞吐量
常见场景:日志收集
吞吐量优先 | 配置属性 | 默认值 | 建议值 | 说明 | |
---|---|---|---|---|---|
broker | |||||
分区个数 | num.partitions | 1 | [确定分区个数][0] | 分区是并行处理的最小单元。消息的并行发送和接收都是基于分区的。topic的分区越多,并行处理能力越强,吞吐量越大 | |
producer | 消息是异步发送的,先保存至内存,然后由后台线程(kafka-producer-network-thread$client-id)发送到broker集群 | ||||
批量策略 | batch.size | 16384(16K) | 100000 - 200000 | 设置每次批量提交消息的最大字节数,一旦待批量发送的消息的大小超过这个字节数,这些消息将被批量发送出去 | |
linger.ms | 0 | 10 - 100 | 该参数会影响延时,因为消息不是立即发送的,而是需要等待发送的消息大小超过batch.size或者收集消息的等待时间超过linger.ms才会发送消息 | ||
压缩 | compression.type | none | lz4 | 将待发送的多个消息压缩成一个消息,因此和batch.size一起使用,batch.size越大,压缩率越高,吞吐量越大。支持3种压缩类型:lz4、snappy和gzip。[lz4能使吞吐量最优][1] | |
确认方式 | acks | 1 | 1 | acks=1表示leader broker收到消息后就返回ack,而无需等所有的followers都确认收到消息。存在丢失消息的风险 | |
队列限制 | buffer.memory | 33554432(32G) | 分配给producer的内存,用于存储还未发送到broker的消息。如果分区比较多,那么设置的值也需要更大 | ||
max.block.ms | 60000(10s) | 定义被阻塞的时间;当buffer.memory达到上限时,再次发送消息时会被阻塞。当阻塞时间超过max.block.ms时,producer将会抛出异常 | |||
消息大小 | max.request.size | 1048576(1M) | 定义每条消息的最大字节数。 | ||
请求超时时间 | request.timeout.ms | 30000(30s) | 客户端等待请求的最大响应时间;如果设置了重试次数,超过这个时间,客户端将会重试。 | ||
consumer | |||||
获取策略 | fetch.min.bytes | 1 | 100000 | 定义从broker获取消息的最小字节数。只有大于这个值时,consumer才会拉取到消息,否则会等待到超时。这个值越大,从broker获取消息的次数越少,会减轻broker的CPU压力;但会影响延时 | |
fetch.max.wait.ms | 达到fetch.min.bytes或者超过fetch.max.wait.ms时,才会消费消息 | ||||
GC | |||||
JVM | [JVM调优][2] | 长时间的GC时间会严重影响吞吐量;严重的情况下会导致broker失败,比如导致ZooKeeper session超时,ZooKeeper会认为该broker不可用 |
压缩类型为什么选择lz4?
因为这种类型的压缩方式下,吞吐量最大。
具体可以参考[压缩的性能测试][1]以及日志压缩机制
吞吐量优先时,会占用大量的网络带宽,如果不希望影响整个网络,可以设置配额。
低延时优先
低延是指producer开始发送消息到consumer接收到消息的时间差。低延时优先意味着每条消息需要尽可能快地完成端对端(从producer到consumer)的传递
常见场景:近实时数据的传输、聊天、视频弹幕等应用
低延时优先 | 分类 | 配置属性 | 默认值 | 建议值 | 说明 |
---|---|---|---|---|---|
broker | |||||
分区个数 | num.partitions | [如何确定分区个数][0] | 控制每个broker的分区数量。(1)限制整个集群的分区数量;(2)增加集群的broker数量,从而减少每个broker上的分区数量 | ||
消息复制线程数 | num.replica.fetchers | 1 | 复制消息的线程复制将消息从leader上复制到每个followers上。当followers无法跟上leader时,可以将该值调大,加快复制消息的速率 | ||
producer | |||||
发送时间 | linger.ms | 0 | 0 | 即时发送,有消息就发送 | |
压缩 | compression.type | none | none | 压缩会使用CPU、降低网络带宽使用率 | |
确认方式 | acks | 1 | 0 | 异步发送,发送者无需等待broker的ack | |
consumer | |||||
获取策略 | fetch.min.bytes | 1 | 1 | 即时消费,有消息就消费 | |
优化durability
可靠性就是要降低丢失消息的概率。最常见的做法就是通过消息复制实现高可靠。
可靠性优先 | 分类 | 配置属性 | 默认值 | 建议值 | 说明 |
---|---|---|---|---|---|
broker | |||||
复制 | default.replication.factor | 1 | 3 | 指定分区副本的个数。设置为3,可以保证集群中的3个borker即使出现2个不可用时,消息依旧不会丢失 | |
auto.create.topics.enable | true | false | 当该值设置为true时,必须将default.replication.factor设置为3,这样topic将会随着复制一同被创建 | ||
min.insync.replicas | 1 | 2 | 指定ISR列表的最小个数(包含leader);topic上的属性值可以覆盖该值 | ||
unclean.leader.election.enable | true | false | 只从ISR中选举leader,不允许不在ISR列表中的broker参加leader的选举,否则会导致已经提交但是还未复制的消息的丢失。topic上的属性值可以覆盖该值 | ||
log.flush.interval.messages | 1 | 同步刷磁盘。topic上的属性值可以覆盖该值 | |||
producer | 复制个数 | replication.factor | 1 | 3 | 可以在每个topic上配置 |
确认方式 | acks | 1 | -1 | leader等待ISR中的所有follower确认接收到消息后再返回ack | |
重试 | retries | 0 | 3 | 重试后依据失败的,需要在producer根据业务场景做对应的处理,比如,在callback里面记录发生失败 | |
max.in.flight.requests.per.connection | 5 | 1 | 保证消息的有序性 | ||
consumer | 消息确认 | auto.commit.enable | true | false | 通过显式调用commitSync()或者commitAsync()手动确认消息被消费。消费者是通过更新offsets来表明offsets之前的消息已经被消费了;当消费者的poll()执行完时,会自动commit offsets。但是如果poll作为业务事务中的一部分的时候,为了保证可靠性,必须在事务提交之后才能提交offsets,所以需要将auto.commit.enable=false,并且显示地调用commitSync() or commitAsync() |
必须调用producer的close()方法,该方法会一直block,直到之前发送的消息全部发送成功
retries>0 & max.in.flight.requests.per.connection>1 会产生消息reordering
- default.replication.factor和min.insync.replicas的区别
default.replication.factor是指分区的总的副本个数,min.insync.replicas是指ISR列表中最少的在线副本的个数(含leader),当在线的副本个数小于min.insync.replicas时,生产者发送消息会失败。
default.replication.factor=3,min.insync.replicas=2表示消息总共有3个副本,当在线的副本大于或者等于2时,生产者可以继续发送消息,能够容忍1个备份不可用,否则不能发送消息。
可靠性的解决方案
复制机制
平衡可靠性和可用性
min.insync.replicas
推荐配置:每个分区一个物理存储磁盘,每个分区一个consumer
- retries 自动重试
重试的副作用
- 重复消息;集群出现短暂失败时可能会导致重复消息,需要消费者端自己处理重复消息;另外exactly once semantics (EOS) 的功能还在开发中,这种机制可以保证无重复消息。
- 导致消息重排;为了保证消息有序并且允许重新发生失败的消息,需要将max.in.flight.requests.per.connection设置为1,从而保证任何时候只有一个消息发送到broker。
优化availability
提高可用性,就需要在kafka出现故障时,能够尽快地恢复。
可用性优先 | 分类 | 配置属性 | 默认值 | 建议值 | 说明 |
---|---|---|---|---|---|
broker | |||||
分区个数 | num.partitions | 如何确定分区个数 | |||
unclean.leader.election.enable | true | true | |||
min.insync.replicas | 1 | 1 | |||
num.recovery.threads.per.data.dir | 1 | broker启动的时候会加载日志文件;通过设置该值可以缩短价值日志文件的时间 | |||
consumer | |||||
session.timeout.ms | 10000 | broker检测consumer是否可用的时间,消费者发生异常时broker会检测到失败并且重新rebalance。GC时间或者poll()处理时间超过该值时,会被认为consumer不可用 |
- acks对吞吐量、延时和可靠性的影响
acks | 吞吐量 | 延时 | 可靠性 |
---|---|---|---|
0 | high | low | no guarantee |
1 | medium | medium | leader |
-1 | low | high | ISR |
消息大小TIPs
fetch.message.max.bytes(consumer default:1MB)>message.max.bytes(broker,default:1000000 0.96M),否则消息过大不能被消费,导致consumer被hang住。
replica.fetch.max.bytes(broker)>message.max.bytes(broker),否则消息过大导致broker复制消息失败,存在消息丢失的风险
参考资料
1.kafka configs
2.Kafka 0.9 Configuration Best Practices
3.Kafka 0.10 Compression Benchmark
4.tips-for-improving-performance-of-kafka-producer
5.handling-large-messages-kafka