Skip to content

Commit

Permalink
avoid race condition between cache and websocket rest events
Browse files Browse the repository at this point in the history
  • Loading branch information
ddeguglielmo committed Feb 9, 2024
1 parent 85abb0d commit 00dee1d
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 150 deletions.
249 changes: 102 additions & 147 deletions dht-cache/src/domocache.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::domopersistentstorage::{DomoPersistentStorage, SqlxStorage};
use crate::utils;
use futures::prelude::*;
use libp2p::gossipsub::IdentTopic as Topic;
use libp2p::identity::Keypair;
use libp2p::{mdns, ping};
Expand Down Expand Up @@ -93,19 +92,12 @@ pub struct DomoCache {
pub is_persistent_cache: bool,
pub local_peer_id: String,
client_tx_channel: Sender<DomoEvent>,
client_rx_channel: Receiver<DomoEvent>,
send_cache_state_timer: tokio::time::Instant,
pub client_rx_channel: Receiver<DomoEvent>,
pub send_cache_state_timer: tokio::time::Instant,
loopback_peers_only: bool
}

enum Event {
Client(DomoEvent),
RefreshTime,
PersistentData(String),
VolatileData(String),
Config(String),
Continue,
}


impl Hash for DomoCache {
fn hash<H: Hasher>(&self, state: &mut H) {
Expand Down Expand Up @@ -155,6 +147,104 @@ impl DomoCache {
}
}

pub async fn reload_timer(&mut self){
self.send_cache_state_timer = tokio::time::Instant::now()
+ Duration::from_secs(u64::from(SEND_CACHE_HASH_PERIOD));
self.send_cache_state().await;
}

pub async fn manage_swarm_event(&mut self, event: SwarmEvent<crate::domolibp2p::OutEvent>) -> Result<DomoEvent, Box<dyn Error>>{
match event {
SwarmEvent::ExpiredListenAddr { address, .. } => {
println!("Address {address:?} expired");
}
SwarmEvent::ConnectionEstablished { peer_id, connection_id, endpoint, .. } => {
println!("Connection established {peer_id:?}, {connection_id:?}, {endpoint:?}");
}
SwarmEvent::ConnectionClosed { peer_id, connection_id, endpoint, num_established: _, cause } => {
println!("Connection closed {peer_id:?}, {connection_id:?}, {endpoint:?} -> {cause:?}");
self.swarm.close_connection(connection_id);
}
SwarmEvent::ListenerError { listener_id, error } => {
println!("Listener Error {listener_id:?} -> {error:?}");
}
SwarmEvent::OutgoingConnectionError { connection_id, peer_id, error } => {
println!("Outgoing connection error {peer_id:?}, {connection_id:?} -> {error:?}");
self.swarm.close_connection(connection_id);
}
SwarmEvent::ListenerClosed { .. } => {
println!("Listener Closed");
}
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening in {address:?}");
}
SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Gossipsub(
libp2p::gossipsub::Event::Message {
propagation_source: _peer_id,
message_id: _id,
message,
},
)) => {
let data = String::from_utf8(message.data).unwrap();
let topic = message.topic.as_str();
match topic {
"domo-persistent-data" => {
let m = self.handle_persistent_message_data(&data).await;
return m;
}
"domo-config" => {
self.handle_config_data(&data).await;
}
"domo-volatile-data" => {
let m = self.handle_volatile_data(&data);
return m;
}
_ => {
log::warn!("Not able to recognize message {topic}");
}
}
}
SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Ping(
ping::Event { peer, connection, result}
)) => {

if let Ok(_res) = result {
println!("PING OK {} {}", peer.to_string(), connection);
} else {
println!("PING FAILED {} {}, CLOSE CONNECTION", peer.to_string(), connection);
self.swarm.close_connection(connection);
}
}
SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Mdns(
mdns::Event::Discovered(list),
)) => {
let _local = OffsetDateTime::now_utc();
for (peer_id, multiaddr) in list {
println!("Discovered peer {peer_id} {multiaddr}");
log::info!("{}", multiaddr);
let is_local_peer = utils::is_local_peer(&multiaddr.to_string());
if self.loopback_peers_only && !is_local_peer {
log::info!("Skipping peer since it is not local");
continue;
}

let dial_opts = DialOpts::peer_id(peer_id)
.condition(PeerCondition::Always)
.build();

let _res = self.swarm.dial(dial_opts);
println!("INSERT INTO MDNS CACHE {} {} ", peer_id.to_string(), get_epoch_ms()/1000);

self.mdns_peers_cache.insert(peer_id.to_string(), get_epoch_ms());
println!("{:?}", self.mdns_peers_cache);

}

}
_ => {}
}
return Err("no messages".into());
}
fn handle_volatile_data(
&self,
message: &str,
Expand Down Expand Up @@ -354,7 +444,7 @@ impl DomoCache {
}
}

