支付通知-rabbitMQ
支付通知
1 部署RabbitMQ
-
拉取
docker pull rabbitmq:3-management
-
运行
docker run \ -e RABBITMQ_DEFAULT_USER=ciky \ -e RABBITMQ_DEFAULT_PASS=190715 \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management
2 订单服务集成MQ
(1) 添加消息队列依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2) nacos配置
-
rabbitmq-dev.yaml为通用配置文件
spring: rabbitmq: host: 192.168.65.129 port: 5672 username: xxxx password: xxxx virtual-host: / publisher-confirm-type: correlated #correlated 异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback publisher-returns: false #开启publish-return功能,同样是基于callback机制,需要定义ReturnCallback template: mandatory: false #定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息 listener: simple: prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息 acknowledge-mode: none #auto:出现异常时返回unack,消息回滚到mq;没有异常,返回ack ,manual:手动控制,none:丢弃消息,不回滚到mq retry: enabled: true #开启消费者失败重试 initial-interval: 1000ms #初识的失败等待时长为1秒 multiplier: 1 #失败的等待时长倍数,下次等待时长 = multiplier * last-interval max-attempts: 3 #最大重试次数 stateless: true #true无状态;false有状态。如果业务中包含事务,这里改为false
(3) 引入配置文件
- 生产方(orders)和消费方(learning)都需要引入
- data-id: rabbitmq-${spring.profiles.active}.yaml
group: xuecheng-plus-common
refresh: true
(4) MQ配置类
-
生产者配置
/** * @author Mr.M * @version 1.0 * @description mq配置类 * @date 2023/2/23 16:59 */ @Slf4j @Configuration public class PayNotifyConfig implements ApplicationContextAware { //交换机 public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout"; //支付结果通知消息类型 public static final String MESSAGE_TYPE = "payresult_notify"; //支付通知队列 public static final String PAYNOTIFY_QUEUE = "paynotify_queue"; //声明交换机,且持久化 @Bean(PAYNOTIFY_EXCHANGE_FANOUT) public FanoutExchange paynotify_exchange_fanout() { // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false); } //支付通知队列,且持久化 @Bean(PAYNOTIFY_QUEUE) public Queue course_publish_queue() { return QueueBuilder.durable(PAYNOTIFY_QUEUE).build(); } //交换机和支付通知队列绑定 @Bean public Binding binding_course_publish_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 获取RabbitTemplate RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); //消息处理service MqMessageService mqMessageService = applicationContext.getBean(MqMessageService.class); // 设置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 投递失败,记录日志 log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", replyCode, replyText, exchange, routingKey, message.toString()); MqMessage mqMessage = JSON.parseObject(message.toString(), MqMessage.class); //将消息再添加到消息表 mqMessageService.addMessage(mqMessage.getMessageType(),mqMessage.getBusinessKey1(),mqMessage.getBusinessKey2(),mqMessage.getBusinessKey3()); }); } }
-
消费者配置
@Slf4j @Configuration public class PayNotifyConfig { //交换机 public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout"; //支付结果通知消息类型 public static final String MESSAGE_TYPE = "payresult_notify"; //支付通知队列 public static final String PAYNOTIFY_QUEUE = "paynotify_queue"; //声明交换机,且持久化 @Bean(PAYNOTIFY_EXCHANGE_FANOUT) public FanoutExchange paynotify_exchange_fanout() { // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false); } //支付通知队列,且持久化 @Bean(PAYNOTIFY_QUEUE) public Queue course_publish_queue() { return QueueBuilder.durable(PAYNOTIFY_QUEUE).build(); } //交换机和支付通知队列绑定 @Bean public Binding binding_course_publish_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } }
3 生产者发送消息
(1) 发送消息方法
/**
* 发送通知结果
* @param message
*/
public void notifyPayResult(MqMessage message);
@Override
public void notifyPayResult(MqMessage message) {
//消息内容
String jsonString = JSON.toJSONString(message);
//创建一个持久化消息
Message messagePER = MessageBuilder
.withBody(jsonString.getBytes(StandardCharsets.UTF_8)) //指定编码
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久化
.build();
//消息id
Long id = message.getId();
//全局消息id
CorrelationData correlationData = new CorrelationData(id.toString());
//使用correlationData指定回调方法
correlationData.getFuture().addCallback( result->{
if(result.isAck()){
//消息成功发送到了交换机
log.debug("发送消息成功:{}",jsonString);
//将消息从(mq_message)删除
mqMessageService.completed(id);
}else{
//消息发送失败
log.debug("发送消息失败:{}",jsonString);
}
},ex->{
//发生异常
log.debug("发送消息异常:{}",jsonString);
});
//发送消息
rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT,"",messagePER,correlationData);
}
(2) 修改saveAliPayStatus方法
- 完成支付操作后,支付宝回调paynotify方法,通知支付结果,保存支付宝支付结果,发送消息
/**
* 保存支付宝支付结果
* @payStatusDto 从支付宝查询到的信息
*/
@Override
@Transactional
public void saveAliPayStatus(PayStatusDto payStatusDto){
// (1) 支付记录号
String payNo = payStatusDto.getOut_trade_no();
XcPayRecord xcPayRecordDB = getPayRecordByPayNo(payNo);
if(xcPayRecordDB == null){
XueChengPlusException.cast("找不到相关的支付记录");
}
// (2) 相关的订单id
Long orderId = xcPayRecordDB.getOrderId();
XcOrders xcOrdersDB = xcOrdersMapper.selectById(orderId);
if(xcOrdersDB == null){
XueChengPlusException.cast("找不到相关联的订单");
}
// (3) 支付状态
String statusDB = xcPayRecordDB.getStatus();
if("601002".equals(statusDB)){
//已经支付
return;
}
//如果支付成功
String tradeStatus = payStatusDto.getTrade_status(); //从支付宝查询到的支付结果
if(tradeStatus.equals("TRADE_SUCCESS")){ //支付宝返回的信息为"TRADE_SUCCESS"
//1. 更新支付记录表(xc_pay_record)的状态
xcPayRecordDB.setStatus("601002"); //支付状态:支付成功(601002)
xcPayRecordDB.setOutPayNo(payStatusDto.getTrade_no()); //支付宝的订单号
xcPayRecordDB.setOutPayChannel("AliPay"); //第三方支付渠道编号(支付宝:AliPay)
xcPayRecordDB.setPaySuccessTime(LocalDateTime.now()); //支付成功的时间
xcPayRecordMapper.updateById(xcPayRecordDB);
//2. 更新订单表(xc_orders)的状态
xcOrdersDB.setStatus("600002"); //订单状态:交易成功(600002)
xcOrdersMapper.updateById(xcOrdersDB);
//新增内容----------------------------------------------------------------------------
//3. 将消息写到消息表(mq_message)
MqMessage mqMessage = mqMessageService.addMessage("payresult_notify", xcOrdersDB.getOutBusinessId(), xcOrdersDB.getOrderType(), null);
//4. 发送消息
notifyPayResult(mqMessage);
//------------------------------------------------------------------------------------
}
}
4 消费者消费消息
(1) 接收消息方法
/**
* @Author: ciky
* @Description: 接受消息通知类
* @DateTime: 2024/8/6 12:54
**/
@Slf4j
@Service
public class ReceivePayNotifyService {
@Autowired
private MyCourseTableService myCourseTableService;
@RabbitListener(queues = PayNotifyConfig.PAYNOTIFY_QUEUE)
public void receive(Message message){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 1. 解析出消息
//(1) 发送方发送的是MqMessage类的JSON串,放到MessageBuilder创建出Message消息
//(2) 所以接收到Message消息的时候,从中取出body,即是MqMessage的Json串
byte[] body = message.getBody();
String jsonString = new String(body);
// 2. 将JSON串转成MqMessage对象
MqMessage mqMessage = JSON.parseObject(jsonString, MqMessage.class);
// 3. 解析消息的内容
String chooseCourseId = mqMessage.getBusinessKey1(); //选课id
String orderType = mqMessage.getBusinessKey2(); //订单类型
//学习中心服务只要(购买课程60201)的支付订单的结果
if(orderType.equals("60201")){
//根据消息内容,更新选课记录表(xc_choose_course),我的课程表(xc_course_tables)
//调用方法---------------------------
boolean b = myCourseTableService.saveChooseCourseSuccess(chooseCourseId);
if(!b){
XueChengPlusException.cast("保存选课记录状态失败");
}
}
}
}
(2) 保存选课成功状态
/**
* 保存选课成功状态
* @param chooseCourseId
* @return
*/
public boolean saveChooseCourseSuccess(String chooseCourseId);
@Override
public boolean saveChooseCourseSuccess(String chooseCourseId) {
// 1. 根据选课id查询选课表(xc_choose_course)
XcChooseCourse xcChooseCourseDB = chooseCourseMapper.selectById(chooseCourseId);
if (xcChooseCourseDB == null) {
log.debug("接收购买课程的消息,根据选课id从数据库找不到选课记录,选课id:{}", chooseCourseId);
return false;
}
// 2. 选课状态
String status = xcChooseCourseDB.getStatus();
if (status.equals("701002")) {
// (1) 待支付(701002)才更新为已支付(701001)
xcChooseCourseDB.setStatus("701001");
int i = chooseCourseMapper.updateById(xcChooseCourseDB);
if(i<=0){
log.debug("添加选课记录失败:{}",xcChooseCourseDB);
XueChengPlusException.cast("添加选课记录失败");
}
// (2) 向我的课程表(xc_course_tables)插入记录
XcCourseTables xcCourseTables = addCourseTables(xcChooseCourseDB);
}
return true;
}