Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2601,10 +2601,6 @@ public boolean isIsolated() {
return this.isIsolated;
}

public TimerCheckpoint getTimerCheckpoint() {
return timerCheckpoint;
}

public TopicRouteInfoManager getTopicRouteInfoManager() {
return this.topicRouteInfoManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,12 @@ private boolean syncMetadataReverse(String brokerAddr) {
}
}

if (null != this.brokerController.getTimerCheckpoint() && this.brokerController.getTimerCheckpoint().getDataVersion().compare(timerCheckpoint.getDataVersion()) <= 0) {
if (null != this.brokerController.getTimerMessageStore().getTimerCheckpoint() && this.brokerController.getTimerMessageStore().getTimerCheckpoint().getDataVersion().compare(timerCheckpoint.getDataVersion()) <= 0) {
LOGGER.info("{}'s timerCheckpoint data version is larger than master broker, {}'s timerCheckpoint will be used.", brokerAddr, brokerAddr);
this.brokerController.getTimerCheckpoint().setLastReadTimeMs(timerCheckpoint.getLastReadTimeMs());
this.brokerController.getTimerCheckpoint().setMasterTimerQueueOffset(timerCheckpoint.getMasterTimerQueueOffset());
this.brokerController.getTimerCheckpoint().getDataVersion().assignNewOne(timerCheckpoint.getDataVersion());
this.brokerController.getTimerCheckpoint().flush();
this.brokerController.getTimerMessageStore().getTimerCheckpoint().setLastReadTimeMs(timerCheckpoint.getLastReadTimeMs());
this.brokerController.getTimerMessageStore().getTimerCheckpoint().setMasterTimerQueueOffset(timerCheckpoint.getMasterTimerQueueOffset());
this.brokerController.getTimerMessageStore().getTimerCheckpoint().getDataVersion().assignNewOne(timerCheckpoint.getDataVersion());
this.brokerController.getTimerMessageStore().getTimerCheckpoint().flush();
}

for (BrokerAttachedPlugin brokerAttachedPlugin : brokerController.getBrokerAttachedPlugins()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCom

private RemotingCommand getTimerCheckPoint(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, "Unknown");
TimerCheckpoint timerCheckpoint = this.brokerController.getTimerCheckpoint();
TimerCheckpoint timerCheckpoint = this.brokerController.getTimerMessageStore().getTimerCheckpoint();
if (null == timerCheckpoint) {
LOGGER.error("AdminBrokerProcessor#getTimerCheckPoint: checkpoint is null, caller={}", ctx.channel().remoteAddress());
response.setCode(ResponseCode.SYSTEM_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,10 @@ public void syncTimerCheckPoint() {
if (null != brokerController.getMessageStore().getTimerMessageStore() &&
!brokerController.getTimerMessageStore().isShouldRunningDequeue()) {
TimerCheckpoint checkpoint = this.brokerController.getBrokerOuterAPI().getTimerCheckPoint(masterAddrBak);
if (null != this.brokerController.getTimerCheckpoint()) {
this.brokerController.getTimerCheckpoint().setLastReadTimeMs(checkpoint.getLastReadTimeMs());
this.brokerController.getTimerCheckpoint().setMasterTimerQueueOffset(checkpoint.getMasterTimerQueueOffset());
this.brokerController.getTimerCheckpoint().getDataVersion().assignNewOne(checkpoint.getDataVersion());
if (null != this.brokerController.getTimerMessageStore().getTimerCheckpoint()) {
this.brokerController.getTimerMessageStore().getTimerCheckpoint().setLastReadTimeMs(checkpoint.getLastReadTimeMs());
this.brokerController.getTimerMessageStore().getTimerCheckpoint().setMasterTimerQueueOffset(checkpoint.getMasterTimerQueueOffset());
this.brokerController.getTimerMessageStore().getTimerCheckpoint().getDataVersion().assignNewOne(checkpoint.getDataVersion());
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1179,13 +1179,14 @@ public void testListAcl() throws RemotingCommandException {

@Test
public void testGetTimeCheckPoint() throws RemotingCommandException {
when(this.brokerController.getTimerCheckpoint()).thenReturn(null);
when(this.brokerController.getTimerMessageStore()).thenReturn(timerMessageStore);
when(this.timerMessageStore.getTimerCheckpoint()).thenReturn(null);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TIMER_CHECK_POINT, null);
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
assertThat(response.getRemark()).isEqualTo("The checkpoint is null");

when(this.brokerController.getTimerCheckpoint()).thenReturn(new TimerCheckpoint());
when(this.timerMessageStore.getTimerCheckpoint()).thenReturn(new TimerCheckpoint());
response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void init() {
when(brokerController.getQueryAssignmentProcessor()).thenReturn(queryAssignmentProcessor);
when(brokerController.getMessageStore()).thenReturn(messageStore);
when(brokerController.getTimerMessageStore()).thenReturn(timerMessageStore);
when(brokerController.getTimerCheckpoint()).thenReturn(timerCheckpoint);
when(timerMessageStore.getTimerCheckpoint()).thenReturn(timerCheckpoint);
when(topicConfigManager.getDataVersion()).thenReturn(new DataVersion());
when(topicConfigManager.getTopicConfigTable()).thenReturn(new ConcurrentHashMap<>());
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public void testGetMetadataReverse_timerCheckPoint() throws Exception {

awaitUntilSlaveOK();

await().atMost(Duration.ofMinutes(1)).until(() -> master1With3Replicas.getTimerCheckpoint().getMasterTimerQueueOffset() >= MESSAGE_COUNT);
await().atMost(Duration.ofMinutes(1)).until(() -> master1With3Replicas.getTimerMessageStore().getTimerCheckpoint().getMasterTimerQueueOffset() >= MESSAGE_COUNT);

pushConsumer.shutdown();
}
Expand Down
Loading