Skip to content

Commit eadbc6e

Browse files
authored
Kerberos auth python (#36211)
* Add the FileAwareFactoryFn and the KerberosConsumerFactoryFn classes to support consumer factories which pull files from GCS. * Revert "Add the FileAwareFactoryFn and the KerberosConsumerFactoryFn classes to support consumer factories which pull files from GCS." This reverts commit f8f69d9. * Add tests for file aware factory fn * Add changes to the build and integration files for manual testing. Be sure to remove these later as they cannot stay. * Migrate to a new module such that kafka remains GCP Agnostic. * Clean up classes for PR review * Move the existing module files to the extensions repo. This module will contain the factory functions to be utilized by users and the cross lang expansion service. * Modify the base class to use GCS client instead of GCS FileSystems. This is a more lightweight dependency for the expansion service. * Migrate to a new module such that kafka remains GCP Agnostic. * Move the existing module files to the extensions repo. This module will contain the factory functions to be utilized by users and the cross lang expansion service. * Add plumbing for python use case. * Remove accidentally committed python modules * Trigger CI build * Clean up typing.
1 parent 6d9df8c commit eadbc6e

File tree

4 files changed

+96
-43
lines changed

4 files changed

+96
-43
lines changed

sdks/java/io/expansion-service/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ dependencies {
7676
permitUnusedDeclared project(":sdks:java:io:kafka") // BEAM-11761
7777
implementation project(":sdks:java:io:kafka:upgrade")
7878
permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761
79+
implementation project(":sdks:java:extensions:kafka-factories")
80+
permitUnusedDeclared project(":sdks:java:extensions:kafka-factories")
7981

8082
if (JavaVersion.current().compareTo(JavaVersion.VERSION_11) >= 0 && project.findProperty('testJavaVersion') != '8') {
8183
// iceberg ended support for Java 8 in 1.7.0

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.HashSet;
3636
import java.util.List;
3737
import java.util.Map;
38+
import java.util.Objects;
3839
import java.util.Optional;
3940
import java.util.Set;
4041
import java.util.regex.Pattern;
@@ -94,6 +95,7 @@
9495
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual;
9596
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
9697
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime;
98+
import org.apache.beam.sdk.util.InstanceBuilder;
9799
import org.apache.beam.sdk.util.Preconditions;
98100
import org.apache.beam.sdk.util.construction.PTransformMatchers;
99101
import org.apache.beam.sdk.util.construction.ReplacementOutputs;
@@ -930,6 +932,34 @@ static <K, V> void setupExternalBuilder(
930932
builder.setOffsetDeduplication(false);
931933
builder.setRedistributeByRecordKey(false);
932934
}
935+
936+
if (config.consumerFactoryFnClass != null) {
937+
if (config.consumerFactoryFnClass.contains("KerberosConsumerFactoryFn")) {
938+
try {
939+
if (!config.consumerFactoryFnParams.containsKey("krb5Location")) {
940+
throw new IllegalArgumentException(
941+
"The KerberosConsumerFactoryFn requires a location for the krb5.conf file. "
942+
+ "Please provide either a GCS location or Google Secret Manager location for this file.");
943+
}
944+
String krb5Location = config.consumerFactoryFnParams.get("krb5Location");
945+
builder.setConsumerFactoryFn(
946+
InstanceBuilder.ofType(
947+
new TypeDescriptor<
948+
SerializableFunction<
949+
Map<String, Object>, Consumer<byte[], byte[]>>>() {})
950+
.fromClassName(config.consumerFactoryFnClass)
951+
.withArg(String.class, Objects.requireNonNull(krb5Location))
952+
.build());
953+
} catch (Exception e) {
954+
throw new RuntimeException(
955+
"Unable to construct FactoryFn "
956+
+ config.consumerFactoryFnClass
957+
+ ": "
958+
+ e.getMessage(),
959+
e);
960+
}
961+
}
962+
}
933963
}
934964

935965
private static <T> Coder<T> resolveCoder(Class<Deserializer<T>> deserializer) {
@@ -1000,6 +1030,8 @@ public static class Configuration {
10001030
private Boolean offsetDeduplication;
10011031
private Boolean redistributeByRecordKey;
10021032
private Long dynamicReadPollIntervalSeconds;
1033+
private String consumerFactoryFnClass;
1034+
private Map<String, String> consumerFactoryFnParams;
10031035

10041036
public void setConsumerConfig(Map<String, String> consumerConfig) {
10051037
this.consumerConfig = consumerConfig;
@@ -1068,6 +1100,14 @@ public void setRedistributeByRecordKey(Boolean redistributeByRecordKey) {
10681100
public void setDynamicReadPollIntervalSeconds(Long dynamicReadPollIntervalSeconds) {
10691101
this.dynamicReadPollIntervalSeconds = dynamicReadPollIntervalSeconds;
10701102
}
1103+
1104+
public void setConsumerFactoryFnClass(String consumerFactoryFnClass) {
1105+
this.consumerFactoryFnClass = consumerFactoryFnClass;
1106+
}
1107+
1108+
public void setConsumerFactoryFnParams(Map<String, String> consumerFactoryFnParams) {
1109+
this.consumerFactoryFnParams = consumerFactoryFnParams;
1110+
}
10711111
}
10721112
}
10731113

sdks/python/apache_beam/io/kafka.py

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100

101101
# pytype: skip-file
102102

103+
import collections
103104
import typing
104105

105106
import numpy as np
@@ -110,22 +111,21 @@
110111

111112
ReadFromKafkaSchema = typing.NamedTuple(
112113
'ReadFromKafkaSchema',
113-
[
114-
('consumer_config', typing.Mapping[str, str]),
115-
('topics', typing.List[str]),
116-
('key_deserializer', str),
117-
('value_deserializer', str),
118-
('start_read_time', typing.Optional[int]),
119-
('max_num_records', typing.Optional[int]),
120-
('max_read_time', typing.Optional[int]),
121-
('commit_offset_in_finalize', bool),
122-
('timestamp_policy', str),
123-
('consumer_polling_timeout', typing.Optional[int]),
124-
('redistribute', typing.Optional[bool]),
125-
('redistribute_num_keys', typing.Optional[np.int32]),
126-
('allow_duplicates', typing.Optional[bool]),
127-
('dynamic_read_poll_interval_seconds', typing.Optional[int]),
128-
])
114+
[('consumer_config', typing.Mapping[str, str]),
115+
('topics', typing.List[str]), ('key_deserializer', str),
116+
('value_deserializer', str), ('start_read_time', typing.Optional[int]),
117+
('max_num_records', typing.Optional[int]),
118+
('max_read_time', typing.Optional[int]),
119+
('commit_offset_in_finalize', bool), ('timestamp_policy', str),
120+
('consumer_polling_timeout', typing.Optional[int]),
121+
('redistribute', typing.Optional[bool]),
122+
('redistribute_num_keys', typing.Optional[np.int32]),
123+
('allow_duplicates', typing.Optional[bool]),
124+
('dynamic_read_poll_interval_seconds', typing.Optional[int]),
125+
('consumer_factory_fn_class', typing.Optional[str]),
126+
(
127+
'consumer_factory_fn_params',
128+
typing.Optional[collections.abc.Mapping[str, str]])])
129129

130130

131131
def default_io_expansion_service(append_args=None):
@@ -173,7 +173,9 @@ def __init__(
173173
redistribute_num_keys=np.int32(0),
174174
allow_duplicates=False,
175175
dynamic_read_poll_interval_seconds: typing.Optional[int] = None,
176-
):
176+
consumer_factory_fn_class: typing.Optional[str] = None,
177+
consumer_factory_fn_params: typing.Optional[
178+
collections.abc.Mapping] = None):
177179
"""
178180
Initializes a read operation from Kafka.
179181
@@ -216,6 +218,13 @@ def __init__(
216218
:param dynamic_read_poll_interval_seconds: The interval in seconds at which
217219
to check for new partitions. If not None, dynamic partition discovery
218220
is enabled.
221+
:param consumer_factory_fn_class: A fully qualified classpath to an
222+
existing provided consumerFactoryFn. If not None, this will construct
223+
Kafka consumers with a custom configuration.
224+
:param consumer_factory_fn_params: A map which specifies the parameters for
225+
the provided consumer_factory_fn_class. If not None, the values in this
226+
map will be used when constructing the consumer_factory_fn_class object.
227+
This cannot be null if the consumer_factory_fn_class is not null.
219228
"""
220229
if timestamp_policy not in [ReadFromKafka.processing_time_policy,
221230
ReadFromKafka.create_time_policy,
@@ -242,7 +251,9 @@ def __init__(
242251
redistribute_num_keys=redistribute_num_keys,
243252
allow_duplicates=allow_duplicates,
244253
dynamic_read_poll_interval_seconds=
245-
dynamic_read_poll_interval_seconds)),
254+
dynamic_read_poll_interval_seconds,
255+
consumer_factory_fn_class=consumer_factory_fn_class,
256+
consumer_factory_fn_params=consumer_factory_fn_params)),
246257
expansion_service or default_io_expansion_service())
247258

