# Srpring Boot 整合 RocketMQ 消息发送与接收
# 第一步:引入 pom.xml配置
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.3.5.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
</dependencies>
# 第二步:添加 yaml配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: producer-group-1
consumer:
group: consumer-group-1
# 第三步:消息推送类
@Slf4j
@Component("producerService")
public class ProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送异步消息 在SendCallback中可处理相关成功失败时的逻辑
* @param alarmPushTopic 主题
* @param msgBody 消息体
* @param hashKey 执行队列号,保证消息有序消费
*/
public void asyncSendOrderly(String alarmPushTopic, String msgBody, String hashKey) {
rocketMQTemplate.asyncSendOrderly(alarmPushTopic, MessageBuilder.withPayload(msgBody).build(), "0", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 处理消息发送成功逻辑
log.info("RocketMqProducer---->"+sendResult.getMessageQueue().toString());
}
@Override
public void onException(Throwable e) {
// 处理消息发送异常逻辑
log.info("RocketMqProducer--sendAsyncMsg-->e"+e.toString());
}
});
}
}
# 第三步:消息消费类
@Component
@RocketMQMessageListener(topic = "topic-java1234", consumerGroup = "${rocketmq.consumer.group}")
public class ConsumerService implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt mes) {
System.out.println("收到消息:" + mes);
}
}