diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 76882cc71c1..5a2407f5f75 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -2601,10 +2601,6 @@ public boolean isIsolated() { return this.isIsolated; } - public TimerCheckpoint getTimerCheckpoint() { - return timerCheckpoint; - } - public TopicRouteInfoManager getTopicRouteInfoManager() { return this.topicRouteInfoManager; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPreOnlineService.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPreOnlineService.java index de2ccb29399..1629838cc2e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPreOnlineService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPreOnlineService.java @@ -182,12 +182,12 @@ private boolean syncMetadataReverse(String brokerAddr) { } } - if (null != this.brokerController.getTimerCheckpoint() && this.brokerController.getTimerCheckpoint().getDataVersion().compare(timerCheckpoint.getDataVersion()) <= 0) { + if (null != this.brokerController.getTimerMessageStore().getTimerCheckpoint() && this.brokerController.getTimerMessageStore().getTimerCheckpoint().getDataVersion().compare(timerCheckpoint.getDataVersion()) <= 0) { LOGGER.info("{}'s timerCheckpoint data version is larger than master broker, {}'s timerCheckpoint will be used.", brokerAddr, brokerAddr); - this.brokerController.getTimerCheckpoint().setLastReadTimeMs(timerCheckpoint.getLastReadTimeMs()); - this.brokerController.getTimerCheckpoint().setMasterTimerQueueOffset(timerCheckpoint.getMasterTimerQueueOffset()); - this.brokerController.getTimerCheckpoint().getDataVersion().assignNewOne(timerCheckpoint.getDataVersion()); - this.brokerController.getTimerCheckpoint().flush(); + this.brokerController.getTimerMessageStore().getTimerCheckpoint().setLastReadTimeMs(timerCheckpoint.getLastReadTimeMs()); + this.brokerController.getTimerMessageStore().getTimerCheckpoint().setMasterTimerQueueOffset(timerCheckpoint.getMasterTimerQueueOffset()); + this.brokerController.getTimerMessageStore().getTimerCheckpoint().getDataVersion().assignNewOne(timerCheckpoint.getDataVersion()); + this.brokerController.getTimerMessageStore().getTimerCheckpoint().flush(); } for (BrokerAttachedPlugin brokerAttachedPlugin : brokerController.getBrokerAttachedPlugins()) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index e7333ab91ad..effda2663a8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -878,7 +878,7 @@ private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCom private RemotingCommand getTimerCheckPoint(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, "Unknown"); - TimerCheckpoint timerCheckpoint = this.brokerController.getTimerCheckpoint(); + TimerCheckpoint timerCheckpoint = this.brokerController.getTimerMessageStore().getTimerCheckpoint(); if (null == timerCheckpoint) { LOGGER.error("AdminBrokerProcessor#getTimerCheckPoint: checkpoint is null, caller={}", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index 2e3134016c7..403f8a480b3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -239,10 +239,10 @@ public void syncTimerCheckPoint() { if (null != brokerController.getMessageStore().getTimerMessageStore() && !brokerController.getTimerMessageStore().isShouldRunningDequeue()) { TimerCheckpoint checkpoint = this.brokerController.getBrokerOuterAPI().getTimerCheckPoint(masterAddrBak); - if (null != this.brokerController.getTimerCheckpoint()) { - this.brokerController.getTimerCheckpoint().setLastReadTimeMs(checkpoint.getLastReadTimeMs()); - this.brokerController.getTimerCheckpoint().setMasterTimerQueueOffset(checkpoint.getMasterTimerQueueOffset()); - this.brokerController.getTimerCheckpoint().getDataVersion().assignNewOne(checkpoint.getDataVersion()); + if (null != this.brokerController.getTimerMessageStore().getTimerCheckpoint()) { + this.brokerController.getTimerMessageStore().getTimerCheckpoint().setLastReadTimeMs(checkpoint.getLastReadTimeMs()); + this.brokerController.getTimerMessageStore().getTimerCheckpoint().setMasterTimerQueueOffset(checkpoint.getMasterTimerQueueOffset()); + this.brokerController.getTimerMessageStore().getTimerCheckpoint().getDataVersion().assignNewOne(checkpoint.getDataVersion()); } } } catch (Exception e) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index 572be63e3f6..4c521cfe4a8 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -1179,13 +1179,14 @@ public void testListAcl() throws RemotingCommandException { @Test public void testGetTimeCheckPoint() throws RemotingCommandException { - when(this.brokerController.getTimerCheckpoint()).thenReturn(null); + when(this.brokerController.getTimerMessageStore()).thenReturn(timerMessageStore); + when(this.timerMessageStore.getTimerCheckpoint()).thenReturn(null); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TIMER_CHECK_POINT, null); RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); assertThat(response.getRemark()).isEqualTo("The checkpoint is null"); - when(this.brokerController.getTimerCheckpoint()).thenReturn(new TimerCheckpoint()); + when(this.timerMessageStore.getTimerCheckpoint()).thenReturn(new TimerCheckpoint()); response = adminBrokerProcessor.processRequest(handlerContext, request); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java index c9461c42240..85da3824ccf 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java @@ -112,7 +112,7 @@ public void init() { when(brokerController.getQueryAssignmentProcessor()).thenReturn(queryAssignmentProcessor); when(brokerController.getMessageStore()).thenReturn(messageStore); when(brokerController.getTimerMessageStore()).thenReturn(timerMessageStore); - when(brokerController.getTimerCheckpoint()).thenReturn(timerCheckpoint); + when(timerMessageStore.getTimerCheckpoint()).thenReturn(timerCheckpoint); when(topicConfigManager.getDataVersion()).thenReturn(new DataVersion()); when(topicConfigManager.getTopicConfigTable()).thenReturn(new ConcurrentHashMap<>()); when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); diff --git a/test/src/test/java/org/apache/rocketmq/test/container/GetMetadataReverseIT.java b/test/src/test/java/org/apache/rocketmq/test/container/GetMetadataReverseIT.java index b9bb7b2e1e1..4ebf9a85acf 100644 --- a/test/src/test/java/org/apache/rocketmq/test/container/GetMetadataReverseIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/container/GetMetadataReverseIT.java @@ -282,7 +282,7 @@ public void testGetMetadataReverse_timerCheckPoint() throws Exception { awaitUntilSlaveOK(); - await().atMost(Duration.ofMinutes(1)).until(() -> master1With3Replicas.getTimerCheckpoint().getMasterTimerQueueOffset() >= MESSAGE_COUNT); + await().atMost(Duration.ofMinutes(1)).until(() -> master1With3Replicas.getTimerMessageStore().getTimerCheckpoint().getMasterTimerQueueOffset() >= MESSAGE_COUNT); pushConsumer.shutdown(); }