- 引入consumer-spring的jar包
- 配置文件进行相关配置
- retry.config.client.datasource.type=mysql
- retry.config.client.datasource.className=io.github.zj.spring.remote.MySqlClientApi
- 对监听方法加上@RetryListener,并对其参数添加@RetryListenerParameter注解
- RetryMqListenerInitialization
- 实现BeanPostProcessor,找到有@RetryListener注解的方法,将相关信息存储到RetryMqContext中
- 重试上下文RetryMqContext两个属性
- consumerInfo:存储@RetryListener注解信息(消费组)
- retryConfMap:RetryMqConf存储关于反射相关信息(retryMqConf.getMethod().invoke)
- RetrySubscribeEventListener:监听容器启动事件
- 遍历RetryMqContext的consumerInfo,启动消费者DefaultMQPushConsumer并缓存
- 设置消息监听器,对拉取到的消息进行反射调用业务方法进行业务处理(RetryMqMessageListenerImpl)
- RetryMqMessageListenerImpl使用Cglib动态代理支持插件化(详细见IntercepterUtil#getProxyObj)
- 注册ClientApi(仿照数据库驱动基于SPI机制实现)
- 在ConsumerConfig中进行注册(触发ClientApiManager.getClientApis())
- ClientApiManager的静态块触发SPI的服务发现机制,相应服务进行初始化
- 基于mysql的实现:MySqlClientApi,在静态块中进行注册ClientApiManager.registerClient(new MySqlClientApi());
- 消费端的消费模型
- 队列负载线程:RebalanceService(基于CountDownLatch2超时机制的定时任务)
- 负载均衡推模式服务:RebalancePushImpl#doRebalance
- 负载均衡算法(1.平均散列:AllocateMessageQueueAveragely)
- 基于当前的队列数、消费者数(把队列分给消费者)
- 添加队列消息拉取任务:pullRequestQueue
- 消息拉取线程:PullMessageService(基于阻塞队列拉取)
- 控制消息拉取速度(ProcessQueue):消息数量、消息大小、消息跨度 -> 延迟拉取:延迟添加拉取任务
- 向mysql拉取消息:ClientApi#pullMessage
- 拉取到消息后的回调:PullCallback#onSuccess
- 设置下次拉取的进度:pullRequest.setNextOffset
- 将消息放入队列快照中:ProcessQueue
- 将消息提交消息消费线程池中
- 消息消费线程:ConsumeMessageConcurrentlyService$ConsumeRequest
- 触发@RetryListener,进行业务消息消费