@@ -22,6 +22,7 @@ import org.apache.gluten.execution.VeloxResizeBatchesExec
2222import org .apache .spark .sql .catalyst .rules .Rule
2323import org .apache .spark .sql .execution .{ColumnarShuffleExchangeExec , SparkPlan }
2424import org .apache .spark .sql .execution .adaptive .{AQEShuffleReadExec , ShuffleQueryStageExec }
25+ import org .apache .spark .sql .execution .exchange .ReusedExchangeExec
2526
2627/**
2728 * Try to append [[VeloxResizeBatchesExec ]] for shuffle input and output to make the batch sizes in
@@ -49,7 +50,16 @@ case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan] {
4950 if resizeBatchesShuffleOutputEnabled &&
5051 shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
5152 VeloxResizeBatchesExec (a, range.min, range.max)
52- // Since it's transformed in a bottom to up order, so we may first encountered
53+ case a @ AQEShuffleReadExec (
54+ ShuffleQueryStageExec (
55+ _,
56+ ReusedExchangeExec (_, shuffle : ColumnarShuffleExchangeExec ),
57+ _),
58+ _)
59+ if resizeBatchesShuffleOutputEnabled &&
60+ shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
61+ VeloxResizeBatchesExec (a, range.min, range.max)
62+ // Since it's transformed in a bottom to up order, so we may first encounter
5363 // ShuffeQueryStageExec, which is transformed to VeloxResizeBatchesExec(ShuffeQueryStageExec),
5464 // then we see AQEShuffleReadExec
5565 case a @ AQEShuffleReadExec (
@@ -61,10 +71,29 @@ case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan] {
6171 if resizeBatchesShuffleOutputEnabled &&
6272 shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
6373 VeloxResizeBatchesExec (a.copy(child = s), range.min, range.max)
74+ case a @ AQEShuffleReadExec (
75+ VeloxResizeBatchesExec (
76+ s @ ShuffleQueryStageExec (
77+ _,
78+ ReusedExchangeExec (_, shuffle : ColumnarShuffleExchangeExec ),
79+ _),
80+ _,
81+ _),
82+ _)
83+ if resizeBatchesShuffleOutputEnabled &&
84+ shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
85+ VeloxResizeBatchesExec (a.copy(child = s), range.min, range.max)
6486 case s @ ShuffleQueryStageExec (_, shuffle : ColumnarShuffleExchangeExec , _)
6587 if resizeBatchesShuffleOutputEnabled &&
6688 shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
6789 VeloxResizeBatchesExec (s, range.min, range.max)
90+ case s @ ShuffleQueryStageExec (
91+ _,
92+ ReusedExchangeExec (_, shuffle : ColumnarShuffleExchangeExec ),
93+ _)
94+ if resizeBatchesShuffleOutputEnabled &&
95+ shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
96+ VeloxResizeBatchesExec (s, range.min, range.max)
6897 }
6998 }
7099}
0 commit comments