大数据全系列 教程
1869个小节阅读:467.8k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:将数据发往指定的partition下,例如,将所有数据发往分区 0 中。
创建类ProducerToPartition
编写代码
xxxxxxxxxx
package com.itbaizhan.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerToPartition {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//TODO 1.声明并实例化Kafka Producer的配置文件对象
Properties prop = new Properties();
//TODO 2.为配置文件对象设置参数
// 2.1 配置bootstrap_servers
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node2:9092,node3:9092,node4:9092");
// 2.2 配置key和value的序列化类
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//TODO 3.声明并实例化生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);
//TODO 4.发送消息
for(int i = 0;i<5;i++){
////指定数据发送到0号分区,key为null
producer.send(new ProducerRecord<>("topicA",0,null, "unsync_msg" + i),
new Callback() {
//如下方法在生产者收到acks确认时异步调用
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e == null){
//无异常信息,输出主题和分区信息到控制台
System.out.println("topic:"+recordMetadata.topic()
+",partition:"+recordMetadata.partition());
}else{//打印异常信息
System.out.println(e.getMessage());
}
}
});
Thread.sleep(5);
}
//TODO 5.关闭生产者
producer.close();
}
}
在node2上开启Kafka消费者进行消费
xxxxxxxxxx
[root@node2 ~]# kafka-console-consumer.sh --bootstrap-server node2:9092 --topic topicA
运行ProducerToPartition类
观察node2上Kafka消费者消费消息的情况
xxxxxxxxxx
[root@node2 ~]# kafka-console-consumer.sh --bootstrap-server node2:9092 --topic topicA
unsync_msg0
unsync_msg1
unsync_msg2
unsync_msg3
unsync_msg4
控制台输出信息
xxxxxxxxxx
topic:topicA,partition:0
topic:topicA,partition:0
topic:topicA,partition:0
topic:topicA,partition:0
topic:topicA,partition:0