diff --git a/Cargo.toml b/Cargo.toml index 37538f83..2360310d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,8 +5,24 @@ authors = ["Thomas P Braun"] license = "GPL-3.0-or-later WITH Classpath-exception-2.0" edition = "2021" +[features] +substrate = [ + "sp-runtime", + "sc-client-api", + "sp-api", + "futures" +] + [dependencies] sync_wrapper = "0.1.2" parking_lot = "0.12.1" tokio = { version = "1.32.0", features = ["sync", "time", "macros", "rt"] } -hex = "0.4.3" \ No newline at end of file +hex = "0.4.3" +async-trait = "0.1.73" + +sp-runtime = { optional = true, git = "https://github.com/paritytech/substrate", branch = "polkadot-v1.0.0", default-features = false } +sc-client-api = { optional = true, git = "https://github.com/paritytech/substrate", branch = "polkadot-v1.0.0", default-features = false } +sp-api = { optional = true, git = "https://github.com/paritytech/substrate", branch = "polkadot-v1.0.0", default-features = false } +futures = { optional = true, version = "0.3.28" } + +[dev-dependencies] \ No newline at end of file diff --git a/src/gadget/manager.rs b/src/gadget/manager.rs new file mode 100644 index 00000000..3c8e5e74 --- /dev/null +++ b/src/gadget/manager.rs @@ -0,0 +1,114 @@ +use async_trait::async_trait; +use std::error::Error; +use std::future::Future; +use std::pin::Pin; + +pub struct GadgetManager<'a> { + gadget: Pin> + 'a>>, +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub enum GadgetError { + FinalityNotificationStreamEnded, + BlockImportNotificationStreamEnded, + ProtocolMessageStreamEnded, +} + +#[async_trait] +pub trait AbstractGadget: Send { + type FinalityNotification: Send; + type BlockImportNotification: Send; + type ProtocolMessage: Send; + type Error: Error; + + async fn get_next_finality_notification(&self) -> Option; + async fn get_next_block_import_notification(&self) -> Option; + async fn get_next_protocol_message(&self) -> Option; + + async fn process_finality_notification( + &self, + notification: Self::FinalityNotification, + ) -> Result<(), Self::Error>; + async fn process_block_import_notification( + &self, + notification: Self::BlockImportNotification, + ) -> Result<(), Self::Error>; + async fn process_protocol_message( + &self, + message: Self::ProtocolMessage, + ) -> Result<(), Self::Error>; + + async fn process_error(&self, error: Self::Error); +} + +impl<'a> GadgetManager<'a> { + pub fn new(gadget: T) -> Self { + let gadget_task = async move { + let gadget = &gadget; + + let finality_notification_task = async move { + loop { + if let Some(notification) = gadget.get_next_finality_notification().await { + if let Err(err) = gadget.process_finality_notification(notification).await { + gadget.process_error(err).await; + } + } else { + return Err(GadgetError::FinalityNotificationStreamEnded); + } + } + }; + + let block_import_notification_task = async move { + loop { + if let Some(notification) = gadget.get_next_block_import_notification().await { + if let Err(err) = + gadget.process_block_import_notification(notification).await + { + gadget.process_error(err).await; + } + } else { + return Err(GadgetError::BlockImportNotificationStreamEnded); + } + } + }; + + let protocol_message_task = async move { + loop { + if let Some(message) = gadget.get_next_protocol_message().await { + if let Err(err) = gadget.process_protocol_message(message).await { + gadget.process_error(err).await; + } + } else { + return Err(GadgetError::ProtocolMessageStreamEnded); + } + } + }; + + tokio::select! { + res0 = finality_notification_task => res0, + res1 = block_import_notification_task => res1, + res2 = protocol_message_task => res2 + } + }; + + Self { + gadget: Box::pin(gadget_task), + } + } +} + +impl Future for GadgetManager<'_> { + type Output = Result<(), GadgetError>; + fn poll( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + self.gadget.as_mut().poll(cx) + } +} + +impl<'a, T: AbstractGadget + 'a> From for GadgetManager<'a> { + fn from(gadget: T) -> Self { + Self::new(gadget) + } +} diff --git a/src/gadget/mod.rs b/src/gadget/mod.rs new file mode 100644 index 00000000..383e27d7 --- /dev/null +++ b/src/gadget/mod.rs @@ -0,0 +1,3 @@ +pub mod manager; +#[cfg(feature = "substrate")] +pub mod substrate; diff --git a/src/gadget/substrate/mod.rs b/src/gadget/substrate/mod.rs new file mode 100644 index 00000000..e3ec35b9 --- /dev/null +++ b/src/gadget/substrate/mod.rs @@ -0,0 +1,143 @@ +use crate::gadget::manager::AbstractGadget; +use async_trait::async_trait; +use futures::stream::StreamExt; +use sc_client_api::{ + Backend, BlockImportNotification, BlockchainEvents, FinalityNotification, + FinalityNotifications, HeaderBackend, ImportNotifications, +}; +use sp_api::ProvideRuntimeApi; +use sp_runtime::traits::Block; +use std::error::Error; +use std::fmt::{Debug, Display, Formatter}; +use tokio::sync::Mutex; + +pub struct SubstrateGadget { + client: C, + module: Module, + finality_notification_stream: Mutex>, + block_import_notification_stream: Mutex>, + _pd: std::marker::PhantomData<(B, BE, API)>, +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub struct SubstrateGadgetError {} + +/// Designed to plug-in to the substrate gadget +#[async_trait] +pub trait SubstrateGadgetModule: Send + Sync { + type Error: Error + Send; + type ProtocolMessage: Send; + + async fn get_next_protocol_message(&self) -> Option; + async fn process_finality_notification( + &self, + notification: Gadget::FinalityNotification, + ) -> Result<(), Self::Error>; + async fn process_block_import_notification( + &self, + notification: Gadget::BlockImportNotification, + ) -> Result<(), Self::Error>; + async fn process_protocol_message( + &self, + message: Gadget::ProtocolMessage, + ) -> Result<(), Self::Error>; + async fn process_error(&self, error: Self::Error); +} + +impl Display for SubstrateGadgetError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + Debug::fmt(self, f) + } +} + +impl Error for SubstrateGadgetError {} + +pub trait Client: + BlockchainEvents + HeaderBackend + ProvideRuntimeApi + Send +where + B: Block, + BE: Backend, +{ +} + +impl SubstrateGadget +where + B: Block, + BE: Backend, + C: Client, + Module: SubstrateGadgetModule, + Api: Send + Sync, +{ + pub fn new(client: C, module: Module) -> Self { + let finality_notification_stream = client.finality_notification_stream(); + let block_import_notification_stream = client.import_notification_stream(); + + Self { + client, + module, + finality_notification_stream: Mutex::new(finality_notification_stream), + block_import_notification_stream: Mutex::new(block_import_notification_stream), + _pd: std::marker::PhantomData, + } + } +} + +#[async_trait] +impl AbstractGadget for SubstrateGadget +where + B: Block, + BE: Backend, + C: Client, + Module: SubstrateGadgetModule, + Api: Send + Sync, +{ + type FinalityNotification = FinalityNotification; + type BlockImportNotification = BlockImportNotification; + type ProtocolMessage = Module::ProtocolMessage; + type Error = Module::Error; + + async fn get_next_finality_notification(&self) -> Option { + self.finality_notification_stream.lock().await.next().await + } + + async fn get_next_block_import_notification(&self) -> Option { + self.block_import_notification_stream + .lock() + .await + .next() + .await + } + + async fn get_next_protocol_message(&self) -> Option { + self.module.get_next_protocol_message().await + } + + async fn process_finality_notification( + &self, + notification: Self::FinalityNotification, + ) -> Result<(), Self::Error> { + self.module + .process_finality_notification(notification) + .await + } + + async fn process_block_import_notification( + &self, + notification: Self::BlockImportNotification, + ) -> Result<(), Self::Error> { + self.module + .process_block_import_notification(notification) + .await + } + + async fn process_protocol_message( + &self, + message: Self::ProtocolMessage, + ) -> Result<(), Self::Error> { + self.module.process_protocol_message(message).await + } + + async fn process_error(&self, error: Self::Error) { + self.module.process_error(error).await + } +} diff --git a/src/lib.rs b/src/lib.rs index 7ff5f0f4..6feac97d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1 +1,2 @@ +pub mod gadget; pub mod job_manager;