支付通知-rabbitMQ

Author Avatar
ciky 09月 02,2024
  • 在其它设备中阅读本文章
  • 点击生成二维码

支付通知


1 部署RabbitMQ


  • 拉取

    docker pull rabbitmq:3-management
    
  • 运行

    docker run \
     -e RABBITMQ_DEFAULT_USER=ciky \
     -e RABBITMQ_DEFAULT_PASS=190715 \
     --name mq \
     --hostname mq1 \
     -p 15672:15672 \
     -p 5672:5672 \
     -d \
     rabbitmq:3-management
    

2 订单服务集成MQ


(1) 添加消息队列依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

(2) nacos配置

  • rabbitmq-dev.yaml为通用配置文件

    spring:
      rabbitmq:
        host: 192.168.65.129
        port: 5672
        username: xxxx
        password: xxxx
        virtual-host: /
        publisher-confirm-type: correlated #correlated 异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
        publisher-returns: false #开启publish-return功能,同样是基于callback机制,需要定义ReturnCallback
        template:
          mandatory: false #定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
        listener:
          simple:
            prefetch: 1  #每次只能获取一条消息,处理完成才能获取下一个消息
            acknowledge-mode: none #auto:出现异常时返回unack,消息回滚到mq;没有异常,返回ack ,manual:手动控制,none:丢弃消息,不回滚到mq
            retry:
              enabled: true #开启消费者失败重试
              initial-interval: 1000ms #初识的失败等待时长为1秒
              multiplier: 1 #失败的等待时长倍数,下次等待时长 = multiplier * last-interval
              max-attempts: 3 #最大重试次数
              stateless: true #true无状态;false有状态。如果业务中包含事务,这里改为false
    

(3) 引入配置文件

  • 生产方(orders)和消费方(learning)都需要引入
          - data-id: rabbitmq-${spring.profiles.active}.yaml
            group: xuecheng-plus-common
            refresh: true

(4) MQ配置类

  • 生产者配置

    /**
     * @author Mr.M
     * @version 1.0
     * @description mq配置类
     * @date 2023/2/23 16:59
     */
    @Slf4j
    @Configuration
    public class PayNotifyConfig implements ApplicationContextAware {
    
        //交换机
        public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout";
        //支付结果通知消息类型
        public static final String MESSAGE_TYPE = "payresult_notify";
        //支付通知队列
        public static final String PAYNOTIFY_QUEUE = "paynotify_queue";
    
        //声明交换机,且持久化
        @Bean(PAYNOTIFY_EXCHANGE_FANOUT)
        public FanoutExchange paynotify_exchange_fanout() {
            // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
            return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false);
        }
        //支付通知队列,且持久化
        @Bean(PAYNOTIFY_QUEUE)
        public Queue course_publish_queue() {
            return QueueBuilder.durable(PAYNOTIFY_QUEUE).build();
        }
    
        //交换机和支付通知队列绑定
        @Bean
        public Binding binding_course_publish_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange);
        }
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            // 获取RabbitTemplate
            RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
            //消息处理service
            MqMessageService mqMessageService = applicationContext.getBean(MqMessageService.class);
            // 设置ReturnCallback
            rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                // 投递失败,记录日志
                log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                        replyCode, replyText, exchange, routingKey, message.toString());
                MqMessage mqMessage = JSON.parseObject(message.toString(), MqMessage.class);
                //将消息再添加到消息表
                mqMessageService.addMessage(mqMessage.getMessageType(),mqMessage.getBusinessKey1(),mqMessage.getBusinessKey2(),mqMessage.getBusinessKey3());
    
            });
        }
    }
    
  • 消费者配置

    @Slf4j
    @Configuration
    public class PayNotifyConfig {
    
        //交换机
        public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout";
        //支付结果通知消息类型
        public static final String MESSAGE_TYPE = "payresult_notify";
        //支付通知队列
        public static final String PAYNOTIFY_QUEUE = "paynotify_queue";
    
        //声明交换机,且持久化
        @Bean(PAYNOTIFY_EXCHANGE_FANOUT)
        public FanoutExchange paynotify_exchange_fanout() {
            // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
            return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false);
        }
        //支付通知队列,且持久化
        @Bean(PAYNOTIFY_QUEUE)
        public Queue course_publish_queue() {
            return QueueBuilder.durable(PAYNOTIFY_QUEUE).build();
        }
    
        //交换机和支付通知队列绑定
        @Bean
        public Binding binding_course_publish_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange);
        }
    }
    

3 生产者发送消息


(1) 发送消息方法

/**
 * 发送通知结果
 * @param message
 */
public void notifyPayResult(MqMessage message);
@Override
public void notifyPayResult(MqMessage message) {
    //消息内容
    String jsonString = JSON.toJSONString(message);
    //创建一个持久化消息
    Message messagePER = MessageBuilder
            .withBody(jsonString.getBytes(StandardCharsets.UTF_8))  //指定编码
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)        //持久化
            .build();

    //消息id
    Long id = message.getId();

    //全局消息id
    CorrelationData correlationData = new CorrelationData(id.toString());
    //使用correlationData指定回调方法
    correlationData.getFuture().addCallback( result->{
        if(result.isAck()){
            //消息成功发送到了交换机
            log.debug("发送消息成功:{}",jsonString);
            //将消息从(mq_message)删除
            mqMessageService.completed(id);

        }else{
            //消息发送失败
            log.debug("发送消息失败:{}",jsonString);
        }

    },ex->{
        //发生异常
        log.debug("发送消息异常:{}",jsonString);
    });

    //发送消息
    rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT,"",messagePER,correlationData);
}

