Skip to content

Commit

Permalink
[fix][ml] Fix races in ledger initialization timeout handling
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Oct 17, 2024
1 parent 63c6a25 commit 94c59e4
Showing 1 changed file with 65 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,50 @@ private static class PendingInitializeManagedLedger {

private final ManagedLedgerImpl ledger;
private final long createTimeMs;
private boolean done;

PendingInitializeManagedLedger(ManagedLedgerImpl ledger) {
this.ledger = ledger;
this.createTimeMs = System.currentTimeMillis();
}

public synchronized boolean done() {
if (done) {
return false;
}
done = true;
return true;
}

public synchronized boolean close() {
return close(null);
}

public synchronized boolean close(Runnable closeCallback) {
if (done) {
return false;
}
done = true;
ledger.asyncClose(new CloseCallback() {
@Override
public void closeComplete(Object ctx) {
if (closeCallback != null) {
closeCallback.run();
}
}

@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed to a pending initialization managed ledger", ledger.getName(),
exception);
if (closeCallback != null) {
closeCallback.run();
}
}
}, null);
return true;
}

}

public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, ClientConfiguration bkClientConfiguration)
Expand Down Expand Up @@ -391,14 +429,17 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final
PendingInitializeManagedLedger pendingLedger = pendingInitializeLedgers.get(name);
if (null != pendingLedger) {
long pendingMs = System.currentTimeMillis() - pendingLedger.createTimeMs;
if (pendingMs > TimeUnit.SECONDS.toMillis(config.getMetadataOperationsTimeoutSeconds())) {
if (pendingMs > TimeUnit.SECONDS.toMillis(config.getMetadataOperationsTimeoutSeconds())
&& pendingInitializeLedgers.remove(name, pendingLedger)) {
log.warn("[{}] Managed ledger has been pending in initialize state more than {} milliseconds,"
+ " remove it from cache to retry ...", name, pendingMs);
ledgers.remove(name, existingFuture);
pendingInitializeLedgers.remove(name, pendingLedger);
+ " remove it from cache to retry ...", name, pendingMs);
pendingLedger.close(() -> {
if (ledgers.remove(name, existingFuture)) {
entryCacheManager.removeEntryCache(name);
}
});
}
}

}
}

Expand All @@ -420,15 +461,20 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final
newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
@Override
public void initializeComplete() {
log.info("[{}] Successfully initialize managed ledger", name);
pendingInitializeLedgers.remove(name, pendingLedger);
future.complete(newledger);

// May need to update the cursor position
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
// May need to trigger offloading
if (config.isTriggerOffloadOnTopicLoad()) {
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
if (pendingLedger.done()) {
log.info("[{}] Successfully initialize managed ledger", name);
future.complete(newledger);

// May need to update the cursor position
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
// May need to trigger offloading
if (config.isTriggerOffloadOnTopicLoad()) {
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
}
} else {
future.completeExceptionally(
new ManagedLedgerException("Managed ledger initialization timed out"));
}
}

Expand All @@ -437,26 +483,14 @@ public void initializeFailed(ManagedLedgerException e) {
if (config.isCreateIfMissing()) {
log.error("[{}] Failed to initialize managed ledger: {}", name, e.getMessage());
}

// Clean the map if initialization fails
ledgers.remove(name, future);
entryCacheManager.removeEntryCache(name);

if (pendingInitializeLedgers.remove(name, pendingLedger)) {
pendingLedger.ledger.asyncClose(new CloseCallback() {
@Override
public void closeComplete(Object ctx) {
// no-op
}

@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed to a pending initialization managed ledger", name,
exception);
pendingLedger.close(() -> {
if (ledgers.remove(name, existingFuture)) {
entryCacheManager.removeEntryCache(name);
}
}, null);
});
}

future.completeExceptionally(e);
}
}, null);
Expand Down Expand Up @@ -612,8 +646,8 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {

}, scheduledExecutor.chooseThread());
//close pendingInitializeManagedLedger directly to make sure all callbacks is called.
PendingInitializeManagedLedger pendingLedger = pendingInitializeLedgers.get(ledgerName);
if (pendingLedger != null && !ledgerFuture.isDone()) {
PendingInitializeManagedLedger pendingLedger = pendingInitializeLedgers.remove(ledgerName);
if (pendingLedger != null && pendingLedger.close()) {
ledgerFuture.completeExceptionally(new ManagedLedgerException.ManagedLedgerFactoryClosedException());
}
}
Expand Down

0 comments on commit 94c59e4

Please sign in to comment.