大数据全系列 教程
1869个小节阅读:467.7k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:创建 Kafka 生产者,采用同步的方式发送到 Kafka Broker。
代码实现
创建maven项目kafkademo
导入kafka依赖
xxxxxxxxxx
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.1</version>
</dependency>
</dependencies>
创建包com.itbaizhan.kafka.producer
创建类
编写代码
xpackage com.itbaizhan.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class SyncCustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//TODO 1.声明并实例化Kafka Producer的配置文件对象
Properties prop = new Properties();
//TODO 2.为配置文件对象设置参数
// 2.1 配置bootstrap_servers
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"node2:9092,node3:9092,node4:9092");
// 2.2 配置key和value的序列化类
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//TODO 3.声明并实例化生产者对象
KafkaProducer<String,String> producer =
new KafkaProducer<String, String>(prop);
//TODO 4.发送消息
for(int i = 0;i<5;i++){
//同步发送消息
producer.send(new ProducerRecord<>("topicA","sync_msg"+i)).get();
}
//TODO 5.关闭生产者
producer.close();
}
}
在node2上开启Kafka消费者进行消费
xxxxxxxxxx
[root@node2 ~]# kafka-console-consumer.sh --bootstrap-server node2:9092 --topic topicA
运行SyncCustomProducer类
观察node2上Kafka消费者消费消息的情况
xxxxxxxxxx
[root@node2 ~]# kafka-console-consumer.sh --bootstrap-server node2:9092 --topic topicA
sync_msg0
sync_msg1
sync_msg2
sync_msg3
sync_msg4