Skip to content

Commit 8451f39

Browse files
committed
consume_from_where not working during initial launch
1 parent 63d20eb commit 8451f39

File tree

5 files changed

+58
-12
lines changed

5 files changed

+58
-12
lines changed

broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,23 @@ public long queryOffset(String group, String topic, int queueId) {
404404
}
405405
}
406406

407+
@Override
408+
public boolean hasOffsetRecord(final String group, final String topic) {
409+
if (!MixAll.isLmq(topic)) {
410+
return super.hasOffsetRecord(group, topic);
411+
}
412+
413+
ByteBuf keyBuf = keyOfConsumerOffset(group, topic, 0);
414+
try {
415+
byte[] slice = configStorage.get(keyBuf.nioBuffer());
416+
return slice != null;
417+
} catch (RocksDBException e) {
418+
throw new RuntimeException(e);
419+
} finally {
420+
keyBuf.release();
421+
}
422+
}
423+
407424
@Override
408425
public void commitPullOffset(String clientHost, String group, String topic, int queueId, long offset) {
409426
if (!MixAll.isLmq(topic)) {

broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,20 @@ public long queryOffset(final String group, final String topic, final int queueI
258258
return -1L;
259259
}
260260

261+
public boolean hasOffsetRecord(final String group, final String topic) {
262+
String key = topic + TOPIC_GROUP_SEPARATOR + group;
263+
264+
if (this.brokerController.getBrokerConfig().isUseServerSideResetOffset()) {
265+
Map<Integer, Long> reset = resetOffsetTable.get(key);
266+
if (reset != null) {
267+
return true;
268+
}
269+
}
270+
271+
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
272+
return map != null;
273+
}
274+
261275
/**
262276
* Query pull offset in pullOffsetTable
263277
* @param group Consumer group

broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,18 @@ public Map<Integer, Long> queryOffset(final String group, final String topic) {
6868
return map;
6969
}
7070

71+
@Override
72+
public boolean hasOffsetRecord(final String group, final String topic) {
73+
if (!MixAll.isLmq(group)) {
74+
return super.hasOffsetRecord(group, topic);
75+
}
76+
// topic@group
77+
String key = topic + TOPIC_GROUP_SEPARATOR + group;
78+
79+
Long offset = lmqOffsetTable.get(key);
80+
return offset != null;
81+
}
82+
7183
@Override
7284
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
7385
final long offset) {

broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -327,18 +327,20 @@ private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingC
327327
response.setCode(ResponseCode.SUCCESS);
328328
response.setRemark(null);
329329
} else {
330-
long minOffset =
331-
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
332-
requestHeader.getQueueId());
333-
if (requestHeader.getSetZeroIfNotFound() != null && Boolean.FALSE.equals(requestHeader.getSetZeroIfNotFound())) {
334-
response.setCode(ResponseCode.QUERY_NOT_FOUND);
335-
response.setRemark("Not found, do not set to zero, maybe this group boot first");
336-
} else if (minOffset <= 0
337-
&& this.brokerController.getMessageStore().checkInMemByConsumeOffset(
338-
requestHeader.getTopic(), requestHeader.getQueueId(), 0, 1)) {
339-
responseHeader.setOffset(0L);
340-
response.setCode(ResponseCode.SUCCESS);
341-
response.setRemark(null);
330+
if (this.brokerController.getConsumerOffsetManager().hasOffsetRecord(requestHeader.getConsumerGroup(), requestHeader.getTopic())) {
331+
long minOffset =
332+
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
333+
requestHeader.getQueueId());
334+
if (requestHeader.getSetZeroIfNotFound() != null && Boolean.FALSE.equals(requestHeader.getSetZeroIfNotFound())) {
335+
response.setCode(ResponseCode.QUERY_NOT_FOUND);
336+
response.setRemark("Not found, do not set to zero, maybe this group boot first");
337+
} else if (minOffset <= 0
338+
&& this.brokerController.getMessageStore().checkInMemByConsumeOffset(
339+
requestHeader.getTopic(), requestHeader.getQueueId(), 0, 1)) {
340+
responseHeader.setOffset(0L);
341+
response.setCode(ResponseCode.SUCCESS);
342+
response.setRemark(null);
343+
}
342344
} else {
343345
response.setCode(ResponseCode.QUERY_NOT_FOUND);
344346
response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");

broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ public void testQueryConsumerOffset() throws RemotingCommandException, Execution
175175

176176
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
177177
when(consumerOffsetManager.queryOffset(anyString(),anyString(),anyInt())).thenReturn(0L);
178+
when(consumerOffsetManager.hasOffsetRecord(anyString(), anyString())).thenReturn(true);
178179
response = consumerManageProcessor.processRequest(handlerContext, request);
179180
assertThat(response).isNotNull();
180181
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);

0 commit comments

Comments
 (0)