Kafka Best Practices

Kafka官网给我们提供了一些很好的最佳实践,但是内容还不够详实。
所以这篇文章整理了各大公司Kafka最佳实践的最新资料,为使用Kafka提供参考。

UPDATE:内容更新如下

  • 在最新版本中Kafka的部分属性的默认值会发生变化,本文已经根据当前最新版本(2.11-0.11.0.0)进行重新整理。
  • 本文已整合2017年Hortonworks公司在DataWorks Summit/Hadoop Summit上分享的干货内容

硬件

硬件配置

集群 机器数量 内存 CPU 磁盘
Kafka 3台+ 24G(小规模),64G(大规模) 12核+,开启超程 6+(盘位)1TB的专属磁盘(JBOD或者RAID)
ZK 3台 (小规模),5台(大规模) 8G(小规模),24G(大规模) 2核+,开启超程 SSD

多大的规模算小规模?
如果24G内存不够,不建议堆机器解决,可以直接用64G的内存服务器代替。

OS

  • 内存
    24G以上
    Kafka会将有读写操作的segment都存放在Page Cache,所以会占用大量内存。
  • less swapping
    尽可能使用物理内存。设置vm.swappiness=0
  • 文件描述符
    100k+
    Kafka的log segments和connection都会使用到文件描述符。

如何修改文件描述符

  • socket缓冲区
    增大socket缓冲区大小可以提升数据传输性能

磁盘

通常情况下,Kafka的性能受限于磁盘的吞吐量。一般场景下使用JBOD(Just a Bunch of Disks)就可以了。

  • 使用多块专属磁盘
    Kafka在对消息分区时会以轮询的方式选择log.dirs中的目录路径,所以推荐在log.dirs中配置多目录,每个目录挂在不同的多块专属磁盘上,均衡磁盘的读写。

磁盘监控非常有必要,因为分区数据不均衡时会发生数据倾斜,会出现某个目录先被写满,从而导致整个Kafka不可用。
Kafka使用MemoryMapped文件存储offset索引,因此不能使用挂靠的共享磁盘或者网络磁盘。

  • JBOD vs RAID
    JBOD就是将多个物理磁盘串联起来成为一个巨大的逻辑磁盘,它的性能与单块磁盘一样。
    JBOD比RAID0快30%(结论来源于JBOD vs RAID),但是JBOD数据安全性差,无数据冗余。
    如果考虑到数据安全问题,RAID是首选,它能够给物理磁盘提供负载均衡和冗余,当然也牺牲了一些写入性能和磁盘空间(冗余)。
  • 尽可能使用SSD

文件系统

  • 推荐使用EXT4或者XFS
    基本上不用考虑对文件系统做调优。

网络

  • 网卡
    万兆网络是标配;如果公司数据量不够大的话,千兆网络也行。
  • 网络延时
    集群同机房部署,避免因物理距离导致网络延时,影响消息同步复制;
    如果有多个DC(数据中心),可以采用MirrorMaker在Kafka集群间同步数据。

同机房网络延时在1ms;北京到上海的网络延时在30ms左右

基础监控

监控范围:CPU Load,Network,File Handle Usage,Disk Space,Disk I/O,GC,ZK Monitoring

Broker

JVM

  • 推荐:jdk1.8+G1
    如果是使用jdk1.7,推荐使用u51或者更高版本+G1,因为LinkedIn尝试测试过1.7 u21,遇到了不少问题。

0.9.0.0 以后的版本已不支持 jdk1.6

  • 堆内存:6-8G(可以设置更小一些)
    可以参考LinkedIn经过调优的JVM配置:90% GC停顿时间在21ms左右,young GC频率低于每秒1次
1
2
3
-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80

如果服务器的内存比较小的话,比如8G,JVM的堆可以设置得更小一些,因为Kafka自身并不会使用大量的堆,它会大量使用pagecache
pagecache是linux内核的低优先级缓存,在内存空间富裕的情况下才能获得较大的空间
因此建议保留物理内存的1/2以上给OS,以便保证pagecache的分配

刷盘

刷盘就是将消息从pagecache内存中持久化到文件,为消息的可靠性提供保障。
能够触发刷盘操作的是:producer(调用flush(),不推荐)、OS和Broker。

  • OS刷盘
    官方推荐使用默认的刷盘配置,也就是依赖OS刷盘。
    默认的刷盘配置已经提供了很好的吞吐量和延时以及恢复保障,所以不需要做任何调整。

