diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index f546a487f84be..8918e17778b16 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -152,12 +152,50 @@ private static class PendingInitializeManagedLedger { private final ManagedLedgerImpl ledger; private final long createTimeMs; + private boolean done; PendingInitializeManagedLedger(ManagedLedgerImpl ledger) { this.ledger = ledger; this.createTimeMs = System.currentTimeMillis(); } + public synchronized boolean done() { + if (done) { + return false; + } + done = true; + return true; + } + + public synchronized boolean close() { + return close(null); + } + + public synchronized boolean close(Runnable closeCallback) { + if (done) { + return false; + } + done = true; + ledger.asyncClose(new CloseCallback() { + @Override + public void closeComplete(Object ctx) { + if (closeCallback != null) { + closeCallback.run(); + } + } + + @Override + public void closeFailed(ManagedLedgerException exception, Object ctx) { + log.warn("[{}] Failed to a pending initialization managed ledger", ledger.getName(), + exception); + if (closeCallback != null) { + closeCallback.run(); + } + } + }, null); + return true; + } + } public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, ClientConfiguration bkClientConfiguration) @@ -391,14 +429,17 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final PendingInitializeManagedLedger pendingLedger = pendingInitializeLedgers.get(name); if (null != pendingLedger) { long pendingMs = System.currentTimeMillis() - pendingLedger.createTimeMs; - if (pendingMs > TimeUnit.SECONDS.toMillis(config.getMetadataOperationsTimeoutSeconds())) { + if (pendingMs > TimeUnit.SECONDS.toMillis(config.getMetadataOperationsTimeoutSeconds()) + && pendingInitializeLedgers.remove(name, pendingLedger)) { log.warn("[{}] Managed ledger has been pending in initialize state more than {} milliseconds," - + " remove it from cache to retry ...", name, pendingMs); - ledgers.remove(name, existingFuture); - pendingInitializeLedgers.remove(name, pendingLedger); + + " remove it from cache to retry ...", name, pendingMs); + pendingLedger.close(() -> { + if (ledgers.remove(name, existingFuture)) { + entryCacheManager.removeEntryCache(name); + } + }); } } - } } @@ -420,15 +461,20 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final newledger.initialize(new ManagedLedgerInitializeLedgerCallback() { @Override public void initializeComplete() { - log.info("[{}] Successfully initialize managed ledger", name); pendingInitializeLedgers.remove(name, pendingLedger); - future.complete(newledger); - - // May need to update the cursor position - newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger(); - // May need to trigger offloading - if (config.isTriggerOffloadOnTopicLoad()) { - newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); + if (pendingLedger.done()) { + log.info("[{}] Successfully initialize managed ledger", name); + future.complete(newledger); + + // May need to update the cursor position + newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger(); + // May need to trigger offloading + if (config.isTriggerOffloadOnTopicLoad()) { + newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); + } + } else { + future.completeExceptionally( + new ManagedLedgerException("Managed ledger initialization timed out")); } } @@ -437,26 +483,14 @@ public void initializeFailed(ManagedLedgerException e) { if (config.isCreateIfMissing()) { log.error("[{}] Failed to initialize managed ledger: {}", name, e.getMessage()); } - // Clean the map if initialization fails - ledgers.remove(name, future); - entryCacheManager.removeEntryCache(name); - if (pendingInitializeLedgers.remove(name, pendingLedger)) { - pendingLedger.ledger.asyncClose(new CloseCallback() { - @Override - public void closeComplete(Object ctx) { - // no-op - } - - @Override - public void closeFailed(ManagedLedgerException exception, Object ctx) { - log.warn("[{}] Failed to a pending initialization managed ledger", name, - exception); + pendingLedger.close(() -> { + if (ledgers.remove(name, existingFuture)) { + entryCacheManager.removeEntryCache(name); } - }, null); + }); } - future.completeExceptionally(e); } }, null); @@ -612,8 +646,8 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { }, scheduledExecutor.chooseThread()); //close pendingInitializeManagedLedger directly to make sure all callbacks is called. - PendingInitializeManagedLedger pendingLedger = pendingInitializeLedgers.get(ledgerName); - if (pendingLedger != null && !ledgerFuture.isDone()) { + PendingInitializeManagedLedger pendingLedger = pendingInitializeLedgers.remove(ledgerName); + if (pendingLedger != null && pendingLedger.close()) { ledgerFuture.completeExceptionally(new ManagedLedgerException.ManagedLedgerFactoryClosedException()); } }