248259

settings.gradle.kts

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818
import com.gradle.enterprise.gradleplugin.internal.extension.BuildScanExtensionWithHiddenFeatures
1919

2020
pluginManagement {
21-
plugins {
22-
id("org.javacc.javacc") version "3.0.3" // enable the JavaCC parser generator
23-
}
21+
plugins {
22+
id("org.javacc.javacc") version "3.0.3" // enable the JavaCC parser generator
23+
}
2424
}
2525

2626
plugins {
27-
id("com.gradle.develocity") version "3.19"
28-
id("com.gradle.common-custom-user-data-gradle-plugin") version "2.4.0"
27+
id("com.gradle.develocity") version "3.19"
28+
id("com.gradle.common-custom-user-data-gradle-plugin") version "2.2.1"
2929
}
3030

3131

@@ -36,32 +36,32 @@ val isGithubActionsBuild = arrayOf("GITHUB_REPOSITORY", "GITHUB_RUN_ID").all { S
3636
val isCi = isJenkinsBuild || isGithubActionsBuild
3737

3838
develocity {
39-
server = "https://develocity.apache.org"
40-
projectId = "beam"
39+
server = "https://develocity.apache.org"
40+
projectId = "beam"
4141

42-
buildScan {
43-
uploadInBackground = !isCi
44-
publishing.onlyIf { it.isAuthenticated }
45-
obfuscation {
46-
ipAddresses { addresses -> addresses.map { "0.0.0.0" } }
42+
buildScan {
43+
uploadInBackground = !isCi
44+
publishing.onlyIf { it.isAuthenticated }
45+
obfuscation {
46+
ipAddresses { addresses -> addresses.map { "0.0.0.0" } }
47+
}
4748
}
48-
}
4949
}
5050

5151
buildCache {
52-
local {
53-
isEnabled = true
54-
}
55-
remote<HttpBuildCache> {
56-
url = uri("https://beam-cache.apache.org/cache/")
57-
isAllowUntrustedServer = false
58-
credentials {
59-
username = System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME")
60-
password = System.getenv("GRADLE_ENTERPRISE_CACHE_PASSWORD")
52+
local {
53+
isEnabled = true
54+
}
55+
remote<HttpBuildCache> {
56+
url = uri("https://beam-cache.apache.org/cache/")
57+
isAllowUntrustedServer = false
58+
credentials {
59+
username = System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME")
60+
password = System.getenv("GRADLE_ENTERPRISE_CACHE_PASSWORD")
61+
}
62+
isEnabled = !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME").isNullOrBlank()
63+
isPush = isCi && !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME").isNullOrBlank()
6164
}
62-
isEnabled = !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME").isNullOrBlank()
63-
isPush = isCi && !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME").isNullOrBlank()
64-
}
6565
}
6666

6767
rootProject.name = "beam"

0 commit comments

Comments
 (0)