Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Str 304 migrate prover guest code #339

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ toml = "0.5"
tower = "0.4"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
uuid = { version = "1.0", features = ["v4", "serde"] }


# This is needed for custom build of SP1
[profile.release.build-override]
Expand Down
13 changes: 10 additions & 3 deletions bin/prover-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,38 @@ path = "src/main.rs"
required-features = ["prover"]

[dependencies]
alpen-express-btcio = { workspace = true }
alpen-express-common = { workspace = true }
alpen-express-db = { workspace = true }
alpen-express-primitives = { workspace = true }
alpen-express-rocksdb = { workspace = true }
alpen-express-rpc-types = { workspace = true }
alpen-express-state = { workspace = true }
express-proofimpl-btc-blockspace = { workspace = true }
express-proofimpl-evm-ee-stf = { workspace = true }
express-proofimpl-l1-batch = { workspace = true }
express-prover-client-rpc-api = { workspace = true }
express-sp1-adapter = { workspace = true, features = ["prover"] }
express-sp1-guest-builder = { path = "../../provers/sp1" }
express-zkvm = { workspace = true }


anyhow = { workspace = true }
argh = { workspace = true }
async-trait = { workspace = true }
bincode = { workspace = true }
bitcoin = { workspace = true }
borsh = { workspace = true }
jsonrpsee = { workspace = true, features = ["http-client"] }
rayon = "1.8.0"
reth-rpc-types = { workspace = true }
rockbound = { workspace = true }
serde = { workspace = true }
strata-tx-parser = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
uuid = { version = "1.0", features = ["v4", "serde"] }

uuid = { workspace = true }

[features]
prover = []
13 changes: 13 additions & 0 deletions bin/prover-client/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ pub struct Args {
#[argh(option, description = "reth rpc host:port")]
pub reth_rpc: String,

#[argh(option, description = "bitcoind RPC host")]
pub bitcoind_url: String,

#[argh(option, description = "bitcoind RPC user")]
pub bitcoind_user: String,

#[argh(option, description = "bitcoind RPC password")]
pub bitcoind_password: String,

#[argh(option, description = "enable prover client dev rpc", default = "true")]
pub enable_dev_rpcs: bool,
}
Expand All @@ -53,4 +62,8 @@ impl Args {
pub fn get_reth_rpc_url(&self) -> String {
self.reth_rpc.to_string()
}

pub fn get_btc_rpc_url(&self) -> String {
format!("http://{}", self.bitcoind_url)
}
}
12 changes: 3 additions & 9 deletions bin/prover-client/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
// Number of prover workers to spawn
pub const NUM_PROVER_WORKER: usize = 10;
pub const NUM_PROVER_WORKERS: usize = 5;

// Wait time in seconds for the prover manager loop, in seconds
pub const PROVER_MANAGER_WAIT_TIME: u64 = 5;

// Interval between dispatching block proving tasks, in seconds
pub const BLOCK_PROVING_TASK_DISPATCH_INTERVAL: u64 = 1;

// Starting block height for EL block proving tasks
pub const EL_START_BLOCK_HEIGHT: u64 = 1;
// Wait time in seconds for the prover manager loop
pub const PROVER_MANAGER_INTERVAL: u64 = 2;
55 changes: 55 additions & 0 deletions bin/prover-client/src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::{fmt::Debug, sync::Arc};

use tracing::info;
use uuid::Uuid;

use crate::{errors::ProvingTaskError, proving_ops::ops::ProvingOperations, task::TaskTracker};

/// Generic dispatcher for block proving tasks.
#[derive(Debug, Clone)]
pub struct TaskDispatcher<O>
where
O: ProvingOperations,
{
operations: O,
task_tracker: Arc<TaskTracker>,
}

