Skip to content

Commit 7cf5b2c

Browse files
committed
Set default options from boot.go; move PipelineOpt to SdkHarnessOptions
1 parent 83a8513 commit 7cf5b2c

File tree

4 files changed

+26
-53
lines changed

4 files changed

+26
-53
lines changed

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.beam.runners.dataflow.options;
1919

2020
import com.google.api.services.dataflow.Dataflow;
21-
import java.util.Arrays;
2221
import java.util.List;
2322
import java.util.Map;
2423
import org.apache.beam.runners.dataflow.DataflowRunner;
@@ -267,30 +266,4 @@ public String create(PipelineOptions options) {
267266
List<String> getJdkAddOpenModules();
268267

269268
void setJdkAddOpenModules(List<String> options);
270-
271-
class AvroSerializableClassesFactory implements DefaultValueFactory<List<String>> {
272-
@Override
273-
public List<String> create(PipelineOptions options) {
274-
return Arrays.asList(
275-
"java.math.BigDecimal",
276-
"java.math.BigInteger",
277-
"java.net.URI",
278-
"java.net.URL",
279-
"java.io.File",
280-
"java.lang.Integer");
281-
}
282-
}
283-
284-
/**
285-
* The Avro spec supports the `java-class` schema annotation, which allows fields to be serialized
286-
* and deserialized via their toString/String constructor. As of Avro 1.11.4+, allowed Java
287-
* classes must be explicitly specified via the jvm option. The comma-separated String value of
288-
* this pipeline option will be passed to the Dataflow worker via the
289-
* -Dorg.apache.avro.SERIALIZABLE_CLASSES jvm option.
290-
*/
291-
@Description("Serializable classes required by java-class props in Avro 1.11.4+")
292-
@Default.InstanceFactory(AvroSerializableClassesFactory.class)
293-
List<String> getAvroSerializableClasses();
294-
295-
void setAvroSerializableClasses(List<String> options);
296269
}

runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.io.IOException;
3030
import java.io.InputStream;
3131
import java.io.OutputStream;
32-
import java.util.Arrays;
3332
import java.util.List;
3433
import java.util.concurrent.TimeoutException;
3534
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
@@ -253,28 +252,6 @@ public void testDefaultGcpRegionFromGcloud() {
253252
}
254253
}
255254

256-
@Test
257-
public void testDefaultAvroSerializableClasses() {
258-
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
259-
assertEquals(
260-
Arrays.asList(
261-
"java.math.BigDecimal",
262-
"java.math.BigInteger",
263-
"java.net.URI",
264-
"java.net.URL",
265-
"java.io.File",
266-
"java.lang.Integer"),
267-
options.getAvroSerializableClasses());
268-
}
269-
270-
@Test
271-
public void testOverriddenAvroSerializableClasses() {
272-
final List<String> opts = Arrays.asList("foo", "bar");
273-
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
274-
options.setAvroSerializableClasses(opts);
275-
assertEquals(opts, options.getAvroSerializableClasses());
276-
}
277-
278255
/**
279256
* If gcloud gets stuck, test that {@link DefaultGcpRegionFactory#getRegionFromGcloudCli(long)}
280257
* times out instead of blocking forever.

sdks/java/container/boot.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -277,12 +277,23 @@ func main() {
277277
}
278278
}
279279
// Add trusted Avro serializable classes
280+
var serializableClassesList []string
280281
if serializableClasses, ok := pipelineOptions.GetStructValue().GetFields()["avroSerializableClasses"]; ok {
281-
var serializableClassesSlice []string
282282
for _, cls := range serializableClasses.GetListValue().GetValues() {
283-
serializableClassesSlice = append(serializableClassesSlice, cls.GetStringValue())
283+
serializableClassesList = append(serializableClassesList, cls.GetStringValue())
284284
}
285-
args = append(args, "-Dorg.apache.avro.SERIALIZABLE_CLASSES="+strings.Join(serializableClassesSlice, ","))
285+
} else {
286+
serializableClassesList = []string{
287+
"java.math.BigDecimal",
288+
"java.math.BigInteger",
289+
"java.net.URI",
290+
"java.net.URL",
291+
"java.io.File",
292+
"java.lang.Integer",
293+
}
294+
}
295+
if len(serializableClassesList) > 0 {
296+
args = append(args, "-Dorg.apache.avro.SERIALIZABLE_CLASSES="+strings.Join(serializableClassesList, ","))
286297
}
287298
}
288299
// Automatically open modules for Java 11+

sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,4 +440,16 @@ public Duration create(PipelineOptions options) {
440440
int getElementProcessingTimeoutMinutes();
441441

442442
void setElementProcessingTimeoutMinutes(int value);
443+
444+
/**
445+
* The Avro spec supports the `java-class` schema annotation, which allows fields to be serialized
446+
* and deserialized via their toString/String constructor. As of Avro 1.11.4+, allowed Java
447+
* classes must be explicitly specified via the jvm option. The comma-separated String value of
448+
* this pipeline option will be passed to the Dataflow worker via the
449+
* -Dorg.apache.avro.SERIALIZABLE_CLASSES jvm option.
450+
*/
451+
@Description("Serializable classes required by java-class props in Avro 1.11.4+")
452+
List<String> getAvroSerializableClasses();
453+
454+
void setAvroSerializableClasses(List<String> options);
443455
}

0 commit comments

Comments
 (0)