Skip to content

Commit

Permalink
Factor out common notify read code
Browse files Browse the repository at this point in the history
  • Loading branch information
mystenmark committed Jun 28, 2024
1 parent d9c0c7e commit f3d95ba
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 69 deletions.
25 changes: 25 additions & 0 deletions crates/mysten-common/src/sync/notify_read.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use futures::future::{join_all, Either};
use parking_lot::Mutex;
use parking_lot::MutexGuard;
use std::collections::hash_map::DefaultHasher;
use std::collections::HashMap;
use std::error::Error;
use std::future::Future;
use std::hash::{Hash, Hasher};
use std::mem;
Expand Down Expand Up @@ -115,6 +117,29 @@ impl<K: Eq + Hash + Clone, V: Clone> NotifyRead<K, V> {
}
}

impl<K: Eq + Hash + Clone + Unpin, V: Clone + Unpin> NotifyRead<K, V> {
pub async fn read<E: Error>(
&self,
keys: &[K],
fetch: impl FnOnce(&[K]) -> Result<Vec<Option<V>>, E>,
) -> Result<Vec<V>, E> {
let registrations = self.register_all(keys);

let results = fetch(keys)?;

let results = results
.into_iter()
.zip(registrations)
.map(|(a, r)| match a {
// Note that Some() clause also drops registration that is already fulfilled
Some(ready) => Either::Left(futures::future::ready(ready)),
None => Either::Right(r),
});

Ok(join_all(results).await)
}
}

/// Registration resolves to the value but also provides safe cancellation
/// When Registration is dropped before it is resolved, we de-register from the pending list
pub struct Registration<'a, K: Eq + Hash + Clone, V: Clone> {
Expand Down
29 changes: 8 additions & 21 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1325,27 +1325,14 @@ impl AuthorityPerEpochStore {
&self,
checkpoints: Vec<CheckpointSequenceNumber>,
) -> SuiResult<Vec<Accumulator>> {
// We need to register waiters _before_ reading from the database to avoid
// race conditions
let registrations = self.checkpoint_state_notify_read.register_all(&checkpoints);
let accumulators = self
.tables()?
.state_hash_by_checkpoint
.multi_get(checkpoints)?;

// Zipping together registrations and accumulators ensures returned order is
// the same as order of digests
let results =
accumulators
.into_iter()
.zip(registrations.into_iter())
.map(|(a, r)| match a {
// Note that Some() clause also drops registration that is already fulfilled
Some(ready) => Either::Left(futures::future::ready(ready)),
None => Either::Right(r),
});

Ok(join_all(results).await)
self.checkpoint_state_notify_read
.read(&checkpoints, |checkpoints| -> SuiResult<_> {
Ok(self
.tables()?
.state_hash_by_checkpoint
.multi_get(checkpoints)?)
})
.await
}

pub async fn notify_read_running_root(
Expand Down
30 changes: 6 additions & 24 deletions crates/sui-core/src/execution_cache/passthrough_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ use crate::authority::AuthorityStore;
use crate::state_accumulator::AccumulatorStore;
use crate::transaction_outputs::TransactionOutputs;

use either::Either;
use futures::{
future::{join_all, BoxFuture},
FutureExt,
};
use futures::{future::BoxFuture, FutureExt};
use mysten_common::sync::notify_read::NotifyRead;
use prometheus::Registry;
use std::sync::Arc;
Expand Down Expand Up @@ -219,25 +215,11 @@ impl TransactionCacheRead for PassthroughCache {
&'a self,
digests: &'a [TransactionDigest],
) -> BoxFuture<'a, SuiResult<Vec<TransactionEffectsDigest>>> {
async move {
let registrations = self
.executed_effects_digests_notify_read
.register_all(digests);

let executed_effects_digests = self.multi_get_executed_effects_digests(digests)?;

let results = executed_effects_digests
.into_iter()
.zip(registrations)
.map(|(a, r)| match a {
// Note that Some() clause also drops registration that is already fulfilled
Some(ready) => Either::Left(futures::future::ready(ready)),
None => Either::Right(r),
});

Ok(join_all(results).await)
}
.boxed()
self.executed_effects_digests_notify_read
.read(digests, |digests| {
self.multi_get_executed_effects_digests(digests)
})
.boxed()
}

fn multi_get_events(
Expand Down
30 changes: 6 additions & 24 deletions crates/sui-core/src/execution_cache/writeback_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@ use crate::transaction_outputs::TransactionOutputs;

use dashmap::mapref::entry::Entry as DashMapEntry;
use dashmap::DashMap;
use either::Either;
use futures::{
future::{join_all, BoxFuture},
FutureExt,
};
use futures::{future::BoxFuture, FutureExt};
use moka::sync::Cache as MokaCache;
use mysten_common::sync::notify_read::NotifyRead;
use parking_lot::Mutex;
Expand Down Expand Up @@ -1735,25 +1731,11 @@ impl TransactionCacheRead for WritebackCache {
&'a self,
digests: &'a [TransactionDigest],
) -> BoxFuture<'a, SuiResult<Vec<TransactionEffectsDigest>>> {
async move {
let registrations = self
.executed_effects_digests_notify_read
.register_all(digests);

let executed_effects_digests = self.multi_get_executed_effects_digests(digests)?;

let results = executed_effects_digests
.into_iter()
.zip(registrations)
.map(|(a, r)| match a {
// Note that Some() clause also drops registration that is already fulfilled
Some(ready) => Either::Left(futures::future::ready(ready)),
None => Either::Right(r),
});

Ok(join_all(results).await)
}
.boxed()
self.executed_effects_digests_notify_read
.read(digests, |digests| {
self.multi_get_executed_effects_digests(digests)
})
.boxed()
}

fn multi_get_events(
Expand Down

0 comments on commit f3d95ba

Please sign in to comment.