大数据全系列 教程
1869个小节阅读:467.5k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
Interceptor可能运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外,若指定了多个Interceptor,则producer将按照指定顺序调用它们,同时把每个Interceptor中捕获的异常记录写入到错误日志中而不是向上传递。这在使用过程中要特别留意。
实现一个简单的双interceptor组成的拦截链。第一个Interceptor会在消息发送前将时间戳信息加到消息value的前面,第二个Interceptor在消息发送后更新成功发送消息数或失败发送消息数。
xxxxxxxxxx
package com.itbaizhan.kafka.producer;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class TimeStampInterceptor implements ProducerInterceptor<String,String> {
public void configure(Map<String, ?> configs) {
}
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return new ProducerRecord<String, String>(record.topic(),record.partition(),
record.timestamp(), record.key(),
System.currentTimeMillis()+","+record.value());
}
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
public void close() {
}
}
xxxxxxxxxx
package com.itbaizhan.kafka.producer;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class CounterIntercepter implements ProducerInterceptor<String,String> {
private int errorCounter = 0;
private int successCounter = 0;
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if(exception==null){
successCounter++;
}else{
errorCounter++;
}
}
public void close() {
System.out.println("successful send:"+successCounter);
System.out.println("failed send:"+errorCounter);
}
public void configure(Map<String, ?> configs) {
}
}
复制SyncCustomProducer为SyncCustomProducerInterceptor
xxxxxxxxxx
package com.itbaizhan.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class SyncCustomProducerInterceptor {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//......
//构造拦截器链
List<String> interceptors = new ArrayList<>();
interceptors.add("com.itbaizhan.kafka.producer.TimeStampInterceptor");
interceptors.add("com.itbaizhan.kafka.producer.CounterIntercepter");
//配置拦截器链
prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
interceptors);
//TODO 3.声明并实例化生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);
//......
//TODO 5.一定要关闭关闭生产者,这样才会调用interceptor的close方法
producer.close();
}
}
在node2上开启Kafka消费者进行消费
xxxxxxxxxx
[root@node2 ~]# kafka-console-consumer.sh --bootstrap-server node2:9092 --topic topicA
运行SyncCustomProducerInterceptor类
观察node2上Kafka消费者消费消息的情况
xxxxxxxxxx
[root@node2 ~]# kafka-console-consumer.sh --bootstrap-server node2:9092 --topic topicA
1648713968033,sync_msg0
1648713969753,sync_msg3
1648713969723,sync_msg2
1648713969643,sync_msg1
1648713969766,sync_msg4
控制台输出
xxxxxxxxxx
successful send:5
failed send:0
实时效果反馈
1. 关于Kafka拦截器的描述,错误的是:
A 拦截器(Interceptor)是kafka2.10.0.0版本中引入的新功能。
B 它可以使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。
C 同时允许指定多个Interceptor按序作用于同一条消息从而形成一个拦截器链(Interceptor Chain)。
D 自定义拦截器需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口。
答案:
1=>A 拦截器(Interceptor)是kafka0.10.0.0版本中引入的新功能。