大数据全系列 教程
1869个小节阅读:464.8k
目录
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
对于有key的消息而言,Java版本Producer自带的Partitioner会根据murmur2算法计算消息key哈希值,然后对总分区数求模得到消息要被发送到的目标分区号。对应的计算源码:(Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions)
若要使用自定义分区机制,需要完成两件事:
自定义分区器类
在用于构造KafkaProducer的Properties对象中设置partitioner.class参数
需求:假设我们的消息中有一些消息是用于审计功能的,这类消息的key会被固定的分配一个字符串"baizhan"。我们让这类消息发送到topicA主题的最后一个分区上,便于后续统计处理。其它消息采用随机发送的策略。
具体实现步骤:
修改主题topicA的分区数为3
xxxxxxxxxx
[root@node3 ~]# kafka-topics.sh --bootstrap-server node3:9092 --topic topicA --alter --partitions 3
#修改后查看主题topicA的信息
[root@node3 ~]# kafka-topics.sh --bootstrap-server node3:9092 --describe --topic topicA
Topic: topicA TopicId: gSTy9E-8T_aPF95n3EkUjA PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: topicA Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: topicA Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0
Topic: topicA Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
创建分区器类BaizhanPartitioner
编写BaizhanPartitioner类代码
xxxxxxxxxx
package com.itbaizhan.kafka.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
import java.util.Random;
public class BaizhanPartitioner implements Partitioner {
private Random random;
@Override
public void configure(Map<String, ?> configs) {
//该方法实现必要资源的初始化工作
random = new Random();
}
/** 计算信息对应的分区
* @param topic 主题
* @param key 消息的key
* @param keyBytes 消息的key序列化后的字节数组
* @param value 消息的value
* @param valueBytes 消息value序列化后的字节数组
* @param cluster 集群元数据 可以获取分区信息
* @return 息对应的分区号
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//将key转换为字符串
String keyInfo = (String)key;
//获取指定主题的分区对象列表
List<PartitionInfo> partitionInfoList =
cluster.availablePartitionsForTopic(topic);
//获取指定主题下的分区总数量
int partCount = partitionInfoList.size();
//最后一个分区号
int baizhanPartition = partCount -1;
return keyInfo==null || keyInfo.isEmpty()
||!keyInfo.contains("baizhan")
? random.nextInt(partCount-1) : baizhanPartition ;
}
@Override
public void close() {
//该方法实现必要资源的清理工作
random = null;
}
}