Skip to content

Commit 8e60914

Browse files
committed
fix
Change-Id: Ie2faff9942027a510c4550a9f2bf2df8c6283137
1 parent 63d20eb commit 8e60914

File tree

12 files changed

+296
-80
lines changed

12 files changed

+296
-80
lines changed

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,29 @@
1818

1919
import com.google.common.annotations.VisibleForTesting;
2020
import com.google.common.collect.Lists;
21+
import java.net.InetSocketAddress;
22+
import java.util.AbstractMap;
23+
import java.util.ArrayList;
24+
import java.util.Arrays;
25+
import java.util.Collections;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.Objects;
29+
import java.util.Optional;
30+
import java.util.concurrent.BlockingQueue;
31+
import java.util.concurrent.ConcurrentHashMap;
32+
import java.util.concurrent.ConcurrentMap;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.ExecutorService;
35+
import java.util.concurrent.LinkedBlockingQueue;
36+
import java.util.concurrent.ScheduledExecutorService;
37+
import java.util.concurrent.ScheduledFuture;
38+
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.locks.Lock;
40+
import java.util.concurrent.locks.ReentrantLock;
41+
import java.util.function.Function;
42+
import java.util.function.Supplier;
43+
import java.util.stream.Collectors;
2144
import org.apache.rocketmq.auth.authentication.factory.AuthenticationFactory;
2245
import org.apache.rocketmq.auth.authentication.manager.AuthenticationMetadataManager;
2346
import org.apache.rocketmq.auth.authorization.factory.AuthorizationFactory;
@@ -58,11 +81,12 @@
5881
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
5982
import org.apache.rocketmq.broker.offset.BroadcastOffsetManager;
6083
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
61-
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
6284
import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
6385
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
6486
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
6587
import org.apache.rocketmq.broker.pop.PopConsumerService;
88+
import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
89+
import org.apache.rocketmq.broker.pop.orderly.QueueLevelConsumerManager;
6690
import org.apache.rocketmq.broker.processor.AckMessageProcessor;
6791
import org.apache.rocketmq.broker.processor.AdminBrokerProcessor;
6892
import org.apache.rocketmq.broker.processor.ChangeInvisibleTimeProcessor;
@@ -156,30 +180,6 @@
156180
import org.apache.rocketmq.store.timer.TimerMessageStore;
157181
import org.apache.rocketmq.store.timer.TimerMetrics;
158182

