Skip to content

Conversation

@lhotari
Copy link
Member

@lhotari lhotari commented Nov 24, 2025

Motivation

Geo-replication will fail with schemaValidationEnforced enabled in certain cases currently.
The replication will be in a loop loggin "IncompatibleSchemaException: Producers cannot connect or send message without a schema to topics with a schemawhen SchemaValidationEnforced is enabled" warnings and errors.

Example logs from the test that reproduces the issue:

2025-11-24T17:45:05,002 - WARN  - [pulsar-io-64-7:ClientCnx] - [id: 0xca37c05a, L:/127.0.0.1:61297 - R:localhost/127.0.0.1:61194] Received error from server: org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Producers cannot connect or send message without a schema to topics with a schemawhen SchemaValidationEnforced is enabled caused by org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Producers cannot connect or send message without a schema to topics with a schemawhen SchemaValidationEnforced is enabled
2025-11-24T17:45:05,002 - ERROR - [pulsar-io-64-7:ProducerImpl] - [persistent://public/always-compatible/tp_-206e9fd0-3dab-49eb-99f6-ebfa1a8452d8] [pulsar.repl.r1-->r2] Failed to create producer: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Producers cannot connect or send message without a schema to topics with a schemawhen SchemaValidationEnforced is enabled caused by org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Producers cannot connect or send message without a schema to topics with a schemawhen SchemaValidationEnforced is enabled","reqId":1217737249645833457, "remote":"localhost/127.0.0.1:61194", "local":"/127.0.0.1:61297"}
2025-11-24T17:45:05,002 - WARN  - [pulsar-io-64-7:AbstractReplicator] - [persistent://public/always-compatible/tp_-206e9fd0-3dab-49eb-99f6-ebfa1a8452d8 | r1-->r2] Failed to create remote producer (org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Producers cannot connect or send message without a schema to topics with a schemawhen SchemaValidationEnforced is enabled caused by org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Producers cannot connect or send message without a schema to topics with a schemawhen SchemaValidationEnforced is enabled","reqId":1217737249645833457, "remote":"localhost/127.0.0.1:61194", "local":"/127.0.0.1:61297"}), retrying in 0.382 s
2025-11-24T17:45:05,012 - INFO  - [pulsar-io-64-5:AbstractReplicator] - [persistent://public/always-compatible/tp_-206e9fd0-3dab-49eb-99f6-ebfa1a8452d8 | r1-->r2] Starting replicator
2025-11-24T17:45:05,013 - INFO  - [pulsar-io-64-10:ConnectionPool] - [[id: 0x153925b5, L:/127.0.0.1:61298 - R:localhost/127.0.0.1:61194]] Connected to server
2025-11-24T17:45:05,013 - INFO  - [pulsar-io-105-14:ServerCnx] - [/127.0.0.1:61298] connected with clientVersion=Pulsar-Java-v4.2.0-SNAPSHOT, clientProtocolVersion=21, proxyVersion=null
2025-11-24T17:45:05,014 - INFO  - [pulsar-io-64-24:ProducerImpl] - [persistent://public/always-compatible/tp_-206e9fd0-3dab-49eb-99f6-ebfa1a8452d8] [pulsar.repl.r1-->r2] Creating producer on cnx [id: 0xae6e9a97, L:/127.0.0.1:61237 - R:localhost/127.0.0.1:61194]
2025-11-24T17:45:05,015 - WARN  - [pulsar-io-64-18:ClientCnx] - [id: 0xae6e9a97, L:/127.0.0.1:61237 - R:localhost/127.0.0.1:61194] Received error from server: org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Producers cannot connect or send message without a schema to topics with a schemawhen SchemaValidationEnforced is enabled caused by org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Producers cannot connect or send message without a schema to topics with a schemawhen SchemaValidationEnforced is enabled
2025-11-24T17:45:05,015 - ERROR - [pulsar-io-64-18:ProducerImpl] - [persistent://public/always-compatible/tp_-206e9fd0-3dab-49eb-99f6-ebfa1a8452d8] [pulsar.repl.r1-->r2] Failed to create producer: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Producers cannot connect or send message without a schema to topics with a schemawhen SchemaValidationEnforced is enabled caused by org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Producers cannot connect or send message without a schema to topics with a schemawhen SchemaValidationEnforced is enabled","reqId":1217737249645833460, "remote":"localhost/127.0.0.1:61194", "local":"/127.0.0.1:61237"}
2025-11-24T17:45:05,015 - WARN  - [pulsar-io-64-18:AbstractReplicator] - [persistent://public/always-compatible/tp_-206e9fd0-3dab-49eb-99f6-ebfa1a8452d8 | r1-->r2] Failed to create remote producer (org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Producers cannot connect or send message without a schema to topics with a schemawhen SchemaValidationEnforced is enabled caused by org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Producers cannot connect or send message without a schema to topics with a schemawhen SchemaValidationEnforced is enabled","reqId":1217737249645833460, "remote":"localhost/127.0.0.1:61194", "local":"/127.0.0.1:61237"}), retrying in 0.782 s

The root cause of the problem is in this implementation code in the Pulsar client that geo-replication uses:

AutoProduceBytesSchema autoProduceBytesSchema = (AutoProduceBytesSchema) schema;
if (autoProduceBytesSchema.schemaInitialized()) {
return createProducerAsync(topic, conf, schema, interceptors);
}
return lookup.getSchema(TopicName.get(conf.getTopicName()))
.thenCompose(schemaInfoOptional -> {
if (schemaInfoOptional.isPresent()) {
SchemaInfo schemaInfo = schemaInfoOptional.get();
if (schemaInfo.getType() == SchemaType.PROTOBUF) {
autoProduceBytesSchema.setSchema(new GenericAvroSchema(schemaInfo));
} else {
autoProduceBytesSchema.setSchema(Schema.getSchema(schemaInfo));
}
} else {
autoProduceBytesSchema.setSchema(Schema.BYTES);
}
return createProducerAsync(topic, conf, schema, interceptors);
});

After the first lookup for the schema, the schema will be cached. When the schema is missing from the remote cluster's topic, it will always be initialized to Schema.BYTES. If topic is configured to have a schema, this will get ignored by the replicator since it will continue to use Schema.BYTES for replication.

Modifications

  • modify the logic to only use the cached schema when it was provided by the user code with org.apache.pulsar.client.api.Schema#AUTO_PRODUCE_BYTES(org.apache.pulsar.client.api.Schema<?>) method.
  • continue to cache the returned schema since it's needed for producing messages. However, when reinitializing the producer, it will get fetched each time from the broker so that schema updates will be detected. This is needed for fixing the replication issue.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@codecov-commenter
Copy link

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 74.33%. Comparing base (807dcaf) to head (aef0be6).
⚠️ Report is 3 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #25012      +/-   ##
============================================
- Coverage     74.42%   74.33%   -0.10%     
- Complexity    33586    33701     +115     
============================================
  Files          1920     1920              
  Lines        150279   150301      +22     
  Branches      17448    17457       +9     
============================================
- Hits         111847   111726     -121     
- Misses        29557    29673     +116     
- Partials       8875     8902      +27     
Flag Coverage Δ
inttests 26.36% <66.66%> (-0.39%) ⬇️
systests 22.81% <100.00%> (-0.23%) ⬇️
unittests 73.88% <100.00%> (-0.05%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...rg/apache/pulsar/client/impl/PulsarClientImpl.java 75.55% <100.00%> (ø)
...sar/client/impl/schema/AutoProduceBytesSchema.java 70.73% <100.00%> (-1.07%) ⬇️

... and 103 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@lhotari lhotari merged commit ec609af into apache:master Nov 25, 2025
148 of 155 checks passed
lhotari added a commit that referenced this pull request Nov 25, 2025
lhotari added a commit that referenced this pull request Nov 25, 2025
lhotari added a commit that referenced this pull request Nov 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants