Skip to content

Commit d6736b9

Browse files
morningmanwuwenchihubgeter
authored
branch-3.1: [feat](iceberg) implement iceberg partition batch mode (apache#52095)
bp apache#46398 apache#49025 apache#49489 apache#50434 apache#51185 apache#51694 --------- Co-authored-by: wuwenchi <[email protected]> Co-authored-by: daidai <[email protected]>
1 parent b66feb9 commit d6736b9

File tree

23 files changed

+1264
-124
lines changed

23 files changed

+1264
-124
lines changed

be/src/common/config.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) -> b
306306
return true;
307307
});
308308
DEFINE_Int32(doris_scanner_min_thread_pool_thread_num, "8");
309-
DEFINE_Int32(remote_split_source_batch_size, "10240");
309+
DEFINE_Int32(remote_split_source_batch_size, "1000");
310310
DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
311311
// number of olap scanner thread pool queue size
312312
DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400");

fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.doris.common;
1919

2020

21+
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
2122
import org.apache.doris.metric.Metric;
2223
import org.apache.doris.metric.Metric.MetricUnit;
2324
import org.apache.doris.metric.MetricLabel;
@@ -33,6 +34,7 @@
3334
import java.util.Map;
3435
import java.util.concurrent.BlockingQueue;
3536
import java.util.concurrent.Callable;
37+
import java.util.concurrent.ExecutorService;
3638
import java.util.concurrent.FutureTask;
3739
import java.util.concurrent.LinkedBlockingQueue;
3840
import java.util.concurrent.PriorityBlockingQueue;
@@ -69,6 +71,7 @@
6971

7072
public class ThreadPoolManager {
7173

74+
private static final Logger LOG = LogManager.getLogger(ThreadPoolManager.class);
7275
private static Map<String, ThreadPoolExecutor> nameToThreadPoolMap = Maps.newConcurrentMap();
7376

7477
private static String[] poolMetricTypes = {"pool_size", "active_thread_num", "task_in_queue"};
@@ -140,6 +143,17 @@ public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread,
140143
poolName, needRegisterMetric);
141144
}
142145

146+
public static ThreadPoolExecutor newDaemonFixedThreadPoolWithPreAuth(
147+
int numThread,
148+
int queueSize,
149+
String poolName,
150+
boolean needRegisterMetric,
151+
PreExecutionAuthenticator preAuth) {
152+
return newDaemonThreadPoolWithPreAuth(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
153+
new LinkedBlockingQueue<>(queueSize), new BlockedPolicy(poolName, 60),
154+
poolName, needRegisterMetric, preAuth);
155+
}
156+
143157
public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize,
144158
String poolName, int timeoutSeconds,
145159
boolean needRegisterMetric) {
@@ -229,6 +243,40 @@ private static ThreadFactory namedThreadFactory(String poolName) {
229243
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(poolName + "-%d").build();
230244
}
231245

246+
247+
public static ThreadPoolExecutor newDaemonThreadPoolWithPreAuth(
248+
int corePoolSize,
249+
int maximumPoolSize,
250+
long keepAliveTime,
251+
TimeUnit unit,
252+
BlockingQueue<Runnable> workQueue,
253+
RejectedExecutionHandler handler,
254+
String poolName,
255+
boolean needRegisterMetric,
256+
PreExecutionAuthenticator preAuth) {
257+
ThreadFactory threadFactory = namedThreadFactoryWithPreAuth(poolName, preAuth);
258+
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
259+
keepAliveTime, unit, workQueue, threadFactory, handler);
260+
if (needRegisterMetric) {
261+
nameToThreadPoolMap.put(poolName, threadPool);
262+
}
263+
return threadPool;
264+
}
265+
266+
private static ThreadFactory namedThreadFactoryWithPreAuth(String poolName, PreExecutionAuthenticator preAuth) {
267+
return new ThreadFactoryBuilder()
268+
.setDaemon(true)
269+
.setNameFormat(poolName + "-%d")
270+
.setThreadFactory(runnable -> new Thread(() -> {
271+
try {
272+
preAuth.execute(runnable);
273+
} catch (Exception e) {
274+
throw new RuntimeException(e);
275+
}
276+
}))
277+
.build();
278+
}
279+
232280
private static class PriorityThreadPoolExecutor<T> extends ThreadPoolExecutor {
233281

234282
private final Comparator<T> comparator;
@@ -384,4 +432,25 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
384432
}
385433
}
386434
}
435+
436+
public static void shutdownExecutorService(ExecutorService executorService) {
437+
// Disable new tasks from being submitted
438+
executorService.shutdown();
439+
try {
440+
// Wait a while for existing tasks to terminate
441+
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
442+
// Cancel currently executing tasks
443+
executorService.shutdownNow();
444+
// Wait a while for tasks to respond to being cancelled
445+
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
446+
LOG.warn("ExecutorService did not terminate");
447+
}
448+
}
449+
} catch (InterruptedException e) {
450+
// (Re-)Cancel if current thread also interrupted
451+
executorService.shutdownNow();
452+
// Preserve interrupt status
453+
Thread.currentThread().interrupt();
454+
}
455+
}
387456
}

fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.doris.common.DdlException;
3636
import org.apache.doris.common.FeConstants;
3737
import org.apache.doris.common.Pair;
38+
import org.apache.doris.common.ThreadPoolManager;
3839
import org.apache.doris.common.UserException;
3940
import org.apache.doris.common.Version;
4041
import org.apache.doris.common.io.Text;
@@ -98,6 +99,7 @@
9899
import java.util.Optional;
99100
import java.util.OptionalLong;
100101
import java.util.Set;
102+
import java.util.concurrent.ThreadPoolExecutor;
101103
import java.util.stream.Collectors;
102104

103105
/**
@@ -167,6 +169,7 @@ public abstract class ExternalCatalog
167169
protected Optional<Boolean> useMetaCache = Optional.empty();
168170
protected MetaCache<ExternalDatabase<? extends ExternalTable>> metaCache;
169171
protected PreExecutionAuthenticator preExecutionAuthenticator;
172+
protected ThreadPoolExecutor threadPoolWithPreAuth;
170173

171174
private volatile Configuration cachedConf = null;
172175
private byte[] confLock = new byte[0];
@@ -764,6 +767,9 @@ public void onClose() {
764767
if (null != transactionManager) {
765768
transactionManager = null;
766769
}
770+
if (threadPoolWithPreAuth != null) {
771+
ThreadPoolManager.shutdownExecutorService(threadPoolWithPreAuth);
772+
}
767773
CatalogIf.super.onClose();
768774
}
769775

fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public class ExternalMetaCacheMgr {
8383
private ExecutorService rowCountRefreshExecutor;
8484
private ExecutorService commonRefreshExecutor;
8585
private ExecutorService fileListingExecutor;
86+
// This executor is used to schedule the getting split tasks
8687
private ExecutorService scheduleExecutor;
8788

8889
// catalog id -> HiveMetaStoreCache

fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -335,17 +335,19 @@ public void createScanRangeLocations() throws UserException {
335335
if (splitAssignment.getSampleSplit() == null && !isFileStreamType()) {
336336
return;
337337
}
338-
selectedSplitNum = numApproximateSplits();
339-
340338
FileSplit fileSplit = (FileSplit) splitAssignment.getSampleSplit();
341339
TFileType locationType = fileSplit.getLocationType();
340+
selectedSplitNum = numApproximateSplits();
341+
if (selectedSplitNum < 0) {
342+
throw new UserException("Approximate split number should not be negative");
343+
}
342344
totalFileSize = fileSplit.getLength() * selectedSplitNum;
343345
long maxWaitTime = sessionVariable.getFetchSplitsMaxWaitTime();
344346
// Not accurate, only used to estimate concurrency.
345347
// Here, we must take the max of 1, because
346348
// in the case of multiple BEs, `numApproximateSplits() / backendPolicy.numBackends()` may be 0,
347349
// and finally numSplitsPerBE is 0, resulting in no data being queried.
348-
int numSplitsPerBE = Math.max(numApproximateSplits() / backendPolicy.numBackends(), 1);
350+
int numSplitsPerBE = Math.max(selectedSplitNum / backendPolicy.numBackends(), 1);
349351
for (Backend backend : backendPolicy.getBackends()) {
350352
SplitSource splitSource = new SplitSource(backend, splitAssignment, maxWaitTime);
351353
splitSources.add(splitSource);

fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java

Lines changed: 62 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323
import org.apache.doris.thrift.TScanRangeLocations;
2424

2525
import com.google.common.collect.Multimap;
26+
import org.apache.logging.log4j.LogManager;
27+
import org.apache.logging.log4j.Logger;
2628

29+
import java.io.Closeable;
2730
import java.util.ArrayList;
2831
import java.util.Collection;
2932
import java.util.HashSet;
@@ -33,13 +36,15 @@
3336
import java.util.concurrent.BlockingQueue;
3437
import java.util.concurrent.ConcurrentHashMap;
3538
import java.util.concurrent.LinkedBlockingQueue;
39+
import java.util.concurrent.TimeUnit;
3640
import java.util.concurrent.atomic.AtomicBoolean;
3741

3842
/**
3943
* When file splits are supplied in batch mode, splits are generated lazily and assigned in each call of `getNextBatch`.
4044
* `SplitGenerator` provides the file splits, and `FederationBackendPolicy` assigns these splits to backends.
4145
*/
4246
public class SplitAssignment {
47+
private static final Logger LOG = LogManager.getLogger(SplitAssignment.class);
4348
private final Set<Long> sources = new HashSet<>();
4449
private final FederationBackendPolicy backendPolicy;
4550
private final SplitGenerator splitGenerator;
@@ -50,10 +55,11 @@ public class SplitAssignment {
5055
private final List<String> pathPartitionKeys;
5156
private final Object assignLock = new Object();
5257
private Split sampleSplit = null;
53-
private final AtomicBoolean isStop = new AtomicBoolean(false);
58+
private final AtomicBoolean isStopped = new AtomicBoolean(false);
5459
private final AtomicBoolean scheduleFinished = new AtomicBoolean(false);
5560

5661
private UserException exception = null;
62+
private final List<Closeable> closeableResources = new ArrayList<>();
5763

5864
public SplitAssignment(
5965
FederationBackendPolicy backendPolicy,
@@ -71,21 +77,29 @@ public SplitAssignment(
7177
public void init() throws UserException {
7278
splitGenerator.startSplit(backendPolicy.numBackends());
7379
synchronized (assignLock) {
74-
while (sampleSplit == null && waitFirstSplit()) {
80+
final int waitIntervalTimeMillis = 100;
81+
final int initTimeoutMillis = 30000; // 30s
82+
int waitTotalTime = 0;
83+
while (sampleSplit == null && needMoreSplit()) {
7584
try {
76-
assignLock.wait(100);
85+
assignLock.wait(waitIntervalTimeMillis);
7786
} catch (InterruptedException e) {
7887
throw new UserException(e.getMessage(), e);
7988
}
89+
waitTotalTime += waitIntervalTimeMillis;
90+
if (waitTotalTime > initTimeoutMillis) {
91+
throw new UserException("Failed to get first split after waiting for "
92+
+ (waitTotalTime / 1000) + " seconds.");
93+
}
8094
}
8195
}
8296
if (exception != null) {
8397
throw exception;
8498
}
8599
}
86100

87-
private boolean waitFirstSplit() {
88-
return !scheduleFinished.get() && !isStop.get() && exception == null;
101+
public boolean needMoreSplit() {
102+
return !scheduleFinished.get() && !isStopped.get() && exception == null;
89103
}
90104

91105
private void appendBatch(Multimap<Backend, Split> batch) throws UserException {
@@ -95,8 +109,16 @@ private void appendBatch(Multimap<Backend, Split> batch) throws UserException {
95109
for (Split split : splits) {
96110
locations.add(splitToScanRange.getScanRange(backend, locationProperties, split, pathPartitionKeys));
97111
}
98-
if (!assignment.computeIfAbsent(backend, be -> new LinkedBlockingQueue<>()).offer(locations)) {
99-
throw new UserException("Failed to offer batch split");
112+
while (needMoreSplit()) {
113+
BlockingQueue<Collection<TScanRangeLocations>> queue =
114+
assignment.computeIfAbsent(backend, be -> new LinkedBlockingQueue<>(10000));
115+
try {
116+
if (queue.offer(locations, 100, TimeUnit.MILLISECONDS)) {
117+
break;
118+
}
119+
} catch (InterruptedException e) {
120+
addUserException(new UserException("Failed to offer batch split by interrupted", e));
121+
}
100122
}
101123
}
102124
}
@@ -113,7 +135,7 @@ public Split getSampleSplit() {
113135
return sampleSplit;
114136
}
115137

116-
public void addToQueue(List<Split> splits) {
138+
public void addToQueue(List<Split> splits) throws UserException {
117139
if (splits.isEmpty()) {
118140
return;
119141
}
@@ -123,19 +145,9 @@ public void addToQueue(List<Split> splits) {
123145
sampleSplit = splits.get(0);
124146
assignLock.notify();
125147
}
126-
try {
127-
batch = backendPolicy.computeScanRangeAssignment(splits);
128-
} catch (UserException e) {
129-
exception = e;
130-
}
131-
}
132-
if (batch != null) {
133-
try {
134-
appendBatch(batch);
135-
} catch (UserException e) {
136-
exception = e;
137-
}
148+
batch = backendPolicy.computeScanRangeAssignment(splits);
138149
}
150+
appendBatch(batch);
139151
}
140152

141153
private void notifyAssignment() {
@@ -150,28 +162,54 @@ public BlockingQueue<Collection<TScanRangeLocations>> getAssignedSplits(Backend
150162
}
151163
BlockingQueue<Collection<TScanRangeLocations>> splits = assignment.computeIfAbsent(backend,
152164
be -> new LinkedBlockingQueue<>());
153-
if (scheduleFinished.get() && splits.isEmpty() || isStop.get()) {
165+
if (scheduleFinished.get() && splits.isEmpty() || isStopped.get()) {
154166
return null;
155167
}
156168
return splits;
157169
}
158170

159171
public void setException(UserException e) {
160-
exception = e;
172+
addUserException(e);
161173
notifyAssignment();
162174
}
163175

176+
private void addUserException(UserException e) {
177+
if (exception != null) {
178+
exception.addSuppressed(e);
179+
} else {
180+
exception = e;
181+
}
182+
}
183+
164184
public void finishSchedule() {
165185
scheduleFinished.set(true);
166186
notifyAssignment();
167187
}
168188

169189
public void stop() {
170-
isStop.set(true);
190+
if (isStop()) {
191+
return;
192+
}
193+
isStopped.set(true);
194+
closeableResources.forEach((closeable) -> {
195+
try {
196+
closeable.close();
197+
} catch (Exception e) {
198+
LOG.warn("close resource error:{}", e.getMessage(), e);
199+
// ignore
200+
}
201+
});
171202
notifyAssignment();
203+
if (exception != null) {
204+
throw new RuntimeException(exception);
205+
}
172206
}
173207

174208
public boolean isStop() {
175-
return isStop.get();
209+
return isStopped.get();
210+
}
211+
212+
public void addCloseable(Closeable resource) {
213+
closeableResources.add(resource);
176214
}
177215
}

fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ default int numApproximateSplits() {
5252
return -1;
5353
}
5454

55-
default void startSplit(int numBackends) {
55+
default void startSplit(int numBackends) throws UserException {
5656
}
5757

5858
/**

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,9 @@ public void startSplit(int numBackends) {
223223
if (allFiles.size() > numSplitsPerPartition.get()) {
224224
numSplitsPerPartition.set(allFiles.size());
225225
}
226-
splitAssignment.addToQueue(allFiles);
226+
if (splitAssignment.needMoreSplit()) {
227+
splitAssignment.addToQueue(allFiles);
228+
}
227229
} catch (Exception e) {
228230
batchException.set(new UserException(e.getMessage(), e));
229231
} finally {

0 commit comments

Comments
 (0)