Skip to content

Commit b63dfcd

Browse files
author
RongtongJin
committed
refactor: remove getTimerCheckpoint() from BrokerController
1 parent 89e021e commit b63dfcd

File tree

7 files changed

+15
-18
lines changed

7 files changed

+15
-18
lines changed

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2601,10 +2601,6 @@ public boolean isIsolated() {
26012601
return this.isIsolated;
26022602
}
26032603

2604-
public TimerCheckpoint getTimerCheckpoint() {
2605-
return timerCheckpoint;
2606-
}
2607-
26082604
public TopicRouteInfoManager getTopicRouteInfoManager() {
26092605
return this.topicRouteInfoManager;
26102606
}

broker/src/main/java/org/apache/rocketmq/broker/BrokerPreOnlineService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -182,12 +182,12 @@ private boolean syncMetadataReverse(String brokerAddr) {
182182
}
183183
}
184184

185-
if (null != this.brokerController.getTimerCheckpoint() && this.brokerController.getTimerCheckpoint().getDataVersion().compare(timerCheckpoint.getDataVersion()) <= 0) {
185+
if (null != this.brokerController.getTimerMessageStore().getTimerCheckpoint() && this.brokerController.getTimerMessageStore().getTimerCheckpoint().getDataVersion().compare(timerCheckpoint.getDataVersion()) <= 0) {
186186
LOGGER.info("{}'s timerCheckpoint data version is larger than master broker, {}'s timerCheckpoint will be used.", brokerAddr, brokerAddr);
187-
this.brokerController.getTimerCheckpoint().setLastReadTimeMs(timerCheckpoint.getLastReadTimeMs());
188-
this.brokerController.getTimerCheckpoint().setMasterTimerQueueOffset(timerCheckpoint.getMasterTimerQueueOffset());
189-
this.brokerController.getTimerCheckpoint().getDataVersion().assignNewOne(timerCheckpoint.getDataVersion());
190-
this.brokerController.getTimerCheckpoint().flush();
187+
this.brokerController.getTimerMessageStore().getTimerCheckpoint().setLastReadTimeMs(timerCheckpoint.getLastReadTimeMs());
188+
this.brokerController.getTimerMessageStore().getTimerCheckpoint().setMasterTimerQueueOffset(timerCheckpoint.getMasterTimerQueueOffset());
189+
this.brokerController.getTimerMessageStore().getTimerCheckpoint().getDataVersion().assignNewOne(timerCheckpoint.getDataVersion());
190+
this.brokerController.getTimerMessageStore().getTimerCheckpoint().flush();
191191
}
192192

193193
for (BrokerAttachedPlugin brokerAttachedPlugin : brokerController.getBrokerAttachedPlugins()) {

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -878,7 +878,7 @@ private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCom
878878

879879
private RemotingCommand getTimerCheckPoint(ChannelHandlerContext ctx, RemotingCommand request) {
880880
final RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, "Unknown");
881-
TimerCheckpoint timerCheckpoint = this.brokerController.getTimerCheckpoint();
881+
TimerCheckpoint timerCheckpoint = this.brokerController.getTimerMessageStore().getTimerCheckpoint();
882882
if (null == timerCheckpoint) {
883883
LOGGER.error("AdminBrokerProcessor#getTimerCheckPoint: checkpoint is null, caller={}", ctx.channel().remoteAddress());
884884
response.setCode(ResponseCode.SYSTEM_ERROR);

broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -239,10 +239,10 @@ public void syncTimerCheckPoint() {
239239
if (null != brokerController.getMessageStore().getTimerMessageStore() &&
240240
!brokerController.getTimerMessageStore().isShouldRunningDequeue()) {
241241
TimerCheckpoint checkpoint = this.brokerController.getBrokerOuterAPI().getTimerCheckPoint(masterAddrBak);
242-
if (null != this.brokerController.getTimerCheckpoint()) {
243-
this.brokerController.getTimerCheckpoint().setLastReadTimeMs(checkpoint.getLastReadTimeMs());
244-
this.brokerController.getTimerCheckpoint().setMasterTimerQueueOffset(checkpoint.getMasterTimerQueueOffset());
245-
this.brokerController.getTimerCheckpoint().getDataVersion().assignNewOne(checkpoint.getDataVersion());
242+
if (null != this.brokerController.getTimerMessageStore().getTimerCheckpoint()) {
243+
this.brokerController.getTimerMessageStore().getTimerCheckpoint().setLastReadTimeMs(checkpoint.getLastReadTimeMs());
244+
this.brokerController.getTimerMessageStore().getTimerCheckpoint().setMasterTimerQueueOffset(checkpoint.getMasterTimerQueueOffset());
245+
this.brokerController.getTimerMessageStore().getTimerCheckpoint().getDataVersion().assignNewOne(checkpoint.getDataVersion());
246246
}
247247
}
248248
} catch (Exception e) {

broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1179,13 +1179,14 @@ public void testListAcl() throws RemotingCommandException {
11791179

11801180
@Test
11811181
public void testGetTimeCheckPoint() throws RemotingCommandException {
1182-
when(this.brokerController.getTimerCheckpoint()).thenReturn(null);
1182+
when(this.brokerController.getTimerMessageStore()).thenReturn(timerMessageStore);
1183+
when(this.timerMessageStore.getTimerCheckpoint()).thenReturn(null);
11831184
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TIMER_CHECK_POINT, null);
11841185
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
11851186
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
11861187
assertThat(response.getRemark()).isEqualTo("The checkpoint is null");
11871188

1188-
when(this.brokerController.getTimerCheckpoint()).thenReturn(new TimerCheckpoint());
1189+
when(this.timerMessageStore.getTimerCheckpoint()).thenReturn(new TimerCheckpoint());
11891190
response = adminBrokerProcessor.processRequest(handlerContext, request);
11901191
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
11911192
}

broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public void init() {
112112
when(brokerController.getQueryAssignmentProcessor()).thenReturn(queryAssignmentProcessor);
113113
when(brokerController.getMessageStore()).thenReturn(messageStore);
114114
when(brokerController.getTimerMessageStore()).thenReturn(timerMessageStore);
115-
when(brokerController.getTimerCheckpoint()).thenReturn(timerCheckpoint);
115+
when(timerMessageStore.getTimerCheckpoint()).thenReturn(timerCheckpoint);
116116
when(topicConfigManager.getDataVersion()).thenReturn(new DataVersion());
117117
when(topicConfigManager.getTopicConfigTable()).thenReturn(new ConcurrentHashMap<>());
118118
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);

test/src/test/java/org/apache/rocketmq/test/container/GetMetadataReverseIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ public void testGetMetadataReverse_timerCheckPoint() throws Exception {
282282

283283
awaitUntilSlaveOK();
284284

285-
await().atMost(Duration.ofMinutes(1)).until(() -> master1With3Replicas.getTimerCheckpoint().getMasterTimerQueueOffset() >= MESSAGE_COUNT);
285+
await().atMost(Duration.ofMinutes(1)).until(() -> master1With3Replicas.getTimerMessageStore().getTimerCheckpoint().getMasterTimerQueueOffset() >= MESSAGE_COUNT);
286286

287287
pushConsumer.shutdown();
288288
}

0 commit comments

Comments
 (0)