# rabbitmq-example **Repository Path**: jonathanzyf/rabbitmq-example ## Basic Information - **Project Name**: rabbitmq-example - **Description**: No description available - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-01-05 - **Last Updated**: 2021-04-12 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 一、Spring RabbitMQ基础知识 ## 运行时线程模型 1. RabbitMQ Client 与Spring RabbitMQ中各层次主要类运行期数据流动示意图。 ![数据流示意图](images/rabbitmq-data-flow.png) 2. Spring RabbitMQ中SimpleMessageListenerContainer在Spring容器启动时主要类交互示意图。 ![Container启动时序图](images/rabbitmq-container-start.png) 3. 运行期接收Queue中数据主要类交互示意图。 ![接收数据时序图](images/rabbitmq-receive-message.png) ## 配置参数 ### TTL |类型|设置对象|目标对象|参数|范围|含义| |---|---|---|---|---|---| |TTL|Policy|Message|message-ttl|per-queue|队列中的消息存在TTL之后仍未被消费,消息会被丢弃(Discard),如果设置了死信参数,该消息会进入死信队列(Dead-Letter)| |TTL|Queue|Message|x-message-ttl|per-queue|同上| |TTL|Message|Message|expiration|per-message|同上| |TTL|Policy|Queue|expires|per-queue|当队列未被使用(没有消费者Consumer、没有重新声明Declare、没有被get)经过TTL时间之后,队列会被删除| |TTL|Queue|Queue|x-expires|per-queue|同上| > Policy ```bash set_policy [-p ] [--priority ] [--apply-to ] ``` ```bash rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues rabbitmqctl set_policy expiry ".*" '{"expires":1800000}' --apply-to queues ``` > 目标对象为Message的TTL处理规则 - 如果TTL参数同时设置x-message-ttl、expiration,过期时间以最小值为准。 - 过期的消息不会被投递到消费者处(Consumer),在投递后传输过程中过期的例外。 - 未设置TTL的消息比设置了TTL的消息投递优先级高,即优先投递未设置TTL的消息。 - 统计时,未被处理的过期的消息仍然会被统计进来。 ### 死信 DLX |类型|设置对象|目标对象|参数|范围|含义| |---|---|---|---|---|---| |DLX|Policy|Message|dead-letter-exchange|per-queue|消息触发死信处理时,消息被发布到配置的Exchange| |DLX|Queue|Message|x-dead-letter-exchange|per-queue|同上| |DLK|Queue|Message|x-dead-letter-routing-key|per-queue|如果不设置,则使用消息本身的RoutingKey| > Policy ```bash rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues ``` > 死信触发时机 1. 消息消费时:basic.reject 或 basic.nack,并且requeue为false。@see x-death.reason.rejected 2. 当设置TTL时:消息过期。注意:目标对象为Queue的TTL不触发DL。@see x-death.reason.expired 3. 当队列已满时:消息丢弃。@see x-death.reason.maxlen > 注意 - 死信处理可能会产生循环,例如死信接收的队列就是原始队列。 ```bash _______________________ V | QUEUE --> Consumer --> DLX reason: rejected; 从来没有rejected的消息(expired,maxlen)进入同一个队列时会被丢弃 ``` # 二、动态创建队列 ## 场景说明 多租户场景下(客户1、客户2使用云上的SaaS系统),云上应用程序的数据要同步到客户本地服务中,SaaS应用的同步数据会先加入到消协队列中,然后由数据同步服务同步到各个客户。 服务器部署示例图: ![部署图](images/sync-deploy.png) 运行期数据流示例图: ![数据流](images/data-flow.png) 由于客户本地服务的响应速度较低,并且要求不能并发请求,针对一个客户,同时不能有两个消费者处理其消息。有三种实现方法: 1. 方法一:为每个客户建立了一个队列,每个队列同时只有一个消费者消费。 2. 方法二:为每个客户建立了一个队列,每个队列同时有多个消费者消费。通过限流来控制调用客户服务,限流值为1。 3. 方法三:创建固定个数的队列,同步数据轮询发送到一个队列中,每个队列同时有多个消费者消费。通过限流来控制调用客户服务,限流值为1。 三种方式的优缺点如下: |方法|优点|缺点| |---|---|---| |方法一|实现简单|只有一个消费者存在**单点故障问题**;
如果需要同步的客户数多,会建立很多队列;
为客户创建队列需要额外处理,参见“为客户创建队列分析”| |方法二|多个消费者可以避免单点故障问题|如果需要同步的客户数多,会建立很多队列;
限流需要额外处理;
为客户创建队列需要额外处理,参见“为客户创建队列分析”| |[推荐]方法三|实现简单;多个消费者可以避免单点故障问题;队列数量可控|限流需要额外处理工作;
某些时候某些客户的消息可能不能及时消费| ### 为客户创建队列分析 方法一和方法二需要动态创建队列,比如当新增客户需要同步数据时。 1. 什么时候为客户创建队列?
`人工参与方式`:在应用程序启动时,为所有需要同步数据的客户创建队列;新增客户时,在配置客户请求代理时创建队列。
`自动方式`:在发布同步数据时创建队列(每个进程不重复创建队列),同时通知数据同步服务,有新的队列被创建,数据同步服务创建消息监听程序。 2. 新增客户时,如何创建队列,如何为该队列创建消息监听?
`人工参与方式`:在配置客户请求代理时创建队列,同时创建消息监听。
`自动方式`:在发布同步数据时,如果队列不存在,创建队列;同时通知数据同步服务,创建消息监听程序。 # 三、自动创建队列 本示例主要针对方法一和方法二,包含两个模块,分别模拟SaaS发送消息与数据同步服务。 |模块|角色|功能| |---|---|---| |dynamic-queue-provider|模拟SaaS|创建队列、发送消息| |dynamic-queue-consumer|模拟数据同步服务|创建消息监听、接收消息| |方法|限流器|范围| |---|---|---| |方法一|RamRateLimiter|进程级| |方法二|RedisRateLimiter|分布式| |方法三|RedisRateLimiter|分布式| > dynamic-queue-provider 模拟SaaS发布数据同步消息 ![发送数据同步消息](images/provider-publish-message.png) > dynamic-queue-consumer 模拟数据同步服务实例(可能多个实例)启动 ![发送数据同步消息](images/consumer-on-started.png) > dynamic-queue-consumer 模拟数据同步服务实例(可能多个实例)收到队列创建通知消息 ![发送数据同步消息](images/consumer-on-queue-created.png) ## 自动创建队列-测试 1. 消息发送方启动 2. 执行shell命令 ```shell script curl 'http://localhost:9001/provider/publish?tenantId=1001&message=data001' ``` 执行后产生效果: 1. `提供者`创建Exchange: data-sync-exchange(Direct类型)和data-queue-notify-exchange(Fanout类型) 2. `提供者`创建队列data-sync-queue-**1001**,绑定到data-sync-exchange上,routingKey 为data-sync-**1001** 3. `提供者`向Redis中data-sync-queues(类型为set)增加值data-sync-queue-**1001** 4. `提供者`向data-queue-notify-exchange发布创建队列的消息data-sync-queue-**1001**,由于没有消费者,消息被忽略 5. `提供者`向data-sync-exchange发布一条消息data001,routingKey 为data-sync-**1001**,队列data-sync-queue-**1001**中消息数量为1,值为data001 3. 消息接收方启动 启动后产生效果: 1. `消费者`注册消息通知队列queue-created-notify-01(根据配置),绑定到data-queue-notify-exchange,无需routingKey 2. `消费者`根据Redis中data-sync-queues的值创建队列监听,监听的队列有data-sync-queue-**1001** 3. `消费者`消费队列data-sync-queue-**1001**中的消息data001,队列中消息数量为0 4. 执行shell命令 ```shell script curl 'http://localhost:9001/provider/publish?tenantId=1001&message=data002' ``` 执行后产生效果: 1. `提供者`向data-sync-exchange发布一条消息data002,routingKey 为data-sync-**1001**,队列中消息数量为1 2. `消费者`该消息马上被消费者消费,队列中消息数量为0 5. 执行shell命令 ```shell script curl 'http://localhost:9001/provider/publish?tenantId=2001&message=data201' ``` 执行后产生效果: 1. `提供者`创建队列data-sync-queue-**2001**,绑定到data-sync-exchange上,routingKey 为data-sync-**2001** 2. `提供者`向Redis中data-sync-queues(类型为set)增加值data-sync-queue-**2001** 3. `提供者`向data-queue-notify-exchange发布创建队列的消息data-sync-queue-**2001**,消息被广播到队列queue-created-notify-01中,通知队列中消息数量为1 4. `提供者`向data-sync-exchange发布一条消息data201,routingKey 为data-sync-**2001**,队列data-sync-queue-**2001**中消息数量为1,值为data201 4. `消费者`队列queue-created-notify-01中消息data-sync-queue-**1001**被通知消息监听消费,动态创建队列监听,监听队列data-sync-queue-**2001**的消息,通知队列中消息数量为0 4. `消费者`队列data-sync-queue-**2001**的消息被消费,队列中消息数量为0 6. 状态汇总 |Exchange|Type|Queue| |---|---|---| |data-sync-exchange|Direct|data-sync-queue-**1001**, data-sync-queue-**2001**| |data-queue-notify-exchange|Fanout|queue-created-notify-01| # 四、固定队列个数与延迟队列 本示例主要针对方法三,包含三个部分,分别负责发送消息、数据同步服务、服务超时后进入延迟队列。由于队列数量固定,需要确定租户的消息路由到哪个队列中,这个任务由`RouteStrategy`负责。 ![固定队列个数与延迟队列](images/multi-fixed-latency.png) |模块|角色|功能| |---|---|---| |server|发送消息|创建队列、发送消息| |client|模拟数据同步服务|创建消息监听、接收消息,服务超时后消息进入延迟队列| |latency|延迟队列|达到TTL设定时间后,消息重新进入队列| 选择路由策略,该策略通过 **TenantId** 确定路由 **routingKey** ,默认提供了三种路由策略: |路由策略|说明| |---|---| |`MappingRouteStrategy`|通过配置确定路由| |`ModRouteStrategy`|通过租户ID取模确定路由| |`RoundRobinRouteStrategy`|通过轮询确定路由| ![路由策略](images/route-strategy.png) > server 发布消息,通过路由策略确定进入哪个队列 ![发布消息到固定队列](images/multi-fixed-publish.png) > client 接收消息,请求超时后消息进入延迟队列;延迟时间到期后消息变成死信(Dead Letter),经过RouteExchange重新进入正常队列 ![接收消息与延迟发布](images/multi-fixed-latency-consume.png)