From c0a2665a60ab53feac6a1e47214da910b13f9701 Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Sat, 9 Aug 2025 11:27:29 +0530 Subject: [PATCH 1/8] added metadat and data path in case of dynamic routing --- committer/build.gradle | 36 ++ .../java/org/apache/iceberg/Coordinator.java | 315 ++++++++++++++++++ .../main/java/org/apache/iceberg/Main.java | 7 + .../iceberg/connect/IcebergSinkConfig.java | 34 +- .../iceberg/connect/TableSinkConfig.java | 13 +- .../connect/data/IcebergWriterFactory.java | 11 +- 6 files changed, 413 insertions(+), 3 deletions(-) create mode 100644 committer/build.gradle create mode 100644 committer/src/main/java/org/apache/iceberg/Coordinator.java create mode 100644 committer/src/main/java/org/apache/iceberg/Main.java diff --git a/committer/build.gradle b/committer/build.gradle new file mode 100644 index 000000000000..0de61912d850 --- /dev/null +++ b/committer/build.gradle @@ -0,0 +1,36 @@ +plugins { + id 'java-library' +} + +group = 'org.apache.iceberg' +version = '1.7.0-SNAPSHOT' + +repositories { + mavenCentral() +} + +dependencies { + api project(':iceberg-api') + implementation project(':iceberg-kafka-connect') + implementation project(':iceberg-core') + implementation project(':iceberg-common') + implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + implementation project(':iceberg-data') + implementation project(':iceberg-kafka-connect:iceberg-kafka-connect-events') + implementation platform(libs.jackson.bom) + implementation libs.jackson.core + implementation libs.jackson.databind + implementation libs.avro.avro + + compileOnly libs.kafka.clients + compileOnly libs.kafka.connect.api + compileOnly libs.kafka.connect.json + + testImplementation libs.hadoop3.client + testRuntimeOnly project(':iceberg-parquet') + testRuntimeOnly project(':iceberg-orc') +} + +test { + useJUnitPlatform() +} diff --git a/committer/src/main/java/org/apache/iceberg/Coordinator.java b/committer/src/main/java/org/apache/iceberg/Coordinator.java new file mode 100644 index 000000000000..6a260d91bb80 --- /dev/null +++ b/committer/src/main/java/org/apache/iceberg/Coordinator.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.channels.Channel; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.channel.CommitState; +import org.apache.iceberg.connect.events.CommitComplete; +import org.apache.iceberg.connect.events.CommitToTable; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.StartCommit; +import org.apache.iceberg.connect.events.TableReference; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class Coordinator { + + private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id"; + private static final String VALID_THROUGH_TS_SNAPSHOT_PROP = "kafka.connect.valid-through-ts"; + private static final Duration POLL_DURATION = Duration.ofSeconds(1); + + private final Catalog catalog; + private final IcebergSinkConfig config; + private final int totalPartitionCount; + private final String snapshotOffsetsProp; + private final ExecutorService exec; + private final CommitState commitState; + private volatile boolean terminated; + + Coordinator( + Catalog catalog, + IcebergSinkConfig config, + Collection members, + KafkaClientFactory clientFactory, + SinkTaskContext context) { + // pass consumer group ID to which we commit low watermark offsets + super("coordinator", config.connectGroupId() + "-coord", config, clientFactory, context); + + this.catalog = catalog; + this.config = config; + this.totalPartitionCount = + members.stream().mapToInt(desc -> desc.assignment().topicPartitions().size()).sum(); + this.snapshotOffsetsProp = + String.format( + "kafka.connect.offsets.%s.%s", config.controlTopic(), config.connectGroupId()); + this.exec = ThreadPools.newFixedThreadPool("iceberg-committer", config.commitThreads()); + this.commitState = new CommitState(config); + } + + void process() { + if (commitState.isCommitIntervalReached()) { + // send out begin commit + commitState.startNewCommit(); + Event event = + new Event(config.connectGroupId(), new StartCommit(commitState.currentCommitId())); + send(event); + LOG.info("Commit {} initiated", commitState.currentCommitId()); + } + + consumeAvailable(POLL_DURATION); + + if (commitState.isCommitTimedOut()) { + commit(true); + } + } + + @Override + protected boolean receive(Envelope envelope) { + switch (envelope.event().payload().type()) { + case DATA_WRITTEN: + commitState.addResponse(envelope); + return true; + case DATA_COMPLETE: + commitState.addReady(envelope); + if (commitState.isCommitReady(totalPartitionCount)) { + commit(false); + } + return true; + } + return false; + } + + private void commit(boolean partialCommit) { + try { + doCommit(partialCommit); + } catch (Exception e) { + LOG.warn("Commit failed, will try again next cycle", e); + } finally { + commitState.endCurrentCommit(); + } + } + + private void doCommit(boolean partialCommit) { + Map> commitMap = commitState.tableCommitMap(); + + String offsetsJson = offsetsJson(); + OffsetDateTime validThroughTs = commitState.validThroughTs(partialCommit); + + Tasks.foreach(commitMap.entrySet()) + .executeWith(exec) + .stopOnFailure() + .run( + entry -> { + commitToTable(entry.getKey(), entry.getValue(), offsetsJson, validThroughTs); + }); + + // we should only get here if all tables committed successfully... + commitConsumerOffsets(); + commitState.clearResponses(); + + Event event = + new Event( + config.connectGroupId(), + new CommitComplete(commitState.currentCommitId(), validThroughTs)); + send(event); + + LOG.info( + "Commit {} complete, committed to {} table(s), valid-through {}", + commitState.currentCommitId(), + commitMap.size(), + validThroughTs); + } + + private String offsetsJson() { + try { + return MAPPER.writeValueAsString(controlTopicOffsets()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private void commitToTable( + TableReference tableReference, + List envelopeList, + String offsetsJson, + OffsetDateTime validThroughTs) { + TableIdentifier tableIdentifier = tableReference.identifier(); + Table table; + try { + table = catalog.loadTable(tableIdentifier); + } catch (NoSuchTableException e) { + LOG.warn("Table not found, skipping commit: {}", tableIdentifier, e); + return; + } + + String branch = config.tableConfig(tableIdentifier.toString()).commitBranch(); + + Map committedOffsets = lastCommittedOffsetsForTable(table, branch); + + List payloads = + envelopeList.stream() + .filter( + envelope -> { + Long minOffset = committedOffsets.get(envelope.partition()); + return minOffset == null || envelope.offset() >= minOffset; + }) + .map(envelope -> (DataWritten) envelope.event().payload()) + .collect(Collectors.toList()); + + List dataFiles = + payloads.stream() + .filter(payload -> payload.dataFiles() != null) + .flatMap(payload -> payload.dataFiles().stream()) + .filter(dataFile -> dataFile.recordCount() > 0) + .filter(distinctByKey(ContentFile::location)) + .collect(Collectors.toList()); + + List deleteFiles = + payloads.stream() + .filter(payload -> payload.deleteFiles() != null) + .flatMap(payload -> payload.deleteFiles().stream()) + .filter(deleteFile -> deleteFile.recordCount() > 0) + .filter(distinctByKey(ContentFile::location)) + .collect(Collectors.toList()); + + if (terminated) { + throw new ConnectException("Coordinator is terminated, commit aborted"); + } + + if (dataFiles.isEmpty() && deleteFiles.isEmpty()) { + LOG.info("Nothing to commit to table {}, skipping", tableIdentifier); + } else { + if (deleteFiles.isEmpty()) { + AppendFiles appendOp = table.newAppend(); + if (branch != null) { + appendOp.toBranch(branch); + } + appendOp.set(snapshotOffsetsProp, offsetsJson); + appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); + if (validThroughTs != null) { + appendOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); + } + dataFiles.forEach(appendOp::appendFile); + appendOp.commit(); + } else { + RowDelta deltaOp = table.newRowDelta(); + if (branch != null) { + deltaOp.toBranch(branch); + } + deltaOp.set(snapshotOffsetsProp, offsetsJson); + deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); + if (validThroughTs != null) { + deltaOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); + } + dataFiles.forEach(deltaOp::addRows); + deleteFiles.forEach(deltaOp::addDeletes); + deltaOp.commit(); + } + + Long snapshotId = latestSnapshot(table, branch).snapshotId(); + Event event = + new Event( + config.connectGroupId(), + new CommitToTable( + commitState.currentCommitId(), tableReference, snapshotId, validThroughTs)); + send(event); + + LOG.info( + "Commit complete to table {}, snapshot {}, commit ID {}, valid-through {}", + tableIdentifier, + snapshotId, + commitState.currentCommitId(), + validThroughTs); + } + } + + private Predicate distinctByKey(Function keyExtractor) { + Map seen = Maps.newConcurrentMap(); + return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null; + } + + private Snapshot latestSnapshot(Table table, String branch) { + if (branch == null) { + return table.currentSnapshot(); + } + return table.snapshot(branch); + } + + private Map lastCommittedOffsetsForTable(Table table, String branch) { + Snapshot snapshot = latestSnapshot(table, branch); + while (snapshot != null) { + Map summary = snapshot.summary(); + String value = summary.get(snapshotOffsetsProp); + if (value != null) { + TypeReference> typeRef = new TypeReference>() {}; + try { + return MAPPER.readValue(value, typeRef); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + Long parentSnapshotId = snapshot.parentId(); + snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; + } + return ImmutableMap.of(); + } + + void terminate() { + this.terminated = true; + + exec.shutdownNow(); + + // wait for coordinator termination, else cause the sink task to fail + try { + if (!exec.awaitTermination(1, TimeUnit.MINUTES)) { + throw new ConnectException("Timed out waiting for coordinator shutdown"); + } + } catch (InterruptedException e) { + throw new ConnectException("Interrupted while waiting for coordinator shutdown", e); + } + } +} diff --git a/committer/src/main/java/org/apache/iceberg/Main.java b/committer/src/main/java/org/apache/iceberg/Main.java new file mode 100644 index 000000000000..3a6b1ae495ae --- /dev/null +++ b/committer/src/main/java/org/apache/iceberg/Main.java @@ -0,0 +1,7 @@ +package org.apache.iceberg; + +public class Main { + public static void main(String[] args) { + System.out.println("Hello world!"); + } +} \ No newline at end of file diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 9650ce16270c..f6dd7cf3c0a6 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -28,6 +28,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -56,6 +57,7 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String ID_COLUMNS = "id-columns"; private static final String PARTITION_BY = "partition-by"; private static final String COMMIT_BRANCH = "commit-branch"; + private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = "iceberg.dynamic-route-data-metadata-prefix"; private static final String CATALOG_PROP_PREFIX = "iceberg.catalog."; private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop."; @@ -235,6 +237,13 @@ private static ConfigDef newConfigDef() { 120000L, Importance.LOW, "config to control coordinator executor keep alive time"); + configDef.define( + DYNAMIC_ROUTE_DATA_METADATA_PREFIX, + ConfigDef.Type.STRING, + "", + Importance.HIGH, + "prefix for creation of metadata path and data path in case of dynamic routing" + ); return configDef; } @@ -375,7 +384,20 @@ public TableSinkConfig tableConfig(String tableName) { String commitBranch = tableConfig.getOrDefault(COMMIT_BRANCH, tablesDefaultCommitBranch()); - return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch); + String metadataPath = "", dataPath = ""; + + if (dynamicTablesEnabled()) { + TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); + if (originalProps.containsKey("iceberg.catalog.warehouse")) { + metadataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; + dataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; + } else { + metadataPath = tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); + dataPath = tableConfig.getOrDefault("write.data.path", defaultDataPath(tableName)); + } + } + + return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); }); } @@ -388,6 +410,16 @@ static List stringToList(String value, String regex) { return Arrays.stream(value.split(regex)).map(String::trim).collect(Collectors.toList()); } + private String defaultDataPath(String tableName) { + TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; + } + + private String defaultMetadataPath(String tableName) { + TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; + } + public String controlTopic() { return getString(CONTROL_TOPIC_PROP); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java index 0ecde1f7dd0b..28fdd4f36d15 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java @@ -27,13 +27,24 @@ public class TableSinkConfig { private final List idColumns; private final List partitionBy; private final String commitBranch; + private final String dataPath, metadataPath; public TableSinkConfig( - Pattern routeRegex, List idColumns, List partitionBy, String commitBranch) { + Pattern routeRegex, List idColumns, List partitionBy, String commitBranch, String dataPath, String metadataPath) { this.routeRegex = routeRegex; this.idColumns = idColumns; this.partitionBy = partitionBy; this.commitBranch = commitBranch; + this.dataPath = dataPath; + this.metadataPath = metadataPath; + } + + public String getDataPath() { + return dataPath; + } + + public String getMetadataPath() { + return metadataPath; } public Pattern routeRegex() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index 92f5af2d7a87..5de1039e77b0 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; @@ -89,6 +90,14 @@ Table autoCreateTable(String tableName, SinkRecord sample) { createNamespaceIfNotExist(catalog, identifier.namespace()); List partitionBy = config.tableConfig(tableName).partitionBy(); + + Map tableAutoCreateProps = config.autoCreateProps(); + + if (config.dynamicTablesEnabled()) { + tableAutoCreateProps.put("write.metadata.path", config.tableConfig(tableName).getMetadataPath()); + tableAutoCreateProps.put("write.data.path", config.tableConfig(tableName).getDataPath()); + } + PartitionSpec spec; try { spec = SchemaUtils.createPartitionSpec(schema, partitionBy); @@ -110,7 +119,7 @@ Table autoCreateTable(String tableName, SinkRecord sample) { try { result.set( catalog.createTable( - identifier, schema, partitionSpec, config.autoCreateProps())); + identifier, schema, partitionSpec, tableAutoCreateProps)); } catch (AlreadyExistsException e) { result.set(catalog.loadTable(identifier)); } From 67619ec140a65c7768af074178c95408d76e605d Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Sat, 9 Aug 2025 11:28:07 +0530 Subject: [PATCH 2/8] spotless --- .../iceberg/connect/IcebergSinkConfig.java | 44 +++++++++++++++---- .../iceberg/connect/TableSinkConfig.java | 7 ++- .../connect/data/IcebergWriterFactory.java | 6 +-- 3 files changed, 44 insertions(+), 13 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index f6dd7cf3c0a6..8339cf7f89ac 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -57,7 +57,8 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String ID_COLUMNS = "id-columns"; private static final String PARTITION_BY = "partition-by"; private static final String COMMIT_BRANCH = "commit-branch"; - private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = "iceberg.dynamic-route-data-metadata-prefix"; + private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = + "iceberg.dynamic-route-data-metadata-prefix"; private static final String CATALOG_PROP_PREFIX = "iceberg.catalog."; private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop."; @@ -242,8 +243,7 @@ private static ConfigDef newConfigDef() { ConfigDef.Type.STRING, "", Importance.HIGH, - "prefix for creation of metadata path and data path in case of dynamic routing" - ); + "prefix for creation of metadata path and data path in case of dynamic routing"); return configDef; } @@ -389,15 +389,27 @@ public TableSinkConfig tableConfig(String tableName) { if (dynamicTablesEnabled()) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); if (originalProps.containsKey("iceberg.catalog.warehouse")) { - metadataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; - dataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; + metadataPath = + originalProps.get("iceberg.catalog.warehouse") + + tableIdentifier.namespace() + + "/" + + tableIdentifier.name() + + "/metadata"; + dataPath = + originalProps.get("iceberg.catalog.warehouse") + + tableIdentifier.namespace() + + "/" + + tableIdentifier.name() + + "/data"; } else { - metadataPath = tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); + metadataPath = + tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); dataPath = tableConfig.getOrDefault("write.data.path", defaultDataPath(tableName)); } } - return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); + return new TableSinkConfig( + routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); }); } @@ -412,12 +424,26 @@ static List stringToList(String value, String regex) { private String defaultDataPath(String tableName) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + + "/" + + connectorName() + + "/" + + tableIdentifier.namespace() + + "/" + + tableIdentifier.name() + + "/data"; } private String defaultMetadataPath(String tableName) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + + "/" + + connectorName() + + "/" + + tableIdentifier.namespace() + + "/" + + tableIdentifier.name() + + "/metadata"; } public String controlTopic() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java index 28fdd4f36d15..36879bf6929a 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java @@ -30,7 +30,12 @@ public class TableSinkConfig { private final String dataPath, metadataPath; public TableSinkConfig( - Pattern routeRegex, List idColumns, List partitionBy, String commitBranch, String dataPath, String metadataPath) { + Pattern routeRegex, + List idColumns, + List partitionBy, + String commitBranch, + String dataPath, + String metadataPath) { this.routeRegex = routeRegex; this.idColumns = idColumns; this.partitionBy = partitionBy; diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index 5de1039e77b0..df87e8d2a958 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -94,7 +94,8 @@ Table autoCreateTable(String tableName, SinkRecord sample) { Map tableAutoCreateProps = config.autoCreateProps(); if (config.dynamicTablesEnabled()) { - tableAutoCreateProps.put("write.metadata.path", config.tableConfig(tableName).getMetadataPath()); + tableAutoCreateProps.put( + "write.metadata.path", config.tableConfig(tableName).getMetadataPath()); tableAutoCreateProps.put("write.data.path", config.tableConfig(tableName).getDataPath()); } @@ -118,8 +119,7 @@ Table autoCreateTable(String tableName, SinkRecord sample) { notUsed -> { try { result.set( - catalog.createTable( - identifier, schema, partitionSpec, tableAutoCreateProps)); + catalog.createTable(identifier, schema, partitionSpec, tableAutoCreateProps)); } catch (AlreadyExistsException e) { result.set(catalog.loadTable(identifier)); } From 6b15ae402deded39a34843f85f5342dbb9ae28a0 Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Sat, 9 Aug 2025 17:54:24 +0530 Subject: [PATCH 3/8] Revert "spotless" This reverts commit 67619ec140a65c7768af074178c95408d76e605d. --- .../iceberg/connect/IcebergSinkConfig.java | 44 ++++--------------- .../iceberg/connect/TableSinkConfig.java | 7 +-- .../connect/data/IcebergWriterFactory.java | 6 +-- 3 files changed, 13 insertions(+), 44 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 8339cf7f89ac..f6dd7cf3c0a6 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -57,8 +57,7 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String ID_COLUMNS = "id-columns"; private static final String PARTITION_BY = "partition-by"; private static final String COMMIT_BRANCH = "commit-branch"; - private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = - "iceberg.dynamic-route-data-metadata-prefix"; + private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = "iceberg.dynamic-route-data-metadata-prefix"; private static final String CATALOG_PROP_PREFIX = "iceberg.catalog."; private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop."; @@ -243,7 +242,8 @@ private static ConfigDef newConfigDef() { ConfigDef.Type.STRING, "", Importance.HIGH, - "prefix for creation of metadata path and data path in case of dynamic routing"); + "prefix for creation of metadata path and data path in case of dynamic routing" + ); return configDef; } @@ -389,27 +389,15 @@ public TableSinkConfig tableConfig(String tableName) { if (dynamicTablesEnabled()) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); if (originalProps.containsKey("iceberg.catalog.warehouse")) { - metadataPath = - originalProps.get("iceberg.catalog.warehouse") - + tableIdentifier.namespace() - + "/" - + tableIdentifier.name() - + "/metadata"; - dataPath = - originalProps.get("iceberg.catalog.warehouse") - + tableIdentifier.namespace() - + "/" - + tableIdentifier.name() - + "/data"; + metadataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; + dataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; } else { - metadataPath = - tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); + metadataPath = tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); dataPath = tableConfig.getOrDefault("write.data.path", defaultDataPath(tableName)); } } - return new TableSinkConfig( - routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); + return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); }); } @@ -424,26 +412,12 @@ static List stringToList(String value, String regex) { private String defaultDataPath(String tableName) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) - + "/" - + connectorName() - + "/" - + tableIdentifier.namespace() - + "/" - + tableIdentifier.name() - + "/data"; + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; } private String defaultMetadataPath(String tableName) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) - + "/" - + connectorName() - + "/" - + tableIdentifier.namespace() - + "/" - + tableIdentifier.name() - + "/metadata"; + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; } public String controlTopic() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java index 36879bf6929a..28fdd4f36d15 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java @@ -30,12 +30,7 @@ public class TableSinkConfig { private final String dataPath, metadataPath; public TableSinkConfig( - Pattern routeRegex, - List idColumns, - List partitionBy, - String commitBranch, - String dataPath, - String metadataPath) { + Pattern routeRegex, List idColumns, List partitionBy, String commitBranch, String dataPath, String metadataPath) { this.routeRegex = routeRegex; this.idColumns = idColumns; this.partitionBy = partitionBy; diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index df87e8d2a958..5de1039e77b0 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -94,8 +94,7 @@ Table autoCreateTable(String tableName, SinkRecord sample) { Map tableAutoCreateProps = config.autoCreateProps(); if (config.dynamicTablesEnabled()) { - tableAutoCreateProps.put( - "write.metadata.path", config.tableConfig(tableName).getMetadataPath()); + tableAutoCreateProps.put("write.metadata.path", config.tableConfig(tableName).getMetadataPath()); tableAutoCreateProps.put("write.data.path", config.tableConfig(tableName).getDataPath()); } @@ -119,7 +118,8 @@ Table autoCreateTable(String tableName, SinkRecord sample) { notUsed -> { try { result.set( - catalog.createTable(identifier, schema, partitionSpec, tableAutoCreateProps)); + catalog.createTable( + identifier, schema, partitionSpec, tableAutoCreateProps)); } catch (AlreadyExistsException e) { result.set(catalog.loadTable(identifier)); } From 8398e4c2675f5e319850c6fe1555007c153ec143 Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Sat, 9 Aug 2025 17:54:36 +0530 Subject: [PATCH 4/8] Revert "added metadat and data path in case of dynamic routing" This reverts commit c0a2665a60ab53feac6a1e47214da910b13f9701. --- committer/build.gradle | 36 -- .../java/org/apache/iceberg/Coordinator.java | 315 ------------------ .../main/java/org/apache/iceberg/Main.java | 7 - .../iceberg/connect/IcebergSinkConfig.java | 34 +- .../iceberg/connect/TableSinkConfig.java | 13 +- .../connect/data/IcebergWriterFactory.java | 11 +- 6 files changed, 3 insertions(+), 413 deletions(-) delete mode 100644 committer/build.gradle delete mode 100644 committer/src/main/java/org/apache/iceberg/Coordinator.java delete mode 100644 committer/src/main/java/org/apache/iceberg/Main.java diff --git a/committer/build.gradle b/committer/build.gradle deleted file mode 100644 index 0de61912d850..000000000000 --- a/committer/build.gradle +++ /dev/null @@ -1,36 +0,0 @@ -plugins { - id 'java-library' -} - -group = 'org.apache.iceberg' -version = '1.7.0-SNAPSHOT' - -repositories { - mavenCentral() -} - -dependencies { - api project(':iceberg-api') - implementation project(':iceberg-kafka-connect') - implementation project(':iceberg-core') - implementation project(':iceberg-common') - implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') - implementation project(':iceberg-data') - implementation project(':iceberg-kafka-connect:iceberg-kafka-connect-events') - implementation platform(libs.jackson.bom) - implementation libs.jackson.core - implementation libs.jackson.databind - implementation libs.avro.avro - - compileOnly libs.kafka.clients - compileOnly libs.kafka.connect.api - compileOnly libs.kafka.connect.json - - testImplementation libs.hadoop3.client - testRuntimeOnly project(':iceberg-parquet') - testRuntimeOnly project(':iceberg-orc') -} - -test { - useJUnitPlatform() -} diff --git a/committer/src/main/java/org/apache/iceberg/Coordinator.java b/committer/src/main/java/org/apache/iceberg/Coordinator.java deleted file mode 100644 index 6a260d91bb80..000000000000 --- a/committer/src/main/java/org/apache/iceberg/Coordinator.java +++ /dev/null @@ -1,315 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.channels.Channel; -import java.time.Duration; -import java.time.OffsetDateTime; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Collectors; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.connect.IcebergSinkConfig; -import org.apache.iceberg.connect.channel.CommitState; -import org.apache.iceberg.connect.events.CommitComplete; -import org.apache.iceberg.connect.events.CommitToTable; -import org.apache.iceberg.connect.events.DataWritten; -import org.apache.iceberg.connect.events.Event; -import org.apache.iceberg.connect.events.StartCommit; -import org.apache.iceberg.connect.events.TableReference; -import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.util.Tasks; -import org.apache.iceberg.util.ThreadPools; -import org.apache.kafka.clients.admin.MemberDescription; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.sink.SinkTaskContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class Coordinator { - - private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class); - private static final ObjectMapper MAPPER = new ObjectMapper(); - private static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id"; - private static final String VALID_THROUGH_TS_SNAPSHOT_PROP = "kafka.connect.valid-through-ts"; - private static final Duration POLL_DURATION = Duration.ofSeconds(1); - - private final Catalog catalog; - private final IcebergSinkConfig config; - private final int totalPartitionCount; - private final String snapshotOffsetsProp; - private final ExecutorService exec; - private final CommitState commitState; - private volatile boolean terminated; - - Coordinator( - Catalog catalog, - IcebergSinkConfig config, - Collection members, - KafkaClientFactory clientFactory, - SinkTaskContext context) { - // pass consumer group ID to which we commit low watermark offsets - super("coordinator", config.connectGroupId() + "-coord", config, clientFactory, context); - - this.catalog = catalog; - this.config = config; - this.totalPartitionCount = - members.stream().mapToInt(desc -> desc.assignment().topicPartitions().size()).sum(); - this.snapshotOffsetsProp = - String.format( - "kafka.connect.offsets.%s.%s", config.controlTopic(), config.connectGroupId()); - this.exec = ThreadPools.newFixedThreadPool("iceberg-committer", config.commitThreads()); - this.commitState = new CommitState(config); - } - - void process() { - if (commitState.isCommitIntervalReached()) { - // send out begin commit - commitState.startNewCommit(); - Event event = - new Event(config.connectGroupId(), new StartCommit(commitState.currentCommitId())); - send(event); - LOG.info("Commit {} initiated", commitState.currentCommitId()); - } - - consumeAvailable(POLL_DURATION); - - if (commitState.isCommitTimedOut()) { - commit(true); - } - } - - @Override - protected boolean receive(Envelope envelope) { - switch (envelope.event().payload().type()) { - case DATA_WRITTEN: - commitState.addResponse(envelope); - return true; - case DATA_COMPLETE: - commitState.addReady(envelope); - if (commitState.isCommitReady(totalPartitionCount)) { - commit(false); - } - return true; - } - return false; - } - - private void commit(boolean partialCommit) { - try { - doCommit(partialCommit); - } catch (Exception e) { - LOG.warn("Commit failed, will try again next cycle", e); - } finally { - commitState.endCurrentCommit(); - } - } - - private void doCommit(boolean partialCommit) { - Map> commitMap = commitState.tableCommitMap(); - - String offsetsJson = offsetsJson(); - OffsetDateTime validThroughTs = commitState.validThroughTs(partialCommit); - - Tasks.foreach(commitMap.entrySet()) - .executeWith(exec) - .stopOnFailure() - .run( - entry -> { - commitToTable(entry.getKey(), entry.getValue(), offsetsJson, validThroughTs); - }); - - // we should only get here if all tables committed successfully... - commitConsumerOffsets(); - commitState.clearResponses(); - - Event event = - new Event( - config.connectGroupId(), - new CommitComplete(commitState.currentCommitId(), validThroughTs)); - send(event); - - LOG.info( - "Commit {} complete, committed to {} table(s), valid-through {}", - commitState.currentCommitId(), - commitMap.size(), - validThroughTs); - } - - private String offsetsJson() { - try { - return MAPPER.writeValueAsString(controlTopicOffsets()); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - private void commitToTable( - TableReference tableReference, - List envelopeList, - String offsetsJson, - OffsetDateTime validThroughTs) { - TableIdentifier tableIdentifier = tableReference.identifier(); - Table table; - try { - table = catalog.loadTable(tableIdentifier); - } catch (NoSuchTableException e) { - LOG.warn("Table not found, skipping commit: {}", tableIdentifier, e); - return; - } - - String branch = config.tableConfig(tableIdentifier.toString()).commitBranch(); - - Map committedOffsets = lastCommittedOffsetsForTable(table, branch); - - List payloads = - envelopeList.stream() - .filter( - envelope -> { - Long minOffset = committedOffsets.get(envelope.partition()); - return minOffset == null || envelope.offset() >= minOffset; - }) - .map(envelope -> (DataWritten) envelope.event().payload()) - .collect(Collectors.toList()); - - List dataFiles = - payloads.stream() - .filter(payload -> payload.dataFiles() != null) - .flatMap(payload -> payload.dataFiles().stream()) - .filter(dataFile -> dataFile.recordCount() > 0) - .filter(distinctByKey(ContentFile::location)) - .collect(Collectors.toList()); - - List deleteFiles = - payloads.stream() - .filter(payload -> payload.deleteFiles() != null) - .flatMap(payload -> payload.deleteFiles().stream()) - .filter(deleteFile -> deleteFile.recordCount() > 0) - .filter(distinctByKey(ContentFile::location)) - .collect(Collectors.toList()); - - if (terminated) { - throw new ConnectException("Coordinator is terminated, commit aborted"); - } - - if (dataFiles.isEmpty() && deleteFiles.isEmpty()) { - LOG.info("Nothing to commit to table {}, skipping", tableIdentifier); - } else { - if (deleteFiles.isEmpty()) { - AppendFiles appendOp = table.newAppend(); - if (branch != null) { - appendOp.toBranch(branch); - } - appendOp.set(snapshotOffsetsProp, offsetsJson); - appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); - if (validThroughTs != null) { - appendOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); - } - dataFiles.forEach(appendOp::appendFile); - appendOp.commit(); - } else { - RowDelta deltaOp = table.newRowDelta(); - if (branch != null) { - deltaOp.toBranch(branch); - } - deltaOp.set(snapshotOffsetsProp, offsetsJson); - deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); - if (validThroughTs != null) { - deltaOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); - } - dataFiles.forEach(deltaOp::addRows); - deleteFiles.forEach(deltaOp::addDeletes); - deltaOp.commit(); - } - - Long snapshotId = latestSnapshot(table, branch).snapshotId(); - Event event = - new Event( - config.connectGroupId(), - new CommitToTable( - commitState.currentCommitId(), tableReference, snapshotId, validThroughTs)); - send(event); - - LOG.info( - "Commit complete to table {}, snapshot {}, commit ID {}, valid-through {}", - tableIdentifier, - snapshotId, - commitState.currentCommitId(), - validThroughTs); - } - } - - private Predicate distinctByKey(Function keyExtractor) { - Map seen = Maps.newConcurrentMap(); - return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null; - } - - private Snapshot latestSnapshot(Table table, String branch) { - if (branch == null) { - return table.currentSnapshot(); - } - return table.snapshot(branch); - } - - private Map lastCommittedOffsetsForTable(Table table, String branch) { - Snapshot snapshot = latestSnapshot(table, branch); - while (snapshot != null) { - Map summary = snapshot.summary(); - String value = summary.get(snapshotOffsetsProp); - if (value != null) { - TypeReference> typeRef = new TypeReference>() {}; - try { - return MAPPER.readValue(value, typeRef); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - Long parentSnapshotId = snapshot.parentId(); - snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; - } - return ImmutableMap.of(); - } - - void terminate() { - this.terminated = true; - - exec.shutdownNow(); - - // wait for coordinator termination, else cause the sink task to fail - try { - if (!exec.awaitTermination(1, TimeUnit.MINUTES)) { - throw new ConnectException("Timed out waiting for coordinator shutdown"); - } - } catch (InterruptedException e) { - throw new ConnectException("Interrupted while waiting for coordinator shutdown", e); - } - } -} diff --git a/committer/src/main/java/org/apache/iceberg/Main.java b/committer/src/main/java/org/apache/iceberg/Main.java deleted file mode 100644 index 3a6b1ae495ae..000000000000 --- a/committer/src/main/java/org/apache/iceberg/Main.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.apache.iceberg; - -public class Main { - public static void main(String[] args) { - System.out.println("Hello world!"); - } -} \ No newline at end of file diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index f6dd7cf3c0a6..9650ce16270c 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -28,7 +28,6 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.iceberg.IcebergBuild; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -57,7 +56,6 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String ID_COLUMNS = "id-columns"; private static final String PARTITION_BY = "partition-by"; private static final String COMMIT_BRANCH = "commit-branch"; - private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = "iceberg.dynamic-route-data-metadata-prefix"; private static final String CATALOG_PROP_PREFIX = "iceberg.catalog."; private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop."; @@ -237,13 +235,6 @@ private static ConfigDef newConfigDef() { 120000L, Importance.LOW, "config to control coordinator executor keep alive time"); - configDef.define( - DYNAMIC_ROUTE_DATA_METADATA_PREFIX, - ConfigDef.Type.STRING, - "", - Importance.HIGH, - "prefix for creation of metadata path and data path in case of dynamic routing" - ); return configDef; } @@ -384,20 +375,7 @@ public TableSinkConfig tableConfig(String tableName) { String commitBranch = tableConfig.getOrDefault(COMMIT_BRANCH, tablesDefaultCommitBranch()); - String metadataPath = "", dataPath = ""; - - if (dynamicTablesEnabled()) { - TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - if (originalProps.containsKey("iceberg.catalog.warehouse")) { - metadataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; - dataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; - } else { - metadataPath = tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); - dataPath = tableConfig.getOrDefault("write.data.path", defaultDataPath(tableName)); - } - } - - return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); + return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch); }); } @@ -410,16 +388,6 @@ static List stringToList(String value, String regex) { return Arrays.stream(value.split(regex)).map(String::trim).collect(Collectors.toList()); } - private String defaultDataPath(String tableName) { - TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; - } - - private String defaultMetadataPath(String tableName) { - TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; - } - public String controlTopic() { return getString(CONTROL_TOPIC_PROP); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java index 28fdd4f36d15..0ecde1f7dd0b 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java @@ -27,24 +27,13 @@ public class TableSinkConfig { private final List idColumns; private final List partitionBy; private final String commitBranch; - private final String dataPath, metadataPath; public TableSinkConfig( - Pattern routeRegex, List idColumns, List partitionBy, String commitBranch, String dataPath, String metadataPath) { + Pattern routeRegex, List idColumns, List partitionBy, String commitBranch) { this.routeRegex = routeRegex; this.idColumns = idColumns; this.partitionBy = partitionBy; this.commitBranch = commitBranch; - this.dataPath = dataPath; - this.metadataPath = metadataPath; - } - - public String getDataPath() { - return dataPath; - } - - public String getMetadataPath() { - return metadataPath; } public Pattern routeRegex() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index 5de1039e77b0..92f5af2d7a87 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -20,7 +20,6 @@ import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; @@ -90,14 +89,6 @@ Table autoCreateTable(String tableName, SinkRecord sample) { createNamespaceIfNotExist(catalog, identifier.namespace()); List partitionBy = config.tableConfig(tableName).partitionBy(); - - Map tableAutoCreateProps = config.autoCreateProps(); - - if (config.dynamicTablesEnabled()) { - tableAutoCreateProps.put("write.metadata.path", config.tableConfig(tableName).getMetadataPath()); - tableAutoCreateProps.put("write.data.path", config.tableConfig(tableName).getDataPath()); - } - PartitionSpec spec; try { spec = SchemaUtils.createPartitionSpec(schema, partitionBy); @@ -119,7 +110,7 @@ Table autoCreateTable(String tableName, SinkRecord sample) { try { result.set( catalog.createTable( - identifier, schema, partitionSpec, tableAutoCreateProps)); + identifier, schema, partitionSpec, config.autoCreateProps())); } catch (AlreadyExistsException e) { result.set(catalog.loadTable(identifier)); } From 554e21b9277125825ec4f27ca7c3adb42a9df8cb Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Thu, 5 Feb 2026 07:19:47 +0530 Subject: [PATCH 5/8] Always take connect group id from the consumer group-id rather than exposing a config --- .../org/apache/iceberg/connect/IcebergSinkConfig.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 9650ce16270c..51ec3d59e598 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -86,7 +86,6 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String COMMIT_TIMEOUT_MS_PROP = "iceberg.control.commit.timeout-ms"; private static final int COMMIT_TIMEOUT_MS_DEFAULT = 30_000; private static final String COMMIT_THREADS_PROP = "iceberg.control.commit.threads"; - private static final String CONNECT_GROUP_ID_PROP = "iceberg.connect.group-id"; private static final String TRANSACTIONAL_PREFIX_PROP = "iceberg.coordinator.transactional.prefix"; private static final String HADOOP_CONF_DIR_PROP = "iceberg.hadoop-conf-dir"; @@ -193,12 +192,6 @@ private static ConfigDef newConfigDef() { DEFAULT_CONTROL_GROUP_PREFIX, Importance.LOW, "Prefix of the control consumer group"); - configDef.define( - CONNECT_GROUP_ID_PROP, - ConfigDef.Type.STRING, - null, - Importance.LOW, - "Name of the Connect consumer group, should not be set under normal conditions"); configDef.define( COMMIT_INTERVAL_MS_PROP, ConfigDef.Type.INT, @@ -397,7 +390,7 @@ public String controlGroupIdPrefix() { } public String connectGroupId() { - String result = getString(CONNECT_GROUP_ID_PROP); + String result = getString("consumer.override.group.id"); if (result != null) { return result; } From 8e8d0c1dfc75f21186c41748ab19bd2047f7ff6d Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Thu, 5 Feb 2026 07:28:12 +0530 Subject: [PATCH 6/8] renamed connect group id to source consumer group id to make sense with the nomenclature --- .../org/apache/iceberg/connect/IcebergSinkConfig.java | 2 +- .../org/apache/iceberg/connect/channel/Channel.java | 6 +++--- .../apache/iceberg/connect/channel/CommitterImpl.java | 2 +- .../apache/iceberg/connect/channel/Coordinator.java | 10 +++++----- .../org/apache/iceberg/connect/channel/Worker.java | 4 ++-- .../iceberg/connect/channel/ChannelTestBase.java | 6 +++--- .../iceberg/connect/channel/TestCommitterImpl.java | 2 +- .../iceberg/connect/channel/TestCoordinator.java | 4 ++-- .../org/apache/iceberg/connect/channel/TestWorker.java | 2 +- 9 files changed, 19 insertions(+), 19 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 51ec3d59e598..0a7eee7e7b4b 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -389,7 +389,7 @@ public String controlGroupIdPrefix() { return getString(CONTROL_GROUP_ID_PREFIX_PROP); } - public String connectGroupId() { + public String sourceConsumerGroupId() { String result = getString("consumer.override.group.id"); if (result != null) { return result; diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java index 01cf165de66b..89df9cd192f0 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java @@ -46,7 +46,7 @@ abstract class Channel { private static final Logger LOG = LoggerFactory.getLogger(Channel.class); private final String controlTopic; - private final String connectGroupId; + private final String sourceConsumerGroupId; private final Producer producer; private final Consumer consumer; private final SinkTaskContext context; @@ -61,7 +61,7 @@ abstract class Channel { KafkaClientFactory clientFactory, SinkTaskContext context) { this.controlTopic = config.controlTopic(); - this.connectGroupId = config.connectGroupId(); + this.sourceConsumerGroupId = config.sourceConsumerGroupId(); this.context = context; String transactionalId = config.transactionalPrefix() + name + config.transactionalSuffix(); @@ -127,7 +127,7 @@ record -> { Event event = AvroUtil.decode(record.value()); - if (event.groupId().equals(connectGroupId)) { + if (event.groupId().equals(sourceConsumerGroupId)) { LOG.debug("Received event of type: {}", event.type().name()); if (receive(new Envelope(event, record.partition(), record.offset()))) { LOG.info("Handled event of type: {}", event.type().name()); diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java index 04602a66a5e1..056e111fe149 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java @@ -77,7 +77,7 @@ public int compare(TopicPartition o1, TopicPartition o2) { boolean hasLeaderPartition(Collection currentAssignedPartitions) { ConsumerGroupDescription groupDesc; try (Admin admin = clientFactory.createAdmin()) { - groupDesc = KafkaUtils.consumerGroupDescription(config.connectGroupId(), admin); + groupDesc = KafkaUtils.consumerGroupDescription(config.sourceConsumerGroupId(), admin); } Collection members = groupDesc.members(); diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java index 068e1e1f6e9c..ce47350881a9 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java @@ -89,7 +89,7 @@ class Coordinator extends Channel { KafkaClientFactory clientFactory, SinkTaskContext context) { // pass consumer group ID to which we commit low watermark offsets - super("coordinator", config.connectGroupId() + "-coord", config, clientFactory, context); + super("coordinator", config.sourceConsumerGroupId() + "-coord", config, clientFactory, context); this.catalog = catalog; this.config = config; @@ -97,7 +97,7 @@ class Coordinator extends Channel { members.stream().mapToInt(desc -> desc.assignment().topicPartitions().size()).sum(); this.snapshotOffsetsProp = String.format( - "kafka.connect.offsets.%s.%s", config.controlTopic(), config.connectGroupId()); + "kafka.connect.offsets.%s.%s", config.controlTopic(), config.sourceConsumerGroupId()); this.exec = new ThreadPoolExecutor( config.commitThreads(), @@ -117,7 +117,7 @@ void process() { // send out begin commit commitState.startNewCommit(); Event event = - new Event(config.connectGroupId(), new StartCommit(commitState.currentCommitId())); + new Event(config.sourceConsumerGroupId(), new StartCommit(commitState.currentCommitId())); send(event); LOG.info("Commit {} initiated", commitState.currentCommitId()); } @@ -174,7 +174,7 @@ private void doCommit(boolean partialCommit) { Event event = new Event( - config.connectGroupId(), + config.sourceConsumerGroupId(), new CommitComplete(commitState.currentCommitId(), validThroughTs)); send(event); @@ -297,7 +297,7 @@ private void commitToTable( Long snapshotId = latestSnapshot(table, branch).snapshotId(); Event event = new Event( - config.connectGroupId(), + config.sourceConsumerGroupId(), new CommitToTable( commitState.currentCommitId(), tableReference, snapshotId, validThroughTs)); send(event); diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java index 903be7070370..014394b9f6bd 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java @@ -96,7 +96,7 @@ protected boolean receive(Envelope envelope) { .map( writeResult -> new Event( - config.connectGroupId(), + config.sourceConsumerGroupId(), new DataWritten( writeResult.partitionStruct(), commitId, @@ -105,7 +105,7 @@ protected boolean receive(Envelope envelope) { writeResult.deleteFiles()))) .collect(Collectors.toList()); - Event readyEvent = new Event(config.connectGroupId(), new DataComplete(commitId, assignments)); + Event readyEvent = new Event(config.sourceConsumerGroupId(), new DataComplete(commitId, assignments)); events.add(readyEvent); send(events, results.sourceOffsets()); diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java index 63bde7c0c3ff..614c3dbfc391 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java @@ -58,7 +58,7 @@ public class ChannelTestBase { protected static final String SRC_TOPIC_NAME = "src-topic"; protected static final String CTL_TOPIC_NAME = "ctl-topic"; - protected static final String CONNECT_CONSUMER_GROUP_ID = "cg-connect"; + protected static final String SOURCE_CONSUMER_GROUP_ID = "cg-connect"; protected InMemoryCatalog catalog; protected Table table; protected IcebergSinkConfig config; @@ -87,7 +87,7 @@ private InMemoryCatalog initInMemoryCatalog() { protected static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id"; protected static final String OFFSETS_SNAPSHOT_PROP = - String.format("kafka.connect.offsets.%s.%s", CTL_TOPIC_NAME, CONNECT_CONSUMER_GROUP_ID); + String.format("kafka.connect.offsets.%s.%s", CTL_TOPIC_NAME, SOURCE_CONSUMER_GROUP_ID); protected static final String VALID_THROUGH_TS_SNAPSHOT_PROP = "kafka.connect.valid-through-ts"; @BeforeEach @@ -100,7 +100,7 @@ public void before() { config = mock(IcebergSinkConfig.class); when(config.controlTopic()).thenReturn(CTL_TOPIC_NAME); when(config.commitThreads()).thenReturn(1); - when(config.connectGroupId()).thenReturn(CONNECT_CONSUMER_GROUP_ID); + when(config.sourceConsumerGroupId()).thenReturn(SOURCE_CONSUMER_GROUP_ID); when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); TopicPartitionInfo partitionInfo = mock(TopicPartitionInfo.class); diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java index c6b7c86e4c66..d88e9ef38557 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java @@ -94,7 +94,7 @@ public void testHasLeaderPartition() throws NoSuchFieldException, IllegalAccessE clientFactoryField.setAccessible(true); IcebergSinkConfig config = mock(IcebergSinkConfig.class); - when(config.connectGroupId()).thenReturn("test-group"); + when(config.sourceConsumerGroupId()).thenReturn("test-group"); configField.set(committer, config); KafkaClientFactory clientFactory = mock(KafkaClientFactory.class); diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java index 05526eab5c68..1ad314857e5c 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java @@ -186,7 +186,7 @@ private UUID coordinatorTest( Event commitResponse = new Event( - config.connectGroupId(), + config.sourceConsumerGroupId(), new DataWritten( StructType.of(), commitId, @@ -198,7 +198,7 @@ private UUID coordinatorTest( Event commitReady = new Event( - config.connectGroupId(), + config.sourceConsumerGroupId(), new DataComplete( commitId, ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, ts)))); bytes = AvroUtil.encode(commitReady); diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestWorker.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestWorker.java index 6cd5c0c86eab..83e27664f679 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestWorker.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestWorker.java @@ -92,7 +92,7 @@ public void testSave() { worker.save(ImmutableList.of(rec)); UUID commitId = UUID.randomUUID(); - Event commitRequest = new Event(config.connectGroupId(), new StartCommit(commitId)); + Event commitRequest = new Event(config.sourceConsumerGroupId(), new StartCommit(commitId)); byte[] bytes = AvroUtil.encode(commitRequest); consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key", bytes)); From 5208417bd0e5b4452747b85f6bd7b6083d42e047 Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Thu, 5 Feb 2026 07:40:23 +0530 Subject: [PATCH 7/8] fixed spottless --- .../main/java/org/apache/iceberg/connect/channel/Worker.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java index 014394b9f6bd..0013b5bbb333 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java @@ -105,7 +105,8 @@ protected boolean receive(Envelope envelope) { writeResult.deleteFiles()))) .collect(Collectors.toList()); - Event readyEvent = new Event(config.sourceConsumerGroupId(), new DataComplete(commitId, assignments)); + Event readyEvent = + new Event(config.sourceConsumerGroupId(), new DataComplete(commitId, assignments)); events.add(readyEvent); send(events, results.sourceOffsets()); From b07f2164deceb9f5bc15ef1215065f2a2e9a178b Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Thu, 5 Feb 2026 08:51:22 +0530 Subject: [PATCH 8/8] fixed iintegration tests --- .../main/java/org/apache/iceberg/connect/IcebergSinkConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 0a7eee7e7b4b..2d308302e977 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -390,7 +390,7 @@ public String controlGroupIdPrefix() { } public String sourceConsumerGroupId() { - String result = getString("consumer.override.group.id"); + String result = originalProps.get("consumer.override.group.id"); if (result != null) { return result; }