Skip to content

Commit

Permalink
[Bridge Node] Better optimize sig aggregation (#19700)
Browse files Browse the repository at this point in the history
## Description 

Adds path to provide min timeout for sig aggregation with preference
such that, when specified, we will wait at least `min_timeout` to
collect as many sigs as possible before ordering based on the provided
preference.

Note that the const min timeout value provided here is for verifying
functionality for now, and should ideally be replaced with something
like P95 latency.

## Test plan 

Added unit test. Also ran on testnet node and observed that the client
running this generated a certificate with 11 signatures, while other
client generated a certificate with 28 signatures.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
williampsmith authored Oct 11, 2024
1 parent 5dc3f91 commit a9cd809
Show file tree
Hide file tree
Showing 5 changed files with 316 additions and 41 deletions.
92 changes: 83 additions & 9 deletions crates/sui-authority-aggregation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,33 @@ use mysten_metrics::monitored_future;

use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use sui_types::base_types::ConciseableName;
use sui_types::committee::{CommitteeTrait, StakeUnit};

use tokio::time::timeout;

pub type AsyncResult<'a, T, E> = BoxFuture<'a, Result<T, E>>;

pub struct SigRequestPrefs<K> {
pub ordering_pref: BTreeSet<K>,
pub prefetch_timeout: Duration,
}

pub enum ReduceOutput<R, S> {
Continue(S),
Failed(S),
Success(R),
}

/// This function takes an initial state, than executes an asynchronous function (FMap) for each
/// authority, and folds the results as they become available into the state using an async function (FReduce).
///
/// prefetch_timeout: the minimum amount of time to spend trying to gather results from all authorities
/// before falling back to arrival order.
///
/// total_timeout: the maximum amount of total time to wait for results from all authorities, including
/// time spent prefetching.
pub async fn quorum_map_then_reduce_with_timeout_and_prefs<
'a,
C,
Expand All @@ -35,11 +48,11 @@ pub async fn quorum_map_then_reduce_with_timeout_and_prefs<
>(
committee: Arc<C>,
authority_clients: Arc<BTreeMap<K, Arc<Client>>>,
authority_preferences: Option<&BTreeSet<K>>,
authority_preferences: Option<SigRequestPrefs<K>>,
initial_state: S,
map_each_authority: FMap,
reduce_result: FReduce,
initial_timeout: Duration,
total_timeout: Duration,
) -> Result<
(
R,
Expand All @@ -53,23 +66,84 @@ where
FMap: FnOnce(K, Arc<Client>) -> AsyncResult<'a, V, E> + Clone + 'a,
FReduce: Fn(S, K, StakeUnit, Result<V, E>) -> BoxFuture<'a, ReduceOutput<R, S>>,
{
let authorities_shuffled = committee.shuffle_by_stake(authority_preferences, None);
let (preference, prefetch_timeout) = if let Some(SigRequestPrefs {
ordering_pref,
prefetch_timeout,
}) = authority_preferences
{
(Some(ordering_pref), Some(prefetch_timeout))
} else {
(None, None)
};
let authorities_shuffled = committee.shuffle_by_stake(preference.as_ref(), None);
let mut accumulated_state = initial_state;
let mut total_timeout = total_timeout;

// First, execute in parallel for each authority FMap.
let mut responses: futures::stream::FuturesUnordered<_> = authorities_shuffled
.clone()
.into_iter()
.map(|name| {
let client = authority_clients[&name].clone();
let execute = map_each_authority.clone();
monitored_future!(async move { (name.clone(), execute(name, client).await,) })
})
.collect();
if let Some(prefetch_timeout) = prefetch_timeout {
let elapsed = Instant::now();
let prefetch_sleep = tokio::time::sleep(prefetch_timeout);
let mut authority_to_result: BTreeMap<K, Result<V, E>> = BTreeMap::new();
tokio::pin!(prefetch_sleep);
// get all the sigs we can within prefetch_timeout
loop {
tokio::select! {
resp = responses.next() => {
match resp {
Some((authority_name, result)) => {
authority_to_result.insert(authority_name, result);
}
None => {
// we have processed responses from the full committee so can stop early
break;
}
}
}
_ = &mut prefetch_sleep => {
break;
}
}
}
// process what we have up to this point
for authority_name in authorities_shuffled {
let authority_weight = committee.weight(&authority_name);
if let Some(result) = authority_to_result.remove(&authority_name) {
accumulated_state = match reduce_result(
accumulated_state,
authority_name,
authority_weight,
result,
)
.await
{
// In the first two cases we are told to continue the iteration.
ReduceOutput::Continue(state) => state,
ReduceOutput::Failed(state) => {
return Err(state);
}
ReduceOutput::Success(result) => {
// The reducer tells us that we have the result needed. Just return it.
return Ok((result, responses));
}
};
}
}
// if we got here, fallback through the if statement to continue in arrival order on
// the remaining validators
total_timeout = total_timeout.saturating_sub(elapsed.elapsed());
}

let current_timeout = initial_timeout;
let mut accumulated_state = initial_state;
// Then, as results become available fold them into the state using FReduce.
while let Ok(Some((authority_name, result))) = timeout(current_timeout, responses.next()).await
{
// As results become available fold them into the state using FReduce.
while let Ok(Some((authority_name, result))) = timeout(total_timeout, responses.next()).await {
let authority_weight = committee.weight(&authority_name);
accumulated_state =
match reduce_result(accumulated_state, authority_name, authority_weight, result).await {
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-bridge/src/action_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,7 @@ mod tests {
sui_tx_digest,
sui_tx_event_index,
Ok(signed_action.clone()),
None,
);
signed_actions.insert(secret.public().into(), signed_action.into_sig().signature);
}
Expand All @@ -1376,6 +1377,7 @@ mod tests {
sui_tx_digest,
sui_tx_event_index,
Err(BridgeError::RestAPIError("small issue".into())),
None,
);
}
}
Expand Down
Loading

0 comments on commit a9cd809

Please sign in to comment.