大数据全系列 教程
1869个小节阅读:467.5k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:创建一个独立消费者,消费topicA主题0号分区中的数据。
具体实现步骤:
xxxxxxxxxx
package com.itbaizhan.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
//创建一个独立消费者,消费topicA主题0号分区中的数据
public class ConsumTopicPartitionConsumer {
public static void main(String[] args) {
//1.创建属性对象
Properties prop = new Properties();
//2.设置相关参数
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"node2:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"testCg2");
//3.创建消费者对象
KafkaConsumer<String,String> kafkaConsumer =
new KafkaConsumer<String, String>(prop);
//4.为消费者注册主题和分区号
List<TopicPartition> topicPartitions =
new ArrayList<>();
topicPartitions.add(new TopicPartition("topicA",0));
kafkaConsumer.assign(topicPartitions);
//5.消费数据
while(true){
ConsumerRecords<String, String> consumerRecords =
kafkaConsumer.poll(Duration.ofSeconds(1));
for(ConsumerRecord consumerRecord:consumerRecords){
System.out.println(consumerRecord);
}
}
}
}
运行CustomTopicPartitionConsumer
修改ProducerToPartition类的发送的分区号1->0
xxxxxxxxxx
producer.send(new ProducerRecord<>(
"topicA", 0, null, "unsync_msg" + i),
运行ProducerToPartition类向topicA的0号分区生成数据。
在 IDEA 控制台观察接收到的数据。