Skip to content

Commit

Permalink
[bridge] let monitor handle blocklist event (#18792)
Browse files Browse the repository at this point in the history
## Description 

as title 

## Test plan 

added unit tests

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
longbowlu authored Jul 26, 2024
1 parent 3dd9ddd commit 93caf44
Showing 1 changed file with 323 additions and 4 deletions.
327 changes: 323 additions & 4 deletions crates/sui-bridge/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use tokio::time::Duration;

use crate::client::bridge_authority_aggregator::BridgeAuthorityAggregator;
use crate::crypto::BridgeAuthorityPublicKeyBytes;
use crate::events::CommitteeMemberUrlUpdateEvent;
use crate::events::SuiBridgeEvent;
use crate::events::{BlocklistValidatorEvent, CommitteeMemberUrlUpdateEvent};
use crate::retry_with_max_elapsed_time;
use crate::sui_client::{SuiClient, SuiClientInner};
use crate::types::BridgeCommittee;
Expand Down Expand Up @@ -74,10 +74,20 @@ where
bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(Arc::new(
new_committee,
))));
info!("Committee updated");
info!("Committee updated with CommitteeMemberUrlUpdateEvent");
}
SuiBridgeEvent::BlocklistValidatorEvent(_) => {
// TODO
SuiBridgeEvent::BlocklistValidatorEvent(event) => {
info!("Received BlocklistValidatorEvent: {:?}", event);
let new_committee = get_latest_bridge_committee_with_blocklist_event(
sui_client.clone(),
event,
Duration::from_secs(10),
)
.await;
bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(Arc::new(
new_committee,
))));
info!("Committee updated with BlocklistValidatorEvent");
}
SuiBridgeEvent::TokenRegistrationEvent(_) => (),
SuiBridgeEvent::NewTokenEvent(_) => {
Expand Down Expand Up @@ -134,6 +144,59 @@ async fn get_latest_bridge_committee_with_url_update_event<C: SuiClientInner>(
}
}

