Skip to content

transaction

事务消息的使用限制

  1. 事务消息不支持延时消息和批量消息。
  2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
  3. 事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
  4. 事务性消息可能不止一次被检查或消费。
  5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  6. 事务消息的生产者 GroupName 不能与其他类型消息的生产者 GroupName 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 GroupName 查询到生产者。

事务回查的工作原理

  1. 半事务消息:当生产者发送一条事务消息时,这条消息首先会被标记为“半事务消息”,此时它不会被消费者消费。
  2. 本地事务执行:接着,生产者会执行与这条消息关联的本地事务逻辑。
  3. 提交状态确认:生产者需要向RocketMQ服务端提交该事务的状态(提交COMMIT_MESSAGE或回滚ROLLBACK_MESSAGE)。如果因为某些原因(如网络问题、生产者崩溃等),RocketMQ服务端未能接收到这个提交状态,那么这条消息就会处于不确定状态。
  4. 事务回查:在这种情况下,RocketMQ的服务端会在一段时间后(根据配置的参数),对这条未决的消息进行回查。它会调用生产者实现的回查接口(checkLocalTransaction方法),询问这条消息对应的本地事务最终状态是什么。 回查结果处理:根据回查的结果,生产者再次向RocketMQ服务端提交正确的事务状态。如果是提交,则该消息可以被消费者消费;如果是回滚,则该消息会被丢弃。
java
package com.jasper.transaction;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;

/**
 * 使用场景
 * 当生产者成功发送了消息但本地事务失败时,或者消费者已经消费了消息但是后续的本地处理失败了。
 */
@Slf4j
public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.setTransactionListener(
                new TransactionListener() {

                    @Override
                    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                        try {
                            System.out.println("更新订单状态和库存");
                            int temp = 3/0;
                            return LocalTransactionState.COMMIT_MESSAGE;
                        } catch (Exception e) {
                            log.info("执行出现异常");
                            return LocalTransactionState.ROLLBACK_MESSAGE;
                        }
                    }

                    @Override
                    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                        log.info("回查本地事务状态");
                        return LocalTransactionState.COMMIT_MESSAGE;
                    }
                }
        );

        producer.start();

        Message message = new Message("orderTopic", "tagA", "keyA", "order".getBytes());
        producer.sendMessageInTransaction(message, null);

        producer.shutdown();
    }
}