# RocketMQ 发送消息模式

# 发送同步消息

发送同步消息是指producer向 broker 发送消息,执行 API 时同步等待,直到broker 服务器返回发送结果相对异步发送消息,同步会阻塞线程,性能相对差点,但是可靠性高,这种方式得到广泛使用。

比如:短信通知,邮件通知,站内重要信息通知等,RocketMQTemplate给我们提供了syncSend方法(有多个重载),来实现发送同步消息;

实际场景中需要对发送结果进行判断,例如发送邮件,需要将发送失败记录到日志中,发送失败三次则进行告警,同时过滤当前继续发送后面的消息;

实例:

/**
* 发送同步消息
*/
public void sendsyncMessage() {
   for (int i=0;i<10;i++){
       SendResult sendResult = rocketMQTemplate.syncSend("java1234-rocketmgq","rocketmq同步消息!"+i);
       System.out .printIn(sendResult);
   }
}

执行完发送同步消息返回执行结果sendResult:

./images/7.png

# 发送异步消息

发送异步消息是Producer broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 AP后立即返回,producer发送消息线程不阻塞,消息发送成功或失败的回调任务在一个新的线程中执行。

相对发送同步消息,异步消息性能更高,可靠性略差。适合对响应时间要求高的业务场景。

RocketMQTemplate给我们提供了asyncSend方法(有多个重载),来实现发送异步消息;下面给一个实例:

/**
 * 发送异步消息
 */
pubic void sendAsyncMessage() {
    for (int i=0;i<10;i++) {
    rocketMQTemplate.asyncSend("java1234-rocketmg", "rocketmg异步消息!"+i, new SendCallback() {
        @Override
        public void onSuccess(sendResult sendResult){ 
            System.out.printIn("发送成功!);
        }
        @Override
        public void onException(Throwable throwable) {
            System.out.print1n("发送失败!);
        }
    }
}

执行完发送异步消息返回执行结果:

./images/8.png

# 发送单向消息

发送单向消息是指producer向 broker 发送消息,执行 API 时直接返回,不等待broker 服务器的结果这种方式主要用在不特别关心发送结果的场景。

举例: 日志发送,RocketMoTemplate给我们提供了sendOneWav方法(有多个重载),来实现发送单向消息下面给一个实例:

/**
 * 发送单向消息
 */
public void sendonewayMessage() {
    for (int i=0;i<10;i++){
        rocketMQTemplate.sendOneway("java1234-rocketmg", "rocketmg单向消息!"+i);
    }
}

执行完发送单向消息返回执行结果:

./images/9.png

# 消费者广播模式和负载均衡模式

./images/10.png

如上图,假如我们有多个消费者,消息生产者发送的消息,是每一个消费者都消费一次呢? 还是通过一些机制,比如轮询机制,每个消息只被某一个消费者消费一次呢?

这里涉及到消费者的消费模式,一种是广播模式,还有一种是负载均衡模式;

# 广播模式是每个消费者,都会消费消息;

# 负载均衡模式是每一个消费只会被某一个消费者消费一次:**

业务上一般用的是负载均衡模式,当然一些特殊场景需要用到广播模式,比如发送一个信息到邮箱,手机,站内提示;

可以通过@RocketMOMessageListener的messageModel属性值来设置,MesageModel.BOADCASTING是广播式,MessageModel.(CLUSTERING是默)集群负载均衡模式;

集群负载均衡测试,加上messageModel=MessageModel.CLUSTERING;

@Component
@RocketMQMessageListener(topic = "topic-java1234", consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.CLUSTERING)
public class ConsumerService implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt mes) {
        System.out.println("收到消息:" + mes);
    }
}

# 顺序消息

rocketmg默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是顺序消费消息的,如图:

./images/11.png

有时候,越们需要实现顺序消费一批消息,比如电商系统,订单创建,支付,完成等操作,需要顺序执行RocketMQTemplate 给我们提供了SendOrderly方法(有多个重载),来实现发送顺序消息,包括以下:

  • # syncSendOrderly,发送同步顺序消息;

  • # asyncSendOrderly,发送异步顺序消息;

  • # sendOneWayOrderly,发送单向顺序消息;

# 一般我们用第一种发送同步顺序消息;

/**
 * 发送异步消息 在SendCallback中可处理相关成功失败时的逻辑
 * @param alarmPushTopic 主题
 * @param msgBody 消息体
 * @param hasKey 通道编号
 */
public void asyncSendOrderly(String alarmPushTopic, String msgBody, String hasKey) {
    rocketMqTemplate.asyncSendOrderly(alarmPushTopic, MessageBuilder.withPayload(msgBody).build(), hasKey, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            // 处理消息发送成功逻辑
            log.info("RocketMqProducer---->"+sendResult.getMessageQueue().toString());
        }
        @Override
        public void onException(Throwable e) {
            // 处理消息发送异常逻辑
            log.info("RocketMqProducer--sendAsyncMsg-->e"+e.toString());
        }
    });
}

# 事务消息

事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务;

