大数据全系列 教程
1869个小节阅读:467.5k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
xxxxxxxxxx
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.2.0</version>
</dependency>
xxxxxxxxxx
public class HaiKouOrderInfoConsumer implements Runnable {
}
xxxxxxxxxx
public HaiKouOrderInfoConsumer(String topic, String groupId) {
Properties props = new Properties();
// kafka地址
props.put("bootstrap.servers", "node2:9092,node3:9092,node4:9092");
// 组名,不同组名可以重复消费。例如你先使用组名A消费了1000条数据,但是你还想在消费这1000条数据,并且不想重复去产生
// 那么这里你只需要更换组名就可以消费了
props.put("group.id", groupId);
// 是否自动提交,默认为true。
props.put("enable.auto.commit", "true");
//earliest 如果各个分区下有提交得 offset 从头消费
//latest 当各个分区有提交得offset 从提交offset 开始消费,无提交得offset时 消费新的分区下的数据
// non topic 分区都存在提交offset 从offset 后开始消费。 如果只有一个分区不存在就报错
props.put("auto.offset.reset", "earliest");
// 超时时间
props.put("session.timeout.ms", "30000");
// 键序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 值序列化
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
this.topic = topic;
}
xxxxxxxxxx
@Override
public void run() {
log.info("---------开始消费---------");
for(;;){
try {
word();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void word() throws Exception {
// 订阅 topic,可以一次订阅多个topic
consumer.subscribe(Collections.singletonList(this.topic));
// 从服务端拉取信息,每次poll()可以拉取多个信息
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
System.out.println("消费到消息数:" + records.count());
}