Skip to content

Commit b7a42cd

Browse files
authored
[spark] Support CopyFilesProcedure in spark (#6625)
1 parent 17f3c1e commit b7a42cd

File tree

15 files changed

+1426
-0
lines changed

15 files changed

+1426
-0
lines changed

docs/content/spark/procedures.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,5 +463,18 @@ This section introduce all available spark procedures about paimon.
463463
CALL sys.rewrite_file_index(table => "t", where => "day = '2025-08-17'")<br/>
464464
</td>
465465
</tr>
466+
<tr>
467+
<td>copy</td>
468+
<td>
469+
copy table files. Arguments:
470+
<li>source_table: the source table identifier. Cannot be empty.</li>
471+
<li>target_table: the target table identifier. Cannot be empty.</li>
472+
<li>where: partition predicate. Left empty for all partitions.</li>
473+
</td>
474+
<td>
475+
CALL sys.copy(source_table => "t1", target_table => "t1_copy")<br/>
476+
CALL sys.copy(source_table => "t1", target_table => "t1_copy", where => "day = '2025-08-17'")<br/>
477+
</td>
478+
</tr>
466479
</tbody>
467480
</table>

paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.paimon.fs.FileIO;
2929
import org.apache.paimon.fs.FileStatus;
3030
import org.apache.paimon.fs.Path;
31+
import org.apache.paimon.index.IndexFileMeta;
3132
import org.apache.paimon.io.CompactIncrement;
3233
import org.apache.paimon.io.DataFileMeta;
3334
import org.apache.paimon.io.DataIncrement;
@@ -95,6 +96,26 @@ public static CommitMessage createCommitMessage(
9596
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
9697
}
9798

99+
public static CommitMessage createCommitMessage(
100+
BinaryRow partition,
101+
int bucket,
102+
int totalBuckets,
103+
List<DataFileMeta> dataFileMetas,
104+
List<IndexFileMeta> indexFileMetas) {
105+
return new CommitMessageImpl(
106+
partition,
107+
bucket,
108+
totalBuckets,
109+
new DataIncrement(
110+
dataFileMetas,
111+
Collections.emptyList(),
112+
Collections.emptyList(),
113+
indexFileMetas,
114+
Collections.emptyList()),
115+
new CompactIncrement(
116+
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
117+
}
118+
98119
public static DataFileMeta constructFileMeta(
99120
String format,
100121
FileStatus fileStatus,

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.paimon.spark.procedure.ClearConsumersProcedure;
2424
import org.apache.paimon.spark.procedure.CompactManifestProcedure;
2525
import org.apache.paimon.spark.procedure.CompactProcedure;
26+
import org.apache.paimon.spark.procedure.CopyFilesProcedure;
2627
import org.apache.paimon.spark.procedure.CreateBranchProcedure;
2728
import org.apache.paimon.spark.procedure.CreateFunctionProcedure;
2829
import org.apache.paimon.spark.procedure.CreateTagFromTimestampProcedure;
@@ -113,6 +114,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
113114
procedureBuilders.put(
114115
"trigger_tag_automatic_creation", TriggerTagAutomaticCreationProcedure::builder);
115116
procedureBuilders.put("rewrite_file_index", RewriteFileIndexProcedure::builder);
117+
procedureBuilders.put("copy", CopyFilesProcedure::builder);
116118
return procedureBuilders.build();
117119
}
118120
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.copy;
20+
21+
import org.apache.paimon.catalog.Catalog;
22+
import org.apache.paimon.catalog.Identifier;
23+
import org.apache.paimon.fs.Path;
24+
import org.apache.paimon.spark.utils.SparkProcedureUtils;
25+
import org.apache.paimon.table.FileStoreTable;
26+
27+
import org.apache.commons.collections4.CollectionUtils;
28+
import org.apache.spark.api.java.JavaRDD;
29+
import org.apache.spark.api.java.JavaSparkContext;
30+
import org.apache.spark.api.java.function.FlatMapFunction;
31+
import org.apache.spark.sql.SparkSession;
32+
33+
import java.util.ArrayList;
34+
import java.util.Iterator;
35+
import java.util.List;
36+
37+
/** Copy data files from source table to target table. */
38+
public class CopyDataFilesOperator extends CopyFilesOperator {
39+
40+
public CopyDataFilesOperator(SparkSession spark, Catalog sourceCatalog, Catalog targetCatalog) {
41+
super(spark, sourceCatalog, targetCatalog);
42+
}
43+
44+
public JavaRDD<CopyFileInfo> execute(
45+
Identifier sourceIdentifier, Identifier targetIdentifier, List<CopyFileInfo> dataFiles)
46+
throws Exception {
47+
if (CollectionUtils.isEmpty(dataFiles)) {
48+
return null;
49+
}
50+
FileStoreTable sourceTable = (FileStoreTable) sourceCatalog.getTable(sourceIdentifier);
51+
FileStoreTable targetTable = (FileStoreTable) targetCatalog.getTable(targetIdentifier);
52+
int readParallelism = SparkProcedureUtils.readParallelism(dataFiles, spark);
53+
JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());
54+
JavaRDD<CopyFileInfo> copyFileInfoRdd =
55+
context.parallelize(dataFiles, readParallelism)
56+
.mapPartitions(new DataFileProcesser(sourceTable, targetTable));
57+
return copyFileInfoRdd;
58+
}
59+
60+
/** Copy data files. */
61+
public static class DataFileProcesser
62+
implements FlatMapFunction<Iterator<CopyFileInfo>, CopyFileInfo> {
63+
64+
private final FileStoreTable sourceTable;
65+
private final FileStoreTable targetTable;
66+
67+
public DataFileProcesser(FileStoreTable sourceTable, FileStoreTable targetTable) {
68+
this.sourceTable = sourceTable;
69+
this.targetTable = targetTable;
70+
}
71+
72+
@Override
73+
public Iterator<CopyFileInfo> call(Iterator<CopyFileInfo> dataFileIterator)
74+
throws Exception {
75+
List<CopyFileInfo> result = new ArrayList<>();
76+
while (dataFileIterator.hasNext()) {
77+
CopyFileInfo file = dataFileIterator.next();
78+
Path sourcePath = new Path(file.sourceFilePath());
79+
Path targetPath = new Path(file.targetFilePath());
80+
CopyFilesUtil.copyFiles(
81+
sourceTable.fileIO(), targetTable.fileIO(), sourcePath, targetPath, false);
82+
result.add(file);
83+
}
84+
return result.iterator();
85+
}
86+
}
87+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.copy;
20+
21+
import java.io.Serializable;
22+
23+
/** The information of copy data file. */
24+
public class CopyFileInfo implements Serializable {
25+
26+
private static final long serialVersionUID = 1L;
27+
28+
private final String sourceFilePath;
29+
30+
private final String targetFilePath;
31+
32+
private final byte[] partition;
33+
34+
private final int bucket;
35+
36+
private final int totalBuckets;
37+
38+
private final byte[] dataFileMeta;
39+
40+
public CopyFileInfo(
41+
String sourceFilePath,
42+
String targetFilePath,
43+
byte[] partition,
44+
int bucket,
45+
byte[] dataFileMeta) {
46+
this.sourceFilePath = sourceFilePath;
47+
this.targetFilePath = targetFilePath;
48+
this.partition = partition;
49+
this.bucket = bucket;
50+
this.totalBuckets = 0;
51+
this.dataFileMeta = dataFileMeta;
52+
}
53+
54+
public CopyFileInfo(
55+
String sourceFilePath,
56+
String targetFilePath,
57+
byte[] partition,
58+
int bucket,
59+
int totalBuckets,
60+
byte[] dataFileMeta) {
61+
this.sourceFilePath = sourceFilePath;
62+
this.targetFilePath = targetFilePath;
63+
this.partition = partition;
64+
this.bucket = bucket;
65+
this.totalBuckets = totalBuckets;
66+
this.dataFileMeta = dataFileMeta;
67+
}
68+
69+
public String sourceFilePath() {
70+
return sourceFilePath;
71+
}
72+
73+
public String targetFilePath() {
74+
return targetFilePath;
75+
}
76+
77+
public byte[] partition() {
78+
return partition;
79+
}
80+
81+
public int bucket() {
82+
return bucket;
83+
}
84+
85+
public int totalBuckets() {
86+
return totalBuckets;
87+
}
88+
89+
public byte[] dataFileMeta() {
90+
return dataFileMeta;
91+
}
92+
}

0 commit comments

Comments
 (0)