RocketMO在其消息定义的基础上,对事务消息扩展了两个相关的概念:

  • # Half(Prepare)Message--半消息(预处理消息)

    半消息是一种特殊的消息类型,该状本的消息暂时不能(onsumer消费,当-条事务消息被成功投递到Broker上,但是Broker并没有接收到PrOdUCEr发出的次确认时,该事务消息就处于"暂时不可被消费"状态,该状态的事务消息被称为半消息

  • # Message Status Check--消息状态回

    由于网络抖动、Producer重启等原因,可导致Producer向Broker发送的二次确认消息没有成功送达,如果Broker检测到某条事务消息长时间处于半消息状态则会主动向Producer诺发起回查操作,查询该事务消息在Producer端的事务状态(ommit 或 Rolback);

可以看出,Message Status Check主要用来解决分布式事务中的超时问题,执行流程:

./images/12.png

  • # 1、应用模块遇到要发送事务消息的场景时,先发送prepare消息给MQ;

  • # 2、prepare消息发送成功后,应用模块执行数据库事务 (本地事务);

  • # 3、根据数据库事务执行的结果,再返回Commit或Rollback给MQ;

  • # 4、如果是Commit,MQ把消息下发给Consumer端,如果是Rollback,直接删掉prepare消息;

  • # 5、第3步的执行结果如果没响应,或是超时的,启动定时任务回查事务状态(最多重试15次,超过了默认丢弃此消息),处理结果同第4步;

  • # 6、MQ消费的成功机制由MQ自己保证;

具体实例:

通过rocketMQTemplate的sendMessageInTransaction 方法发送事务消息

/**
 * 发送事务消息
 */
pub]ic void sendTransactionMessage() {
  // 构造消息
  Message msg = MessageBuilder.withpayload("rocketmq事务消息-01").build();
  rocketMQTemplate.sendMessageInTransaction("java1234-transaction-rocketmg", msg, null);
} 

定义本地事务处理类,实现 RocketMQLocalTransactionlistener 接口,以及加上@RocketMQTransactinlistener注解,这个类似方法的调用是异步的;

executelocalTransaction方法,当我们处理完业务后,可以根业务处理情况,返回事务执行状态,有bl1back、commit、unknown三种,分别是回淡事务,提交事务和末知,根据事务消息执行流程;

如果是返回 bollback,则直接丢弃消息;

如果是返回 commit,则消费消息;

如果是返回 unknown,则继续等待,然后调checkLocalTransaction方法,最多重试15次,超过了默认丢弃此消息;

checkLocalTransation方法,是当MQ Server未得到MQ发送方应答,或者超时的情况,或者应答是unknown的情况,调用此方法进行检查确认,返回值和面的方法一样;

@RocketMOTransactionlistener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
  @Override
  public RocketMQLocalTransactionstate executeLocalTransaction(Message msg, object arg) {
     // ... local transaction process, return bollback, commit or unknown
     System.out.println("executeLocalTransaction");
     return RocketMOLocalTransactionstate.UNKNOWN;
  }
  @Override
  public RocketMQLocalTransactionstate checkLocalTransaction(Message msg) {
     // ... check transaction status and return bollback, commit or unknown
     System. out .print1n("checkLocalTransaction") ;
     return RocketMOLocalTransactionstate.COMMIT;
  }
}

# 消息过滤

在消费端进行消息消费的时候,我们根据业务需求,可以对消息进行过流,处理需要的消息;

尤其是广播模式下,消息过滤经常使用;

RocketIMQ提供了TAG和SQL表达式两种消息过滤方式;

  • # 根据 TAG方式过游消息

消息发送端只能设置一个tag,消息接收端可以设置多个tag;

接收消息端通过|设置多个tag,如下: tag1、tag2、tag3...

上实例,生产端发送三个消息,TAG分别是TAG1TAG2,TAG3

/**
 *发送带Tag消息,测试根据Tag过滤消息
 */
public void sendMessagewithTag(){
    // 构造消息1
    Message msg1 = MessageBuilder.withpayload("rocketmq过滤消息测试-TAG01").build();
    // 构造消息2
    Message msg2 = MessageBuilder.withpayload("rocketmq过流消息测试-TAG2").build();
    // 构造消息3
    Message msg3 = MessageBuilder.withPayload("rocketmq过演消息测试-TAG03").build();


    rocketMQTemplate.convertAndSend("java1234-filter-rocketmg"+ ":"+ "TAGl", msg1);
    rocketMQTemplate.convertAndsend("java1234-filter-rocketmq"+ ":"+ "TAG2", msg2);
    rocketMQTemplate.convertAndsend("java1234-filter-rocketmq"+ ":"+ "TAG3", msg3);
}

消费端,通过selectorExpression = "TAG1 TAG2",selectorType = SelectorType.TAG,指定需要消费的TAG

@Component
@RocketMOMessagelistener (topic = "java234-filter-rocketmq", consumerGroup ="rocketmq.consumer.group)", selectorExpression = "TAG1 || TAG2", selectorType = SelctorType.TAG)
public class ConsumerService implements RocketMQListener<String> {
    @Override
    public void onMessage(string s) {
        System.out.print1n("消费者: 收到消息内容:"+s);
    }
}