大数据全系列 教程
1869个小节阅读:467.3k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
同步提交:
xxxxxxxxxx
package com.itbaizhan.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerHandSyncCommit {
public static void main(String[] args) {
//1.创建属性对象
Properties prop = new Properties();
//2.设置相关参数
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"node2:9092,node3:9092,node4:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
//配置消费者组
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"cghandSyncCommit");
//设置为非自动提交
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
//3.创建消费者对象
KafkaConsumer<String,String> consumer=
new KafkaConsumer<String, String>(prop);
//4.注册消费主题
consumer.subscribe(Arrays.asList("topicA"));
//5.消费数据
while(true){
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
for(ConsumerRecord record:records){
System.out.println(record.value());
}
//6.同步提交offset
consumer.commitSync();
}
}
}
异步提交:
xxxxxxxxxx
package com.itbaizhan.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerHandASyncCommit {
public static void main(String[] args) {
//1.创建属性对象
Properties prop = new Properties();
//2.设置相关参数
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"node2:9092,node3:9092,node4:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
//配置消费者组
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"cghandAsyncCommit");
//设置为非自动提交
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
//3.创建消费者对象
KafkaConsumer<String,String> consumer=
new KafkaConsumer<String, String>(prop);
//4.注册消费主题
consumer.subscribe(Arrays.asList("topicA"));
//5.消费数据
while(true){
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
for(ConsumerRecord record:records){
System.out.println(record.value());
}
//6.同步提交offset
consumer.commitAsync();
}
}
}