(2) 修改saveAliPayStatus方法

  • 完成支付操作后,支付宝回调paynotify方法,通知支付结果,保存支付宝支付结果,发送消息
/**
 * 保存支付宝支付结果
 * @payStatusDto 从支付宝查询到的信息
 */
@Override
@Transactional
public void saveAliPayStatus(PayStatusDto payStatusDto){
    // (1) 支付记录号
    String payNo = payStatusDto.getOut_trade_no();
    XcPayRecord xcPayRecordDB = getPayRecordByPayNo(payNo);
    if(xcPayRecordDB == null){
        XueChengPlusException.cast("找不到相关的支付记录");
    }

    // (2) 相关的订单id
    Long orderId = xcPayRecordDB.getOrderId();
    XcOrders xcOrdersDB = xcOrdersMapper.selectById(orderId);
    if(xcOrdersDB == null){
        XueChengPlusException.cast("找不到相关联的订单");
    }

    // (3) 支付状态
    String statusDB = xcPayRecordDB.getStatus();
    if("601002".equals(statusDB)){
        //已经支付
        return;
    }

    //如果支付成功
    String tradeStatus = payStatusDto.getTrade_status();    //从支付宝查询到的支付结果
    if(tradeStatus.equals("TRADE_SUCCESS")){    //支付宝返回的信息为"TRADE_SUCCESS"
        //1. 更新支付记录表(xc_pay_record)的状态
        xcPayRecordDB.setStatus("601002");                      //支付状态:支付成功(601002)
        xcPayRecordDB.setOutPayNo(payStatusDto.getTrade_no());  //支付宝的订单号
        xcPayRecordDB.setOutPayChannel("AliPay");               //第三方支付渠道编号(支付宝:AliPay)
        xcPayRecordDB.setPaySuccessTime(LocalDateTime.now());   //支付成功的时间
        xcPayRecordMapper.updateById(xcPayRecordDB);

        //2. 更新订单表(xc_orders)的状态
        xcOrdersDB.setStatus("600002"); //订单状态:交易成功(600002)
        xcOrdersMapper.updateById(xcOrdersDB);

        //新增内容----------------------------------------------------------------------------
        //3. 将消息写到消息表(mq_message)
        MqMessage mqMessage = mqMessageService.addMessage("payresult_notify", xcOrdersDB.getOutBusinessId(), xcOrdersDB.getOrderType(), null);

        //4. 发送消息
        notifyPayResult(mqMessage);
        //------------------------------------------------------------------------------------
    }
}


4 消费者消费消息


(1) 接收消息方法

/**
 * @Author: ciky
 * @Description: 接受消息通知类
 * @DateTime: 2024/8/6 12:54
 **/
@Slf4j
@Service
public class ReceivePayNotifyService {

    @Autowired
    private MyCourseTableService myCourseTableService;

    @RabbitListener(queues = PayNotifyConfig.PAYNOTIFY_QUEUE)
    public void receive(Message message){

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


        // 1. 解析出消息
        //(1) 发送方发送的是MqMessage类的JSON串,放到MessageBuilder创建出Message消息
        //(2) 所以接收到Message消息的时候,从中取出body,即是MqMessage的Json串
        byte[] body = message.getBody();
        String jsonString = new String(body);
        // 2. 将JSON串转成MqMessage对象
        MqMessage mqMessage = JSON.parseObject(jsonString, MqMessage.class);

        // 3. 解析消息的内容
        String chooseCourseId = mqMessage.getBusinessKey1();    //选课id
        String orderType = mqMessage.getBusinessKey2();         //订单类型

        //学习中心服务只要(购买课程60201)的支付订单的结果
        if(orderType.equals("60201")){
            
            //根据消息内容,更新选课记录表(xc_choose_course),我的课程表(xc_course_tables)
            									//调用方法---------------------------
            boolean b = myCourseTableService.saveChooseCourseSuccess(chooseCourseId);
            if(!b){
                XueChengPlusException.cast("保存选课记录状态失败");
            }
        }
    }
}

(2) 保存选课成功状态

/**
 * 保存选课成功状态
 * @param chooseCourseId
 * @return
 */
public boolean saveChooseCourseSuccess(String chooseCourseId);
@Override
public boolean saveChooseCourseSuccess(String chooseCourseId) {
    // 1. 根据选课id查询选课表(xc_choose_course)
    XcChooseCourse xcChooseCourseDB = chooseCourseMapper.selectById(chooseCourseId);
    if (xcChooseCourseDB == null) {
        log.debug("接收购买课程的消息,根据选课id从数据库找不到选课记录,选课id:{}", chooseCourseId);
        return false;
    }

    // 2. 选课状态
    String status = xcChooseCourseDB.getStatus();
    if (status.equals("701002")) {
        // (1) 待支付(701002)才更新为已支付(701001)
        xcChooseCourseDB.setStatus("701001");
        int i = chooseCourseMapper.updateById(xcChooseCourseDB);
        if(i<=0){
            log.debug("添加选课记录失败:{}",xcChooseCourseDB);
            XueChengPlusException.cast("添加选课记录失败");
        }

        // (2) 向我的课程表(xc_course_tables)插入记录
        XcCourseTables xcCourseTables = addCourseTables(xcChooseCourseDB);
    }
    return true;
}