关于pachage刷盘的解读资料1资料2

  • OS何时刷盘

    • 时间维度–30秒
      默认情况下系统会5秒(/proc/sys/vm/dirty_writeback_centisecs=500)启动一次pdflush回写进程,把dirty时间超过30秒(/proc/sys/vm/dirty_expire_centisecs=3000)的页回写。
    • 数量维度–10%、40%
      当脏页占内存(MemFree + Cached - Mapped)的比例超过10%(/proc/sys/vm/dirty_background_ratio=10)时,write系统调用会唤醒pdflush回写dirty page,但write系统调用不会被阻塞,立即返回。
      当脏页占内存(MemFree + Cached - Mapped)的比例超40%(/proc/sys/vm/dirty_ratio=40)时,write系统调用会被被阻塞,主动回写dirty page。

    通过 /proc/meminfo 计算MemFree + Cached - Mapped

  • 由OS控制刷盘,可以极大地利用pagecache的优势

    • I/O调度器可以将连续的small writes批处理成较大的物理写,从而提升吞吐量
    • I/O调度器可以对写操作进行重排序,减少磁盘head的移动,从而提升吞吐量
    • 自动使用机器的剩余内存

Note that durability in Kafka does not require syncing data to disk, as a failed node will always recover from its replicas.
虽然官方文档这么说,但是在对可靠性要求高的场景下,我们还是会默默的配置上log.flush.interval.messages=1

  • Broker控制刷盘
    官方不建议修改Broker的这些属性值,直接由OS刷盘
    • log.flush.interval.messages
      默认值2的63次方-1
    • log.flush.scheduler.interval. ms
      默认值同上
    • log.segment.bytes
      默认值1073741824,1GB
    • log.roll.hours
      默认值168小时,7天;切分日志的时间间隔

Kafka replica

  • Under Replicated Partitions
    是指集群中没有全部复制leader消息的分区个数

    • ISR扩容/缩容率
    • Mbean
      kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
    • 导致的可能原因
      • broker挂了
      • controller问题
      • zk问题
      • 网络问题
    • 解决办法
      • 调整ISR参数
    • 增加broker
  • Controller
    复制管理分区的生命周期

    • 避免controller的ZK session超时
      • ISR抖动
      • ZK Server性能问题
      • Broker长时间GC暂停
      • 网络问题
    • 监控
      • Mbean:kafka.controller:type=KafkaController,name=ActiveControllerCount
      • Mbean的值应该是1
      • LeaderElectionRate
  • Unclean leader选举

    • unclean.leader.election.enable默认值是true
      允许不在ISR中的replica被选举为leader,存在数据不一致的风险。
    • Availability 和 Correctness
      • 默认值为true,说明Kafka默认选择了可用性
    • 监控
      • MBean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec

Broker配置属性

  • 通过KAFKA_HEAP_OPTS设置JVM
  • log.dirs
    不要使用默认的“/tmp/kafka-logs”。设置为多个不同目录(每个目录挂靠不同的磁盘)

如何合理设置?TODO

  • log.retention.hours
    根据磁盘剩余空间合理设置该值
  • message.max.bytes
    设置broker可以接收的最大消息,这个值必须小于replica.fetch.max.bytes,否则会出现消息过大无法复制导致丢消息。
  • delete.topic.enable
  • unclean.leader.election
    默认值是true,存在丢消息的风险。在可靠性场景下,推荐设置为false.

集群大小评估

  • Broker大小
    • 每个Broker的Partition数不超过2k
    • partition的大小不要超过25G
  • 集群评估
    • 预估数据保留的时间
    • 预估集群的网络流量
  • 集群扩容
    • 磁盘使用率<60%
    • 网络使用率<75%
  • 集群监控
    • 确保集群负载均衡
    • 确保topic的partition均匀分布在Broker上
    • 确保集群中的Broker不会出现耗尽磁盘或带宽的情况

