跳转网站正在建设中,seo优化销售话术,网站整体风格设计,徐州网站开发口碑好基于RocketMQ release-4.9.3#xff0c;深入的介绍了DefaultMQPushConsumer延迟消息源码。 文章目录1 load加载延迟消息数据1.1 parseDelayLevel解析延迟等级2 start启动调度消息服务3 DeliverDelayedMessageTimerTask投递延迟消息任务3.1 executeOnTimeup执行延迟消息投递3.2… 基于RocketMQ release-4.9.3深入的介绍了DefaultMQPushConsumer延迟消息源码。 文章目录1 load加载延迟消息数据1.1 parseDelayLevel解析延迟等级2 start启动调度消息服务3 DeliverDelayedMessageTimerTask投递延迟消息任务3.1 executeOnTimeup执行延迟消息投递3.2 scheduleNextTimerTask下一个调度任务3.3 correctDeliverTimestamp校验投递时间3.4 messageTimeup恢复正常消息3.5 syncDeliver同步投递消息4 延迟消息的总结并发消息消费失败引发消费重试时默认情况下重试16次从延迟等级level310s开始每次延迟时间递增时间到了又会发送到重试topic去消费这其中就涉及到RocketMQ的延迟消息可以说RocketMQ并发消息消费失败引发消费重试就是基于topic替换和延迟消息这两个技术实现的。
此前我们学习了RocketMQ的消费重试我们知道在判断消息为延迟消息的时候即DelayTimeLevel大于0那么替换topic为SCHEDULE_TOPIC_XXXX替换queueId为延迟队列id id level - 1如果延迟级别大于最大级别则设置为最大级别18默认延迟2h。这些参数可以在broker端配置类MessageStoreConfig中配置。最后保存真实topic到消息的REAL_TOPIC属性保存queueId到消息的REAL_QID属性方便后面恢复。
实际上普通的延迟消息也会进行topic替换那么发送到SCHEDULE_TOPIC_XXXX对应的消息队列里面的延迟消息到底是做到能够在给定的延迟时间之后取出来重新投递的呢下面我们来看看RocketMQ延迟消息的源码。
实际上RocketMQ通过ScheduleMessageService调度消息服务实现延迟定时消息。ScheduleMessageService继承了ConfigManager在DefaultMessageStore实例化的时候被实例化。
1 load加载延迟消息数据
在broker启动执行DefaultMessageStore#load方法加载Commit Log、Consume Queue、index file等文件将数据加载到内存中并完成数据的恢复的时候同样会执行ScheduleMessageService#load方法加载延迟消息数据初始化delayLevelTable和offsetTable。
首先调用父类的ConfigManager#load方法在broker启动部分就讲过源码了将延迟消息文件${user.home}/store/config/delayOffset.json加载到内存的offsetTable集合中delayOffset.json中保存着延迟topic每个队列的消费进度消费偏移量。
/*** ScheduleMessageService的方法* p* 加载延迟消息数据初始化delayLevelTable和offsetTable*/
Override
public boolean load() {//调用父类ConfigManager#load方法将延迟消息文件${user.home}/store/config/delayOffset.json加载到内存的offsetTable集合中//delayOffset.json中保存着延迟topic每个队列的消费进度消费偏移量boolean result super.load();//解析延迟级别到delayLevelTable集合中result result this.parseDelayLevel();//矫正每个延迟队列的偏移量result result this.correctDelayOffset();return result;
}
/*** ScheduleMessageService的方法* p* 获取延迟消息文件路径${user.home}/store/config/delayOffset.json*/
Override
public String configFilePath() {//${user.home}/store/config/delayOffset.jsonreturn StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
}/*** ScheduleMessageService的方法* p* json字符串转换为offsetTable对象*/
Override
public void decode(String jsonString) {if (jsonString ! null) {DelayOffsetSerializeWrapper delayOffsetSerializeWrapper DelayOffsetSerializeWrapper.fromJson(jsonString, DelayOffsetSerializeWrapper.class);if (delayOffsetSerializeWrapper ! null) {this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable());}}
}
延迟消息文件${user.home}/store/config/delayOffset.json的内容它保存着延迟队列对应的消费偏移量。
{offsetTable:{3:2,4:1}
}1.1 parseDelayLevel解析延迟等级
延迟等级字符串存储在MessageStoreConfig的messageDelayLevel属性中默认值为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h即18个等级因此也是可以配置的但是单位仅支持s、m、h、d分别表示秒、分、时、天。
该方法解析延迟等级以及对应的延迟时间到delayLevelTable中单位统一转换为毫秒注意延迟等级从1开始。
/*** ScheduleMessageService的方法* 解析延迟等级到delayLevelTable中* return*/
public boolean parseDelayLevel() {//时间单位表HashMapString, Long timeUnitTable new HashMapString, Long();timeUnitTable.put(s, 1000L);timeUnitTable.put(m, 1000L * 60);timeUnitTable.put(h, 1000L * 60 * 60);timeUnitTable.put(d, 1000L * 60 * 60 * 24);//从MessageStoreConfig中获取延迟等级字符串messageDelayLevelString levelString this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();try {//通过空格拆分String[] levelArray levelString.split( );for (int i 0; i levelArray.length; i) {//获取每个等级的延迟时间String value levelArray[i];//获取延迟单位String ch value.substring(value.length() - 1);//获取对应的延迟单位的时间毫秒Long tu timeUnitTable.get(ch);//延迟等级从1开始int level i 1;//如果当前等级已经大于最大等级则赋值为最大等级if (level this.maxDelayLevel) {this.maxDelayLevel level;}//延迟时间long num Long.parseLong(value.substring(0, value.length() - 1));//计算该等级的延迟时间毫秒long delayTimeMillis tu * num;//存入delayLevelTable中this.delayLevelTable.put(level, delayTimeMillis);if (this.enableAsyncDeliver) {this.deliverPendingTable.put(level, new LinkedBlockingQueue());}}} catch (Exception e) {log.error(parseDelayLevel exception, e);log.info(levelString String {}, levelString);return false;}return true;
}
2 start启动调度消息服务
ScheduleMessageService依靠内部的定时任务实现延迟消息ScheduleMessageService通过start方法完成启动。
在broker的启动过程中会执行DefaultMessageStore的start方法中该方法内部通过handleScheduleMessageService方法执行ScheduleMessageService的start方法。
该方法的大概逻辑为
初始化延迟消息投递线程池deliverExecutorService该线程池是一个调度任务线程池ScheduledThreadPoolExecutor核心线程数就是最大的延迟等级默认18。遍历所有的延迟等级为每一个延迟等级构建一个对应的DeliverDelayedMessageTimerTask调度任务放到deliverExecutorService中默认延迟1000ms后执行。构建一个延迟队列消费偏移量持久化的定时调度任务首次延迟1000ms之后执行后续每次执行间隔flushDelayOffsetInterval时间默认10s。
/*** ScheduleMessageService的方法* p* 启动调度消息服务*/
public void start() {//将启动标志CAS的从false改为true该服务只能启动一次if (started.compareAndSet(false, true)) {//调用父类的load方法将延迟消息文件${user.home}/store/config/delayOffset.json加载到内存的offsetTable集合中//fix(dledger): reload the delay offset when master changed (#2518)super.load();/** 1 初始化延迟消息投递线程池核心线程数就是最大的延迟等级默认18*/this.deliverExecutorService new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl(ScheduleMessageTimerThread_));//异步投递默认不支持if (this.enableAsyncDeliver) {this.handleExecutorService new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl(ScheduleMessageExecutorHandleThread_));}/** 2 对所有的延迟等级构建一个对应的DeliverDelayedMessageTimerTask调度任务默认延迟1000ms后执行*/for (Map.EntryInteger, Long entry : this.delayLevelTable.entrySet()) {//延迟等级Integer level entry.getKey();//延迟时间毫秒Long timeDelay entry.getValue();//根据延迟等级获取对应的延迟队列的消费偏移量如果没有则设置为0Long offset this.offsetTable.get(level);if (null offset) {offset 0L;}//延迟时间不为null那么为该等级的延迟队列构建一个DeliverDelayedMessageTimerTask调度任务默认延迟1000ms后执行if (timeDelay ! null) {if (this.enableAsyncDeliver) {this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);}//DeliverDelayedMessageTimerTask构造参数包括对应的延迟等级以及最新消费偏移量this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);}}/** 3 构建一个延迟队列消费偏移量持久化的定时调度任务首次延迟1000ms之后执行后续每次执行间隔flushDelayOffsetInterval时间默认10s*/this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {if (started.get()) {ScheduleMessageService.this.persist();}} catch (Throwable e) {log.error(scheduleAtFixedRate flush exception, e);}}}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);}
}3 DeliverDelayedMessageTimerTask投递延迟消息任务
在start方法中ScheduleMessageService会为每一个延迟等级创建一个DeliverDelayedMessageTimerTask投递延迟消息任务不同延迟等级的消息放到不同的延迟队列里面被不同的Task处理。
采用不同的队列处理同一个延迟等级的消息的方式不再需要进行消息排序避免了消息排序的复杂逻辑能比较简单的实现有限等级的延迟消息RocketMQ的开源版本不支持任意时间的延迟消息这也是它的一个限制吧
DeliverDelayedMessageTimerTask是一个线程任务下面来看看它的run方法主要是调用executeOnTimeup执行消息投递。
Override
public void run() {try {//如果服务已启动那么继续执行if (isStarted()) {//执行消息投递this.executeOnTimeup();}} catch (Exception e) {// XXX: warn and notify melog.error(ScheduleMessageService, executeOnTimeup exception, e);//抛出异常新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService10000ms后执行本次任务结束this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD);}
}3.1 executeOnTimeup执行延迟消息投递
延迟消息的核心逻辑实现执行延迟消息消息投递。
调用findConsumeQueue方法根据topic和延迟队列id从consumeQueueTable查找需要写入的ConsumeQueue如果没找到就新建即ConsumeQueue文件是延迟创建的。该方法的源码我们在ReputMessageService异步构建ConsumeQueue和IndexFile部分已经讲过了。如果没找到对应的消息队列调用scheduleNextTimerTask方法新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService100ms后执行本次任务结束。调用getIndexBuffer方法根据逻辑offset定位到物理偏移量然后截取该偏移量之后的一段Buffer其包含要拉取的消息的索引数据及对应consumeQueue文件之后的全部索引数据即这里截取的Buffer可能包含多条索引数据。该方法的源码我们在broker处理拉取消息请求部分已经讲过了。遍历缓存buffer中的消息根据tagsCode投递时间判断消息是否到期如果到期则回复真实消息并且投递安到真实topic以及对应的queueId中。 获取该条目对应的消息的tagsCode对于延迟消息tagsCode被替换为延迟消息的发送时间在CommitLog#checkMessageAndReturnSize方法中源码此前讲过了。如果投递时间小于当前时间那么可以投递该延迟消息。如果投递时间大于当前时间那么新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService指定新的消费偏移量100ms后执行本次任务结束。根据消息物理偏移量从commitLog中找到该条消息调用messageTimeup方法构建内部消息对象设置topic为REAL_TOPIC属性值即原始topic设置queueId为REAL_QID属性值即原始queueId。即恢复为正常消息。最后调用syncDeliver方法投递该消息消息将会被投递到原始topic和队列中这样就可以被消费了。 遍历结束更新下一个offset新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService指定新的消费偏移量100ms后执行本次任务结束。保证线程任务的活性。
/*** DeliverDelayedMessageTimerTask的方法* p* 执行延迟消息消息投递*/
public void executeOnTimeup() {/** 1 根据topic和延迟队列id从consumeQueueTable查找需要写入的ConsumeQueue如果没找到就新建即ConsumeQueue文件是延迟创建的。* 该方法的源码我们在ReputMessageService异步构建ConsumeQueue和IndexFile部分已经讲过了*/ConsumeQueue cq ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));/** 2 如果没找到对应的消息队列新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService100ms后执行本次任务结束*/if (cq null) {this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);return;}/** 3 根据逻辑offset定位到物理偏移量然后截取该偏移量之后的一段Buffer其包含要拉取的消息的索引数据及对应consumeQueue文件之后的全部索引数据。* 这里截取的Buffer可能包含多条索引数据因为需要批量拉取多条消息以及进行消息过滤。* 该方法的源码我们在broker处理拉取消息请求部分已经讲过了*/SelectMappedBufferResult bufferCQ cq.getIndexBuffer(this.offset);//没获取到缓存bufferif (bufferCQ null) {long resetOffset;//如果当前消息队列的最小偏移量 大于 当前偏移量那么当前偏移量无效设置新的offset为最小偏移量if ((resetOffset cq.getMinOffsetInQueue()) this.offset) {log.error(schedule CQ offset invalid. offset{}, cqMinOffset{}, queueId{},this.offset, resetOffset, cq.getQueueId());}//如果当前消息队列的最大偏移量 小于 当前偏移量那么当前偏移量无效设置新的offset为最大偏移量else if ((resetOffset cq.getMaxOffsetInQueue()) this.offset) {log.error(schedule CQ offset invalid. offset{}, cqMaxOffset{}, queueId{},this.offset, resetOffset, cq.getQueueId());} else {resetOffset this.offset;}//新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService指定新的消费偏移量100ms后执行本次任务结束this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);return;}/** 3 遍历缓存buffer中的消息根据tagsCode投递时间判断消息是否到期如果到期则回复真实消息并且投递安到真实topic以及对应的queueId中*///下一个消费的offsetlong nextOffset this.offset;try {//i表示consumeQueue消息索引大小int i 0;ConsumeQueueExt.CqExtUnit cqExtUnit new ConsumeQueueExt.CqExtUnit();//遍历截取的Buffer中的consumeQueue消息索引固定长度20bfor (; i bufferCQ.getSize() isStarted(); i ConsumeQueue.CQ_STORE_UNIT_SIZE) {//获取该条目对应的消息在commitlog文件中的物理偏移量long offsetPy bufferCQ.getByteBuffer().getLong();//获取该条目对应的消息在commitlog文件中的总长度int sizePy bufferCQ.getByteBuffer().getInt();//获取该条目对应的消息的tagsCode对于延迟消息tagsCode被替换为延迟消息的发送时间CommitLog#checkMessageAndReturnSize方法中long tagsCode bufferCQ.getByteBuffer().getLong();//如果tagsCode是扩展文件地址if (cq.isExtAddr(tagsCode)) {if (cq.getExt(tagsCode, cqExtUnit)) {tagsCode cqExtUnit.getTagsCode();} else {//cant find ext content.So re compute tags code.log.error([BUG] cant find consume queue extend file content!addr{}, offsetPy{}, sizePy{},tagsCode, offsetPy, sizePy);long msgStoreTime defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);tagsCode computeDeliverTimestamp(delayLevel, msgStoreTime);}}//当前时间戳long now System.currentTimeMillis();//校验投递时间必须小于等于当前时间 延迟时间long deliverTimestamp this.correctDeliverTimestamp(now, tagsCode);//计算下一个offsetnextOffset offset (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);//如果投递时间大于当前时间那么新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService指定新的消费偏移量100ms后执行本次任务结束long countdown deliverTimestamp - now;if (countdown 0) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}//根据消息物理偏移量从commitLog中找到该条消息。MessageExt msgExt ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if (msgExt null) {continue;}//构建内部消息对象设置topic为REAL_TOPIC属性值即原始topic设置queueId为REAL_QID属性值即原始queueId。即恢复为正常消息MessageExtBrokerInner msgInner ScheduleMessageService.this.messageTimeup(msgExt);if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {log.error([BUG] the real topic of schedule msg is {}, discard the msg. msg{},msgInner.getTopic(), msgInner);continue;}boolean deliverSuc;/** 消息投递*/if (ScheduleMessageService.this.enableAsyncDeliver) {//异步投递默认不支持deliverSuc this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);} else {//默认同步投递deliverSuc this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);}//如果投递失败那么新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService指定新的消费偏移量100ms后执行本次任务结束if (!deliverSuc) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}}//遍历结束更新下一个offsetnextOffset this.offset (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);} catch (Exception e) {log.error(ScheduleMessageService, messageTimeup execute error, offset {}, nextOffset, e);} finally {//释放内存bufferCQ.release();}/** 4 新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService指定新的消费偏移量100ms后执行本次任务结束** 保证线程任务的活性*/this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}3.2 scheduleNextTimerTask下一个调度任务
如果没找到对应的消息队列或者没找到缓存buffer或者没有过期的消息获取投递失败等原因将会新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService100ms后执行本次任务结束。
/*** DeliverDelayedMessageTimerTask的方法* p* 新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService100ms后执行本次任务结束** param offset 消费偏移量* param delay 延迟时间*/
public void scheduleNextTimerTask(long offset, long delay) {ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, offset), delay, TimeUnit.MILLISECONDS);
}3.3 correctDeliverTimestamp校验投递时间
校验投递时间要求投递时间最晚不大于保证投递时间小于等于当前时间 延迟时间。
/*** DeliverDelayedMessageTimerTask的方法* p* 校验投递时间*/
private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {//投递时间戳long result deliverTimestamp;//当前时间 延迟时间long maxTimestamp now ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);//保证投递时间小于等于当前时间 延迟时间if (deliverTimestamp maxTimestamp) {result now;}return result;
}3.4 messageTimeup恢复正常消息
构建一个MessageExtBrokerInner恢复为正常消息。设置topic的值为REAL_TOPIC属性值这是原始topic可能是重试topic或者真实topic。设置queueId的值为REAL_QID属性值这是原始queueId可能是重试queueId或者真实queueId。
/*** DeliverDelayedMessageTimerTask的方法* p* 还原原始消息设置topic为REAL_TOPIC属性值即原始topic设置queueId为REAL_QID属性值即原始queueId。即恢复为正常消息** param msgExt 延迟消息* return 真实消息*/
private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {//构建MessageExtBrokerInner对象MessageExtBrokerInner msgInner new MessageExtBrokerInner();msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());MessageAccessor.setProperties(msgInner, msgExt.getProperties());TopicFilterType topicFilterType MessageExt.parseTopicFilterType(msgInner.getSysFlag());//延迟消息的tagsCode为投递时间现在来计算真正的tagsCodeValuelong tagsCodeValue MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());msgInner.setTagsCode(tagsCodeValue);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));msgInner.setSysFlag(msgExt.getSysFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(msgExt.getStoreHost());msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());//不需要等待存储完成后才返回msgInner.setWaitStoreMsgOK(false);MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);//设置topic的值为REAL_TOPIC属性值这是原始topic可能是重试topic或者真实topicmsgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));//设置queueId的值为REAL_QID属性值这是原始queueId可能是重试queueId或者真实queueIdString queueIdStr msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);int queueId Integer.parseInt(queueIdStr);msgInner.setQueueId(queueId);return msgInner;
}3.5 syncDeliver同步投递消息
syncDeliver内部调用asyncPutMessage方法同步投递消息投递完毕之后更新offsetTable中的对应延迟队列的消费偏移量。
这里有个bug第三个参数应该使用当前偏移量而不是最开始的偏移量在4.9.4版本已经修复。
/*** DeliverDelayedMessageTimerTask的方法* 同步投递* 这里有个bug第三个参数应该使用当前偏移量而不是最开始的偏移量在4.9.4版本已经修复** param msgInner 内部消息对象* param msgId 消息id* param offset 当前消费偏移量* param offsetPy 消息物理偏移量* param sizePy 消息大小* return*/
private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,int sizePy) {//投递消息内部调用asyncPutMessage方法PutResultProcess resultProcess deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false);//投递结果PutMessageResult result resultProcess.get();boolean sendStatus result ! null result.getPutMessageStatus() PutMessageStatus.PUT_OK;if (sendStatus) {//如果发送成功那么更新offsetTable中的消费偏移量ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset());}return sendStatus;
}4 延迟消息的总结
投递的消息在broker处理过成功会判断如果是延迟消息即DelayTimeLevel大于0那么替换topic为SCHEDULE_TOPIC_XXXX替换queueId为延迟队列id id level - 1如果延迟级别大于最大级别则设置为最大级别18默认延迟2h。这些参数可以在broker端配置类MessageStoreConfig中配置。
最后保存真实topic到消息的REAL_TOPIC属性保存queueId到消息的REAL_QID属性方便后面恢复。
而在RocketMQ端通过ScheduleMessageService调度消息服务处理投递到SCHEDULE_TOPIC_XXXX的延迟消息。
从源码中可以看到每个延迟级别都有一个线程专门处理该级别的延迟消息这样避免了消息的排序具体的处理逻辑其被封装为一个DeliverDelayedMessageTimerTask线程任务。
不同延迟级别的处理线程将会从各自对应的延迟队列中获取延迟消息然后和当前时间比较看消息是否过期如果消息过期那么构造一个新的消息设置topic为REAL_TOPIC属性值即原始topic设置queueId为REAL_QID属性值即原始queueId即恢复为正常消息然后再次进行投递。
另外延迟消息的consumeQueue条目中的tagsCode并不是tag的hashCOde而是该条消息的到期时间。
RocketMQ在内部通过内部topic替换实现延迟消息非常巧妙而且并发消息重试也是使用了延迟消息。