Skip to content

Commit 131cf95

Browse files
committed
[fix] Reduce unnecessary schema duplication for partitioned topics in client and geo-replication
1 parent 807dcaf commit 131cf95

File tree

2 files changed

+7
-3
lines changed

2 files changed

+7
-3
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.apache.pulsar.client.impl.PulsarClientImpl;
6868
import org.apache.pulsar.client.impl.SendCallback;
6969
import org.apache.pulsar.common.api.proto.MarkerType;
70+
import org.apache.pulsar.common.naming.TopicName;
7071
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
7172
import org.apache.pulsar.common.schema.SchemaInfo;
7273
import org.apache.pulsar.common.stats.Rate;
@@ -80,6 +81,7 @@ public abstract class PersistentReplicator extends AbstractReplicator
8081

8182
protected final PersistentTopic topic;
8283
protected final ManagedCursor cursor;
84+
protected final String localSchemaTopicName;
8385

8486
protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
8587
private final Object dispatchRateLimiterLock = new Object();
@@ -123,6 +125,7 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man
123125
super(localCluster, localTopic, remoteCluster, remoteTopic, localTopic.getReplicatorPrefix(),
124126
brokerService, replicationClient);
125127
this.topic = localTopic;
128+
this.localSchemaTopicName = TopicName.getPartitionedTopicName(localTopicName).toString();
126129
this.cursor = Objects.requireNonNull(cursor);
127130
this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopic,
128131
Codec.decode(cursor.getName()), cursor, null);
@@ -378,7 +381,7 @@ protected CompletableFuture<SchemaInfo> getSchemaInfo(MessageImpl msg) throws Ex
378381
if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length == 0) {
379382
return CompletableFuture.completedFuture(null);
380383
}
381-
return client.getSchemaProviderLoadingCache().get(localTopicName)
384+
return client.getSchemaProviderLoadingCache().get(localSchemaTopicName)
382385
.getSchemaByVersion(msg.getSchemaVersion());
383386
}
384387

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1363,10 +1363,11 @@ protected <T> CompletableFuture<Schema<T>> preProcessSchemaBeforeSubscribe(Pulsa
13631363
String topicName) {
13641364
if (schema != null && schema.supportSchemaVersioning()) {
13651365
final SchemaInfoProvider schemaInfoProvider;
1366+
String schemaTopicName = TopicName.getPartitionedTopicName(topicName).toString();
13661367
try {
1367-
schemaInfoProvider = pulsarClientImpl.getSchemaProviderLoadingCache().get(topicName);
1368+
schemaInfoProvider = pulsarClientImpl.getSchemaProviderLoadingCache().get(schemaTopicName);
13681369
} catch (ExecutionException e) {
1369-
log.error("Failed to load schema info provider for topic {}", topicName, e);
1370+
log.error("Failed to load schema info provider for topic {}", schemaTopicName, e);
13701371
return FutureUtil.failedFuture(e.getCause());
13711372
}
13721373
schema = schema.clone();

0 commit comments

Comments
 (0)