Skip to content

Commit 346158f

Browse files
committed
Add unit test of redistribute override
1 parent cf76c1c commit 346158f

File tree

1 file changed

+98
-0
lines changed

1 file changed

+98
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.dataflow;
19+
20+
import static org.hamcrest.MatcherAssert.assertThat;
21+
import static org.hamcrest.Matchers.is;
22+
import static org.hamcrest.Matchers.nullValue;
23+
24+
import java.io.Serializable;
25+
import java.util.Collections;
26+
import org.apache.beam.sdk.Pipeline;
27+
import org.apache.beam.sdk.io.kafka.KafkaIO;
28+
import org.apache.beam.sdk.runners.PTransformOverride;
29+
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
30+
import org.apache.beam.sdk.testing.TestPipeline;
31+
import org.apache.kafka.common.serialization.StringDeserializer;
32+
import org.junit.Rule;
33+
import org.junit.Test;
34+
import org.junit.runner.RunWith;
35+
import org.junit.runners.JUnit4;
36+
37+
@RunWith(JUnit4.class)
38+
public class KafkaReadWithRedistributeOverrideTest implements Serializable {
39+
@Rule public transient TestPipeline p = TestPipeline.create();
40+
41+
@Test
42+
public void testOverrideAppliedWhenRedistributeEnabled() {
43+
p.apply(
44+
"MatchingRead",
45+
KafkaIO.<String, String>read()
46+
.withBootstrapServers("localhost:9092")
47+
.withTopic("test_match")
48+
.withKeyDeserializer(StringDeserializer.class)
49+
.withValueDeserializer(StringDeserializer.class)
50+
.withRedistribute());
51+
p.apply(
52+
"NonMatchingRead",
53+
KafkaIO.<String, String>read()
54+
.withBootstrapServers("localhost:9092")
55+
.withTopic("test_nomatch")
56+
.withKeyDeserializer(StringDeserializer.class)
57+
.withValueDeserializer(StringDeserializer.class));
58+
59+
p.replaceAll(
60+
Collections.singletonList(
61+
PTransformOverride.of(
62+
KafkaReadWithRedistributeOverride.matcher(),
63+
new KafkaReadWithRedistributeOverride.Factory<>())));
64+
65+
Pipeline.PipelineVisitor visitor =
66+
new Pipeline.PipelineVisitor.Defaults() {
67+
68+
private boolean matchingVisited = false;
69+
private boolean nonMatchingVisited = false;
70+
71+
@Override
72+
public CompositeBehavior enterCompositeTransform(Node node) {
73+
if (node.getTransform() instanceof KafkaIO.Read) {
74+
KafkaIO.Read<?, ?> read = (KafkaIO.Read<?, ?>) node.getTransform();
75+
if (read.getTopics().contains("test_match")) {
76+
assertThat(read.isRedistributed(), is(true));
77+
assertThat(read.getOffsetDeduplication(), is(true));
78+
matchingVisited = true;
79+
} else if (read.getTopics().contains("test_nomatch")) {
80+
assertThat(read.isRedistributed(), is(false));
81+
assertThat(read.getOffsetDeduplication(), nullValue());
82+
nonMatchingVisited = true;
83+
}
84+
}
85+
return CompositeBehavior.ENTER_TRANSFORM;
86+
}
87+
88+
@Override
89+
public void leaveCompositeTransform(Node node) {
90+
if (node.isRootNode()) {
91+
assertThat("Matching transform was not visited", matchingVisited, is(true));
92+
assertThat("Non-matching transform was not visited", nonMatchingVisited, is(true));
93+
}
94+
}
95+
};
96+
p.traverseTopologically(visitor);
97+
}
98+
}

0 commit comments

Comments
 (0)