正文
RocketMQ事务消息-demo
小程序:扫一扫查出行
【扫一扫了解最新限行尾号】
复制小程序
【扫一扫了解最新限行尾号】
复制小程序
RocketMQ为4.3.0版本(我这种写法4.2.0不行)
如果你之前用的其他版本,需要去修改下系统的环境变量
maven工程用到的jar包
<dependencies>
<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.3.0</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency><dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.1</version>
</dependency><dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
生产者代码
package cn.ebiz.rocketmq.transaction;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//01 new 一个有事务基因的生产者
TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
//02 注册
producer.setNamesrvAddr("127.0.0.1:9876");
//03 开启
producer.start();
/**
* 04 生产者设置事务监听器,匿名内部类new一个事务监听器,
* 重写“执行本地事务”和“检查本地事务”两个方法,返回值都为
* “本地事务状态”
*/
producer.setTransactionListener(new TransactionListener() {
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String tag = msg.getTags();
if(tag.equals("Transaction1")) {
System.out.println("这里处理业务逻辑,比如操作数据库,失败情况下进行回滚");
//如果失败,再次给MQ发送消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("state -- "+new String(msg.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
}
});
for(int i=0;i<2;i++) {
try {
// 05 准备要发送的message,名字,标签,内容
Message msg = new Message("TopicTransaction","Transaction" + i,("Hello RocketMQ "+i).getBytes("UTF-8"));
// 06 用发送事务特有的方法发送消息,而不是简单的producer.send(msg);
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.println(msg.getBody());
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
// 07 关闭
producer.shutdown();
}
}
消费者代码
package cn.ebiz.rocketmq.transaction;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer0 {
public static void main(String[] args) throws MQClientException {
//01默认的消息消费FF者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
//02注册
consumer.setNamesrvAddr("127.0.0.1:9876");
//03设置获取原则
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//04订阅
consumer.subscribe("TopicTransaction","*");
//05注册监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
// 06接收消息并打印
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(),"utf-8");
String tags = msg.getTags();
System.out.println("收到消息: topic:"+topic+" ,tags:"+tags+" ,msg: "+msgBody);
}
} catch (Exception e) {
e.printStackTrace();
// 1s 2s 5s ... 2h
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 07开启
consumer.start();
System.out.println("Consumer Started.");
}
}
//namesrv启动
start mqnamesrv.cmd
//broker启动
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
---------------------
转自:https://blog.csdn.net/weixin_38537747/article/details/82112584