JAVA全系列 教程
3762个小节阅读:7088.2k
目录
C语言快速入门
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
业务逻辑层主要实现了用户提交订单后的业务逻辑。
xxxxxxxxxx
/**
* 添加订单
* @param productId 商品id
* @param payCount 购买数量
*/
void save(Long productId,Integer payCount);
/**
* 提交订单同时保存事务信息
*/
void submitOrderAndSaveTxNo(TxMessage txMessage);
/**
* 提交订单
* @param productId 商品id
* @param payCount 购买数量
*/
void submitOrder(Long productId, Integer payCount);
xxxxxxxxxx
package com.itbaizhan.order.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.itbaizhan.order.entity.Order;
import com.itbaizhan.order.entity.TxLog;
import com.itbaizhan.order.mapper.OrderMapper;
import com.itbaizhan.order.mapper.TxLogMapper;
import com.itbaizhan.order.service.IOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.itbaizhan.order.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.UUID;
/**
* <p>
* 服务实现类
* </p>
*
* @author itbaizhan
* @since 05-20
*/
@Slf4j
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService {
@Resource
RocketMQTemplate rocketMQTemplate;
@Resource
private TxLogMapper txLogMapper;
/**
* 添加
* @param productId 商品id
* @param payCount 购买数量
*/
@Override
public void save(Long productId, Integer payCount) {
Order order = new Order();
// 订单创建时间
order.setCreateTime(LocalDateTime.now());
// 生产订单编号
order.setOrderNo(UUID.randomUUID().toString().replace("-",""));
// 商品id
order.setProductId(productId);
// 购买数量
order.setPayCount(payCount);
baseMapper.insert(order);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void submitOrderAndSaveTxNo(TxMessage txMessage) {
TxLog txLog = txLogMapper.selectById(txMessage.getTxNo());
if(txLog != null){
log.info("订单微服务已经执行过事务,商品id为:{},事务编号为:{}",txMessage.getProductId(), txMessage.getTxNo());
return;
}
//生成订单
this.save(txMessage.getProductId(),txMessage.getPayCount());
//生成订单
txLog = new TxLog();
txLog.setTxNo(txMessage.getTxNo());
txLog.setCreateTime(LocalDateTime.now());
//添加事务日志
txLogMapper.insert(txLog);
}
/**
* 提交订单
* @param productId 商品id
* @param payCount 购买数量
*/
@Override
public void submitOrder(Long productId, Integer payCount) {
//生成全局分布式序列号
String txNo = UUID.randomUUID().toString();
TxMessage txMessage = new TxMessage(productId, payCount, txNo);
JSONObject jsonObject = new JSONObject();
jsonObject.put("txMessage", txMessage);
Message<String> message = MessageBuilder.withPayload(jsonObject.toJSONString()).build();
//发送事务消息 且该消息不允许消费 tx_order_group: 指定版事务消息组
rocketMQTemplate.sendMessageInTransaction("tx_order_group", "topic_txmsg", message, null);
}
}
xxxxxxxxxx
package com.itbaizhan.order.message;
import com.alibaba.fastjson.JSONObject;
import com.itbaizhan.order.entity.TxLog;
import com.itbaizhan.order.mapper.TxLogMapper;
import com.itbaizhan.order.service.IOrderService;
import com.itbaizhan.order.service.ITxLogService;
import com.itbaizhan.order.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
* @author itbaizhan
* @version 1.0.0
* @description 监听事务消息
*/
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "tx_order_group")
public class OrderTxMessageListener implements RocketMQLocalTransactionListener {
@Autowired
private IOrderService orderService;
@Resource
private TxLogMapper txLogMapper;
/**
* RocketMQ的Producer本地事务:先执行本地的业务代码(使用Spring的事件管理),判断是否成功。
* 成功返回: RocketMQLocalTransactionState.COMMIT,失败返回:RocketMQLocalTransactionState.ROLLBACK
*/
@Override
@Transactional(rollbackFor = Exception.class)
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object obj) {
try {
log.info("订单微服务执行本地事务");
TxMessage txMessage = this.getTxMessage(msg);
//执行本地事务
orderService.submitOrderAndSaveTxNo(txMessage);
//提交事务
log.info("订单微服务提交事务");
// COMMIT:即生产者通知Rocket该消息可以消费
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
e.printStackTrace();
//异常回滚事务
log.info("订单微服务回滚事务");
// ROLLBACK:即生产者通知Rocket将该消息删除
return RocketMQLocalTransactionState.ROLLBACK;
}
}
private TxMessage getTxMessage(Message msg) {
String messageString = new String((byte[]) msg.getPayload());
JSONObject jsonObject = JSONObject.parseObject(messageString);
String txStr = jsonObject.getString("txMessage");
return JSONObject.parseObject(txStr, TxMessage.class);
}
}
xxxxxxxxxx
/**
* 因为网络异常或其他原因时,RocketMQ的消息状态处于UNKNOWN时,调用该方法检查Producer的本地事务是否已经执行成功,
* 成功返回: RocketMQLocalTransactionState.COMMIT,失败返回:RocketMQLocalTransactionState.ROLLBACK
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
log.info("订单微服务查询本地事务");
TxMessage txMessage = this.getTxMessage(msg);
// 获取订单的消息
Integer exists = txLogService.isExistsTx(txMessage.getTxNo());
if (exists != null) {
// COMMIT:即生产者通知Rocket该消息可以消费
return RocketMQLocalTransactionState.COMMIT;
}
// UNKNOWN:即生产者通知Rocket继续查询该消息的状态
return RocketMQLocalTransactionState.UNKNOWN;
}
private TxMessage getTxMessage(Message msg) {
String messageString = new String((byte[]) msg.getPayload());
JSONObject jsonObject = JSONObject.parseObject(messageString);
String txStr = jsonObject.getString("txMessage");
return JSONObject.parseObject(txStr, TxMessage.class);
}