正文
Kafka消息重新发送
小程序:扫一扫查出行
【扫一扫了解最新限行尾号】
复制小程序
【扫一扫了解最新限行尾号】
复制小程序
K afka 消息重新发送
1、 使用 kafka 消息队列做消息的发布、订阅,如果 consumer 端消费出问题,导致数据并没有消费,此时不需要担心,数据并不会立刻丢失, kafka 会把数据在服务器的磁盘上默认存储 7 天,或者自己指定有两种方式: 1 )指定时间, log.retention.hours=168 ; 2 )指定大小, log.segment.bytes=1073741824 。此时就可以通过重置某个 topic 的 offset 来是消息重新发送,进行消费
2、 查看topic的offset的范围
1) 使用下面的命令可以查看 topic 为 userlog , broker 为 spark:9092 的 offset 的最小值:
#./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list spark:9092 -topic userlog --time -2
2) offset 的最大值:
#./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list spark:9092 -topic userlog --time -1
3、 设置consumer group的offset
1) 启动zookeeper,如果使用的是kafka内置的zookeeper,直接启动bin目录下的zookeeper-shell.sh,进行登录:
#./zookeeper-shell.sh localhost:2181
通过如下命令,来重置offset值,比如topic:userlog,group:userlogs,partition:0 ,offset:2181
set /consumers/userlogs/offsets/userlog/0 1288
如果有多个partition都执行上输的命令,并将0换为相对应的分区号就可以了。
### 如果创建分区的时候设置了zk的根目录,如localhost:2181/kafka
则重置的命令为:
set /kafka/consumers/userlogs/offsets/userlog/0 1288
2) 如果使用单独安装的zookeeper,直接使用bin目录下的ZKCli.sh登录后,执行上述命令即可。
4、 设置完成后需要重启相关的服务,就可以从设置offset的地方开始消费。
重启服务:consumer服务。