# RocketMQ 发送消息模式
# 发送同步消息
发送同步消息是指producer向 broker 发送消息,执行 API 时同步等待,直到broker 服务器返回发送结果相对异步发送消息,同步会阻塞线程,性能相对差点,但是可靠性高,这种方式得到广泛使用。
比如:短信通知,邮件通知,站内重要信息通知等,RocketMQTemplate给我们提供了syncSend方法(有多个重载),来实现发送同步消息;
实际场景中需要对发送结果进行判断,例如发送邮件,需要将发送失败记录到日志中,发送失败三次则进行告警,同时过滤当前继续发送后面的消息;
实例:
/** * 发送同步消息 */ public void sendsyncMessage() { for (int i=0;i<10;i++){ SendResult sendResult = rocketMQTemplate.syncSend("java1234-rocketmgq","rocketmq同步消息!"+i); System.out .printIn(sendResult); } }执行完发送同步消息返回执行结果sendResult:
# 发送异步消息
发送异步消息是Producer broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 AP后立即返回,producer发送消息线程不阻塞,消息发送成功或失败的回调任务在一个新的线程中执行。
相对发送同步消息,异步消息性能更高,可靠性略差。适合对响应时间要求高的业务场景。
RocketMQTemplate给我们提供了asyncSend方法(有多个重载),来实现发送异步消息;下面给一个实例:
/** * 发送异步消息 */ pubic void sendAsyncMessage() { for (int i=0;i<10;i++) { rocketMQTemplate.asyncSend("java1234-rocketmg", "rocketmg异步消息!"+i, new SendCallback() { @Override public void onSuccess(sendResult sendResult){ System.out.printIn("发送成功!“); } @Override public void onException(Throwable throwable) { System.out.print1n("发送失败!); } } }执行完发送异步消息返回执行结果:
# 发送单向消息
发送单向消息是指producer向 broker 发送消息,执行 API 时直接返回,不等待broker 服务器的结果这种方式主要用在不特别关心发送结果的场景。
举例: 日志发送,RocketMoTemplate给我们提供了sendOneWav方法(有多个重载),来实现发送单向消息下面给一个实例:
/** * 发送单向消息 */ public void sendonewayMessage() { for (int i=0;i<10;i++){ rocketMQTemplate.sendOneway("java1234-rocketmg", "rocketmg单向消息!"+i); } }执行完发送单向消息返回执行结果:
# 消费者广播模式和负载均衡模式
如上图,假如我们有多个消费者,消息生产者发送的消息,是每一个消费者都消费一次呢? 还是通过一些机制,比如轮询机制,每个消息只被某一个消费者消费一次呢?
这里涉及到消费者的消费模式,一种是广播模式,还有一种是负载均衡模式;
# 广播模式是每个消费者,都会消费消息;
# 负载均衡模式是每一个消费只会被某一个消费者消费一次:**
业务上一般用的是负载均衡模式,当然一些特殊场景需要用到广播模式,比如发送一个信息到邮箱,手机,站内提示;
可以通过@RocketMOMessageListener的messageModel属性值来设置,MesageModel.BOADCASTING是广播式,MessageModel.(CLUSTERING是默)集群负载均衡模式;
集群负载均衡测试,加上messageModel=MessageModel.CLUSTERING;
@Component @RocketMQMessageListener(topic = "topic-java1234", consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.CLUSTERING) public class ConsumerService implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt mes) { System.out.println("收到消息:" + mes); } }
# 顺序消息
rocketmg默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是顺序消费消息的,如图:
有时候,越们需要实现顺序消费一批消息,比如电商系统,订单创建,支付,完成等操作,需要顺序执行RocketMQTemplate 给我们提供了SendOrderly方法(有多个重载),来实现发送顺序消息,包括以下:
# 一般我们用第一种发送同步顺序消息;
/** * 发送异步消息 在SendCallback中可处理相关成功失败时的逻辑 * @param alarmPushTopic 主题 * @param msgBody 消息体 * @param hasKey 通道编号 */ public void asyncSendOrderly(String alarmPushTopic, String msgBody, String hasKey) { rocketMqTemplate.asyncSendOrderly(alarmPushTopic, MessageBuilder.withPayload(msgBody).build(), hasKey, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 处理消息发送成功逻辑 log.info("RocketMqProducer---->"+sendResult.getMessageQueue().toString()); } @Override public void onException(Throwable e) { // 处理消息发送异常逻辑 log.info("RocketMqProducer--sendAsyncMsg-->e"+e.toString()); } }); }
# 事务消息
事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务;
RocketMO在其消息定义的基础上,对事务消息扩展了两个相关的概念:
# Half(Prepare)Message--半消息(预处理消息)
半消息是一种特殊的消息类型,该状本的消息暂时不能(onsumer消费,当-条事务消息被成功投递到Broker上,但是Broker并没有接收到PrOdUCEr发出的次确认时,该事务消息就处于"暂时不可被消费"状态,该状态的事务消息被称为半消息
# Message Status Check--消息状态回
由于网络抖动、Producer重启等原因,可导致Producer向Broker发送的二次确认消息没有成功送达,如果Broker检测到某条事务消息长时间处于半消息状态则会主动向Producer诺发起回查操作,查询该事务消息在Producer端的事务状态(ommit 或 Rolback);
可以看出,Message Status Check主要用来解决分布式事务中的超时问题,执行流程:
# 1、应用模块遇到要发送事务消息的场景时,先发送prepare消息给MQ;
# 2、prepare消息发送成功后,应用模块执行数据库事务 (本地事务);
# 3、根据数据库事务执行的结果,再返回Commit或Rollback给MQ;
# 4、如果是Commit,MQ把消息下发给Consumer端,如果是Rollback,直接删掉prepare消息;
# 5、第3步的执行结果如果没响应,或是超时的,启动定时任务回查事务状态(最多重试15次,超过了默认丢弃此消息),处理结果同第4步;
# 6、MQ消费的成功机制由MQ自己保证;
具体实例:
通过rocketMQTemplate的sendMessageInTransaction 方法发送事务消息
/** * 发送事务消息 */ pub]ic void sendTransactionMessage() { // 构造消息 Message msg = MessageBuilder.withpayload("rocketmq事务消息-01").build(); rocketMQTemplate.sendMessageInTransaction("java1234-transaction-rocketmg", msg, null); }定义本地事务处理类,实现 RocketMQLocalTransactionlistener 接口,以及加上@RocketMQTransactinlistener注解,这个类似方法的调用是异步的;
executelocalTransaction方法,当我们处理完业务后,可以根业务处理情况,返回事务执行状态,有bl1back、commit、unknown三种,分别是回淡事务,提交事务和末知,根据事务消息执行流程;
如果是返回 bollback,则直接丢弃消息;
如果是返回 commit,则消费消息;
如果是返回 unknown,则继续等待,然后调checkLocalTransaction方法,最多重试15次,超过了默认丢弃此消息;
checkLocalTransation方法,是当MQ Server未得到MQ发送方应答,或者超时的情况,或者应答是unknown的情况,调用此方法进行检查确认,返回值和面的方法一样;
@RocketMOTransactionlistener class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionstate executeLocalTransaction(Message msg, object arg) { // ... local transaction process, return bollback, commit or unknown System.out.println("executeLocalTransaction"); return RocketMOLocalTransactionstate.UNKNOWN; } @Override public RocketMQLocalTransactionstate checkLocalTransaction(Message msg) { // ... check transaction status and return bollback, commit or unknown System. out .print1n("checkLocalTransaction") ; return RocketMOLocalTransactionstate.COMMIT; } }
# 消息过滤
在消费端进行消息消费的时候,我们根据业务需求,可以对消息进行过流,处理需要的消息;
尤其是广播模式下,消息过滤经常使用;
RocketIMQ提供了TAG和SQL表达式两种消息过滤方式;
# 根据 TAG方式过游消息
消息发送端只能设置一个tag,消息接收端可以设置多个tag;
接收消息端通过|设置多个tag,如下: tag1、tag2、tag3...
上实例,生产端发送三个消息,TAG分别是TAG1TAG2,TAG3
/** *发送带Tag消息,测试根据Tag过滤消息 */ public void sendMessagewithTag(){ // 构造消息1 Message msg1 = MessageBuilder.withpayload("rocketmq过滤消息测试-TAG01").build(); // 构造消息2 Message msg2 = MessageBuilder.withpayload("rocketmq过流消息测试-TAG2").build(); // 构造消息3 Message msg3 = MessageBuilder.withPayload("rocketmq过演消息测试-TAG03").build(); rocketMQTemplate.convertAndSend("java1234-filter-rocketmg"+ ":"+ "TAGl", msg1); rocketMQTemplate.convertAndsend("java1234-filter-rocketmq"+ ":"+ "TAG2", msg2); rocketMQTemplate.convertAndsend("java1234-filter-rocketmq"+ ":"+ "TAG3", msg3); }消费端,通过selectorExpression = "TAG1 TAG2",selectorType = SelectorType.TAG,指定需要消费的TAG
@Component @RocketMOMessagelistener (topic = "java234-filter-rocketmq", consumerGroup ="rocketmq.consumer.group)", selectorExpression = "TAG1 || TAG2", selectorType = SelctorType.TAG) public class ConsumerService implements RocketMQListener<String> { @Override public void onMessage(string s) { System.out.print1n("消费者: 收到消息内容:"+s); } }