async fn send_cache_state(&mut self) {
pub async fn send_cache_state(&mut self) {
let m = DomoCacheStateMessage {
peer_id: self.local_peer_id.to_string(),
cache_hash: self.get_cache_hash(),
Expand Down Expand Up @@ -393,141 +483,6 @@ impl DomoCache {
}
}

async fn inner_select(&mut self) -> Event {
use Event::*;
tokio::select!(
// a client of this router published something
m = self.client_rx_channel.recv() => {
let dm = m.unwrap();
return Client(dm);
},

_ = tokio::time::sleep_until(self.send_cache_state_timer) => {
return RefreshTime;
},

event = self.swarm.select_next_some() => {
match event {
SwarmEvent::ExpiredListenAddr { address, .. } => {
println!("Address {address:?} expired");
}
SwarmEvent::ConnectionEstablished { peer_id, connection_id, endpoint, .. } => {
println!("Connection established {peer_id:?}, {connection_id:?}, {endpoint:?}");
}
SwarmEvent::ConnectionClosed { peer_id, connection_id, endpoint, num_established: _, cause } => {
println!("Connection closed {peer_id:?}, {connection_id:?}, {endpoint:?} -> {cause:?}");
self.swarm.close_connection(connection_id);
}
SwarmEvent::ListenerError { listener_id, error } => {
println!("Listener Error {listener_id:?} -> {error:?}");
}
SwarmEvent::OutgoingConnectionError { connection_id, peer_id, error } => {
println!("Outgoing connection error {peer_id:?}, {connection_id:?} -> {error:?}");
self.swarm.close_connection(connection_id);
}
SwarmEvent::ListenerClosed { .. } => {
println!("Listener Closed");
}
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening in {address:?}");
}
SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Gossipsub(
libp2p::gossipsub::Event::Message {
propagation_source: _peer_id,
message_id: _id,
message,
},
)) => {
let data = String::from_utf8(message.data).unwrap();
let topic = message.topic.as_str();
match topic {
"domo-persistent-data" => {
return PersistentData(data);
}
"domo-config" => {
return Config(data);
}
"domo-volatile-data" => {
return VolatileData(data);
}
_ => {
log::warn!("Not able to recognize message {topic}");
}
}
}
SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Ping(
ping::Event { peer, connection, result}
)) => {

if let Ok(_res) = result {
println!("PING OK {} {}", peer.to_string(), connection);
} else {
println!("PING FAILED {} {}, CLOSE CONNECTION", peer.to_string(), connection);
self.swarm.close_connection(connection);
}
}
SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Mdns(
mdns::Event::Discovered(list),
)) => {
let _local = OffsetDateTime::now_utc();
for (peer_id, multiaddr) in list {
println!("Discovered peer {peer_id} {multiaddr}");
log::info!("{}", multiaddr);
let is_local_peer = utils::is_local_peer(&multiaddr.to_string());
if self.loopback_peers_only && !is_local_peer {
log::info!("Skipping peer since it is not local");
continue;
}

let dial_opts = DialOpts::peer_id(peer_id)
.condition(PeerCondition::Always)
.build();

let _res = self.swarm.dial(dial_opts);
println!("INSERT INTO MDNS CACHE {} {} ", peer_id.to_string(), get_epoch_ms()/1000);

self.mdns_peers_cache.insert(peer_id.to_string(), get_epoch_ms());
println!("{:?}", self.mdns_peers_cache);

// self.swarm
// .behaviour_mut()
// .gossipsub
// .add_explicit_peer(&peer);

}

}
_ => {}
}
}
);
Continue
}
pub async fn cache_event_loop(&mut self) -> std::result::Result<DomoEvent, Box<dyn Error>> {
use Event::*;
loop {
match self.inner_select().await {
Client(ev) => {
return Ok(ev);
}
RefreshTime => {
self.send_cache_state_timer = tokio::time::Instant::now()
+ Duration::from_secs(u64::from(SEND_CACHE_HASH_PERIOD));
self.send_cache_state().await;
}
PersistentData(data) => {
return self.handle_persistent_message_data(&data).await;
}
VolatileData(data) => {
return self.handle_volatile_data(&data);
}
Config(data) => {
self.handle_config_data(&data).await;
}
Continue => {}
}
}
}

pub async fn new(conf: sifis_config::Cache) -> Result<Self, Box<dyn Error>> {
if conf.url.is_empty() {
Expand Down
21 changes: 18 additions & 3 deletions src/domobroker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ use crate::websocketmessage::{
};

use serde_json::json;
use futures_util::stream::StreamExt;

pub struct DomoBroker {
pub domo_cache: DomoCache,
pub web_manager: WebApiManager,
}

enum Event {


pub enum Event {
WebSocket(SyncWebSocketDomoMessage),
Rest(RestMessage),
Cache(Result<DomoEvent, Box<dyn Error>>),
Expand All @@ -35,7 +38,7 @@ impl DomoBroker {
})
}

async fn inner_select(&mut self) -> Event {
pub async fn inner_select(&mut self) -> Event {
use Event::*;

tokio::select! {
Expand All @@ -48,9 +51,21 @@ impl DomoBroker {
return Rest(rest_message);
},

m = self.domo_cache.cache_event_loop() => {
m = self.domo_cache.client_rx_channel.recv() => {
if let Some(m) = m {
return Cache(Ok(m));
}
},

_ = tokio::time::sleep_until(self.domo_cache.send_cache_state_timer) => {
self.domo_cache.reload_timer().await;
},

event = self.domo_cache.swarm.select_next_some() => {
let m = self.domo_cache.manage_swarm_event(event).await;
return Cache(m);
}

}

Continue
Expand Down

0 comments on commit 00dee1d

Please sign in to comment.