Skip to content

Commit 6f5b551

Browse files
authored
[ISSUE #9918] Fix the message may be renewed once more if the gRPC push consumer is unexpectedly disconnected (#9919)
Change-Id: I00424612c5ee9dd30e5c155dab17262051e5f097
1 parent 7a08b4c commit 6f5b551

File tree

2 files changed

+20
-10
lines changed

2 files changed

+20
-10
lines changed

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request,
135135
request.hasAttemptId() ? request.getAttemptId() : null,
136136
timeRemaining
137137
).thenAccept(popResult -> {
138+
Runnable doAfterWrite = null;
138139
if (proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew()) {
139140
if (PopStatus.FOUND.equals(popResult.getPopStatus())) {
140141
GrpcClientChannel clientChannel = grpcChannelManager.getChannel(ctx.getClientID());
@@ -145,19 +146,21 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request,
145146
writer.processThrowableWhenWriteMessage(e, ctx, request, messageExt));
146147
throw e;
147148
}
148-
List<MessageExt> messageExtList = popResult.getMsgFoundList();
149-
for (MessageExt messageExt : messageExtList) {
150-
String receiptHandle = messageExt.getProperty(MessageConst.PROPERTY_POP_CK);
151-
if (receiptHandle != null) {
152-
MessageReceiptHandle messageReceiptHandle =
153-
new MessageReceiptHandle(group, topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
154-
messageExt.getQueueOffset(), messageExt.getReconsumeTimes());
155-
messagingProcessor.addReceiptHandle(ctx, clientChannel, group, messageExt.getMsgId(), messageReceiptHandle);
149+
doAfterWrite = () -> {
150+
List<MessageExt> messageExtList = popResult.getMsgFoundList();
151+
for (MessageExt messageExt : messageExtList) {
152+
String receiptHandle = messageExt.getProperty(MessageConst.PROPERTY_POP_CK);
153+
if (receiptHandle != null) {
154+
MessageReceiptHandle messageReceiptHandle =
155+
new MessageReceiptHandle(group, topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
156+
messageExt.getQueueOffset(), messageExt.getReconsumeTimes());
157+
messagingProcessor.addReceiptHandle(ctx, clientChannel, group, messageExt.getMsgId(), messageReceiptHandle);
158+
}
156159
}
157-
}
160+
};
158161
}
159162
}
160-
writer.writeAndComplete(ctx, request, popResult);
163+
writer.writeAndComplete(ctx, request, popResult, doAfterWrite);
161164
})
162165
.exceptionally(t -> {
163166
writer.writeAndComplete(ctx, request, t);

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ public ReceiveMessageResponseStreamWriter(
5454
}
5555

5656
public void writeAndComplete(ProxyContext ctx, ReceiveMessageRequest request, PopResult popResult) {
57+
writeAndComplete(ctx, request, popResult, null);
58+
}
59+
60+
public void writeAndComplete(ProxyContext ctx, ReceiveMessageRequest request, PopResult popResult, Runnable doAfterWrite) {
5761
PopStatus status = popResult.getPopStatus();
5862
List<MessageExt> messageFoundList = popResult.getMsgFoundList();
5963
try {
@@ -103,6 +107,9 @@ public void writeAndComplete(ProxyContext ctx, ReceiveMessageRequest request, Po
103107
.build());
104108
break;
105109
}
110+
if (doAfterWrite != null) {
111+
doAfterWrite.run();
112+
}
106113
} catch (Throwable t) {
107114
writeResponseWithErrorIgnore(
108115
ReceiveMessageResponse.newBuilder().setStatus(ResponseBuilder.getInstance().buildStatus(t)).build());

0 commit comments

Comments
 (0)