159-
import java.net.InetSocketAddress;
160-
import java.util.AbstractMap;
161-
import java.util.ArrayList;
162-
import java.util.Arrays;
163-
import java.util.Collections;
164-
import java.util.List;
165-
import java.util.Map;
166-
import java.util.Objects;
167-
import java.util.Optional;
168-
import java.util.concurrent.BlockingQueue;
169-
import java.util.concurrent.ConcurrentHashMap;
170-
import java.util.concurrent.ConcurrentMap;
171-
import java.util.concurrent.CountDownLatch;
172-
import java.util.concurrent.ExecutorService;
173-
import java.util.concurrent.LinkedBlockingQueue;
174-
import java.util.concurrent.ScheduledExecutorService;
175-
import java.util.concurrent.ScheduledFuture;
176-
import java.util.concurrent.TimeUnit;
177-
import java.util.concurrent.locks.Lock;
178-
import java.util.concurrent.locks.ReentrantLock;
179-
import java.util.function.Function;
180-
import java.util.function.Supplier;
181-
import java.util.stream.Collectors;
182-
183183
public class BrokerController {
184184
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
185185
private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
@@ -387,7 +387,7 @@ public BrokerController(
387387
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener, this.brokerStatsManager, this.brokerConfig);
388388
this.producerManager = new ProducerManager(this.brokerStatsManager);
389389
this.consumerFilterManager = new ConsumerFilterManager(this);
390-
this.consumerOrderInfoManager = new ConsumerOrderInfoManager(this);
390+
this.consumerOrderInfoManager = new QueueLevelConsumerManager(this);
391391
this.popInflightMessageCounter = new PopInflightMessageCounter(this);
392392
this.popConsumerService = brokerConfig.isPopConsumerKVServiceInit() ? new PopConsumerService(this) : null;
393393
this.clientHousekeepingService = new ClientHousekeepingService(this);

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public PopConsumerContext handleGetMessageResult(PopConsumerContext context, Get
167167

168168
if (GetMessageStatus.FOUND.equals(result.getStatus()) && !result.getMessageQueueOffset().isEmpty()) {
169169
if (context.isFifo()) {
170-
this.setFifoBlocked(context, context.getGroupId(), topicId, queueId, result.getMessageQueueOffset());
170+
this.setFifoBlocked(context, context.getGroupId(), topicId, queueId, result.getMessageQueueOffset(), result);
171171
}
172172
// build response header here
173173
context.addGetMessageResult(result, topicId, queueId, retryType, offset);
@@ -275,10 +275,10 @@ public CompletableFuture<GetMessageResult> getMessageAsync(String clientHost,
275275
* Fifo message does not have retry feature in broker
276276
*/
277277
public void setFifoBlocked(PopConsumerContext context,
278-
String groupId, String topicId, int queueId, List<Long> queueOffsetList) {
278+
String groupId, String topicId, int queueId, List<Long> queueOffsetList, GetMessageResult getMessageResult) {
279279
brokerController.getConsumerOrderInfoManager().update(
280280
context.getAttemptId(), false, topicId, groupId, queueId,
281-
context.getPopTime(), context.getInvisibleTime(), queueOffsetList, context.getOrderCountInfoBuilder());
281+
context.getPopTime(), context.getInvisibleTime(), queueOffsetList, context.getOrderCountInfoBuilder(), getMessageResult);
282282
}
283283

284284
public boolean isFifoBlocked(PopConsumerContext context, String groupId, String topicId, int queueId) {
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.broker.pop.orderly;
18+
19+
import java.util.List;
20+
import java.util.concurrent.CompletableFuture;
21+
import org.apache.rocketmq.common.OrderedConsumptionLevel;
22+
import org.apache.rocketmq.store.GetMessageResult;
23+
24+
/**
25+
*
26+
* Ordered Consumption Controller Interface
27+
* This is the top-level interface that encapsulates complete ordered consumption management functionality,
28+
* supporting different concurrency strategy implementations
29+
* <p>
30+
* Design Goals:
31+
* 1. Support queue-level ordered consumption (existing implementation)
32+
* 2. Support message group-level ordered consumption (improve concurrency)
33+
* 3. Support custom ordered consumption strategies
34+
* </p>
35+
*/
36+
public interface ConsumerOrderInfoManager {
37+
38+
/**
39+
* Update the reception status of message list
40+
* Called by handleGetMessageResult when consumer POPs messages, used to record message status and build consumption information
41+
*
42+
* @param attemptId Distinguish different pop requests
43+
* @param isRetry Whether it is a retry topic
44+
* @param topic Topic name
45+
* @param group Consumer group name
46+
* @param queueId Queue ID
47+
* @param popTime Time when messages are popped
48+
* @param invisibleTime Message invisible time
49+
* @param msgQueueOffsetList List of message queue offsets
50+
* @param orderInfoBuilder String builder for constructing order information
51+
* @param getMessageResult Return new result
52+
*/
53+
void update(String attemptId, boolean isRetry, String topic, String group, int queueId,
54+
long popTime, long invisibleTime, List<Long> msgQueueOffsetList,
55+
StringBuilder orderInfoBuilder, GetMessageResult getMessageResult);
56+
57+
/**
58+
* Check whether the current POP request needs to be blocked
59+
* Used to ensure ordered consumption of ordered messages
60+
* Called when consumer POPs messages
61+
*
62+
* @param attemptId Attempt ID
63+
* @param topic Topic name
64+
* @param group Consumer group name
65+
* @param queueId Queue ID
66+
* @param invisibleTime Invisible time
67+
* @return true indicates blocking is needed, false indicates can proceed
68+
*/
69+
boolean checkBlock(String attemptId, String topic, String group, int queueId, long invisibleTime);
70+
71+
/**
72+
* Commit message and calculate next consumption offset
73+
* Called when consumer ACKs messages
74+
*
75+
* @param topic Topic name
76+
* @param group Consumer group name
77+
* @param queueId Queue ID
78+
* @param queueOffset Message queue offset
79+
* @param popTime Pop time, used for validation
80+
* @return -1: invalid, -2: no need to commit, >=0: offset that needs to be committed (indicates messages below this offset have been consumed)
81+
*/
82+
long commitAndNext(String topic, String group, int queueId, long queueOffset, long popTime);
83+
84+
/**
85+
* Update the next visible time of message
86+
* Used for delayed message re-consumption
87+
*
88+
* @param topic Topic name
89+
* @param group Consumer group name
90+
* @param queueId Queue ID
91+
* @param queueOffset Message offset
92+
* @param popTime Pop time, used for validation
93+
* @param nextVisibleTime Next visible time
94+
*/
95+
void updateNextVisibleTime(String topic, String group, int queueId, long queueOffset,
96+
long popTime, long nextVisibleTime);
97+
98+
/**
99+
* Clear the blocking status of specified queue
100+
* Usually called during consumer rebalancing or queue reassignment
101+
*
102+
* @param topic Topic name
103+
* @param group Consumer group name
104+
* @param queueId Queue ID
105+
*/
106+
void clearBlock(String topic, String group, int queueId);
107+
108+
/**
109+
* Get ordered consumption level
110+
* Used to distinguish different implementation strategies
111+
*
112+
* @return Ordered consumption level, such as: QUEUE, MESSAGE_GROUP, etc.
113+
*/
114+
OrderedConsumptionLevel getOrderedConsumptionLevel();
115+
116+
/**
117+
* Start the controller
118+
* Initialize necessary resources, such as timers, thread pools, etc.
119+
*/
120+
void start();
121+
122+
/**
123+
* Shutdown the controller
124+
* Release resources, clean up scheduled tasks, etc.
125+
*/
126+
void shutdown();
127+
128+
/**
129+
* Persist the controller
130+
* Persist the controller's data
131+
*/
132+
void persist();
133+
134+
boolean load();
135+
136+
/**
137+
* Get available message result
138+
* Used to retrieve messages from cache
139+
*/
140+
CompletableFuture<GetMessageResult> getAvailableMessageResult(String attemptId, long popTime, long invisibleTime, String groupId,
141+
String topicId, int queueId, int batchSize, StringBuilder orderCountInfoBuilder);
142+
}

0 commit comments

Comments
 (0)