async fn get_latest_bridge_committee_with_blocklist_event<C: SuiClientInner>(
sui_client: Arc<SuiClient<C>>,
event: BlocklistValidatorEvent,
staleness_retry_interval: Duration,
) -> BridgeCommittee {
let mut remaining_retry_times = REFRESH_COMMITTEE_RETRY_TIMES;
loop {
let Ok(Ok(committee)) = retry_with_max_elapsed_time!(
sui_client.get_bridge_committee(),
Duration::from_secs(600)
) else {
error!("Failed to get bridge committee after retry");
continue;
};
let mut any_mismatch = false;
for pk in &event.public_keys {
let member = committee.member(&BridgeAuthorityPublicKeyBytes::from(pk));
let Some(member) = member else {
// This is possible when a node is processing an older event while the member
// quitted at a later point. Or fullnode returns a stale committee that
// the member hasn't joined.
warn!("Committee member not found in the committee: {:?}", pk);
any_mismatch = true;
break;
};
if member.is_blocklisted != event.blocklisted {
warn!(
"Committee member blocklist status does not match onchain record: {:?}",
member
);
any_mismatch = true;
break;
}
}
if !any_mismatch {
return committee;
}
// If there is any match, it could be:
// 1. the query is sent to a stale fullnode that does not have the latest data yet
// 2. the node is processing an older message, and the latest blocklist status has changed again
// In either case, we retry a few times. If it still fails to match, we assume it's the latter case.
tokio::time::sleep(staleness_retry_interval).await;
remaining_retry_times -= 1;
if remaining_retry_times == 0 {
warn!(
"Committee member blocklist status {:?} does not match onchain record after retry",
event
);
return committee;
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -299,6 +362,219 @@ mod tests {
assert!(elapsed < 1000);
}

#[tokio::test]
async fn test_get_latest_bridge_committee_with_blocklist_event() {
telemetry_subscribers::init_for_testing();
let sui_client_mock = SuiMockClient::default();
let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
let (_, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
let pk = kp.public().clone();
let pk_as_bytes = BridgeAuthorityPublicKeyBytes::from(&pk);
let pk_bytes = pk_as_bytes.as_bytes().to_vec();

// Test the case where the onchain status is the same as the event (blocklisted)
let event = BlocklistValidatorEvent {
blocklisted: true,
public_keys: vec![pk.clone()],
};
let summary = BridgeCommitteeSummary {
members: vec![(
pk_bytes.clone(),
MoveTypeCommitteeMember {
sui_address: SuiAddress::random_for_testing_only(),
bridge_pubkey_bytes: pk_bytes.clone(),
voting_power: 10000,
http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
blocklisted: true,
},
)],
member_registration: vec![],
last_committee_update_epoch: 0,
};
sui_client_mock.set_bridge_committee(summary.clone());
let timer = std::time::Instant::now();
let committee = get_latest_bridge_committee_with_blocklist_event(
sui_client.clone(),
event.clone(),
Duration::from_secs(2),
)
.await;
assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
assert!(timer.elapsed().as_millis() < 500);

// Test the case where the onchain status is the same as the event (unblocklisted)
let event = BlocklistValidatorEvent {
blocklisted: false,
public_keys: vec![pk.clone()],
};
let summary = BridgeCommitteeSummary {
members: vec![(
pk_bytes.clone(),
MoveTypeCommitteeMember {
sui_address: SuiAddress::random_for_testing_only(),
bridge_pubkey_bytes: pk_bytes.clone(),
voting_power: 10000,
http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
blocklisted: false,
},
)],
member_registration: vec![],
last_committee_update_epoch: 0,
};
sui_client_mock.set_bridge_committee(summary.clone());
let timer = std::time::Instant::now();
let committee = get_latest_bridge_committee_with_blocklist_event(
sui_client.clone(),
event.clone(),
Duration::from_secs(2),
)
.await;
assert!(!committee.member(&pk_as_bytes).unwrap().is_blocklisted);
assert!(timer.elapsed().as_millis() < 500);

// Test the case where the onchain status is older. Then update onchain status in 1 second.
// Since the retry interval is 2 seconds, it should return the next retry.
let old_summary = BridgeCommitteeSummary {
members: vec![(
pk_bytes.clone(),
MoveTypeCommitteeMember {
sui_address: SuiAddress::random_for_testing_only(),
bridge_pubkey_bytes: pk_bytes.clone(),
voting_power: 10000,
http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
blocklisted: true,
},
)],
member_registration: vec![],
last_committee_update_epoch: 0,
};
sui_client_mock.set_bridge_committee(old_summary.clone());
let timer = std::time::Instant::now();
// update unblocklisted in 1 second
let sui_client_mock_clone = sui_client_mock.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
sui_client_mock_clone.set_bridge_committee(summary.clone());
});
let committee = get_latest_bridge_committee_with_blocklist_event(
sui_client.clone(),
event.clone(),
Duration::from_secs(2),
)
.await;
assert!(!committee.member(&pk_as_bytes).unwrap().is_blocklisted);
let elapsed = timer.elapsed().as_millis();
assert!(elapsed > 1000 && elapsed < 3000);

// Test the case where the onchain url is newer. It should retry up to
// REFRESH_COMMITTEE_RETRY_TIMES time then return the onchain record.
let newer_summary = BridgeCommitteeSummary {
members: vec![(
pk_bytes.clone(),
MoveTypeCommitteeMember {
sui_address: SuiAddress::random_for_testing_only(),
bridge_pubkey_bytes: pk_bytes.clone(),
voting_power: 10000,
http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
blocklisted: true,
},
)],
member_registration: vec![],
last_committee_update_epoch: 0,
};
sui_client_mock.set_bridge_committee(newer_summary.clone());
let timer = std::time::Instant::now();
let committee = get_latest_bridge_committee_with_blocklist_event(
sui_client.clone(),
event.clone(),
Duration::from_millis(500),
)
.await;
assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
let elapsed = timer.elapsed().as_millis();
assert!(elapsed > 500 * REFRESH_COMMITTEE_RETRY_TIMES as u128);

// Test the case where the member onchain url is not found in the committee
// It should return the onchain record after retrying a few times.
let (_, kp2): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
let pk2 = kp2.public().clone();
let pk_as_bytes2 = BridgeAuthorityPublicKeyBytes::from(&pk2);
let pk_bytes2 = pk_as_bytes2.as_bytes().to_vec();
let summary = BridgeCommitteeSummary {
members: vec![(
pk_bytes2.clone(),
MoveTypeCommitteeMember {
sui_address: SuiAddress::random_for_testing_only(),
bridge_pubkey_bytes: pk_bytes2.clone(),
voting_power: 10000,
http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
blocklisted: false,
},
)],
member_registration: vec![],
last_committee_update_epoch: 0,
};
sui_client_mock.set_bridge_committee(summary.clone());
let timer = std::time::Instant::now();
let committee = get_latest_bridge_committee_with_blocklist_event(
sui_client.clone(),
event.clone(),
Duration::from_secs(1),
)
.await;
assert_eq!(
committee.member(&pk_as_bytes2).unwrap().base_url,
"http://newer.url"
);
assert!(committee.member(&pk_as_bytes).is_none());
let elapsed = timer.elapsed().as_millis();
assert!(elapsed > 500 * REFRESH_COMMITTEE_RETRY_TIMES as u128);

// Test any mismtach in the blocklist status should retry a few times
let event = BlocklistValidatorEvent {
blocklisted: true,
public_keys: vec![pk, pk2],
};
let summary = BridgeCommitteeSummary {
members: vec![
(
pk_bytes.clone(),
MoveTypeCommitteeMember {
sui_address: SuiAddress::random_for_testing_only(),
bridge_pubkey_bytes: pk_bytes.clone(),
voting_power: 5000,
http_rest_url: "http://pk.url".to_string().as_bytes().to_vec(),
blocklisted: true,
},
),
(
pk_bytes2.clone(),
MoveTypeCommitteeMember {
sui_address: SuiAddress::random_for_testing_only(),
bridge_pubkey_bytes: pk_bytes2.clone(),
voting_power: 5000,
http_rest_url: "http://pk2.url".to_string().as_bytes().to_vec(),
blocklisted: false,
},
),
],
member_registration: vec![],
last_committee_update_epoch: 0,
};
sui_client_mock.set_bridge_committee(summary.clone());
let timer = std::time::Instant::now();
let committee = get_latest_bridge_committee_with_blocklist_event(
sui_client.clone(),
event.clone(),
Duration::from_millis(500),
)
.await;
assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
assert!(!committee.member(&pk_as_bytes2).unwrap().is_blocklisted);
let elapsed = timer.elapsed().as_millis();
assert!(elapsed > 500 * REFRESH_COMMITTEE_RETRY_TIMES as u128);
}

#[tokio::test]
async fn test_update_bridge_authority_aggregation_with_url_change_event() {
let (monitor_tx, monitor_rx, sui_client_mock, sui_client) = setup();
Expand Down Expand Up @@ -343,6 +619,49 @@ mod tests {
);
}

#[tokio::test]
async fn test_update_bridge_authority_aggregation_with_blocklist_event() {
let (monitor_tx, monitor_rx, sui_client_mock, sui_client) = setup();
let mut authorities = vec![
get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
];
let old_committee = BridgeCommittee::new(authorities.clone()).unwrap();
let agg = Arc::new(ArcSwap::new(Arc::new(BridgeAuthorityAggregator::new(
Arc::new(old_committee),
))));
let _handle = tokio::task::spawn(
BridgeMonitor::new(sui_client.clone(), monitor_rx, agg.clone()).run(),
);
authorities[0].is_blocklisted = true;
let to_blocklist = &authorities[0];
let new_committee = BridgeCommittee::new(authorities.clone()).unwrap();
let new_committee_summary =
bridge_committee_to_bridge_committee_summary(new_committee.clone());
sui_client_mock.set_bridge_committee(new_committee_summary.clone());
monitor_tx
.send(SuiBridgeEvent::BlocklistValidatorEvent(
BlocklistValidatorEvent {
public_keys: vec![to_blocklist.pubkey.clone()],
blocklisted: true,
},
))
.await
.unwrap();
// Wait for the monitor to process the event
tokio::time::sleep(Duration::from_secs(1)).await;
// Now expect the committee to be updated
assert!(
agg.load()
.committee
.member(&BridgeAuthorityPublicKeyBytes::from(&to_blocklist.pubkey))
.unwrap()
.is_blocklisted,
);
}

fn setup() -> (
mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
Expand Down

0 comments on commit 93caf44

Please sign in to comment.