JAVA全系列 教程
3762个小节阅读:7095.5k
目录
C语言快速入门
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
xxxxxxxxxx
public class OrderMQProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException, ExecutionException {
// 创建DefaultMQProducer类并设定生产者名称
DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test");
// 设置NameServer地址,如果是集群的话,使用分号;分隔开
mqProducer.setNamesrvAddr("10.0.90.86:9876");
// 启动消息生产者
mqProducer.start();
List<Order> orderList = getOrderList();
for (int i = 0; i < orderList.size(); i++) {
String body = "【" + orderList.get(i) + "】订单状态变更消息";
// 创建消息,并指定Topic(主题),Tag(标签)和消息内容
Message msg = new Message("ORDER_STATUS_CHANGE", "", body.getBytes(RemotingHelper.DEFAULT_CHARSET));
// MessageQueueSelector: 消息队列选择器,根据业务唯一标识自定义队列选择算法
/**
* msg:消息对象
* selector:消息队列的选择器
* arg:选择队列的业务标识,如本例中的orderId
*/
SendResult sendResult = mqProducer.send(msg, new MessageQueueSelector() {
/**
* @param mqs 队列集合
* @param msg 消息对象
* @param arg 业务标识的参数,对应send()方法传入的第三个参数arg
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//根据arg(实际上是订单id)选择消息发送的队列
long index = (Long) arg % mqs.size();
return mqs.get((int) index);
}
//mqProducer.send()方法第三个参数, 会传递到select()方法的arg参数
}, orderList.get(i).getOrderId());
System.out.println(String.format("消息发送状态:%s, orderId:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
orderList.get(i).getOrderId(),
sendResult.getMessageQueue().getQueueId(),
body));
}
// 如果不再发送消息,关闭Producer实例
mqProducer.shutdown();
}
/**
* 订单状态变更流程: ORDER_CREATE(订单创建) -> ORDER_PAYED(订单已支付) -> ORDER_COMPLETE(订单完成)
*/
public static List<Order> getOrderList() {
List<Order> orderList = new ArrayList<>();
Order orderDemo = new Order();
orderDemo.setOrderId(1L);
orderDemo.setOrderStatus("ORDER_CREATE");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(2L);
orderDemo.setOrderStatus("ORDER_CREATE");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(1L);
orderDemo.setOrderStatus("ORDER_PAYED");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(2L);
orderDemo.setOrderStatus("ORDER_PAYED");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(2L);
orderDemo.setOrderStatus("ORDER_COMPLETE");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(3L);
orderDemo.setOrderStatus("ORDER_CREATE");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(4L);
orderDemo.setOrderStatus("ORDER_CREATE");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(3L);
orderDemo.setOrderStatus("ORDER_PAYED");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(1L);
orderDemo.setOrderStatus("ORDER_COMPLETE");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(3L);
orderDemo.setOrderStatus("ORDER_COMPLETE");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(4L);
orderDemo.setOrderStatus("ORDER_PAYED");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(4L);
orderDemo.setOrderStatus("ORDER_COMPLETE");
orderList.add(orderDemo);
return orderList;
}
}
public class Order implements Serializable {
/**
* 订单ID
*/
private Long orderId;
/**
* 订单状态
*/
private String orderStatus;
public Long getOrderId() {
return orderId;
}
public void setOrderId(Long orderId) {
this.orderId = orderId;
}
public String getOrderStatus() {
return orderStatus;
}
public void setOrderStatus(String orderStatus) {
this.orderStatus = orderStatus;
}
@Override
public String toString() {
return "Order{" +
"orderId=" + orderId +
", orderStatus='" + orderStatus + '\'' +
'}';
}
}
xxxxxxxxxx
public class OrderMQConsumer {
public static void main(String[] args) throws MQClientException {
// 创建DefaultMQPushConsumer类并设定消费者名称
DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test");
// 设置NameServer地址,如果是集群的话,使用分号;分隔开
mqPushConsumer.setNamesrvAddr("192.168.47.100:9876");
// 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
// 如果不是第一次启动,那么按照上次消费的位置继续消费
mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息,如果订阅该主题下的所有tag,则使用*
mqPushConsumer.subscribe("ORDER_STATUS_CHANGE", "*");
// 注册回调实现类来处理从broker拉取回来的消息
// 注意:顺序消息注册的是MessageListenerOrderly监听器
mqPushConsumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext consumeOrderlyContext) {
consumeOrderlyContext.setAutoCommit(true);
for (MessageExt msg : msgList) {
// 每个queue有唯一的consume线程来消费, 订单对每个queue都是分区有序
System.out.println("消费线程=" + Thread.currentThread().getName() +
", queueId=" + msg.getQueueId() + ", 消息内容:" + new String(msg.getBody()));
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 标记该消息已经被成功消费
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 启动消费者实例
mqPushConsumer.start();
}
}