Kafka官网给我们提供了一些很好的最佳实践,但是内容还不够详实。
所以这篇文章整理了各大公司Kafka最佳实践的最新资料,为使用Kafka提供参考。
UPDATE:内容更新如下
- 在最新版本中Kafka的部分属性的默认值会发生变化,本文已经根据当前最新版本(2.11-0.11.0.0)进行重新整理。
- 本文已整合2017年Hortonworks公司在DataWorks Summit/Hadoop Summit上分享的干货内容。
硬件
硬件配置
集群 | 机器数量 | 内存 | CPU | 磁盘 |
---|---|---|---|---|
Kafka | 3台+ | 24G(小规模),64G(大规模) | 12核+,开启超程 | 6+(盘位)1TB的专属磁盘(JBOD或者RAID) |
ZK | 3台 (小规模),5台(大规模) | 8G(小规模),24G(大规模) | 2核+,开启超程 | SSD |
多大的规模算小规模?
如果24G内存不够,不建议堆机器解决,可以直接用64G的内存服务器代替。
OS
- 内存
24G以上
Kafka会将有读写操作的segment都存放在Page Cache,所以会占用大量内存。
- less swapping
尽可能使用物理内存。设置vm.swappiness=0 - 文件描述符
100k+
Kafka的log segments和connection都会使用到文件描述符。
- socket缓冲区
增大socket缓冲区大小可以提升数据传输性能
磁盘
通常情况下,Kafka的性能受限于磁盘的吞吐量。一般场景下使用JBOD(Just a Bunch of Disks)就可以了。
- 使用多块专属磁盘
Kafka在对消息分区时会以轮询的方式选择log.dirs中的目录路径,所以推荐在log.dirs中配置多目录,每个目录挂在不同的多块专属磁盘上,均衡磁盘的读写。
磁盘监控非常有必要,因为分区数据不均衡时会发生数据倾斜,会出现某个目录先被写满,从而导致整个Kafka不可用。
Kafka使用MemoryMapped文件存储offset索引,因此不能使用挂靠的共享磁盘或者网络磁盘。
- JBOD vs RAID
JBOD就是将多个物理磁盘串联起来成为一个巨大的逻辑磁盘,它的性能与单块磁盘一样。
JBOD比RAID0快30%(结论来源于JBOD vs RAID),但是JBOD数据安全性差,无数据冗余。
如果考虑到数据安全问题,RAID是首选,它能够给物理磁盘提供负载均衡和冗余,当然也牺牲了一些写入性能和磁盘空间(冗余)。 - 尽可能使用SSD
文件系统
- 推荐使用EXT4或者XFS
基本上不用考虑对文件系统做调优。
网络
- 网卡
万兆网络是标配;如果公司数据量不够大的话,千兆网络也行。 - 网络延时
集群同机房部署,避免因物理距离导致网络延时,影响消息同步复制;
如果有多个DC(数据中心),可以采用MirrorMaker在Kafka集群间同步数据。
同机房网络延时在1ms;北京到上海的网络延时在30ms左右
基础监控
监控范围:CPU Load,Network,File Handle Usage,Disk Space,Disk I/O,GC,ZK Monitoring
Broker
JVM
- 推荐:jdk1.8+G1
如果是使用jdk1.7,推荐使用u51或者更高版本+G1,因为LinkedIn尝试测试过1.7 u21,遇到了不少问题。
0.9.0.0 以后的版本已不支持 jdk1.6
- 堆内存:6-8G(可以设置更小一些)
可以参考LinkedIn经过调优的JVM配置:90% GC停顿时间在21ms左右,young GC频率低于每秒1次
|
|
如果服务器的内存比较小的话,比如8G,JVM的堆可以设置得更小一些,因为Kafka自身并不会使用大量的堆,它会大量使用pagecache
pagecache是linux内核的低优先级缓存,在内存空间富裕的情况下才能获得较大的空间
因此建议保留物理内存的1/2以上给OS,以便保证pagecache的分配
刷盘
刷盘就是将消息从pagecache内存中持久化到文件,为消息的可靠性提供保障。
能够触发刷盘操作的是:producer(调用flush(),不推荐)、OS和Broker。
- OS刷盘
官方推荐使用默认的刷盘配置,也就是依赖OS刷盘。
默认的刷盘配置已经提供了很好的吞吐量和延时以及恢复保障,所以不需要做任何调整。
-
OS何时刷盘
- 时间维度–30秒
默认情况下系统会5秒(/proc/sys/vm/dirty_writeback_centisecs=500)启动一次pdflush回写进程,把dirty时间超过30秒(/proc/sys/vm/dirty_expire_centisecs=3000)的页回写。 - 数量维度–10%、40%
当脏页占内存(MemFree + Cached - Mapped)的比例超过10%(/proc/sys/vm/dirty_background_ratio=10)时,write系统调用会唤醒pdflush回写dirty page,但write系统调用不会被阻塞,立即返回。
当脏页占内存(MemFree + Cached - Mapped)的比例超40%(/proc/sys/vm/dirty_ratio=40)时,write系统调用会被被阻塞,主动回写dirty page。
通过 /proc/meminfo 计算MemFree + Cached - Mapped
- 时间维度–30秒
-
由OS控制刷盘,可以极大地利用pagecache的优势
- I/O调度器可以将连续的small writes批处理成较大的物理写,从而提升吞吐量
- I/O调度器可以对写操作进行重排序,减少磁盘head的移动,从而提升吞吐量
- 自动使用机器的剩余内存
Note that durability in Kafka does not require syncing data to disk, as a failed node will always recover from its replicas.
虽然官方文档这么说,但是在对可靠性要求高的场景下,我们还是会默默的配置上log.flush.interval.messages=1
- Broker控制刷盘
官方不建议修改Broker的这些属性值,直接由OS刷盘- log.flush.interval.messages
默认值2的63次方-1 - log.flush.scheduler.interval. ms
默认值同上 - log.segment.bytes
默认值1073741824,1GB - log.roll.hours
默认值168小时,7天;切分日志的时间间隔
- log.flush.interval.messages
Kafka replica
-
Under Replicated Partitions
是指集群中没有全部复制leader消息的分区个数- ISR扩容/缩容率
- Mbean
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions - 导致的可能原因
- broker挂了
- controller问题
- zk问题
- 网络问题
- 解决办法
- 调整ISR参数
- 增加broker
-
Controller
复制管理分区的生命周期- 避免controller的ZK session超时
- ISR抖动
- ZK Server性能问题
- Broker长时间GC暂停
- 网络问题
- 监控
- Mbean:kafka.controller:type=KafkaController,name=ActiveControllerCount
- Mbean的值应该是1
- LeaderElectionRate
- 避免controller的ZK session超时
-
Unclean leader选举
- unclean.leader.election.enable默认值是true
允许不在ISR中的replica被选举为leader,存在数据不一致的风险。 - Availability 和 Correctness
- 默认值为true,说明Kafka默认选择了可用性
- 监控
- MBean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec
- unclean.leader.election.enable默认值是true
Broker配置属性
- 通过KAFKA_HEAP_OPTS设置JVM
- log.dirs
不要使用默认的“/tmp/kafka-logs”。设置为多个不同目录(每个目录挂靠不同的磁盘)
如何合理设置?TODO
- log.retention.hours
根据磁盘剩余空间合理设置该值 - message.max.bytes
设置broker可以接收的最大消息,这个值必须小于replica.fetch.max.bytes,否则会出现消息过大无法复制导致丢消息。 - delete.topic.enable
- unclean.leader.election
默认值是true,存在丢消息的风险。在可靠性场景下,推荐设置为false.
集群大小评估
- Broker大小
- 每个Broker的Partition数不超过2k
- partition的大小不要超过25G
- 集群评估
- 预估数据保留的时间
- 预估集群的网络流量
- 集群扩容
- 磁盘使用率<60%
- 网络使用率<75%
- 集群监控
- 确保集群负载均衡
- 确保topic的partition均匀分布在Broker上
- 确保集群中的Broker不会出现耗尽磁盘或带宽的情况
Partition个数评估
- Partition:Consumers>=1:1 (同一个group)
同一个group中,一个 partition只能被一个consumer消费,所以适当增加partition,可以增加consumer的并发度,从而增加系统的整体吞吐量 - Partition的个数计算公式:Max (T/P, T/C)
P: producer对单个分区的吞吐量
C: consumer对单个分区的吞吐量 - 对于读写比较多的topic,应该划分更多的partitions
- 使用带Key的消息
- 更多的partitions,意味着:
- 更多的fd
- Partition的Leader选举时间变长
如果每个Partition的选举消耗是10ms,如果Broker上有200个Partition,那么在进行选举的2s里,这些Partition的读写操作都会触发LeaderNotAvailableException。 - Controller启动时间变长
新任命的Controller要从ZK上获取所有Partition的Meta信息,如果获取每个信息大概3-5ms,1000个Partition,那么耗时将达到3s-5s。这只是重新启动一个Controller的时间,还要再加上选举Leader的时间。 - 端对端的延时增加
- 客户端会消耗更多的内存
Broker的监控
Broker的几个核心监控,需要重点关注
- Partition数
kafka.server:type=ReplicaManager,name=PartitionCount - Leader副本数
kafka.server:type=ReplicaManager,name=LeaderCount - ISR扩容/缩容率
kafka.server:type=ReplicaManager,name=IsrExpandsPerSec - 读写速率
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower} - 网络请求的平均空闲率
kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent
其值在0到1之间,理想情况是> 0.3 - 请求处理平均空闲率
kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent
其值在0到1之间,理想情况是> 0.3
Producer
合理使用producer
- 关键参数配置
- acks
默认值时1;对可靠性要求较高的场景下,推荐使用-1。 - batch.size
- 基于大小的批量发送
- 值越大,吞吐量越大,延时越高
- linger. ms
- 基于时间的批量发送
- 值越大,吞吐量越大,延时越高
- max.in.flight.requests.per.connection(默认值=5)
表示client在被blocking之前,每个于broker的连接最多发送的未确认请求的最大数
超过1时,如果重试次数>0,那么将会影响数据的有序性 - compression.type
和batch.size一起使用效果会更好。压缩会提升producer的吞吐量,但是会消耗producer的CPU。由于压缩是端对端的,所以Broker的CPU会降低
- acks
- 使用最新版本的java客户端
- 避免发送大消息
- 会导致需要更多的内存
- 会压垮broker
producer性能调优
- 如果吞吐量小于网络带宽
- 提高 batch.size
- 增加更多 producer 实例;
- 增加 partition 数;
- acks=-1时
如果延迟增大:可以增大 num.replica.fetchers - 跨数据中心传输
- 增加socket缓冲区设置
- 增加TCP缓冲区设置
Prodcuer的监控
- batch-size-avg
- compression-rate-avg
- waiting-threads
等待分配buffer memory的线程数 - buffer-available-bytes
可用的buffer memory大小 - record-queue-time-max
- record-send-rate
- records-per-request-avg
Conusmer
合理使用conumser
- 关键参数配置
- fetch.min.bytes、fetch.max.wait. ms
- max.poll.interval. ms
- max.poll.records
- session.timeout. ms
- Consumer Rebalance
- check timeouts 通常是session.timeout. ms超时导致,这个时间必须大于max.poll.records所有消息的处理时间
- check processing times/logic 如果业务逻辑处理时间过长,可以将max.poll.records调小
- GC
- 使用kafka-consumer-perf.test.sh测试
- 吞吐量问题
- partition数不够多
- 足够的page cache
比如说,至少能够让consumer缓存30秒以内的待处理消息 - 应用的处理逻辑过重
- offset topic
- __consumer_offsets
- offsets.topic.replication.factor
- offsets.retention.minutes
- – MonitorISR,topicsize
- Offsets提交较慢
- 异步提交+手动提交
Consumer的监控
- 确认consumer消费消息的速度是否更得上producer
- consumer lag
- 监控
- Mbean: kafka.consumer:type=consumer-fetch-manager-metrics,client-id={client-id} Attribute: records-lag-max
- bin/kafka-consumer-groups.sh
- 如何减少lag
- 分析consumer,是GC问题还是consumer hang住了
- 增加consumer instances
- 增加分区数和consumer
Zookeeper
ZK负责维护consumer的offsets以及topic列表,leader选举和一些状态信息。
- 和Kafka集群单独部署
- 隔离ZK集群。Kafka使用单独的ZK集群,不和其他应用的ZK集群
- 给ZK的JVM至少配置4G内存
- 监控ZK实例