diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java index cc134b57b21e3..9abbf81576d92 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java @@ -33,11 +33,12 @@ public class AutoProduceBytesSchema implements Schema { @Setter - private boolean requireSchemaValidation = true; - private Schema schema; - private boolean userProvidedSchema; + private volatile boolean requireSchemaValidation = true; + private volatile Schema schema; + private final boolean userProvidedSchema; public AutoProduceBytesSchema() { + this.userProvidedSchema = false; } public AutoProduceBytesSchema(Schema schema) { @@ -81,11 +82,12 @@ public byte[] encode(byte[] message) { if (requireSchemaValidation) { // verify if the message can be decoded by the underlying schema - if (schema instanceof KeyValueSchema - && ((KeyValueSchema) schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) { - ((KeyValueSchema) schema).getValueSchema().validate(message); + Schema localSchema = schema; + if (localSchema instanceof KeyValueSchema && ((KeyValueSchema) localSchema).getKeyValueEncodingType() + .equals(KeyValueEncodingType.SEPARATED)) { + ((KeyValueSchema) localSchema).getValueSchema().validate(message); } else { - schema.validate(message); + localSchema.validate(message); } }