javapackage com.jasper.transaction;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* 使用场景
* 当生产者成功发送了消息但本地事务失败时,或者消费者已经消费了消息但是后续的本地处理失败了。
*/
@Slf4j
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.setTransactionListener(
new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
System.out.println("更新订单状态和库存");
int temp = 3/0;
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
log.info("执行出现异常");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
log.info("回查本地事务状态");
return LocalTransactionState.COMMIT_MESSAGE;
}
}
);
producer.start();
Message message = new Message("orderTopic", "tagA", "keyA", "order".getBytes());
producer.sendMessageInTransaction(message, null);
producer.shutdown();
}
}