Skip to content

Commit 807dcaf

Browse files
authored
[improve][fn] Upgrade Kubernetes client-java version to 23.0.0 and add k8s e2e integration tests (#25000)
1 parent 4566edd commit 807dcaf

File tree

18 files changed

+782
-130
lines changed

18 files changed

+782
-130
lines changed

.github/workflows/pulsar-ci.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,9 @@ jobs:
683683
- name: Upgrade
684684
group: UPGRADE
685685

686+
- name: Kubernetes
687+
group: PULSAR_K8S
688+
686689
steps:
687690
- name: checkout
688691
uses: actions/checkout@v4

build/build_java_test_image.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,5 @@
2121
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
2222
cd "$SCRIPT_DIR/.."
2323
mvn -am -pl tests/docker-images/java-test-image -Pcore-modules,-main,integrationTests,docker \
24-
-Dmaven.test.skip=true -DskipSourceReleaseAssembly=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Dlicense.skip=true \
24+
-DskipTests -DskipSourceReleaseAssembly=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Dlicense.skip=true \
2525
"$@" install

build/run_integration_group.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,10 @@ test_group_sql() {
207207
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-sql.xml -DintegrationTests -DtestForkCount=1 -DtestReuseFork=false
208208
}
209209

210+
test_group_pulsar_k8s() {
211+
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-k8s.xml -DintegrationTests
212+
}
213+
210214
test_group_pulsar_io() {
211215
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sources.xml -DintegrationTests -Dgroups=source
212216
mvn_run_integration_test --skip-build-deps "$@" -DintegrationTestSuiteFile=pulsar-io-sinks.xml -DintegrationTests -Dgroups=sink

distribution/server/src/assemble/LICENSE.bin.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ The Apache Software License, Version 2.0
267267
* Bitbucket -- org.bitbucket.b_c-jose4j-0.9.4.jar
268268
* Gson
269269
- com.google.code.gson-gson-2.8.9.jar
270-
- io.gsonfire-gson-fire-1.8.5.jar
270+
- io.gsonfire-gson-fire-1.9.0.jar
271271
* Guava
272272
- com.google.guava-guava-33.4.8-jre.jar
273273
- com.google.guava-failureaccess-1.0.3.jar
@@ -472,9 +472,9 @@ The Apache Software License, Version 2.0
472472
* Apache Yetus
473473
- org.apache.yetus-audience-annotations-0.12.0.jar
474474
* Kubernetes Client
475-
- io.kubernetes-client-java-18.0.0.jar
476-
- io.kubernetes-client-java-api-18.0.0.jar
477-
- io.kubernetes-client-java-proto-18.0.0.jar
475+
- io.kubernetes-client-java-23.0.0.jar
476+
- io.kubernetes-client-java-api-23.0.0.jar
477+
- io.kubernetes-client-java-proto-23.0.0.jar
478478
* Dropwizard
479479
- io.dropwizard.metrics-metrics-core-4.1.12.1.jar
480480
- io.dropwizard.metrics-metrics-graphite-4.1.12.1.jar

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ flexible messaging model and an intuitive client API.</description>
283283
<jakarta.xml.bind.version>2.3.3</jakarta.xml.bind.version>
284284
<jakarta.validation.version>2.0.2</jakarta.validation.version>
285285
<jna.version>5.12.1</jna.version>
286-
<kubernetesclient.version>18.0.0</kubernetesclient.version>
286+
<kubernetesclient.version>23.0.0</kubernetesclient.version>
287287
<jose4j.version>0.9.4</jose4j.version>
288288
<okhttp3.version>5.3.1</okhttp3.version>
289289
<!-- use okio version that matches the okhttp3 version -->

pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ CompletableFuture<Jwk> getJwkFromKubernetesApiServer(String keyId) {
169169
private CompletableFuture<List<Jwk>> getJwksFromKubernetesApiServer() {
170170
CompletableFuture<List<Jwk>> future = new CompletableFuture<>();
171171
try {
172-
openidApi.getServiceAccountIssuerOpenIDKeysetAsync(new ApiCallback<String>() {
172+
openidApi.getServiceAccountIssuerOpenIDKeyset().executeAsync(new ApiCallback<String>() {
173173
@Override
174174
public void onFailure(ApiException e, int statusCode, Map<String, List<String>> responseHeaders) {
175175
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);

pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ CompletableFuture<OpenIDProviderMetadata> getOpenIDProviderMetadataForKubernetes
168168
private CompletableFuture<OpenIDProviderMetadata> loadOpenIDProviderMetadataForKubernetesApiServer() {
169169
CompletableFuture<OpenIDProviderMetadata> future = new CompletableFuture<>();
170170
try {
171-
wellKnownApi.getServiceAccountIssuerOpenIDConfigurationAsync(new ApiCallback<>() {
171+
wellKnownApi.getServiceAccountIssuerOpenIDConfiguration().executeAsync(new ApiCallback<>() {
172172
@Override
173173
public void onFailure(ApiException e, int statusCode, Map<String, List<String>> responseHeaders) {
174174
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PROVIDER_METADATA);

pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,10 @@ void beforeClass() throws IOException {
117117
""".replace("%s", server.baseUrl()))));
118118

119119
// Set up a correct openid-configuration that the k8s integration test can use
120-
// NOTE: integration tests revealed that the k8s client adds a trailing slash to the openid-configuration
121-
// endpoint.
122120
// NOTE: the jwks_uri is ignored, so we supply one that would fail here to ensure that we are not implicitly
123121
// relying on the jwks_uri.
124122
server.stubFor(
125-
get(urlEqualTo("/k8s/.well-known/openid-configuration/"))
123+
get(urlEqualTo("/k8s/.well-known/openid-configuration"))
126124
.willReturn(aResponse()
127125
.withHeader("Content-Type", "application/json")
128126
.withBody("""
@@ -170,7 +168,7 @@ void beforeClass() throws IOException {
170168
// Set up JWKS endpoint with a valid and an invalid public key
171169
// The url matches are for both the normal and the k8s endpoints
172170
server.stubFor(
173-
get(urlMatching("/keys|/k8s/openid/v1/jwks/"))
171+
get(urlMatching("/keys|/k8s/openid/v1/jwks"))
174172
.willReturn(aResponse()
175173
.withHeader("Content-Type", "application/json")
176174
.withBody(

pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,10 @@ public void cleanUpAuthData(Function.FunctionDetails funcDetails, Optional<Funct
179179
// make sure secretName is not null or empty string.
180180
// If deleteNamespacedSecret is called and secret name is null or empty string
181181
// it will delete all the secrets in the namespace
182-
coreClient.deleteNamespacedSecret(secretName,
183-
kubeNamespace, null, null,
184-
0, null, "Foreground", null);
182+
coreClient.deleteNamespacedSecret(secretName, kubeNamespace)
183+
.gracePeriodSeconds(0)
184+
.propagationPolicy("Foreground")
185+
.execute();
185186
} catch (ApiException e) {
186187
// if already deleted
187188
if (e.getCode() == HTTP_NOT_FOUND) {
@@ -205,8 +206,7 @@ public void cleanUpAuthData(Function.FunctionDetails funcDetails, Optional<Funct
205206
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
206207
.supplier(() -> {
207208
try {
208-
coreClient.readNamespacedSecret(secretName, kubeNamespace, null);
209-
209+
coreClient.readNamespacedSecret(secretName, kubeNamespace).execute();
210210
} catch (ApiException e) {
211211
// statefulset is gone
212212
if (e.getCode() == HTTP_NOT_FOUND) {
@@ -304,15 +304,12 @@ private void upsertSecret(String token, Function.FunctionDetails funcDetails, St
304304
.data(buildSecretMap(token));
305305

306306
try {
307-
coreClient.createNamespacedSecret(kubeNamespace, v1Secret, null, null, null, null);
307+
coreClient.createNamespacedSecret(kubeNamespace, v1Secret).execute();
308308
} catch (ApiException e) {
309309
if (e.getCode() == HTTP_CONFLICT) {
310310
try {
311-
coreClient
312-
.replaceNamespacedSecret(secretName, kubeNamespace, v1Secret,
313-
null, null, null, null);
311+
coreClient.replaceNamespacedSecret(secretName, kubeNamespace, v1Secret).execute();
314312
return Actions.ActionResult.builder().success(true).build();
315-
316313
} catch (ApiException e1) {
317314
String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
318315
return Actions.ActionResult.builder()
@@ -366,7 +363,7 @@ private String createSecret(String token, Function.FunctionDetails funcDetails)
366363
.metadata(new V1ObjectMeta().name(getSecretName(id)))
367364
.data(buildSecretMap(token));
368365
try {
369-
coreClient.createNamespacedSecret(kubeNamespace, v1Secret, null, null, null, null);
366+
coreClient.createNamespacedSecret(kubeNamespace, v1Secret).execute();
370367
} catch (ApiException e) {
371368
// already exists
372369
if (e.getCode() == HTTP_CONFLICT) {

0 commit comments

Comments
 (0)