Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -1131,26 +1132,9 @@ private void doSubscribeTopicPartitions(Schema<T> schema,
List<CompletableFuture<Consumer<T>>> subscribeList = new ArrayList<>();
for (int partitionIndex : partitions) {
String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString();
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
configurationData.setStartPaused(paused);
ConsumerImpl<T> newConsumer = createInternalConsumer(configurationData, partitionName,
partitionIndex, subFuture, createIfDoesNotExist, schema);
synchronized (pauseMutex) {
if (paused) {
newConsumer.pause();
} else {
newConsumer.resume();
}
Consumer originalValue = consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
if (originalValue != null) {
newConsumer.closeAsync().exceptionally(ex -> {
log.error("[{}] [{}] Failed to close the orphan consumer",
partitionName, subscription, ex);
return null;
});
}
}
subscribeList.add(subFuture);
subscribeList.add(addNewConsumerIfNotExists(partitionName,
() -> createInternalConsumer(configurationData, partitionName, partitionIndex,
new CompletableFuture<>(), createIfDoesNotExist, schema)));
}
return FutureUtil.waitForAll(subscribeList);
});
Expand All @@ -1160,29 +1144,20 @@ private void doSubscribeTopicPartitions(Schema<T> schema,
CompletableFuture<Consumer<T>> subscribeFuture = new CompletableFuture<>();
subscribeAllPartitionsFuture = subscribeFuture.thenAccept(__ -> {});

synchronized (pauseMutex) {
consumers.compute(topicName, (key, existingValue) -> {
if (existingValue != null) {
String errorMessage =
String.format("[%s] Failed to subscribe for topic [%s] in topics consumer. "
+ "Topic is already being subscribed for in other thread.", topic, topicName);
log.warn(errorMessage);
subscribeResult.completeExceptionally(new PulsarClientException(errorMessage));
return existingValue;
} else {
internalConfig.setStartPaused(paused);
ConsumerImpl<T> newConsumer = createInternalConsumer(internalConfig, topicName,
-1, subscribeFuture, createIfDoesNotExist, schema);
if (paused) {
newConsumer.pause();
} else {
newConsumer.resume();
}
return newConsumer;
}
});
}

consumers.compute(topicName, (key, existingValue) -> {
if (existingValue != null) {
String errorMessage =
String.format("[%s] Failed to subscribe for topic [%s] in topics consumer. "
+ "Topic is already being subscribed for in other thread.", topic, topicName);
log.warn(errorMessage);
subscribeResult.completeExceptionally(new PulsarClientException(errorMessage));
return existingValue;
} else {
ConsumerImpl<T> newConsumer = createInternalConsumer(internalConfig, topicName,
-1, subscribeFuture, createIfDoesNotExist, schema);
return newConsumer;
}
});
}

subscribeAllPartitionsFuture.thenAccept(finalFuture -> {
Expand Down Expand Up @@ -1222,6 +1197,7 @@ private ConsumerImpl<T> createInternalConsumer(ConsumerConfigurationData<T> conf
.timeout(1, TimeUnit.MILLISECONDS)
.build();
configurationData.setBatchReceivePolicy(internalBatchReceivePolicy);
configurationData.setStartPaused(paused);
configurationData = configurationData.clone();
return ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData, client.externalExecutorProvider(),
Expand Down Expand Up @@ -1445,24 +1421,9 @@ private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String topicNa
.stream()
.map(partitionName -> {
int partitionIndex = TopicName.getPartitionIndex(partitionName);
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
ConsumerConfigurationData<T> configurationData = getInternalConsumerConfig();
configurationData.setStartPaused(paused);
ConsumerImpl<T> newConsumer = createInternalConsumer(configurationData, partitionName,
partitionIndex, subFuture, true, schema);
synchronized (pauseMutex) {
if (paused) {
newConsumer.pause();
} else {
newConsumer.resume();
}
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
}
if (log.isDebugEnabled()) {
log.debug("[{}] create consumer {} for partitionName: {}",
topicName, newConsumer.getTopic(), partitionName);
}
return subFuture;
return addNewConsumerIfNotExists(partitionName,
() -> createInternalConsumer(getInternalConsumerConfig(), partitionName,
partitionIndex, new CompletableFuture<>(), true, schema));
})
.collect(Collectors.toList());
// call interceptor
Expand All @@ -1486,6 +1447,33 @@ private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String topicNa
});
}

private CompletableFuture<Consumer<T>> addNewConsumerIfNotExists(String internalTopicName,
Supplier<ConsumerImpl<T>> newConsumerSupplier) {
ConsumerImpl<T> consumer = consumers.compute(internalTopicName, (__, existingConsumer) -> {
if (existingConsumer != null) {
if (existingConsumer.subscribeFuture().isCompletedExceptionally()) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Closing and replacing existing consumer that wasn't completed successfully "
+ "for {}", topic, subscription, internalTopicName);
}
existingConsumer.closeAsync();
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Reusing existing consumer for {}", topic, subscription, internalTopicName);
}
return existingConsumer;
}
}
// create the new consumer
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Creating consumer for {}", topic, subscription, internalTopicName);
}
return newConsumerSupplier.get();
});
// return the subscribe future
return consumer.subscribeFuture();
}

private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
Expand Down
Loading