大数据全系列 教程
1869个小节阅读:468.1k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:创建一个独立消费者,消费topicA主题中的数据。
具体实现步骤:
启动node2、3、4上的zkServer:zkServer.sh start
启动kafka集群,在node2上执行:
xxxxxxxxxx
[root@node2 ~]# kafka.sh start
创建包:com.itbaizhan.kafka.consumer
创建类:CustomTopicConsumer
编写代码:
xxxxxxxxxx
package com.itbaizhan.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
//创建一个独立消费者,消费topicA主题下的数据
public class CustomTopicConsumer {
public static void main(String[] args){
//1.创建消费者属性文件对象
Properties prop = new Properties();
//2.为属性对象设置相关参数
//设置kafka服务器
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node2:9092");
//设置key和value的序列化类
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
//设置消费者的消费者组的名称
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"testCg");
//3.创建消费者对象
KafkaConsumer<String,String> kafkaConsumer =
new KafkaConsumer<String, String>(prop);
//4.注册要消费的主题
/*ArrayList<String> topics = new ArrayList<>();
topics.add("topicA");
kafkaConsumer.subscribe(topics);*/
kafkaConsumer.subscribe(Arrays.asList("topicA"));
//5.拉取数据并打印输出
while(true){
//6.设置1s消费一批数据
ConsumerRecords<String, String> consumerRecords =
kafkaConsumer.poll(Duration.ofSeconds(1));
//7.打印输出消费到的数据
for(ConsumerRecord consumerRecord:consumerRecords){
System.out.println(consumerRecord);
}
}
}
}
xxxxxxxxxx
kafka-console-producer.sh --bootstrap-server node2:9092 --topic topicA
xxxxxxxxxx
topic = topicA, partition = 2,offset = 9, value = hello
topic = topicA, partition = 0,offset = 42,value = andy
topic = topicA, partition = 1,offset = 31,value = lucy
topic = topicA, partition = 0,offset = 43,value = peter
注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组
id 会被自动填写随机的消费者组 id。