spring boot RocketMQ 集成
RocketMQ学习
1.基本概念
RocketMQ是阿里巴巴团队使用java语言开发的一款分布式消息中间件,是一款低延迟,高可用,拥有海量消息堆积能力和灵活拓展性的消息队列。
rocketmq的官网:http://rocketmq.apache.org
gitee仓库:https://gitee.com/apache/rocketmq?_from=gitee_search
Apache RocketMQ开发者指南:https://www.itmuch.com/books/rocketmq/
RocketMQ主要由 Producer、Broker、Consumer 三部分组成。
1.1、Producer
-
消息生产者,负责生产消息,一般由业务系统负责生产消息。
-
发送消息到Broker服务器。
-
发送方式:
- 同步发送、
- 异步发送、
- 顺序发送、
- 单向发送。
-
同步和异步方式均需要Broker返回确认信息,单向发送不需要。
1.2、Consumer
- 消息消费者,负责消费消息,一般是后台系统负责异步消费。
- 从Broker服务器拉取消息。
- 消费形式:
- 拉取式消费、
- 推动式消费。
1.3、Broker
- Broker 在实际部署过程中对应一台服务器,
- 一个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。
- Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。
Topic: 消息的集合,一个主题有多条消息,每条消息只能属于某一个主题。
Broker Server:代理服务器,负责存储消息、转发消息。
Name Server:名字服务,生产者或消费者通过名字服务查找主题相应的Broker IP列表。
Message:消息,生产和消费数据的最小单位,每条消息必须属于一个主题。
Tag:标签,用于同一主题下区分不同类型的消息,可以根据不同业务目的在同一主题下设置不同标签。
2、架构设计
1 技术架构
RocketMQ架构上主要分为四部分:
- Producer:支持分布式集群方式部署。
- Consumer:支持分布式集群方式部署。
- NameServer:
- 是一个非常简单的Topic路由注册中心,支持Broker的动态注册与发现。
- Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
- 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
- BrokerServer:
- 主要负责消息的存储、投递和查询以及服务高可用保证。
2 工作流程
- 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
3、项目使用
官方例子:https://gitee.com/apache/rocketmq/blob/develop/docs/cn/RocketMQ_Example.md
3.1、Rocketmq下载
rocketmq的官网
1、下载zip 文件解压缩到本地磁盘中。例:D:ocketmq
2、启动服务,进入bin,先执行,mqnamesrv.cmd
,再启动broker,通过命令行来指定端口。
mqbroker.cmd -n localhost:9876
3.2、监控平台下载
https://github.com/apache/rocketmq-dashboard
3.3、集成springboot项目
1、添加依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
2、添加配置信息
rocketmq:
name-server: http://127.0.0.1:9876 #rocketmq服务地址
producer:
group: base_group_syncMsg
send-message-timeout: 5000
retry-times-when-send-failed: 2
max-message-size: 4194304
4、测试例子
生产者
@RestController
@RequestMapping("/mq")
public class ProducerController {
@Autowired
private RocketMQTemplate mqTemplate;
@RequestMapping("/send")
public String testSend(String msg) {
try {
mqTemplate.convertAndSend("TopicTest", msg);
return "success";
} catch (Exception e) {
e.printStackTrace();
return "fail";
}
}
}
消费者
@Component
@RocketMQMessageListener(topic = "TopicTest", consumerGroup = "consumerGroupTest")
@Slf4j
public class ConsumeController implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.init("------- Consumer: {}", message);
}
}
4、幂等性
RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。
4.1、什么是幂等性
对一个事情进行操作,这个操作可能会执行成百上千次,但是操作结果都是相同的,这就是幂等性。
4.2、常用解决方案
业界主流的幂等性有两种操作:
- 唯一 ID + 指纹码 机制,利用数据库主键去重
- 缺点,每次都要查询数据库
- 利用redis的原子性去实现
- 缺点:可能业务执行失败,但redis标识成功。