Skip to content

Commit 3937788

Browse files
authored
[fix][broker]Fix memory leak when using a customized ManagedLedger implementation (#25016)
1 parent 6849824 commit 3937788

File tree

5 files changed

+1252
-2
lines changed

5 files changed

+1252
-2
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,8 @@ public interface VoidCallback {
338338
void operationFailed(ManagedLedgerException exception);
339339
}
340340

341-
ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger, String cursorName) {
341+
@VisibleForTesting
342+
protected ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger, String cursorName) {
342343
this.bookkeeper = bookkeeper;
343344
this.cursorProperties = Collections.emptyMap();
344345
this.ledger = ledger;

pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
5454
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerClientFactory.class);
5555
private static final String DEFAULT_STORAGE_CLASS_NAME = "bookkeeper";
5656
private BookkeeperManagedLedgerStorageClass defaultStorageClass;
57-
private ManagedLedgerFactory managedLedgerFactory;
57+
@VisibleForTesting
58+
protected ManagedLedgerFactory managedLedgerFactory;
5859
private BookKeeper defaultBkClient;
5960
private final AsyncCache<EnsemblePlacementPolicyConfig, BookKeeper>
6061
bkEnsemblePolicyToBkClientMap = Caffeine.newBuilder().recordStats().buildAsync();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4092,6 +4092,10 @@ public void readEntryComplete(Entry entry, Object ctx) {
40924092
} catch (Exception e) {
40934093
log.warn("[{}] [{}] Error while getting the oldest message", topic, cursor.toString(), e);
40944094
res.complete(false);
4095+
} finally {
4096+
if (entry != null) {
4097+
entry.release();
4098+
}
40954099
}
40964100

40974101
}

0 commit comments

Comments
 (0)