Skip to content

Commit 89e021e

Browse files
RongtongJinRongtongJin
andauthored
[ISSUE #9898] Remove AbstractBrokerRunnable and replace with Runnable
Change-Id: I94104151c452d09cbe195a3e1126a473b662a337 Co-authored-by: RongtongJin <[email protected]>
1 parent 1bb2168 commit 89e021e

File tree

9 files changed

+45
-105
lines changed

9 files changed

+45
-105
lines changed

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@
121121
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
122122
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
123123
import org.apache.rocketmq.broker.util.HookUtils;
124-
import org.apache.rocketmq.common.AbstractBrokerRunnable;
125124
import org.apache.rocketmq.common.BrokerConfig;
126125
import org.apache.rocketmq.common.BrokerIdentity;
127126
import org.apache.rocketmq.common.MixAll;
@@ -1814,9 +1813,9 @@ public void start() throws Exception {
18141813
this.registerBrokerAll(true, false, true);
18151814
}
18161815

1817-
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
1816+
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
18181817
@Override
1819-
public void run0() {
1818+
public void run() {
18201819
try {
18211820
if (System.currentTimeMillis() < shouldStartTime) {
18221821
BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
@@ -1836,9 +1835,9 @@ public void run0() {
18361835
if (this.brokerConfig.isEnableSlaveActingMaster()) {
18371836
scheduleSendHeartbeat();
18381837

1839-
scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
1838+
scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new Runnable() {
18401839
@Override
1841-
public void run0() {
1840+
public void run() {
18421841
try {
18431842
BrokerController.this.syncBrokerMemberGroup();
18441843
} catch (Throwable e) {
@@ -1869,9 +1868,9 @@ public void run() {
18691868
}
18701869

18711870
protected void scheduleSendHeartbeat() {
1872-
scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
1871+
scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new Runnable() {
18731872
@Override
1874-
public void run0() {
1873+
public void run() {
18751874
if (isIsolated) {
18761875
return;
18771876
}

broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.concurrent.atomic.AtomicBoolean;
2727

2828
import org.apache.rocketmq.broker.BrokerController;
29-
import org.apache.rocketmq.common.AbstractBrokerRunnable;
3029
import org.apache.rocketmq.common.constant.LoggerName;
3130
import org.apache.rocketmq.common.utils.ThreadUtils;
3231
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -48,9 +47,9 @@ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListen
4847
public DefaultConsumerIdsChangeListener(BrokerController brokerController) {
4948
this.brokerController = brokerController;
5049

51-
scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(brokerController.getBrokerConfig()) {
50+
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
5251
@Override
53-
public void run0() {
52+
public void run() {
5453
try {
5554
notifyConsumerChange();
5655
} catch (Exception e) {

broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.concurrent.TimeUnit;
2424
import java.util.function.Supplier;
2525
import org.apache.rocketmq.broker.BrokerController;
26-
import org.apache.rocketmq.common.AbstractBrokerRunnable;
2726
import org.apache.rocketmq.common.Pair;
2827
import org.apache.rocketmq.common.ThreadFactoryImpl;
2928
import org.apache.rocketmq.common.UtilAll;
@@ -80,9 +79,9 @@ public static RequestTask castRunnable(final Runnable runnable) {
8079
}
8180

8281
public void start() {
83-
this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.brokerController.getBrokerConfig()) {
82+
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
8483
@Override
85-
public void run0() {
84+
public void run() {
8685
if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
8786
cleanExpiredRequest();
8887
}

broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
4747
import org.apache.rocketmq.client.producer.SendResult;
4848
import org.apache.rocketmq.client.producer.SendStatus;
49-
import org.apache.rocketmq.common.AbstractBrokerRunnable;
5049
import org.apache.rocketmq.common.BrokerIdentity;
5150
import org.apache.rocketmq.common.LockCallback;
5251
import org.apache.rocketmq.common.MixAll;
@@ -356,18 +355,14 @@ public void sendHeartbeatViaDataVersion(
356355
requestHeader.setClusterName(clusterName);
357356

358357
for (final String namesrvAddr : nameServerAddressList) {
359-
brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
360-
361-
@Override
362-
public void run0() {
363-
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
364-
request.setBody(dataVersion.encode());
358+
brokerOuterExecutor.execute(() -> {
359+
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
360+
request.setBody(dataVersion.encode());
365361

366-
try {
367-
BrokerOuterAPI.this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMillis);
368-
} catch (Exception e) {
369-
LOGGER.error("sendHeartbeat Exception " + namesrvAddr, e);
370-
}
362+
try {
363+
BrokerOuterAPI.this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMillis);
364+
} catch (Exception e) {
365+
LOGGER.error("sendHeartbeat Exception " + namesrvAddr, e);
371366
}
372367
});
373368
}
@@ -389,9 +384,9 @@ public void sendHeartbeat(final String clusterName,
389384

390385
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
391386
for (final String namesrvAddr : nameServerAddressList) {
392-
brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
387+
brokerOuterExecutor.execute(new Runnable() {
393388
@Override
394-
public void run0() {
389+
public void run() {
395390
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, requestHeader);
396391

397392
try {
@@ -532,9 +527,9 @@ public List<RegisterBrokerResult> registerBrokerAll(
532527
requestHeader.setBodyCrc32(bodyCrc32);
533528
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
534529
for (final String namesrvAddr : nameServerAddressList) {
535-
brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) {
530+
brokerOuterExecutor.execute(new Runnable() {
536531
@Override
537-
public void run0() {
532+
public void run() {
538533
try {
539534
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
540535
if (result != null) {
@@ -719,9 +714,9 @@ public List<Boolean> needRegister(
719714
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
720715
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
721716
for (final String namesrvAddr : nameServerAddressList) {
722-
brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
717+
brokerOuterExecutor.execute(new Runnable() {
723718
@Override
724-
public void run0() {
719+
public void run() {
725720
try {
726721
QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
727722
requestHeader.setBrokerAddr(brokerAddr);
@@ -1501,9 +1496,9 @@ public void sendHeartbeatToController(final String controllerAddress,
15011496
requestHeader.setHeartbeatTimeoutMills(controllerHeartBeatTimeoutMills);
15021497
requestHeader.setElectionPriority(electionPriority);
15031498
requestHeader.setBrokerId(brokerId);
1504-
brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
1499+
brokerOuterExecutor.execute(new Runnable() {
15051500
@Override
1506-
public void run0() {
1501+
public void run() {
15071502
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, requestHeader);
15081503

15091504
try {

common/src/main/java/org/apache/rocketmq/common/AbstractBrokerRunnable.java

Lines changed: 0 additions & 48 deletions
This file was deleted.

container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
2323
import org.apache.rocketmq.broker.ConfigContext;
2424
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
25-
import org.apache.rocketmq.common.AbstractBrokerRunnable;
2625
import org.apache.rocketmq.common.BrokerConfig;
2726
import org.apache.rocketmq.common.BrokerIdentity;
2827
import org.apache.rocketmq.common.MixAll;
@@ -156,9 +155,9 @@ public boolean initialize() {
156155
this.updateNamesrvAddr();
157156
LOG.info("Set user specified name server address: {}", this.brokerContainerConfig.getNamesrvAddr());
158157
// also auto update namesrv if specify
159-
this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
158+
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
160159
@Override
161-
public void run0() {
160+
public void run() {
162161
try {
163162
BrokerContainer.this.updateNamesrvAddr();
164163
} catch (Throwable e) {
@@ -167,10 +166,10 @@ public void run0() {
167166
}
168167
}, 1000 * 10, this.brokerContainerConfig.getUpdateNamesrvAddrInterval(), TimeUnit.MILLISECONDS);
169168
} else if (this.brokerContainerConfig.isFetchNamesrvAddrByAddressServer()) {
170-
this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
169+
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
171170

172171
@Override
173-
public void run0() {
172+
public void run() {
174173
try {
175174
BrokerContainer.this.brokerOuterAPI.fetchNameServerAddr();
176175
} catch (Throwable e) {
@@ -180,9 +179,9 @@ public void run0() {
180179
}, 1000 * 10, this.brokerContainerConfig.getFetchNamesrvAddrInterval(), TimeUnit.MILLISECONDS);
181180
}
182181

183-
this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
182+
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
184183
@Override
185-
public void run0() {
184+
public void run() {
186185
try {
187186
BrokerContainer.this.brokerOuterAPI.refreshMetadata();
188187
} catch (Exception e) {

container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.rocketmq.auth.config.AuthConfig;
2222
import org.apache.rocketmq.broker.BrokerController;
2323
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
24-
import org.apache.rocketmq.common.AbstractBrokerRunnable;
2524
import org.apache.rocketmq.common.BrokerConfig;
2625
import org.apache.rocketmq.common.MixAll;
2726
import org.apache.rocketmq.remoting.RemotingServer;
@@ -82,9 +81,9 @@ public void start() throws Exception {
8281
this.registerBrokerAll(true, false, true);
8382
}
8483

85-
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
84+
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
8685
@Override
87-
public void run0() {
86+
public void run() {
8887
try {
8988
if (System.currentTimeMillis() < shouldStartTime) {
9089
BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
@@ -104,9 +103,9 @@ public void run0() {
104103
if (this.brokerConfig.isEnableSlaveActingMaster()) {
105104
scheduleSendHeartbeat();
106105

107-
scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
106+
scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new Runnable() {
108107
@Override
109-
public void run0() {
108+
public void run() {
110109
try {
111110
InnerBrokerController.this.syncBrokerMemberGroup();
112111
} catch (Throwable e) {

store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
import java.util.concurrent.atomic.AtomicLong;
5858
import java.util.function.Supplier;
5959
import org.apache.commons.lang3.StringUtils;
60-
import org.apache.rocketmq.common.AbstractBrokerRunnable;
6160
import org.apache.rocketmq.common.BoundaryType;
6261
import org.apache.rocketmq.common.BrokerConfig;
6362
import org.apache.rocketmq.common.BrokerIdentity;
@@ -1775,23 +1774,23 @@ private void createTempFile() throws IOException {
17751774

17761775
private void addScheduleTask() {
17771776

1778-
this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
1777+
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
17791778
@Override
1780-
public void run0() {
1779+
public void run() {
17811780
DefaultMessageStore.this.cleanFilesPeriodically();
17821781
}
17831782
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
17841783

1785-
this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
1784+
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
17861785
@Override
1787-
public void run0() {
1786+
public void run() {
17881787
DefaultMessageStore.this.checkSelf();
17891788
}
17901789
}, 1, 10, TimeUnit.MINUTES);
17911790

1792-
this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
1791+
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
17931792
@Override
1794-
public void run0() {
1793+
public void run() {
17951794
if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
17961795
try {
17971796
if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
@@ -1810,9 +1809,9 @@ public void run0() {
18101809
}
18111810
}, 1, 1, TimeUnit.SECONDS);
18121811

1813-
this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
1812+
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
18141813
@Override
1815-
public void run0() {
1814+
public void run() {
18161815
DefaultMessageStore.this.storeCheckpoint.flush();
18171816
}
18181817
}, 1, 1, TimeUnit.SECONDS);

store/src/main/java/org/apache/rocketmq/store/index/IndexService.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.List;
2424
import java.util.concurrent.locks.ReadWriteLock;
2525
import java.util.concurrent.locks.ReentrantReadWriteLock;
26-
import org.apache.rocketmq.common.AbstractBrokerRunnable;
2726
import org.apache.rocketmq.common.UtilAll;
2827
import org.apache.rocketmq.common.constant.LoggerName;
2928
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -339,9 +338,9 @@ public IndexFile getAndCreateLastIndexFile() {
339338
if (indexFile != null) {
340339
final IndexFile flushThisFile = prevIndexFile;
341340

342-
Thread flushThread = new Thread(new AbstractBrokerRunnable(defaultMessageStore.getBrokerConfig()) {
341+
Thread flushThread = new Thread(new Runnable() {
343342
@Override
344-
public void run0() {
343+
public void run() {
345344
IndexService.this.flush(flushThisFile);
346345
}
347346
}, "FlushIndexFileThread");

0 commit comments

Comments
 (0)