大数据全系列 教程
1869个小节阅读:467.7k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
从0.9版本开始,每个消费者的offset由消费者提交到kafka系统主题__consumer_offsets
__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic名称+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic名称+分区号就保留最新数据。
解决不能消费系统主题的问题
既然__consumer_offsets
为 Kafka 中的Topic,那就可以通过消费者进行消费。但是默认情况下Kafka系统级别的主题不能进行消费,所以需要修改系统的配置文件:
xxxxxxxxxx
[root@node4 ~]# cd /opt/kafka/config/
[root@node4 config]# vim consumer.properties
exclude.internal.topics=false
并将该文件分发到node2和node3上:
xxxxxxxxxx
[root@node4 config]# scp consumer.properties node2:`pwd`
[root@node4 config]# scp consumer.properties node3:`pwd`
然后在node2上重启kafka集群
xxxxxxxxxx
[root@node2 ~]# kafka.sh stop
--------stop node2's kafka----------
--------stop node3's kafka----------
--------stop node4's kafka----------
[root@node2 ~]# kafka.sh start
--------start node2's kafka----------
--------start node3's kafka----------
--------start node4's kafka----------
演示消费是offset的记录:
新建一个主题
xxxxxxxxxx
kafka-topics.sh --bootstrap-server node2:9092 --create --topic baizhan --partitions 2 --replication-factor 2
在另外一个窗口启动一个消费baizhan
的消费者
xxxxxxxxxx
kafka-console-consumer.sh --bootstrap-server node2:9092 --topic baizhan --group cgoffset
记得指定消费组的名称,方便后续结果的查看。
启动一个生产者向主题baizhan
中生成数据
xxxxxxxxxx
[root@node2 ~]# kafka-console-producer.sh --topic baizhan --bootstrap-server node2:9092
>A
>B
>
查看消费者消费主题__consumer_offsets
xxxxxxxxxx
kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server node2:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
输入:
xxxxxxxxxx
[cgoffset,baizhan,1]::OffsetAndMetadata(offset=1, leaderEpoch=Optional[0], metadata=, commitTimestamp=1653285735599, expireTimestamp=None)
[cgoffset,baizhan,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional[0], metadata=, commitTimestamp=1653285735599, expireTimestamp=None)
实时效果反馈
1. 关于Kafka消费者offset的描述,错误的是:
A 从0.9版本开始,每个消费者的offset由消费者提交到kafka系统主题__consumer_offsets
。
B __consumer_offsets 主题里面采用 key 和 value 的方式存储数据。
C key 是 group.id+topic+分区号。
D value 就是当前 offset 的值,仅有offset的值。
答案:
1=>D 还有提交时间等信息。