Skip to content

Commit 157b96e

Browse files
[GLUTEN-10933][VL] Support resize GPU batch and shuffle reader outputs cudf::table (#11090)
Now in the post rule, we can only get the stage level plan, so we cannot decide if the ColumnarExchange output should be cudf format, currently, we suppose the ColumnarExchange is always CudfColumnarExchange, will support validation in injectQueryStagePrepRule where we can see the total plan, generated the transformed plan and validate it, then discard it. After CudfColumnarExchange, concat the batches in GpuResizeBufferColumnarBatchExec to output cudf::table with batch size maximum integer, velox-cudf does not control the batch size now. So the CPU shuffle reader does the decompression and prepare the first batch while GPU executes the CPU tasks one by one. Performance: After this change, the stage 2 in TPCDS Q95 SF100 time decreases from 27s to 13s. In the shuffle writer, split the bool to byte, and timestamp to nanoseconds int64_t to match the cudf format. Now BroadcastExchange outputs RowVector, so the CudfFromVeclox operator should accept RowVector and CudfVector with this commit, oap-project/velox@3d6ab40 Also fix the Gluten build with GPU. Related issue: #10933
1 parent 54e7e49 commit 157b96e

File tree

44 files changed

+2284
-559
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+2284
-559
lines changed

.github/workflows/velox_backend_x86.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1318,7 +1318,7 @@ jobs:
13181318
dnf config-manager --add-repo "$repo_url"
13191319
dnf install -y libnvjitlink-devel-12-8
13201320
df -a
1321-
bash dev/buildbundle-veloxbe.sh --run_setup_script=OFF --build_arrow=OFF --spark_version=3.4 --enable_gpu=ON
1321+
bash dev/buildbundle-veloxbe.sh --run_setup_script=OFF --build_arrow=OFF --spark_version=3.4 --build_tests=ON --build_benchmarks=ON --enable_gpu=ON
13221322
ccache -s
13231323
13241324
spark-test-spark40:
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gluten.utils;
18+
19+
import org.apache.gluten.runtime.Runtime;
20+
import org.apache.gluten.runtime.RuntimeAware;
21+
import org.apache.gluten.vectorized.ColumnarBatchInIterator;
22+
23+
public class GpuBufferBatchResizerJniWrapper implements RuntimeAware {
24+
private final Runtime runtime;
25+
26+
private GpuBufferBatchResizerJniWrapper(Runtime runtime) {
27+
this.runtime = runtime;
28+
}
29+
30+
public static GpuBufferBatchResizerJniWrapper create(Runtime runtime) {
31+
return new GpuBufferBatchResizerJniWrapper(runtime);
32+
}
33+
34+
@Override
35+
public long rtHandle() {
36+
return runtime.getHandle();
37+
}
38+
39+
public native long create(int minOutputBatchSize, ColumnarBatchInIterator itr);
40+
}

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package org.apache.gluten.backendsapi.velox
1818

1919
import org.apache.gluten.backendsapi.MetricsApi
20-
import org.apache.gluten.config.{HashShuffleWriterType, RssSortShuffleWriterType, ShuffleWriterType, SortShuffleWriterType}
20+
import org.apache.gluten.config.{GpuHashShuffleWriterType, HashShuffleWriterType, RssSortShuffleWriterType, ShuffleWriterType, SortShuffleWriterType}
2121
import org.apache.gluten.metrics._
2222
import org.apache.gluten.substrait.{AggregationParams, JoinParams}
2323

@@ -370,7 +370,7 @@ class VeloxMetricsApi extends MetricsApi with Logging {
370370
"peakBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak bytes allocated")
371371
)
372372
shuffleWriterType match {
373-
case HashShuffleWriterType =>
373+
case HashShuffleWriterType | GpuHashShuffleWriterType =>
374374
baseMetrics ++ Map(
375375
"splitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to split"),
376376
"avgDictionaryFields" -> SQLMetrics

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ object VeloxRuleApi {
109109

110110
// Legacy: Post-transform rules.
111111
injector.injectPostTransform(_ => AppendBatchResizeForShuffleInputAndOutput())
112+
injector.injectPostTransform(_ => GpuBufferBatchResizeForShuffleInputOutput())
112113
injector.injectPostTransform(_ => UnionTransformerRule())
113114
injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
114115
injector.injectPostTransform(_ => PartialGenerateRule())
@@ -209,6 +210,7 @@ object VeloxRuleApi {
209210
// Gluten RAS: Post rules.
210211
injector.injectPostTransform(_ => DistinguishIdenticalScans)
211212
injector.injectPostTransform(_ => AppendBatchResizeForShuffleInputAndOutput())
213+
injector.injectPostTransform(_ => GpuBufferBatchResizeForShuffleInputOutput())
212214
injector.injectPostTransform(_ => RemoveTransitions)
213215
injector.injectPostTransform(_ => UnionTransformerRule())
214216
injector.injectPostTransform(c => PartialProjectRule.apply(c.session))

backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
8080

8181
def cudfEnableValidation: Boolean = getConf(CUDF_ENABLE_VALIDATION)
8282

83+
def cudfBatchSize: Int = getConf(CUDF_BATCH_SIZE)
84+
8385
def orcUseColumnNames: Boolean = getConf(ORC_USE_COLUMN_NAMES)
8486

8587
def parquetUseColumnNames: Boolean = getConf(PARQUET_USE_COLUMN_NAMES)
@@ -634,6 +636,12 @@ object VeloxConfig extends ConfigRegistry {
634636
.booleanConf
635637
.createWithDefault(true)
636638

639+
val CUDF_BATCH_SIZE =
640+
buildConf("spark.gluten.sql.columnar.backend.velox.cudf.batchSize")
641+
.doc("Cudf input batch size after shuffle reader")
642+
.intConf
643+
.createWithDefault(Integer.MAX_VALUE)
644+
637645
val MEMORY_DUMP_ON_EXIT =
638646
buildConf("spark.gluten.monitor.memoryDumpOnExit")
639647
.internal()
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gluten.execution
18+
19+
import org.apache.gluten.backendsapi.BackendsApiManager
20+
import org.apache.gluten.backendsapi.velox.VeloxBatchType
21+
import org.apache.gluten.extension.columnar.transition.Convention
22+
import org.apache.gluten.iterator.ClosableIterator
23+
import org.apache.gluten.runtime.Runtimes
24+
import org.apache.gluten.utils.GpuBufferBatchResizerJniWrapper
25+
import org.apache.gluten.vectorized.{ColumnarBatchInIterator, ColumnarBatchOutIterator}
26+
27+
import org.apache.spark.sql.catalyst.expressions.SortOrder
28+
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
29+
import org.apache.spark.sql.execution.SparkPlan
30+
import org.apache.spark.sql.vectorized.ColumnarBatch
31+
32+
import scala.collection.JavaConverters._
33+
34+
/**
35+
* An operator to resize input BufferBatches generated by shuffle reader, and convert to cudf table.
36+
*/
37+
case class GpuResizeBufferColumnarBatchExec(override val child: SparkPlan, minOutputBatchSize: Int)
38+
extends ColumnarToColumnarExec(child) {
39+
40+
override protected def mapIterator(in: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = {
41+
val runtime =
42+
Runtimes.contextInstance(BackendsApiManager.getBackendName, "GpuBufferColumnarBatchResizer")
43+
val outHandle = GpuBufferBatchResizerJniWrapper
44+
.create(runtime)
45+
.create(
46+
minOutputBatchSize,
47+
new ColumnarBatchInIterator(BackendsApiManager.getBackendName, in.asJava))
48+
new ColumnarBatchOutIterator(runtime, outHandle).asScala
49+
}
50+
51+
override protected def closeIterator(out: Iterator[ColumnarBatch]): Unit = {
52+
out.asJava match {
53+
case c: ClosableIterator[ColumnarBatch] => c.close()
54+
case _ =>
55+
}
56+
}
57+
58+
override protected def needRecyclePayload: Boolean = true
59+
60+
override def outputPartitioning: Partitioning = child.outputPartitioning
61+
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
62+
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
63+
copy(child = newChild)
64+
65+
override def batchType(): Convention.BatchType = VeloxBatchType
66+
67+
override def rowType0(): Convention.RowType = Convention.RowType.None
68+
}

backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
3030
*/
3131
case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan] {
3232
override def apply(plan: SparkPlan): SparkPlan = {
33+
if (VeloxConfig.get.enableColumnarCudf) {
34+
return plan
35+
}
3336
val resizeBatchesShuffleInputEnabled = VeloxConfig.get.veloxResizeBatchesShuffleInput
3437
val resizeBatchesShuffleOutputEnabled = VeloxConfig.get.veloxResizeBatchesShuffleOutput
3538
if (!resizeBatchesShuffleInputEnabled && !resizeBatchesShuffleOutputEnabled) {

backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ package org.apache.gluten.extension
1818

1919
import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
2020
import org.apache.gluten.cudf.VeloxCudfPlanValidatorJniWrapper
21-
import org.apache.gluten.execution.{CudfTag, LeafTransformSupport, TransformSupport, VeloxResizeBatchesExec, WholeStageTransformer}
22-
import org.apache.gluten.extension.CudfNodeValidationRule.setTagForWholeStageTransformer
21+
import org.apache.gluten.exception.GlutenNotSupportException
22+
import org.apache.gluten.execution._
23+
import org.apache.gluten.extension.CudfNodeValidationRule.{createGPUColumnarExchange, setTagForWholeStageTransformer}
2324

2425
import org.apache.spark.sql.catalyst.rules.Rule
2526
import org.apache.spark.sql.execution.{ColumnarShuffleExchangeExec, GPUColumnarShuffleExchangeExec, SparkPlan}
@@ -31,37 +32,23 @@ case class CudfNodeValidationRule(glutenConf: GlutenConfig) extends Rule[SparkPl
3132
if (!glutenConf.enableColumnarCudf) {
3233
return plan
3334
}
34-
plan.transformUp {
35+
val transformedPlan = plan.transformUp {
3536
case shuffle @ ColumnarShuffleExchangeExec(
3637
_,
37-
v @ VeloxResizeBatchesExec(w: WholeStageTransformer, _, _),
38+
VeloxResizeBatchesExec(w: WholeStageTransformer, _, _),
3839
_,
3940
_,
4041
_) =>
4142
setTagForWholeStageTransformer(w)
42-
if (w.isCudf) {
43-
log.info("VeloxResizeBatchesExec is not supported in GPU")
44-
}
45-
GPUColumnarShuffleExchangeExec(
46-
shuffle.outputPartitioning,
47-
w,
48-
shuffle.shuffleOrigin,
49-
shuffle.projectOutputAttributes,
50-
shuffle.advisoryPartitionSize)
51-
43+
createGPUColumnarExchange(shuffle)
5244
case shuffle @ ColumnarShuffleExchangeExec(_, w: WholeStageTransformer, _, _, _) =>
5345
setTagForWholeStageTransformer(w)
54-
GPUColumnarShuffleExchangeExec(
55-
shuffle.outputPartitioning,
56-
w,
57-
shuffle.shuffleOrigin,
58-
shuffle.projectOutputAttributes,
59-
shuffle.advisoryPartitionSize)
60-
46+
createGPUColumnarExchange(shuffle)
6147
case transformer: WholeStageTransformer =>
6248
setTagForWholeStageTransformer(transformer)
6349
transformer
6450
}
51+
transformedPlan
6552
}
6653
}
6754

@@ -93,4 +80,18 @@ object CudfNodeValidationRule {
9380
transformer.setTagValue(CudfTag.CudfTag, true)
9481
}
9582
}
83+
84+
def createGPUColumnarExchange(shuffle: ColumnarShuffleExchangeExec): SparkPlan = {
85+
val exec = GPUColumnarShuffleExchangeExec(
86+
shuffle.outputPartitioning,
87+
shuffle.child,
88+
shuffle.shuffleOrigin,
89+
shuffle.projectOutputAttributes,
90+
shuffle.advisoryPartitionSize)
91+
val res = exec.doValidate()
92+
if (!res.ok()) {
93+
throw new GlutenNotSupportException(res.reason())
94+
}
95+
exec
96+
}
9697
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gluten.extension
18+
19+
import org.apache.gluten.config.{HashShuffleWriterType, VeloxConfig}
20+
import org.apache.gluten.execution.{GpuResizeBufferColumnarBatchExec, VeloxResizeBatchesExec}
21+
22+
import org.apache.spark.sql.catalyst.rules.Rule
23+
import org.apache.spark.sql.execution.{ColumnarShuffleExchangeExec, ColumnarShuffleExchangeExecBase, SparkPlan}
24+
import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, ShuffleQueryStageExec}
25+
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
26+
27+
/**
28+
* Try to append [[GpuBufferBatchResizeForShuffleInputOutput]] for shuffle input and output to make
29+
* the batch sizes in good shape.
30+
*/
31+
case class GpuBufferBatchResizeForShuffleInputOutput() extends Rule[SparkPlan] {
32+
override def apply(plan: SparkPlan): SparkPlan = {
33+
if (!VeloxConfig.get.enableColumnarCudf) {
34+
return plan
35+
}
36+
val range = VeloxConfig.get.veloxResizeBatchesShuffleInputOutputRange
37+
val batchSize = VeloxConfig.get.cudfBatchSize
38+
plan.transformUp {
39+
case shuffle: ColumnarShuffleExchangeExec
40+
if shuffle.shuffleWriterType == HashShuffleWriterType &&
41+
VeloxConfig.get.veloxResizeBatchesShuffleInput =>
42+
val appendBatches =
43+
VeloxResizeBatchesExec(shuffle.child, range.min, range.max)
44+
shuffle.withNewChildren(Seq(appendBatches))
45+
case a @ AQEShuffleReadExec(
46+
ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExecBase, _),
47+
_) =>
48+
GpuResizeBufferColumnarBatchExec(a, batchSize)
49+
case a @ AQEShuffleReadExec(
50+
ShuffleQueryStageExec(_, ReusedExchangeExec(_, _: ColumnarShuffleExchangeExecBase), _),
51+
_) =>
52+
GpuResizeBufferColumnarBatchExec(a, batchSize)
53+
// Since it's transformed in a bottom to up order, so we may first encounter
54+
// ShuffeQueryStageExec, which is transformed to VeloxResizeBatchesExec(ShuffeQueryStageExec),
55+
// then we see AQEShuffleReadExec
56+
case a @ AQEShuffleReadExec(
57+
GpuResizeBufferColumnarBatchExec(
58+
s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExecBase, _),
59+
_),
60+
_) =>
61+
GpuResizeBufferColumnarBatchExec(a.copy(child = s), batchSize)
62+
case a @ AQEShuffleReadExec(
63+
GpuResizeBufferColumnarBatchExec(
64+
s @ ShuffleQueryStageExec(
65+
_,
66+
ReusedExchangeExec(_, _: ColumnarShuffleExchangeExecBase),
67+
_),
68+
_),
69+
_) =>
70+
GpuResizeBufferColumnarBatchExec(a.copy(child = s), batchSize)
71+
case s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExecBase, _) =>
72+
GpuResizeBufferColumnarBatchExec(s, batchSize)
73+
case s @ ShuffleQueryStageExec(
74+
_,
75+
ReusedExchangeExec(_, _: ColumnarShuffleExchangeExecBase),
76+
_) =>
77+
GpuResizeBufferColumnarBatchExec(s, batchSize)
78+
}
79+
}
80+
}

backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.spark.shuffle
1818

1919
import org.apache.gluten.backendsapi.BackendsApiManager
2020
import org.apache.gluten.columnarbatch.ColumnarBatches
21-
import org.apache.gluten.config.{GlutenConfig, HashShuffleWriterType, SortShuffleWriterType}
21+
import org.apache.gluten.config.{GlutenConfig, GpuHashShuffleWriterType, HashShuffleWriterType, SortShuffleWriterType}
2222
import org.apache.gluten.memory.memtarget.{MemoryTarget, Spiller}
2323
import org.apache.gluten.runtime.Runtimes
2424
import org.apache.gluten.vectorized._
@@ -44,7 +44,7 @@ class ColumnarShuffleWriter[K, V](
4444
private val dep = handle.dependency.asInstanceOf[ColumnarShuffleDependency[K, V, V]]
4545

4646
dep.shuffleWriterType match {
47-
case HashShuffleWriterType | SortShuffleWriterType =>
47+
case HashShuffleWriterType | SortShuffleWriterType | GpuHashShuffleWriterType =>
4848
// Valid shuffle writer types
4949
case _ =>
5050
throw new IllegalArgumentException(
@@ -171,6 +171,17 @@ class ColumnarShuffleWriter[K, V](
171171
conf.get(SHUFFLE_SORT_USE_RADIXSORT),
172172
partitionWriterHandle
173173
)
174+
} else if (dep.shuffleWriterType == GpuHashShuffleWriterType) {
175+
shuffleWriterJniWrapper.createGpuHashShuffleWriter(
176+
numPartitions,
177+
dep.nativePartitioning.getShortName,
178+
GlutenShuffleUtils.getStartPartitionId(
179+
dep.nativePartitioning,
180+
taskContext.partitionId),
181+
nativeBufferSize,
182+
reallocThreshold,
183+
partitionWriterHandle
184+
)
174185
} else {
175186
shuffleWriterJniWrapper.createHashShuffleWriter(
176187
numPartitions,

0 commit comments

Comments
 (0)