Skip to content

Commit d3d5963

Browse files
authored
[fix][client] Fix thread-safety of AutoProduceBytesSchema (#25014)
1 parent 0fd0701 commit d3d5963

File tree

1 file changed

+9
-7
lines changed

1 file changed

+9
-7
lines changed

pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,12 @@
3333
public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
3434

3535
@Setter
36-
private boolean requireSchemaValidation = true;
37-
private Schema<T> schema;
38-
private boolean userProvidedSchema;
36+
private volatile boolean requireSchemaValidation = true;
37+
private volatile Schema<T> schema;
38+
private final boolean userProvidedSchema;
3939

4040
public AutoProduceBytesSchema() {
41+
this.userProvidedSchema = false;
4142
}
4243

4344
public AutoProduceBytesSchema(Schema<T> schema) {
@@ -87,11 +88,12 @@ public byte[] encode(byte[] message) {
8788

8889
if (requireSchemaValidation) {
8990
// verify if the message can be decoded by the underlying schema
90-
if (schema instanceof KeyValueSchema
91-
&& ((KeyValueSchema) schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
92-
((KeyValueSchema) schema).getValueSchema().validate(message);
91+
Schema<T> localSchema = schema;
92+
if (localSchema instanceof KeyValueSchema && ((KeyValueSchema) localSchema).getKeyValueEncodingType()
93+
.equals(KeyValueEncodingType.SEPARATED)) {
94+
((KeyValueSchema) localSchema).getValueSchema().validate(message);
9395
} else {
94-
schema.validate(message);
96+
localSchema.validate(message);
9597
}
9698
}
9799

0 commit comments

Comments
 (0)