SpringBoot如何整合RocketMQ事务、广播以及顺序消息
导读:本文共5016.5字符,通常情况下阅读需要17分钟。同时您也可以点击右侧朗读,来听本文内容。按键盘←(左) →(右) 方向键可以翻页。
摘要: 环境:springboot2.3.9RELEASE + RocketMQ4.8.0依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></d... ...
目录
(为您整理了一些要点),点击可以直达。环境:springboot2.3.9RELEASE + RocketMQ4.8.0
依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>
配置文件
server:port:8080---rocketmq:nameServer:localhost:9876producer:group:demo-mq
普通消息
发送
@ResourceprivateRocketMQTemplaterocketMQTemplate;publicvoidsend(Stringmessage){rocketMQTemplate.convertAndSend("test-topic:tag2",MessageBuilder.withPayload(message).build());}
接受
@RocketMQMessageListener(topic="test-topic",consumerGroup="consumer01-group",selectorExpression="tag1||tag2")@ComponentpublicclassConsumerListenerimplementsRocketMQListener<String>{@OverridepublicvoidonMessage(Stringmessage){System.out.println("接收到消息:"+message);}}
顺序消息
发送
@ResourceprivateRocketMQTemplaterocketMQTemplate;publicvoidsendOrder(Stringtopic,Stringmessage,Stringtags,intid){rocketMQTemplate.asyncSendOrderly(topic+":"+tags,MessageBuilder.withPayload(message).build(),"order-"+id,newSendCallback(){@OverridepublicvoidonSuccess(SendResultsendResult){System.err.println("msg-id:"+sendResult.getMsgId()+":"+message+"\tqueueId:"+sendResult.getMessageQueue().getQueueId());}@OverridepublicvoidonException(Throwablee){e.printStackTrace();}});}
这里是根据hashkey将消息发送到不同的队列中
@RocketMQMessageListener(topic="order-topic",consumerGroup="consumer02-group",selectorExpression="tag3||tag4",consumeMode=ConsumeMode.ORDERLY)@ComponentpublicclassConsumerOrderListenerimplementsRocketMQListener<String>{@OverridepublicvoidonMessage(Stringmessage){System.out.println(Thread.currentThread().getName()+"接收到Order消息:"+message);}}
consumeMode = ConsumeMode.ORDERLY,指明了消息模式为顺序模式,一个队列,一个线程。
结果
当consumeMode = ConsumeMode.CONCURRENTLY执行结果如下:
集群/广播消息模式
发送端
@ResourceprivateRocketMQTemplaterocketMQTemplate;publicvoidsend(Stringtopic,Stringmessage,Stringtags){rocketMQTemplate.send(topic+":"+tags,MessageBuilder.withPayload(message).build());}
集群消息模式
消费端
@RocketMQMessageListener(topic="broad-topic",consumerGroup="consumer03-group",selectorExpression="tag6||tag7",messageModel=MessageModel.CLUSTERING)@ComponentpublicclassConsumerBroadListenerimplementsRocketMQListener<String>{@OverridepublicvoidonMessage(Stringmessage){System.out.println("ConsumerBroadListener1接收到消息:"+message);}}
messageModel = MessageModel.CLUSTERING
测试
启动两个服务分别端口是8080,8081
8080服务
8081服务
集群消息模式下,每个服务分别接收一部分消息,实现了负载均衡
广播消息模式
消费端
@RocketMQMessageListener(topic="broad-topic",consumerGroup="consumer03-group",selectorExpression="tag6||tag7",messageModel=MessageModel.BROADCASTING)@ComponentpublicclassConsumerBroadListenerimplementsRocketMQListener<String>{@OverridepublicvoidonMessage(Stringmessage){System.out.println("ConsumerBroadListener1接收到消息:"+message);}}
messageModel = MessageModel.BROADCASTING
测试
启动两个服务分别端口是8080,8081
8080服务
8081服务
集群消息模式下,每个服务分别都接受了同样的消息。
事务消息
RocketMQ事务的3个状态
TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息
TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
TransactionStatus.Unknown :中间状态,它代表需要检查消息队列来确定状态。
RocketMQ实现事务消息主要分为两个阶段:正常事务的发送及提交、事务信息的补偿流程 整体流程为:
正常事务发送与提交阶段
1、生产者发送一个半消息给MQServer(半消息是指消费者暂时不能消费的消息)
2、服务端响应消息写入结果,半消息发送成功
3、开始执行本地事务
4、根据本地事务的执行状态执行Commit或者Rollback操作
事务信息的补偿流程
1、如果MQServer长时间没收到本地事务的执行状态会向生产者发起一个确认回查的操作请求
2、生产者收到确认回查请求后,检查本地事务的执行状态
3、根据检查后的结果执行Commit或者Rollback操作
补偿阶段主要是用于解决生产者在发送Commit或者Rollback操作时发生超时或失败的情况。
发送端
@ResourceprivateRocketMQTemplaterocketMQTemplate;publicvoidsendTx(Stringtopic,Longid,Stringtags){rocketMQTemplate.sendMessageInTransaction(topic+":"+tags,MessageBuilder.withPayload(newUsers(id,UUID.randomUUID().toString().replaceAll("-",""))).setHeader("BID",UUID.randomUUID().toString().replaceAll("-","")).build(),UUID.randomUUID().toString().replaceAll("-",""));}
生产者对应的监听器
@RocketMQTransactionListenerpublicclassProducerTxListenerimplementsRocketMQLocalTransactionListener{@ResourceprivateBusinessServicebs;@OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){//这里执行本地的事务操作,比如保存数据。try{//创建一个日志记录表,将这唯一的ID存入数据库中,在下面的check方法中可以根据这个id查询是否有数据Stringid=(String)msg.getHeaders().get("BID");Usersusers=newJsonMapper().readValue((byte[])msg.getPayload(),Users.class);System.out.println("消息内容:"+users+"\t参与数据:"+arg+"\t本次事务的唯一编号:"+id);bs.save(users,newUsersLog(users.getId(),id));}catch(Exceptione){e.printStackTrace();returnRocketMQLocalTransactionState.ROLLBACK;}returnRocketMQLocalTransactionState.COMMIT;}@OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){//这里检查本地事务是否执行成功Stringid=(String)msg.getHeaders().get("BID");System.out.println("执行查询ID为:"+id+"的数据是否存在");UsersLogusersLog=bs.queryUsersLog(id);if(usersLog==null){returnRocketMQLocalTransactionState.ROLLBACK;}returnRocketMQLocalTransactionState.COMMIT;}}
消费端
@RocketMQMessageListener(topic="tx-topic",consumerGroup="consumer05-group",selectorExpression="tag10")@ComponentpublicclassConsumerTxListenerimplementsRocketMQListener<Users>{@OverridepublicvoidonMessage(Usersusers){System.out.println("TX接收到消息:"+users);}}
Service
@Transactionalpublicbooleansave(Usersusers,UsersLogusersLog){usersRepository.save(users);usersLogRepository.save(usersLog);if(users.getId()==1){thrownewRuntimeException("数据错误");}returntrue;}publicUsersLogqueryUsersLog(Stringbid){returnusersLogRepository.findByBid(bid);}
Controller
@GetMapping("/tx/{id}")publicObjectsendTx(@PathVariable("id")Longid){ps.sendTx("tx-topic",id,"tag10");return"sendtransactionsuccess";}
测试
调用接口后,控制台输出:
从打印日志看出来都保存完毕了后 消费端才接受到消息。
删除数据,再测试ID为1会报错的。
数据库中没有数据。。。
是不是也不是很复杂,2个阶段来处理。
</div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
SpringBoot如何整合RocketMQ事务、广播以及顺序消息的详细内容,希望对您有所帮助,信息来源于网络。