Partition个数评估

  • Partition:Consumers>=1:1 (同一个group)
    同一个group中,一个 partition只能被一个consumer消费,所以适当增加partition,可以增加consumer的并发度,从而增加系统的整体吞吐量
  • Partition的个数计算公式:Max (T/P, T/C)

    P: producer对单个分区的吞吐量
    C: consumer对单个分区的吞吐量

  • 对于读写比较多的topic,应该划分更多的partitions
  • 使用带Key的消息
  • 更多的partitions,意味着:
    • 更多的fd
    • Partition的Leader选举时间变长
      如果每个Partition的选举消耗是10ms,如果Broker上有200个Partition,那么在进行选举的2s里,这些Partition的读写操作都会触发LeaderNotAvailableException。
    • Controller启动时间变长
      新任命的Controller要从ZK上获取所有Partition的Meta信息,如果获取每个信息大概3-5ms,1000个Partition,那么耗时将达到3s-5s。这只是重新启动一个Controller的时间,还要再加上选举Leader的时间。
    • 端对端的延时增加
    • 客户端会消耗更多的内存

Broker的监控

Broker的几个核心监控,需要重点关注

  • Partition数
    kafka.server:type=ReplicaManager,name=PartitionCount
  • Leader副本数
    kafka.server:type=ReplicaManager,name=LeaderCount
  • ISR扩容/缩容率
    kafka.server:type=ReplicaManager,name=IsrExpandsPerSec
  • 读写速率
    kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
    kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
    kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
    kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower}
  • 网络请求的平均空闲率
    kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent
    其值在0到1之间,理想情况是> 0.3
  • 请求处理平均空闲率
    kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent
    其值在0到1之间,理想情况是> 0.3

Producer

合理使用producer

  • 关键参数配置
    • acks
      默认值时1;对可靠性要求较高的场景下,推荐使用-1。
    • batch.size
      • 基于大小的批量发送
      • 值越大,吞吐量越大,延时越高
    • linger. ms
      • 基于时间的批量发送
      • 值越大,吞吐量越大,延时越高
    • max.in.flight.requests.per.connection(默认值=5)
      表示client在被blocking之前,每个于broker的连接最多发送的未确认请求的最大数
      超过1时,如果重试次数>0,那么将会影响数据的有序性
    • compression.type
      和batch.size一起使用效果会更好。压缩会提升producer的吞吐量,但是会消耗producer的CPU。由于压缩是端对端的,所以Broker的CPU会降低
  • 使用最新版本的java客户端
  • 避免发送大消息
    • 会导致需要更多的内存
    • 会压垮broker

producer性能调优

  • 如果吞吐量小于网络带宽
    • 提高 batch.size
    • 增加更多 producer 实例;
    • 增加 partition 数;
  • acks=-1时
    如果延迟增大:可以增大 num.replica.fetchers
  • 跨数据中心传输
    • 增加socket缓冲区设置
    • 增加TCP缓冲区设置

Prodcuer的监控

  • batch-size-avg
  • compression-rate-avg
  • waiting-threads
    等待分配buffer memory的线程数
  • buffer-available-bytes
    可用的buffer memory大小
  • record-queue-time-max
  • record-send-rate
  • records-per-request-avg

Conusmer

合理使用conumser

  • 关键参数配置
    • fetch.min.bytes、fetch.max.wait. ms
    • max.poll.interval. ms
    • max.poll.records
    • session.timeout. ms
    • Consumer Rebalance
      • check timeouts 通常是session.timeout. ms超时导致,这个时间必须大于max.poll.records所有消息的处理时间
      • check processing times/logic 如果业务逻辑处理时间过长,可以将max.poll.records调小
      • GC
  • 使用kafka-consumer-perf.test.sh测试
  • 吞吐量问题
    • partition数不够多
    • 足够的page cache
      比如说,至少能够让consumer缓存30秒以内的待处理消息
    • 应用的处理逻辑过重
  • offset topic
    • __consumer_offsets
    • offsets.topic.replication.factor
    • offsets.retention.minutes
    • – MonitorISR,topicsize
  • Offsets提交较慢
    • 异步提交+手动提交

Consumer的监控

  • 确认consumer消费消息的速度是否更得上producer
  • consumer lag
  • 监控
    • Mbean: kafka.consumer:type=consumer-fetch-manager-metrics,client-id={client-id} Attribute: records-lag-max
    • bin/kafka-consumer-groups.sh
  • 如何减少lag
    • 分析consumer,是GC问题还是consumer hang住了
    • 增加consumer instances
    • 增加分区数和consumer

Zookeeper

ZK负责维护consumer的offsets以及topic列表,leader选举和一些状态信息。

  • 和Kafka集群单独部署
  • 隔离ZK集群。Kafka使用单独的ZK集群,不和其他应用的ZK集群
  • 给ZK的JVM至少配置4G内存
  • 监控ZK实例