Skip to content
13 changes: 13 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -463,5 +463,18 @@ This section introduce all available spark procedures about paimon.
CALL sys.rewrite_file_index(table => "t", where => "day = '2025-08-17'")<br/>
</td>
</tr>
<tr>
<td>copy</td>
<td>
copy table files. Arguments:
<li>source_table: the source table identifier. Cannot be empty.</li>
<li>target_table: the target table identifier. Cannot be empty.</li>
<li>where: partition predicate. Left empty for all partitions.</li>
</td>
<td>
CALL sys.copy(source_table => "t1", target_table => "t1_copy")<br/>
CALL sys.copy(source_table => "t1", target_table => "t1_copy", where => "day = '2025-08-17'")<br/>
</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
Expand Down Expand Up @@ -95,6 +96,26 @@ public static CommitMessage createCommitMessage(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
}

public static CommitMessage createCommitMessage(
BinaryRow partition,
int bucket,
int totalBuckets,
List<DataFileMeta> dataFileMetas,
List<IndexFileMeta> indexFileMetas) {
return new CommitMessageImpl(
partition,
bucket,
totalBuckets,
new DataIncrement(
dataFileMetas,
Collections.emptyList(),
Collections.emptyList(),
indexFileMetas,
Collections.emptyList()),
new CompactIncrement(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
}

public static DataFileMeta constructFileMeta(
String format,
FileStatus fileStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.spark.procedure.ClearConsumersProcedure;
import org.apache.paimon.spark.procedure.CompactManifestProcedure;
import org.apache.paimon.spark.procedure.CompactProcedure;
import org.apache.paimon.spark.procedure.CopyFilesProcedure;
import org.apache.paimon.spark.procedure.CreateBranchProcedure;
import org.apache.paimon.spark.procedure.CreateFunctionProcedure;
import org.apache.paimon.spark.procedure.CreateTagFromTimestampProcedure;
Expand Down Expand Up @@ -111,6 +112,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
procedureBuilders.put(
"trigger_tag_automatic_creation", TriggerTagAutomaticCreationProcedure::builder);
procedureBuilders.put("rewrite_file_index", RewriteFileIndexProcedure::builder);
procedureBuilders.put("copy", CopyFilesProcedure::builder);
return procedureBuilders.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.copy;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
import org.apache.paimon.spark.utils.SparkProcedureUtils;
import org.apache.paimon.table.FileStoreTable;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.SparkSession;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/** Copy data files from source table to target table. */
public class CopyDataFilesOperator extends CopyFilesOperator {

public CopyDataFilesOperator(SparkSession spark, Catalog sourceCatalog, Catalog targetCatalog) {
super(spark, sourceCatalog, targetCatalog);
}

public JavaRDD<CopyFileInfo> execute(
Identifier sourceIdentifier, Identifier targetIdentifier, List<CopyFileInfo> dataFiles)
throws Exception {
if (CollectionUtils.isEmpty(dataFiles)) {
return null;
}
FileStoreTable sourceTable = (FileStoreTable) sourceCatalog.getTable(sourceIdentifier);
FileStoreTable targetTable = (FileStoreTable) targetCatalog.getTable(targetIdentifier);
int readParallelism = SparkProcedureUtils.readParallelism(dataFiles, spark);
JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<CopyFileInfo> copyFileInfoRdd =
context.parallelize(dataFiles, readParallelism)
.mapPartitions(new DataFileProcesser(sourceTable, targetTable));
return copyFileInfoRdd;
}

/** Copy data files. */
public static class DataFileProcesser
implements FlatMapFunction<Iterator<CopyFileInfo>, CopyFileInfo> {

private final FileStoreTable sourceTable;
private final FileStoreTable targetTable;

public DataFileProcesser(FileStoreTable sourceTable, FileStoreTable targetTable) {
this.sourceTable = sourceTable;
this.targetTable = targetTable;
}

@Override
public Iterator<CopyFileInfo> call(Iterator<CopyFileInfo> dataFileIterator)
throws Exception {
List<CopyFileInfo> result = new ArrayList<>();
while (dataFileIterator.hasNext()) {
CopyFileInfo file = dataFileIterator.next();
Path sourcePath = new Path(file.sourceFilePath());
Path targetPath = new Path(file.targetFilePath());
CopyFilesUtil.copyFiles(
sourceTable.fileIO(), targetTable.fileIO(), sourcePath, targetPath, false);
result.add(file);
}
return result.iterator();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.copy;

import java.io.Serializable;

/** The information of copy data file. */
public class CopyFileInfo implements Serializable {

private static final long serialVersionUID = 1L;

private final String sourceFilePath;

private final String targetFilePath;

private final byte[] partition;

private final int bucket;

private final int totalBuckets;

private final byte[] dataFileMeta;

public CopyFileInfo(
String sourceFilePath,
String targetFilePath,
byte[] partition,
int bucket,
byte[] dataFileMeta) {
this.sourceFilePath = sourceFilePath;
this.targetFilePath = targetFilePath;
this.partition = partition;
this.bucket = bucket;
this.totalBuckets = 0;
this.dataFileMeta = dataFileMeta;
}

public CopyFileInfo(
String sourceFilePath,
String targetFilePath,
byte[] partition,
int bucket,
int totalBuckets,
byte[] dataFileMeta) {
this.sourceFilePath = sourceFilePath;
this.targetFilePath = targetFilePath;
this.partition = partition;
this.bucket = bucket;
this.totalBuckets = totalBuckets;
this.dataFileMeta = dataFileMeta;
}

public String sourceFilePath() {
return sourceFilePath;
}

public String targetFilePath() {
return targetFilePath;
}

public byte[] partition() {
return partition;
}

public int bucket() {
return bucket;
}

public int totalBuckets() {
return totalBuckets;
}

public byte[] dataFileMeta() {
return dataFileMeta;
}
}
Loading