Skip to content

Commit 4566edd

Browse files
authored
[fix][broker] Fix creation of replicated subscriptions for partitioned topics (#24997)
1 parent 53162ff commit 4566edd

File tree

2 files changed

+22
-1
lines changed

2 files changed

+22
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2220,7 +2220,7 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su
22202220
try {
22212221
pulsar().getAdminClient().topics()
22222222
.createSubscriptionAsync(topicNamePartition.toString(),
2223-
subscriptionName, targetMessageId, false, properties)
2223+
subscriptionName, targetMessageId, replicated, properties)
22242224
.handle((r, ex) -> {
22252225
if (ex != null) {
22262226
// fail the operation on unknown exception or

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2543,6 +2543,27 @@ public void testInvalidBundleErrorResponse() throws Exception {
25432543
}
25442544
}
25452545

2546+
@Test(dataProvider = "trueFalse")
2547+
public void testCreateReplicatedSubscriptionForPartitionedTopic(boolean replicated) throws Exception {
2548+
final String topic = newUniqueName("persistent://" + defaultNamespace + "/topic");
2549+
admin.topics().createPartitionedTopic(topic, 10);
2550+
admin.topics().createSubscription(topic, "sub", MessageId.earliest, replicated);
2551+
for (int i = 0; i < 10; i++) {
2552+
String individualPartition = TopicName.get(topic).getPartition(i).toString();
2553+
TopicStats stats = admin.topics().getStats(individualPartition);
2554+
assertEquals(stats.getSubscriptions().get("sub").isReplicated(), replicated);
2555+
}
2556+
}
2557+
2558+
@Test(dataProvider = "trueFalse")
2559+
public void testCreateReplicatedSubscriptionForNonPartitionedTopic(boolean replicated) throws Exception {
2560+
final String topic = newUniqueName("persistent://" + defaultNamespace + "/topic");
2561+
admin.topics().createNonPartitionedTopic(topic);
2562+
admin.topics().createSubscription(topic, "sub", MessageId.earliest, replicated);
2563+
TopicStats stats = admin.topics().getStats(topic);
2564+
assertEquals(stats.getSubscriptions().get("sub").isReplicated(), replicated);
2565+
}
2566+
25462567
@Test
25472568
public void testMaxSubscriptionsPerTopic() throws Exception {
25482569
restartClusterAfterTest();

0 commit comments

Comments
 (0)