Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String COMMIT_TIMEOUT_MS_PROP = "iceberg.control.commit.timeout-ms";
private static final int COMMIT_TIMEOUT_MS_DEFAULT = 30_000;
private static final String COMMIT_THREADS_PROP = "iceberg.control.commit.threads";
private static final String CONNECT_GROUP_ID_PROP = "iceberg.connect.group-id";
private static final String TRANSACTIONAL_PREFIX_PROP =
"iceberg.coordinator.transactional.prefix";
private static final String HADOOP_CONF_DIR_PROP = "iceberg.hadoop-conf-dir";
Expand Down Expand Up @@ -193,12 +192,6 @@ private static ConfigDef newConfigDef() {
DEFAULT_CONTROL_GROUP_PREFIX,
Importance.LOW,
"Prefix of the control consumer group");
configDef.define(
CONNECT_GROUP_ID_PROP,
ConfigDef.Type.STRING,
null,
Importance.LOW,
"Name of the Connect consumer group, should not be set under normal conditions");
configDef.define(
COMMIT_INTERVAL_MS_PROP,
ConfigDef.Type.INT,
Expand Down Expand Up @@ -396,8 +389,8 @@ public String controlGroupIdPrefix() {
return getString(CONTROL_GROUP_ID_PREFIX_PROP);
}

public String connectGroupId() {
String result = getString(CONNECT_GROUP_ID_PROP);
public String sourceConsumerGroupId() {
String result = originalProps.get("consumer.override.group.id");
if (result != null) {
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ abstract class Channel {
private static final Logger LOG = LoggerFactory.getLogger(Channel.class);

private final String controlTopic;
private final String connectGroupId;
private final String sourceConsumerGroupId;
private final Producer<String, byte[]> producer;
private final Consumer<String, byte[]> consumer;
private final SinkTaskContext context;
Expand All @@ -61,7 +61,7 @@ abstract class Channel {
KafkaClientFactory clientFactory,
SinkTaskContext context) {
this.controlTopic = config.controlTopic();
this.connectGroupId = config.connectGroupId();
this.sourceConsumerGroupId = config.sourceConsumerGroupId();
this.context = context;

String transactionalId = config.transactionalPrefix() + name + config.transactionalSuffix();
Expand Down Expand Up @@ -127,7 +127,7 @@ record -> {

Event event = AvroUtil.decode(record.value());

if (event.groupId().equals(connectGroupId)) {
if (event.groupId().equals(sourceConsumerGroupId)) {
LOG.debug("Received event of type: {}", event.type().name());
if (receive(new Envelope(event, record.partition(), record.offset()))) {
LOG.info("Handled event of type: {}", event.type().name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public int compare(TopicPartition o1, TopicPartition o2) {
boolean hasLeaderPartition(Collection<TopicPartition> currentAssignedPartitions) {
ConsumerGroupDescription groupDesc;
try (Admin admin = clientFactory.createAdmin()) {
groupDesc = KafkaUtils.consumerGroupDescription(config.connectGroupId(), admin);
groupDesc = KafkaUtils.consumerGroupDescription(config.sourceConsumerGroupId(), admin);
}

Collection<MemberDescription> members = groupDesc.members();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,15 @@ class Coordinator extends Channel {
KafkaClientFactory clientFactory,
SinkTaskContext context) {
// pass consumer group ID to which we commit low watermark offsets
super("coordinator", config.connectGroupId() + "-coord", config, clientFactory, context);
super("coordinator", config.sourceConsumerGroupId() + "-coord", config, clientFactory, context);

this.catalog = catalog;
this.config = config;
this.totalPartitionCount =
members.stream().mapToInt(desc -> desc.assignment().topicPartitions().size()).sum();
this.snapshotOffsetsProp =
String.format(
"kafka.connect.offsets.%s.%s", config.controlTopic(), config.connectGroupId());
"kafka.connect.offsets.%s.%s", config.controlTopic(), config.sourceConsumerGroupId());
this.exec =
new ThreadPoolExecutor(
config.commitThreads(),
Expand All @@ -117,7 +117,7 @@ void process() {
// send out begin commit
commitState.startNewCommit();
Event event =
new Event(config.connectGroupId(), new StartCommit(commitState.currentCommitId()));
new Event(config.sourceConsumerGroupId(), new StartCommit(commitState.currentCommitId()));
send(event);
LOG.info("Commit {} initiated", commitState.currentCommitId());
}
Expand Down Expand Up @@ -174,7 +174,7 @@ private void doCommit(boolean partialCommit) {

Event event =
new Event(
config.connectGroupId(),
config.sourceConsumerGroupId(),
new CommitComplete(commitState.currentCommitId(), validThroughTs));
send(event);

Expand Down Expand Up @@ -297,7 +297,7 @@ private void commitToTable(
Long snapshotId = latestSnapshot(table, branch).snapshotId();
Event event =
new Event(
config.connectGroupId(),
config.sourceConsumerGroupId(),
new CommitToTable(
commitState.currentCommitId(), tableReference, snapshotId, validThroughTs));
send(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected boolean receive(Envelope envelope) {
.map(
writeResult ->
new Event(
config.connectGroupId(),
config.sourceConsumerGroupId(),
new DataWritten(
writeResult.partitionStruct(),
commitId,
Expand All @@ -105,7 +105,8 @@ protected boolean receive(Envelope envelope) {
writeResult.deleteFiles())))
.collect(Collectors.toList());

Event readyEvent = new Event(config.connectGroupId(), new DataComplete(commitId, assignments));
Event readyEvent =
new Event(config.sourceConsumerGroupId(), new DataComplete(commitId, assignments));
events.add(readyEvent);

send(events, results.sourceOffsets());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
public class ChannelTestBase {
protected static final String SRC_TOPIC_NAME = "src-topic";
protected static final String CTL_TOPIC_NAME = "ctl-topic";
protected static final String CONNECT_CONSUMER_GROUP_ID = "cg-connect";
protected static final String SOURCE_CONSUMER_GROUP_ID = "cg-connect";
protected InMemoryCatalog catalog;
protected Table table;
protected IcebergSinkConfig config;
Expand Down Expand Up @@ -87,7 +87,7 @@ private InMemoryCatalog initInMemoryCatalog() {

protected static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id";
protected static final String OFFSETS_SNAPSHOT_PROP =
String.format("kafka.connect.offsets.%s.%s", CTL_TOPIC_NAME, CONNECT_CONSUMER_GROUP_ID);
String.format("kafka.connect.offsets.%s.%s", CTL_TOPIC_NAME, SOURCE_CONSUMER_GROUP_ID);
protected static final String VALID_THROUGH_TS_SNAPSHOT_PROP = "kafka.connect.valid-through-ts";

@BeforeEach
Expand All @@ -100,7 +100,7 @@ public void before() {
config = mock(IcebergSinkConfig.class);
when(config.controlTopic()).thenReturn(CTL_TOPIC_NAME);
when(config.commitThreads()).thenReturn(1);
when(config.connectGroupId()).thenReturn(CONNECT_CONSUMER_GROUP_ID);
when(config.sourceConsumerGroupId()).thenReturn(SOURCE_CONSUMER_GROUP_ID);
when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));

TopicPartitionInfo partitionInfo = mock(TopicPartitionInfo.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testHasLeaderPartition() throws NoSuchFieldException, IllegalAccessE
clientFactoryField.setAccessible(true);

IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.connectGroupId()).thenReturn("test-group");
when(config.sourceConsumerGroupId()).thenReturn("test-group");
configField.set(committer, config);

KafkaClientFactory clientFactory = mock(KafkaClientFactory.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private UUID coordinatorTest(

Event commitResponse =
new Event(
config.connectGroupId(),
config.sourceConsumerGroupId(),
new DataWritten(
StructType.of(),
commitId,
Expand All @@ -198,7 +198,7 @@ private UUID coordinatorTest(

Event commitReady =
new Event(
config.connectGroupId(),
config.sourceConsumerGroupId(),
new DataComplete(
commitId, ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, ts))));
bytes = AvroUtil.encode(commitReady);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void testSave() {
worker.save(ImmutableList.of(rec));

UUID commitId = UUID.randomUUID();
Event commitRequest = new Event(config.connectGroupId(), new StartCommit(commitId));
Event commitRequest = new Event(config.sourceConsumerGroupId(), new StartCommit(commitId));
byte[] bytes = AvroUtil.encode(commitRequest);
consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key", bytes));

Expand Down