Skip to content

Commit

Permalink
Add boilerplate for substrate gadgets
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Braun committed Oct 23, 2023
1 parent 1283f8d commit 7a7e415
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 1 deletion.
18 changes: 17 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,24 @@ authors = ["Thomas P Braun"]
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
edition = "2021"

[features]
substrate = [
"sp-runtime",
"sc-client-api",
"sp-api",
"futures"
]

[dependencies]
sync_wrapper = "0.1.2"
parking_lot = "0.12.1"
tokio = { version = "1.32.0", features = ["sync", "time", "macros", "rt"] }
hex = "0.4.3"
hex = "0.4.3"
async-trait = "0.1.73"

sp-runtime = { optional = true, git = "https://github.com/paritytech/substrate", branch = "polkadot-v1.0.0", default-features = false }
sc-client-api = { optional = true, git = "https://github.com/paritytech/substrate", branch = "polkadot-v1.0.0", default-features = false }
sp-api = { optional = true, git = "https://github.com/paritytech/substrate", branch = "polkadot-v1.0.0", default-features = false }
futures = { optional = true, version = "0.3.28" }

[dev-dependencies]
114 changes: 114 additions & 0 deletions src/gadget/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use async_trait::async_trait;
use std::error::Error;
use std::future::Future;
use std::pin::Pin;

pub struct GadgetManager<'a> {
gadget: Pin<Box<dyn Future<Output = Result<(), GadgetError>> + 'a>>,
}

#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum GadgetError {
FinalityNotificationStreamEnded,
BlockImportNotificationStreamEnded,
ProtocolMessageStreamEnded,
}

#[async_trait]
pub trait AbstractGadget: Send {
type FinalityNotification: Send;
type BlockImportNotification: Send;
type ProtocolMessage: Send;
type Error: Error;

async fn get_next_finality_notification(&self) -> Option<Self::FinalityNotification>;
async fn get_next_block_import_notification(&self) -> Option<Self::BlockImportNotification>;
async fn get_next_protocol_message(&self) -> Option<Self::ProtocolMessage>;

async fn process_finality_notification(
&self,
notification: Self::FinalityNotification,
) -> Result<(), Self::Error>;
async fn process_block_import_notification(
&self,
notification: Self::BlockImportNotification,
) -> Result<(), Self::Error>;
async fn process_protocol_message(
&self,
message: Self::ProtocolMessage,
) -> Result<(), Self::Error>;

async fn process_error(&self, error: Self::Error);
}

impl<'a> GadgetManager<'a> {
pub fn new<T: AbstractGadget + 'a>(gadget: T) -> Self {
let gadget_task = async move {
let gadget = &gadget;

let finality_notification_task = async move {
loop {
if let Some(notification) = gadget.get_next_finality_notification().await {
if let Err(err) = gadget.process_finality_notification(notification).await {
gadget.process_error(err).await;
}
} else {
return Err(GadgetError::FinalityNotificationStreamEnded);
}
}
};

let block_import_notification_task = async move {
loop {
if let Some(notification) = gadget.get_next_block_import_notification().await {
if let Err(err) =
gadget.process_block_import_notification(notification).await
{
gadget.process_error(err).await;
}
} else {
return Err(GadgetError::BlockImportNotificationStreamEnded);
}
}
};

let protocol_message_task = async move {
loop {
if let Some(message) = gadget.get_next_protocol_message().await {
if let Err(err) = gadget.process_protocol_message(message).await {
gadget.process_error(err).await;
}
} else {
return Err(GadgetError::ProtocolMessageStreamEnded);
}
}
};

tokio::select! {
res0 = finality_notification_task => res0,
res1 = block_import_notification_task => res1,
res2 = protocol_message_task => res2
}
};

Self {
gadget: Box::pin(gadget_task),
}
}
}

impl Future for GadgetManager<'_> {
type Output = Result<(), GadgetError>;
fn poll(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
self.gadget.as_mut().poll(cx)
}
}

