Skip to content

Commit da43c9f

Browse files
committed
feat: concurrent heartbeat to all broker.
1 parent 89e021e commit da43c9f

File tree

3 files changed

+182
-3
lines changed

3 files changed

+182
-3
lines changed

client/src/main/java/org/apache/rocketmq/client/ClientConfig.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ public class ClientConfig {
101101

102102
private boolean enableHeartbeatChannelEventListener = true;
103103

104+
private boolean enableConcurrentHeartbeat = false;
105+
106+
private int concurrentHeartbeatThreadPoolSize = Runtime.getRuntime().availableProcessors();
107+
104108
/**
105109
* The switch for message trace
106110
*/
@@ -240,6 +244,8 @@ public void resetClientConfig(final ClientConfig cc) {
240244
this.namespaceV2 = cc.namespaceV2;
241245
this.enableTrace = cc.enableTrace;
242246
this.traceTopic = cc.traceTopic;
247+
this.enableConcurrentHeartbeat = cc.enableConcurrentHeartbeat;
248+
this.concurrentHeartbeatThreadPoolSize = cc.concurrentHeartbeatThreadPoolSize;
243249
}
244250

245251
public ClientConfig cloneClientConfig() {
@@ -272,6 +278,8 @@ public ClientConfig cloneClientConfig() {
272278
cc.namespaceV2 = namespaceV2;
273279
cc.enableTrace = enableTrace;
274280
cc.traceTopic = traceTopic;
281+
cc.enableConcurrentHeartbeat = enableConcurrentHeartbeat;
282+
cc.concurrentHeartbeatThreadPoolSize = concurrentHeartbeatThreadPoolSize;
275283
return cc;
276284
}
277285

@@ -525,6 +533,22 @@ public void setMaxPageSizeInGetMetadata(int maxPageSizeInGetMetadata) {
525533
this.maxPageSizeInGetMetadata = maxPageSizeInGetMetadata;
526534
}
527535

536+
public boolean isEnableConcurrentHeartbeat() {
537+
return this.enableConcurrentHeartbeat;
538+
}
539+
540+
public void setEnableConcurrentHeartbeat(boolean enableConcurrentHeartbeat) {
541+
this.enableConcurrentHeartbeat = enableConcurrentHeartbeat;
542+
}
543+
544+
public int getConcurrentHeartbeatThreadPoolSize() {
545+
return concurrentHeartbeatThreadPoolSize;
546+
}
547+
548+
public void setConcurrentHeartbeatThreadPoolSize(int concurrentHeartbeatThreadPoolSize) {
549+
this.concurrentHeartbeatThreadPoolSize = concurrentHeartbeatThreadPoolSize;
550+
}
551+
528552
@Override
529553
public String toString() {
530554
return "ClientConfig{" +
@@ -558,6 +582,8 @@ public String toString() {
558582
", enableHeartbeatChannelEventListener=" + enableHeartbeatChannelEventListener +
559583
", enableTrace=" + enableTrace +
560584
", traceTopic='" + traceTopic + '\'' +
585+
", enableConcurrentHeartbeat=" + enableConcurrentHeartbeat +
586+
", concurrentHeartbeatThreadPoolSize=" + concurrentHeartbeatThreadPoolSize +
561587
'}';
562588
}
563589
}

client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

Lines changed: 112 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.rocketmq.common.MQVersion;
4343
import org.apache.rocketmq.common.MixAll;
4444
import org.apache.rocketmq.common.ServiceState;
45+
import org.apache.rocketmq.common.ThreadFactoryImpl;
4546
import org.apache.rocketmq.common.constant.PermName;
4647
import org.apache.rocketmq.common.filter.ExpressionType;
4748
import org.apache.rocketmq.common.message.MessageExt;
@@ -68,6 +69,7 @@
6869
import org.apache.rocketmq.remoting.protocol.route.QueueData;
6970
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
7071

72+
import java.util.ArrayList;
7173
import java.util.Collections;
7274
import java.util.HashMap;
7375
import java.util.HashSet;
@@ -79,7 +81,10 @@
7981
import java.util.Set;
8082
import java.util.concurrent.ConcurrentHashMap;
8183
import java.util.concurrent.ConcurrentMap;
84+
import java.util.concurrent.CountDownLatch;
85+
import java.util.concurrent.ExecutorService;
8286
import java.util.concurrent.Executors;
87+
import java.util.concurrent.RejectedExecutionException;
8388
import java.util.concurrent.ScheduledExecutorService;
8489
import java.util.concurrent.ThreadFactory;
8590
import java.util.concurrent.TimeUnit;
@@ -125,7 +130,7 @@ public class MQClientInstance {
125130
*/
126131
private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = new ConcurrentHashMap<>();
127132

128-
private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable = new ConcurrentHashMap<>();
133+
private final ConcurrentMap<String/* Broker Name */, ConcurrentHashMap<String/* address */, Integer>> brokerVersionTable = new ConcurrentHashMap<>();
129134
private final Set<String/* Broker address */> brokerSupportV2HeartbeatSet = new HashSet<>();
130135
private final ConcurrentMap<String, Integer> brokerAddrHeartbeatFingerprintTable = new ConcurrentHashMap<>();
131136
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "MQClientFactoryScheduledThread"));
@@ -142,6 +147,7 @@ public Thread newThread(Runnable r) {
142147
private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);
143148
private ServiceState serviceState = ServiceState.CREATE_JUST;
144149
private final Random random = new Random();
150+
private ExecutorService concurrentHeartbeatExecutor;
145151

146152
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId) {
147153
this(clientConfig, instanceIndex, clientId, null);
@@ -217,6 +223,12 @@ public void onChannelActive(String remoteAddr, Channel channel) {
217223

218224
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
219225

226+
if (this.clientConfig.isEnableConcurrentHeartbeat()) {
227+
this.concurrentHeartbeatExecutor = Executors.newFixedThreadPool(
228+
clientConfig.getConcurrentHeartbeatThreadPoolSize(),
229+
new ThreadFactoryImpl("MQClientConcurrentHeartbeatThread_", true));
230+
}
231+
220232
log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
221233
instanceIndex,
222234
this.clientId,
@@ -537,6 +549,8 @@ public boolean sendHeartbeatToAllBrokerWithLock() {
537549
try {
538550
if (clientConfig.isUseHeartbeatV2()) {
539551
return this.sendHeartbeatToAllBrokerV2(false);
552+
} else if (clientConfig.isEnableConcurrentHeartbeat()) {
553+
return this.sendHeartbeatToAllBrokerConcurrently();
540554
} else {
541555
return this.sendHeartbeatToAllBroker();
542556
}
@@ -641,7 +655,7 @@ private boolean sendHeartbeatToBroker(long id, String brokerName, String addr, H
641655
try {
642656
int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
643657
if (!this.brokerVersionTable.containsKey(brokerName)) {
644-
this.brokerVersionTable.put(brokerName, new HashMap<>(4));
658+
this.brokerVersionTable.put(brokerName, new ConcurrentHashMap<>(4));
645659
}
646660
this.brokerVersionTable.get(brokerName).put(addr, version);
647661
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
@@ -721,7 +735,7 @@ private boolean sendHeartbeatToBrokerV2(long id, String brokerName, String addr,
721735
}
722736
version = heartbeatV2Result.getVersion();
723737
if (!this.brokerVersionTable.containsKey(brokerName)) {
724-
this.brokerVersionTable.put(brokerName, new HashMap<>(4));
738+
this.brokerVersionTable.put(brokerName, new ConcurrentHashMap<>(4));
725739
}
726740
this.brokerVersionTable.get(brokerName).put(addr, version);
727741
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
@@ -780,6 +794,100 @@ private boolean sendHeartbeatToAllBrokerV2(boolean isRebalance) {
780794
return true;
781795
}
782796

797+
private class ClientHeartBeatTask {
798+
private final String brokerName;
799+
private final Long brokerId;
800+
private final String brokerAddr;
801+
private final HeartbeatData heartbeatData;
802+
803+
public ClientHeartBeatTask(String brokerName, Long brokerId, String brokerAddr, HeartbeatData heartbeatData) {
804+
this.brokerName = brokerName;
805+
this.brokerId = brokerId;
806+
this.brokerAddr = brokerAddr;
807+
this.heartbeatData = heartbeatData;
808+
}
809+
810+
public void execute() throws Exception {
811+
int version = MQClientInstance.this.mQClientAPIImpl.sendHeartbeat(
812+
brokerAddr, heartbeatData, MQClientInstance.this.clientConfig.getMqClientApiTimeout());
813+
814+
ConcurrentHashMap<String, Integer> inner = MQClientInstance.this.brokerVersionTable
815+
.computeIfAbsent(brokerName, k -> new ConcurrentHashMap<>(4));
816+
inner.put(brokerAddr, version);
817+
}
818+
}
819+
820+
private boolean sendHeartbeatToAllBrokerConcurrently() {
821+
final HeartbeatData heartbeatData = this.prepareHeartbeatData(false);
822+
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
823+
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
824+
825+
if (producerEmpty && consumerEmpty) {
826+
log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
827+
return false;
828+
}
829+
830+
if (this.brokerAddrTable.isEmpty()) {
831+
return false;
832+
}
833+
834+
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
835+
List<ClientHeartBeatTask> tasks = new ArrayList<>();
836+
for (Entry<String, HashMap<Long, String>> entry : this.brokerAddrTable.entrySet()) {
837+
String brokerName = entry.getKey();
838+
HashMap<Long, String> oneTable = entry.getValue();
839+
if (oneTable != null) {
840+
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
841+
Long id = entry1.getKey();
842+
String addr = entry1.getValue();
843+
if (addr == null) continue;
844+
if (consumerEmpty && id != MixAll.MASTER_ID) continue;
845+
tasks.add(new ClientHeartBeatTask(brokerName, id, addr, heartbeatData));
846+
}
847+
}
848+
}
849+
850+
if (tasks.isEmpty()) {
851+
return false;
852+
}
853+
854+
final CountDownLatch latch = new CountDownLatch(tasks.size());
855+
856+
for (ClientHeartBeatTask task : tasks) {
857+
try {
858+
this.concurrentHeartbeatExecutor.execute(() -> {
859+
try {
860+
task.execute();
861+
if (times % 20 == 0) {
862+
log.info("send heart beat to broker[{} {} {}] success", task.brokerName, task.brokerId, task.brokerAddr);
863+
}
864+
} catch (Exception e) {
865+
if (MQClientInstance.this.isBrokerInNameServer(task.brokerAddr)) {
866+
log.warn("send heart beat to broker[{} {} {}] failed", task.brokerName, task.brokerId, task.brokerAddr, e);
867+
} else {
868+
log.warn("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it",
869+
task.brokerName, task.brokerId, task.brokerAddr, e);
870+
}
871+
} finally {
872+
latch.countDown();
873+
}
874+
});
875+
} catch (RejectedExecutionException rex) {
876+
log.warn("heartbeat submission rejected for broker[{} {} {}], will skip this round", task.brokerName, task.brokerId, task.brokerAddr, rex);
877+
latch.countDown();
878+
}
879+
}
880+
881+
try {
882+
// wait all tasks finish
883+
latch.await();
884+
} catch (InterruptedException ie) {
885+
log.warn("Interrupted while waiting for broker heartbeat tasks to complete", ie);
886+
Thread.currentThread().interrupt();
887+
}
888+
return true;
889+
}
890+
783891
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
784892
DefaultMQProducer defaultMQProducer) {
785893
try {
@@ -971,6 +1079,7 @@ public void shutdown() {
9711079
this.scheduledExecutorService.shutdown();
9721080
this.mQClientAPIImpl.shutdown();
9731081
this.rebalanceService.shutdown();
1082+
this.concurrentHeartbeatExecutor.shutdown();
9741083

9751084
MQClientManager.getInstance().removeClientFactory(this.clientId);
9761085
log.info("the client factory [{}] shutdown OK", this.clientId);

client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import java.util.Set;
7575
import java.util.concurrent.ConcurrentHashMap;
7676
import java.util.concurrent.ConcurrentMap;
77+
import java.util.concurrent.ExecutorService;
7778

7879
import static org.assertj.core.api.Assertions.assertThat;
7980
import static org.junit.Assert.assertEquals;
@@ -82,9 +83,11 @@
8283
import static org.junit.Assert.assertNull;
8384
import static org.junit.Assert.assertThrows;
8485
import static org.junit.Assert.assertTrue;
86+
import static org.junit.Assert.fail;
8587
import static org.mockito.ArgumentMatchers.any;
8688
import static org.mockito.ArgumentMatchers.anyLong;
8789
import static org.mockito.ArgumentMatchers.eq;
90+
import static org.mockito.Mockito.doAnswer;
8891
import static org.mockito.Mockito.doThrow;
8992
import static org.mockito.Mockito.mock;
9093
import static org.mockito.Mockito.times;
@@ -510,4 +513,45 @@ private List<BrokerData> createBrokerDatas() {
510513
brokerData.setBrokerAddrs(brokerAddrs);
511514
return Collections.singletonList(brokerData);
512515
}
516+
517+
@Test
518+
public void testSendHeartbeatToAllBrokerConcurrently() {
519+
try {
520+
String brokerName = "BrokerA";
521+
HashMap<Long, String> addrMap = new HashMap<>();
522+
addrMap.put(0L, "127.0.0.1:10911");
523+
addrMap.put(1L, "127.0.0.1:10912");
524+
addrMap.put(2L, "127.0.0.1:10913");
525+
brokerAddrTable.put(brokerName, addrMap);
526+
527+
DefaultMQPushConsumerImpl mockConsumer = mock(DefaultMQPushConsumerImpl.class);
528+
when(mockConsumer.subscriptions()).thenReturn(Collections.singleton(new SubscriptionData()));
529+
mqClientInstance.registerConsumer("TestConsumerGroup", mockConsumer);
530+
531+
ClientConfig clientConfig = new ClientConfig();
532+
FieldUtils.writeDeclaredField(clientConfig, "enableConcurrentHeartbeat", true, true);
533+
FieldUtils.writeDeclaredField(mqClientInstance, "clientConfig", clientConfig, true);
534+
535+
ExecutorService mockExecutor = mock(ExecutorService.class);
536+
doAnswer(invocation -> {
537+
try {
538+
Runnable task = invocation.getArgument(0);
539+
task.run();
540+
} catch (Exception e) {
541+
// ignore
542+
}
543+
return null;
544+
}).when(mockExecutor).execute(any(Runnable.class));
545+
FieldUtils.writeDeclaredField(mqClientInstance, "concurrentHeartbeatExecutor", mockExecutor, true);
546+
MQClientAPIImpl mockMqClientAPIImpl = mock(MQClientAPIImpl.class);
547+
FieldUtils.writeDeclaredField(mqClientInstance, "mQClientAPIImpl", mockMqClientAPIImpl, true);
548+
549+
mqClientInstance.sendHeartbeatToAllBrokerWithLock();
550+
551+
assertTrue(true);
552+
553+
} catch (Exception e) {
554+
fail("failed: " + e.getMessage());
555+
}
556+
}
513557
}

0 commit comments

Comments
 (0)