大数据全系列 教程
1869个小节阅读:467.8k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
编写自定义序列化类
xxxxxxxxxx
package com.itbaizhan.kafka.producer;
import org.apache.kafka.common.serialization.Serializer;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
public class UserSerializer implements Serializer<UserVo> {
private ObjectMapper objectMapper;
public void configure(Map<String, ?> configs, boolean isKey) {
objectMapper = new ObjectMapper();
//Serializer.super.configure(configs, isKey);
}
public byte[] serialize(String topic, UserVo data) {
byte[] ret = null;
try {
ret = objectMapper.writeValueAsString(data)
.getBytes(StandardCharsets.UTF_8);
} catch (IOException e) {
throw new SerializationException("Error when serializing UserVo to byte[],exception is " + e.getMessage());
}
return ret;
}
public void close() {
objectMapper = null;
//Serializer.super.close();
}
}
编写生产者程序
xxxxxxxxxx
package com.itbaizhan.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class UserSerProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//TODO 1.声明并实例化Kafka Producer的配置文件对象
Properties prop = new Properties();
//TODO 2.为配置文件对象设置参数
// 2.1 配置bootstrap_servers
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node2:9092,node3:9092,node4:9092");
// 2.2 配置key和value的序列化类
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class.getName());
//TODO 3.声明并实例化生产者对象 注意value的泛型类型
KafkaProducer<String,UserVo> producer = new KafkaProducer<String, UserVo>(prop);
//TODO 4.发送消息
UserVo userVo = new UserVo("tuhao",18,"北京");
producer.send(new ProducerRecord<String,UserVo>("topicA", userVo),
new Callback() {
//如下方法在生产者收到acks确认时异步调用
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e == null){
//无异常信息,输出主题和分区信息到控制台
System.out.println("topic:"+recordMetadata.topic()
+",partition:"+recordMetadata.partition());
}else{//打印异常信息
System.out.println(e.getMessage());
}
}
});
Thread.sleep(50);
//TODO 5.关闭生产者
producer.close();
}
}
在node2上开启Kafka消费者进行消费
xxxxxxxxxx
[root@node2 ~]# kafka-console-consumer.sh --bootstrap-server node2:9092 --topic topicA
运行UserSerProducer类
观察node2上Kafka消费者消费消息的情况
xxxxxxxxxx
[root@node2 ~]# kafka-console-consumer.sh --bootstrap-server node2:9092 --topic topicA
{"name":"tuhao","age":18,"address":"北京"}
实时效果反馈
1. 关于Kafka生产者消息序列化的描述,正确的是:
A 默认提供了序列化类,如BytesSerializer、IntegerSerializer、StringSerializer等。
B 自定义序列化类需要实现org.apache.kafka.common.serialization.Serializer。
C 生产者序列化机制使用起来比较简单,需要在构造producer对象之前指定参数key.serializer和value.serializer。
D 以上三个选项都正确。
答案:
1=>D