impl<O> TaskDispatcher<O>
where
O: ProvingOperations + Clone + Send + Sync + 'static,
O::Params: Debug + Clone,
{
/// Creates a new task dispatcher.
pub fn new(operations: O, task_tracker: Arc<TaskTracker>) -> Self {
Self {
operations,
task_tracker,
}
}

/// Creates a proving task for the given params.
pub async fn create_task(&self, param: O::Params) -> Result<Uuid, ProvingTaskError> {
info!(
"Creating proving task for block {:?} {:?}",
param,
self.operations.block_type()
);
let input = self
.operations
.fetch_input(param.clone())
.await
.map_err(|e| ProvingTaskError::FetchInput {
param: format!("{:?}", param),
task_type: self.operations.block_type(),
source: e,
})?;
self.operations
.append_task(self.task_tracker.clone(), input)
.await
}

pub fn task_tracker(&self) -> Arc<TaskTracker> {
self.task_tracker.clone()
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I'm reading how this is used correctly but this seems like a bit of a red flag on how this type is designed. It looks like simultaneously it's trying to be both a handle for dispatching tasks and also a OO-style worker type that you run. This is a red flag because .start() is being called on the same (cloned) instances that are being called with .create_task in the RpcContext type. So what's actually the data flow supposed to look like here?

As it seems, there's sorta spooky action at a distance. In .start(), we loop and try to spontaneously start new jobs at what happens to be the same rate as blocks are being produced. This should work, but if something breaks elsewhere then we break ourselves. We should only try to start generating the proofs when we know that we're probably going to do it successfully. Just trying to do it at a fixed interval is not robust because there's inevitably going to be some clock skew that happens and would cause spurious errors (because the data wasn't ready when we tried to star the work) or adds latency (because our clock phase shifted).

Should split out the .start() function to be a toplevel function that takes the dispatcher state and the input as separate arguments. Then we could have a builder that launches the dispatcher in the background and returns the handle used to communicate with the worker. Then that handle is what would actually be used to submit work items to the dispatcher, from something else that's more closely following what we're waiting on to produce proofs.

Did I make sense there?

40 changes: 35 additions & 5 deletions bin/prover-client/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,43 @@ use thiserror::Error;

// Define custom error type
#[derive(Error, Debug)]
pub enum ELProvingTaskError {
#[error("Failed to fetch EL block prover input for block number {block_num}: {source}")]
FetchElBlockProverInputError {
block_num: u64,
pub enum ProvingTaskError {
#[error("Failed to fetch {task_type} input for {param}: {source}")]
FetchInput {
param: String,
task_type: ProvingTaskType,
source: anyhow::Error,
},

#[error("Failed to serialize the EL block prover input")]
SerializationError(#[from] bincode::Error),
Serialization(#[from] bincode::Error),

#[error("Failed to borsh deserialize the input")]
BorshSerialization(#[from] borsh::io::Error),

#[error("Failed to create dependency task: {0}")]
DependencyTaskCreation(String),
}

// Define ProvingTaskType enum to represent EL and CL
#[derive(Debug, Clone, Copy)]
pub enum ProvingTaskType {
Btc,
EL,
CL,
ClBatch,
BtcBatch,
}

impl std::fmt::Display for ProvingTaskType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let block_type_str = match self {
ProvingTaskType::Btc => "BTC",
ProvingTaskType::EL => "EL",
ProvingTaskType::CL => "CL",
ProvingTaskType::ClBatch => "CL Batch",
ProvingTaskType::BtcBatch => "BTC Batch",
};
write!(f, "{}", block_type_str)
}
}
82 changes: 63 additions & 19 deletions bin/prover-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,59 +2,103 @@

use std::sync::Arc;

use alpen_express_btcio::rpc::BitcoinClient;
use alpen_express_common::logging;
use args::Args;
use config::EL_START_BLOCK_HEIGHT;
use dispatcher::TaskDispatcher;
use express_sp1_adapter::SP1Host;
use express_zkvm::ProverOptions;
use jsonrpsee::http_client::HttpClientBuilder;
use manager::ProverManager;
use proving_ops::{
btc_ops::BtcOperations, checkpoint_ops::CheckpointOperations, cl_ops::ClOperations,
el_ops::ElOperations, l1_batch_ops::L1BatchOperations, l2_batch_ops::L2BatchOperations,
};
use rpc_server::{ProverClientRpc, RpcContext};
use task::TaskTracker;
use task_dispatcher::ELBlockProvingTaskScheduler;
use tracing::info;

mod args;
mod config;
mod db;
mod dispatcher;
mod errors;
mod manager;
mod primitives;
mod prover;
mod proving_ops;
mod rpc_server;
mod task;
mod task_dispatcher;

#[tokio::main]
async fn main() {
logging::init();
info!("running alpen express prover client in dev mode");

let args: Args = argh::from_env();
let task_tracker = Arc::new(TaskTracker::new());

let el_rpc_client = HttpClientBuilder::default()
let el_client = HttpClientBuilder::default()
.build(args.get_reth_rpc_url())
.expect("failed to connect to the el client");

let el_proving_task_scheduler = ELBlockProvingTaskScheduler::new(
el_rpc_client,
task_tracker.clone(),
EL_START_BLOCK_HEIGHT,
let cl_client = HttpClientBuilder::default()
.build(args.get_sequencer_rpc_url())
.expect("failed to connect to the el client");

let btc_client = Arc::new(
BitcoinClient::new(
args.get_btc_rpc_url(),
args.bitcoind_user.clone(),
args.bitcoind_password.clone(),
)
.unwrap(),
);
let rpc_context = RpcContext::new(el_proving_task_scheduler.clone());
let prover_manager: ProverManager<SP1Host> = ProverManager::new(task_tracker);

let task_tracker = Arc::new(TaskTracker::new());

// Create L1 operations
let btc_ops = BtcOperations::new(btc_client.clone());
let btc_dispatcher = TaskDispatcher::new(btc_ops, task_tracker.clone());

// Create EL operations
let el_ops = ElOperations::new(el_client.clone());
let el_dispatcher = TaskDispatcher::new(el_ops, task_tracker.clone());

let cl_ops = ClOperations::new(cl_client.clone(), Arc::new(el_dispatcher.clone()));
let cl_dispatcher = TaskDispatcher::new(cl_ops, task_tracker.clone());

let l1_batch_ops = L1BatchOperations::new(Arc::new(btc_dispatcher.clone()), btc_client.clone());
let l1_batch_dispatcher = TaskDispatcher::new(l1_batch_ops, task_tracker.clone());

let l2_batch_ops = L2BatchOperations::new(Arc::new(cl_dispatcher.clone()).clone());
let l2_batch_dispatcher = TaskDispatcher::new(l2_batch_ops, task_tracker.clone());

let checkpoint_ops = CheckpointOperations::new(
Arc::new(l1_batch_dispatcher.clone()),
Arc::new(l2_batch_dispatcher.clone()),
);

let checkpoint_dispatcher = TaskDispatcher::new(checkpoint_ops, task_tracker.clone());

let rpc_context = RpcContext::new(
btc_dispatcher.clone(),
el_dispatcher.clone(),
cl_dispatcher.clone(),
l1_batch_dispatcher.clone(),
l2_batch_dispatcher.clone(),
checkpoint_dispatcher.clone(),
);

let prover_options = ProverOptions {
use_mock_prover: false,
enable_compression: true,
..Default::default()
};
let prover_manager: ProverManager<SP1Host> = ProverManager::new(task_tracker, prover_options);

// run prover manager in background
tokio::spawn(async move { prover_manager.run().await });

// run el proving task dispatcher
tokio::spawn(async move {
el_proving_task_scheduler
.clone()
.listen_for_new_blocks()
.await
});

// run rpc server
let rpc_url = args.get_dev_rpc_url();
run_rpc_server(rpc_context, rpc_url, args.enable_dev_rpcs)
Expand Down
Loading