impl<'a, T: AbstractGadget + 'a> From<T> for GadgetManager<'a> {
fn from(gadget: T) -> Self {
Self::new(gadget)
}
}
3 changes: 3 additions & 0 deletions src/gadget/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod manager;
#[cfg(feature = "substrate")]
pub mod substrate;
143 changes: 143 additions & 0 deletions src/gadget/substrate/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
use crate::gadget::manager::AbstractGadget;
use async_trait::async_trait;
use futures::stream::StreamExt;
use sc_client_api::{
Backend, BlockImportNotification, BlockchainEvents, FinalityNotification,
FinalityNotifications, HeaderBackend, ImportNotifications,
};
use sp_api::ProvideRuntimeApi;
use sp_runtime::traits::Block;
use std::error::Error;
use std::fmt::{Debug, Display, Formatter};
use tokio::sync::Mutex;

pub struct SubstrateGadget<B: Block, BE, C, API, Module> {
client: C,
module: Module,
finality_notification_stream: Mutex<FinalityNotifications<B>>,
block_import_notification_stream: Mutex<ImportNotifications<B>>,
_pd: std::marker::PhantomData<(B, BE, API)>,
}

#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct SubstrateGadgetError {}

/// Designed to plug-in to the substrate gadget
#[async_trait]
pub trait SubstrateGadgetModule<Gadget: AbstractGadget>: Send + Sync {
type Error: Error + Send;
type ProtocolMessage: Send;

async fn get_next_protocol_message(&self) -> Option<Gadget::ProtocolMessage>;
async fn process_finality_notification(
&self,
notification: Gadget::FinalityNotification,
) -> Result<(), Self::Error>;
async fn process_block_import_notification(
&self,
notification: Gadget::BlockImportNotification,
) -> Result<(), Self::Error>;
async fn process_protocol_message(
&self,
message: Gadget::ProtocolMessage,
) -> Result<(), Self::Error>;
async fn process_error(&self, error: Self::Error);
}

impl Display for SubstrateGadgetError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Debug::fmt(self, f)
}
}

impl Error for SubstrateGadgetError {}

pub trait Client<B, BE>:
BlockchainEvents<B> + HeaderBackend<B> + ProvideRuntimeApi<B> + Send
where
B: Block,
BE: Backend<B>,
{
}

impl<B, BE, C, Api, Module> SubstrateGadget<B, BE, C, Api, Module>
where
B: Block,
BE: Backend<B>,
C: Client<B, BE, Api = Api>,
Module: SubstrateGadgetModule<Self>,
Api: Send + Sync,
{
pub fn new(client: C, module: Module) -> Self {
let finality_notification_stream = client.finality_notification_stream();
let block_import_notification_stream = client.import_notification_stream();

Self {
client,
module,
finality_notification_stream: Mutex::new(finality_notification_stream),
block_import_notification_stream: Mutex::new(block_import_notification_stream),
_pd: std::marker::PhantomData,
}
}
}

#[async_trait]
impl<B, BE, C, Api, Module> AbstractGadget for SubstrateGadget<B, BE, C, Api, Module>
where
B: Block,
BE: Backend<B>,
C: Client<B, BE, Api = Api>,
Module: SubstrateGadgetModule<Self>,
Api: Send + Sync,
{
type FinalityNotification = FinalityNotification<B>;
type BlockImportNotification = BlockImportNotification<B>;
type ProtocolMessage = Module::ProtocolMessage;
type Error = Module::Error;

async fn get_next_finality_notification(&self) -> Option<Self::FinalityNotification> {
self.finality_notification_stream.lock().await.next().await
}

async fn get_next_block_import_notification(&self) -> Option<Self::BlockImportNotification> {
self.block_import_notification_stream
.lock()
.await
.next()
.await
}

async fn get_next_protocol_message(&self) -> Option<Self::ProtocolMessage> {
self.module.get_next_protocol_message().await
}

async fn process_finality_notification(
&self,
notification: Self::FinalityNotification,
) -> Result<(), Self::Error> {
self.module
.process_finality_notification(notification)
.await
}

async fn process_block_import_notification(
&self,
notification: Self::BlockImportNotification,
) -> Result<(), Self::Error> {
self.module
.process_block_import_notification(notification)
.await
}

async fn process_protocol_message(
&self,
message: Self::ProtocolMessage,
) -> Result<(), Self::Error> {
self.module.process_protocol_message(message).await
}

async fn process_error(&self, error: Self::Error) {
self.module.process_error(error).await
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod gadget;
pub mod job_manager;

0 comments on commit 7a7e415

Please sign in to comment.