Skip to content

Commit 5892de6

Browse files
[ISSUE #9862] [Bug] Fix the issue of missing bornTime in POP requests when broker ACL is enabled (#9863)
1 parent ccf9730 commit 5892de6

File tree

3 files changed

+10
-14
lines changed

3 files changed

+10
-14
lines changed

broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.netty.channel.Channel;
2020
import io.netty.channel.ChannelHandlerContext;
2121
import java.util.Map;
22-
import java.util.Objects;
2322
import java.util.Random;
2423
import org.apache.rocketmq.broker.BrokerController;
2524
import org.apache.rocketmq.broker.longpolling.PollingHeader;
@@ -79,16 +78,17 @@ public void notifyMessageArriving(final String topic, final int queueId) {
7978
@Override
8079
public RemotingCommand processRequest(final ChannelHandlerContext ctx,
8180
RemotingCommand request) throws RemotingCommandException {
82-
request.addExtFieldIfNotExist(BORN_TIME, String.valueOf(System.currentTimeMillis()));
83-
if (Objects.equals(request.getExtFields().get(BORN_TIME), "0")) {
84-
request.addExtField(BORN_TIME, String.valueOf(System.currentTimeMillis()));
85-
}
8681
Channel channel = ctx.channel();
8782

8883
RemotingCommand response = RemotingCommand.createResponseCommand(NotificationResponseHeader.class);
8984
final NotificationResponseHeader responseHeader = (NotificationResponseHeader) response.readCustomHeader();
9085
final NotificationRequestHeader requestHeader =
91-
(NotificationRequestHeader) request.decodeCommandCustomHeader(NotificationRequestHeader.class);
86+
request.decodeCommandCustomHeader(NotificationRequestHeader.class, true);
87+
if (requestHeader.getBornTime() == 0) {
88+
final long beginTimeMills = this.brokerController.getMessageStore().now();
89+
request.addExtField(BORN_TIME, String.valueOf(beginTimeMills));
90+
requestHeader.setBornTime(beginTimeMills);
91+
}
9292

9393
response.setOpaque(request.getOpaque());
9494

broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.List;
3030
import java.util.Map;
3131
import java.util.Map.Entry;
32-
import java.util.Objects;
3332
import java.util.Random;
3433
import java.util.concurrent.CompletableFuture;
3534
import java.util.concurrent.ConcurrentHashMap;
@@ -228,18 +227,16 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC
228227

229228
final long beginTimeMills = this.brokerController.getMessageStore().now();
230229

231-
// fill bron time to properties if not exist, why we need this?
232-
request.addExtFieldIfNotExist(BORN_TIME, String.valueOf(System.currentTimeMillis()));
233-
if (Objects.equals(request.getExtFields().get(BORN_TIME), "0")) {
234-
request.addExtField(BORN_TIME, String.valueOf(System.currentTimeMillis()));
235-
}
236-
237230
Channel channel = ctx.channel();
238231
RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
239232
response.setOpaque(request.getOpaque());
240233

241234
final PopMessageRequestHeader requestHeader =
242235
request.decodeCommandCustomHeader(PopMessageRequestHeader.class, true);
236+
if (requestHeader.getBornTime() == 0) {
237+
request.addExtField(BORN_TIME, String.valueOf(beginTimeMills));
238+
requestHeader.setBornTime(beginTimeMills);
239+
}
243240
final PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader();
244241

245242
// Pop mode only supports consumption in cluster load balancing mode

proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,6 @@ public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, String bro
199199
@Override
200200
public CompletableFuture<PopResult> popMessage(ProxyContext ctx, AddressableMessageQueue messageQueue,
201201
PopMessageRequestHeader requestHeader, long timeoutMillis) {
202-
requestHeader.setBornTime(System.currentTimeMillis());
203202
RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader, ctx.getLanguage());
204203
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
205204
SimpleChannel channel = channelManager.createInvocationChannel(ctx);

0 commit comments

Comments
 (0)