雨翔河
首页
列表
关于
kafka 消息丢失
2019-10-15 13:37
在处理生产环境问题的过程中发现12号那天某kafka集群有少量的数据丢失,概率大致在千万分之三。 数据写入kafka之后,就完全消失了,消费者完全没有消费到这个数据。 通过找到那天的数据,查看有问题的数据在写入kafka的时候上下文应用日志发现有少量以下报错: ``` [2019-10-12 11:03:43,xxx] This is not the correct coordinator. ``` 理论上正常情况下kafka是不太可能丢数据的,如果出现这种情况,必然是开发人员或者硬件引发了什么问题,因为写入日志是有的,看了下应用配置 ``` acks=1 ``` 马上意识到,问题突破口应该在这里。 ``` acks=0 生产者能够通过网络吧消息发送出去,那么就认为消息已成功写入Kafka,一定会丢失一些数据 acks=1 master在疏导消息并把它写到分区数据问津是会返回确认或者错误响应,还是可能会丢数据 acks=all master在返回确认或错误响应之前,会等待所有同步副本都收到消息 ``` 可能以前是为了保证性能够快,选择了折中的应用配置 `acks=1` 。 马上想到去看下kafka的日志,猜测这个时间段必然出现出现了 master 不可用的情况才会导致数据丢失。 在kafka集群的57号节点的机器上看到这样一段日志: ``` [2019-10-12 11:03:39,427] WARN Client session timed out, have not heard from server in 4034ms for sessionid 0x396aaaadbbxx00 (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:40,908] INFO Client session timed out, have not heard from server in 4034ms for sessionid 0x396aaaabbxx00, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:41,253] INFO zookeeper state changed (Disconnected) (org.I0Itec.zkclient.ZkClient) [2019-10-12 11:03:41,962] INFO Opening socket connection to server xx.xx.xx.59/xx.xx.xx.59:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:41,962] INFO Socket connection established to xx.xx.xx.59/10.xx.xx.xx:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:42,293] WARN Unable to reconnect to ZooKeeper service, session 0x396cf664cdbb0000 has expired (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:42,293] INFO zookeeper state changed (Expired) (org.I0Itec.zkclient.ZkClient) [2019-10-12 11:03:42,294] INFO Initiating client connection, connectString=xx.xx.xx.55:2181,xx.xx.xx.56:2181,xx.xx.xx.57:2181,xx.xx.xx.58:2181,xx.xx.xx.59:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@342xxx2d (org.apache.zookeeper.ZooKeeper) [2019-10-12 11:03:42,313] INFO Unable to reconnect to ZooKeeper service, session 0x396cxxxxxb0000 has expired, closing socket connection (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:42,323] INFO EventThread shut down for session: 0x396cxxxx000 (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:42,342] INFO Opening socket connection to server xx.xx.xx.58/xx.xx.xx.58:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:42,343] INFO Socket connection established to xx.xx.xx.58/xx.xx.xx.58:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:43,516] INFO Session establishment complete on server xx.xx.xx.58/xx.xx.xx.58:2181, sessionid = 0x3ax4xxxxx01, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:43,517] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) ``` 看来和猜测的情况是一样的,这个kafka的节点机器出现了连接不到zk,所以这台机器处于‘丢失’状态。又去看了下kafka的controller日志: ``` [2019-10-12 11:03:42,671] INFO [Controller id=56] Newly added brokers: , deleted brokers: 57, all live brokers: ...,55,56,58,59,xx.... (kafka.controller.KafkaController) [2019-10-12 11:03:42,678] INFO [Controller-56-to-broker-57-send-thread]: Shutting down (kafka.controller.RequestSendThread) [2019-10-12 11:03:42,749] INFO [Controller-56-to-broker-57-send-thread]: Stopped (kafka.controller.RequestSendThread) [2019-10-12 11:03:42,749] INFO [Controller-56-to-broker-57-send-thread]: Shutdown completed (kafka.controller.RequestSendThread) [2019-10-12 11:03:42,828] INFO [Controller id=56] Broker failure callback for 57 (kafka.controller.KafkaController) [2019-10-12 11:03:42,836] INFO [Controller id=56] Removed ArrayBuffer() from list of shutting down brokers. (kafka.controller.KafkaController) ``` 再次看了下这个节点系统的网络层面监控: ![](https://cdn.yuxianghe.net/image/blog/61-1.jpg) 真相浮出水面,和猜测吻合,这个问题和之前遇到的一个问题有点类似,不一样的情况是这次只丢失了一个节点,上次的问题是所有的网络节点都短暂的丢失,详情可以看我之前写的博客: [网络故障引起的 kafka 自身的 BUG](https://yuxianghe.net/blog/58) #### 这次的原因分析: 57号节点丢失之后,立马进行了选举,这个时候数据在生产的时候到达了master,恰好选举的时候变更了master,而 `acks=1` ,数据已经进入master,但是还没有来得及同步到slave,这样就导致了数据的丢失。 #### 总结: 如果有对数据有强一致性的要求,一定要选择 `acks=all` 否则指不定哪天硬件的轻微系统抖动就会导致 `kafka` 集群重新选举,丢失数据。
类型:工作
标签:kafka,zookeeper
Copyright © 雨翔河
我与我周旋久
独孤影
开源实验室