JAVA全系列 教程
3762个小节阅读:7095k
目录
C语言快速入门
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
消息有序指的是,消费者端消费消息时,需按照消息的发送顺序来消费,即先发送的消息,需要先消费(FIFO)。
举个容易理解的例子:
通常创建订单后,会经历一系列的操作:【订单创建 -> 订单支付 -> 订单发货 -> 订单配送 -> 订单完成】。在创建完订单后,会发送五条消息到MQ Broker中,消费的时候要按照【订单创建 -> 订单支付 -> 订单发货 -> 订单配送 -> 订单完成】这个顺序去消费,这样的订单才是有效的。
在默认的情况下,消息发送会采取Round Robin轮询方式把消息发送到不同的queue;而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序的。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
全局顺序消息的话,我们需要将所有消息都发送到同一个队列,然后消费者端也订阅同一个队列,这样就能实现顺序消费消息的功能。下面通过一个示例说明如何实现全局顺序消息。
实时效果反馈
1.RocketMQ中全局顺序消息是将所有消息都放到___队列?
A 1个
B 2个
C 3个
D 4个
答案
1=>A
(1)、生产者发送消息
xxxxxxxxxx
public class OrderMQProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException, ExecutionException {
// 创建DefaultMQProducer类并设定生产者名称
DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test");
// 设置NameServer地址,如果是集群的话,使用分号;分隔开
mqProducer.setNamesrvAddr("192.168.47.100:9876");
// 启动消息生产者
mqProducer.start();
for (int i = 0; i < 5; i++) {
// 创建消息,并指定Topic(主题),Tag(标签)和消息内容
Message message = new Message("GLOBAL_ORDER_TOPIC", "", ("全局有序消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 实现MessageQueueSelector,重写select方法,保证消息都进入同一个队列
// send方法的第一个参数: 需要发送的消息Message
// send方法的第二个参数: 消息队列选择器MessageQueueSelector
// send方法的第三个参数: 消息将要进入的队列下标,这里我们指定消息都发送到下标为1的队列
SendResult sendResult = mqProducer.send(message, new MessageQueueSelector() {
@Override
// select方法第一个参数: 指该Topic下有的队列集合
// 第二个参数: 发送的消息
// 第三个参数: 消息将要进入的队列下标,它与send方法的第三个参数相同
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get((Integer) arg);
}
}, 1);
System.out.println("sendResult = " + sendResult);
}
// 如果不再发送消息,关闭Producer实例
mqProducer.shutdown();
}
}
xxxxxxxxxx
public class OrderMQConsumer {
public static void main(String[] args) throws MQClientException {
// 创建DefaultMQPushConsumer类并设定消费者名称
DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test");
// 设置NameServer地址,如果是集群的话,使用分号;分隔开
mqPushConsumer.setNamesrvAddr("192.168.47.100:9876");
// 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
// 如果不是第一次启动,那么按照上次消费的位置继续消费
mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息,如果订阅该主题下的所有tag,则使用*
mqPushConsumer.subscribe("GLOBAL_ORDER_TOPIC", "*");
/**
* 与普通消费一样需要注册消息监听器,但是传入的不再是MessageListenerConcurrently
* 而是需要传入MessageListenerOrderly的实现子类,并重写consumeMessage方法。
*/
// 顺序消费同一个队列的消息
mqPushConsumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(false);
for (MessageExt msg : msgs) {
System.out.println("消费线程=" + Thread.currentThread().getName() +
", queueId=" + msg.getQueueId() + ", 消息内容:" + new String(msg.getBody()));
}
// 标记该消息已经被成功消费
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 启动消费者实例
mqPushConsumer.start();
}
}