Skip to content

Commit

Permalink
refactor(indexer-alt): make Handler concurrent-pipeline-specific
Browse files Browse the repository at this point in the history
## Description

Sequential pipelines need different parameters and processing logic than
concurrent pipelines, so it no longer makes sense to share the `Handler`
trait.

This change factors out the shared part as its own trait -- `Processor`
-- and moves the rest into the `concurrent` module.

## Test plan

```
sui$ cargo build -p sui-indexer-alt
sui$ cargo nextest run -p sui-indexer-alt
```
  • Loading branch information
amnn committed Oct 28, 2024
1 parent 1709d22 commit ca73e11
Show file tree
Hide file tree
Showing 15 changed files with 199 additions and 172 deletions.
22 changes: 12 additions & 10 deletions crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,15 @@ use anyhow::Result;
use diesel_async::RunQueryDsl;
use sui_types::full_checkpoint_content::CheckpointData;

use crate::{db, models::events::StoredEvEmitMod, schema::ev_emit_mod};

use super::Handler;

use crate::{
db, models::events::StoredEvEmitMod, pipeline::concurrent::Handler, pipeline::Processor,
schema::ev_emit_mod,
};
pub struct EvEmitMod;

#[async_trait::async_trait]
impl Handler for EvEmitMod {
impl Processor for EvEmitMod {
const NAME: &'static str = "ev_emit_mod";

const BATCH_SIZE: usize = 100;
const CHUNK_SIZE: usize = 1000;
const MAX_PENDING_SIZE: usize = 10000;

type Value = StoredEvEmitMod;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
Expand Down Expand Up @@ -49,6 +44,13 @@ impl Handler for EvEmitMod {

Ok(values.into_iter().collect())
}
}

#[async_trait::async_trait]
impl Handler for EvEmitMod {
const BATCH_SIZE: usize = 100;
const CHUNK_SIZE: usize = 1000;
const MAX_PENDING_SIZE: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Ok(diesel::insert_into(ev_emit_mod::table)
Expand Down
21 changes: 12 additions & 9 deletions crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,16 @@ use anyhow::{Context, Result};
use diesel_async::RunQueryDsl;
use sui_types::full_checkpoint_content::CheckpointData;

use crate::{db, models::events::StoredEvStructInst, schema::ev_struct_inst};

use super::Handler;
use crate::{
db, models::events::StoredEvStructInst, pipeline::concurrent::Handler, pipeline::Processor,
schema::ev_struct_inst,
};

pub struct EvStructInst;

#[async_trait::async_trait]
impl Handler for EvStructInst {
impl Processor for EvStructInst {
const NAME: &'static str = "ev_struct_inst";

const BATCH_SIZE: usize = 100;
const CHUNK_SIZE: usize = 1000;
const MAX_PENDING_SIZE: usize = 10000;

type Value = StoredEvStructInst;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
Expand Down Expand Up @@ -52,6 +48,13 @@ impl Handler for EvStructInst {

Ok(values.into_iter().collect())
}
}

#[async_trait::async_trait]
impl Handler for EvStructInst {
const BATCH_SIZE: usize = 100;
const CHUNK_SIZE: usize = 1000;
const MAX_PENDING_SIZE: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Ok(diesel::insert_into(ev_struct_inst::table)
Expand Down
13 changes: 8 additions & 5 deletions crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ use anyhow::{Context, Result};
use diesel_async::RunQueryDsl;
use sui_types::full_checkpoint_content::CheckpointData;

use crate::{db, models::checkpoints::StoredCheckpoint, schema::kv_checkpoints};

use super::Handler;
use crate::{
db, models::checkpoints::StoredCheckpoint, pipeline::concurrent::Handler, pipeline::Processor,
schema::kv_checkpoints,
};

pub struct KvCheckpoints;

#[async_trait::async_trait]
impl Handler for KvCheckpoints {
impl Processor for KvCheckpoints {
const NAME: &'static str = "kv_checkpoints";

type Value = StoredCheckpoint;
Expand All @@ -29,7 +29,10 @@ impl Handler for KvCheckpoints {
.with_context(|| format!("Serializing checkpoint {sequence_number} contents"))?,
}])
}
}

#[async_trait::async_trait]
impl Handler for KvCheckpoints {
async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Ok(diesel::insert_into(kv_checkpoints::table)
.values(values)
Expand Down
22 changes: 12 additions & 10 deletions crates/sui-indexer-alt/src/handlers/kv_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,15 @@ use anyhow::{Context, Result};
use diesel_async::RunQueryDsl;
use sui_types::full_checkpoint_content::CheckpointData;

use crate::{db, models::objects::StoredObject, schema::kv_objects};

use super::Handler;
use crate::{
db, models::objects::StoredObject, pipeline::concurrent::Handler, pipeline::Processor,
schema::kv_objects,
};

pub struct KvObjects;

#[async_trait::async_trait]
impl Handler for KvObjects {
impl Processor for KvObjects {
const NAME: &'static str = "kv_objects";

const BATCH_SIZE: usize = 100;
const CHUNK_SIZE: usize = 1000;
const MAX_PENDING_SIZE: usize = 10000;

type Value = StoredObject;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
Expand Down Expand Up @@ -56,6 +51,13 @@ impl Handler for KvObjects {
.chain(created_objects)
.collect::<Result<Vec<_>, _>>()
}
}

#[async_trait::async_trait]
impl Handler for KvObjects {
const BATCH_SIZE: usize = 100;
const CHUNK_SIZE: usize = 1000;
const MAX_PENDING_SIZE: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Ok(diesel::insert_into(kv_objects::table)
Expand Down
21 changes: 12 additions & 9 deletions crates/sui-indexer-alt/src/handlers/kv_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,16 @@ use anyhow::{Context, Result};
use diesel_async::RunQueryDsl;
use sui_types::full_checkpoint_content::CheckpointData;

use crate::{db, models::transactions::StoredTransaction, schema::kv_transactions};

use super::Handler;
use crate::{
db, models::transactions::StoredTransaction, pipeline::concurrent::Handler,
pipeline::Processor, schema::kv_transactions,
};

pub struct KvTransactions;

#[async_trait::async_trait]
impl Handler for KvTransactions {
impl Processor for KvTransactions {
const NAME: &'static str = "kv_transactions";

const BATCH_SIZE: usize = 100;
const CHUNK_SIZE: usize = 1000;
const MAX_PENDING_SIZE: usize = 10000;

type Value = StoredTransaction;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
Expand Down Expand Up @@ -58,6 +54,13 @@ impl Handler for KvTransactions {

Ok(values)
}
}

#[async_trait::async_trait]
impl Handler for KvTransactions {
const BATCH_SIZE: usize = 100;
const CHUNK_SIZE: usize = 1000;
const MAX_PENDING_SIZE: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Ok(diesel::insert_into(kv_transactions::table)
Expand Down
47 changes: 0 additions & 47 deletions crates/sui-indexer-alt/src/handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,10 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use sui_types::full_checkpoint_content::CheckpointData;

use crate::db;

pub mod ev_emit_mod;
pub mod ev_struct_inst;
pub mod kv_checkpoints;
pub mod kv_objects;
pub mod kv_transactions;
pub mod tx_affected_objects;
pub mod tx_balance_changes;

/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data into
/// rows for their table, and how to write those rows to the database.
///
/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
/// associated values). Reasonable defaults have been chosen to balance concurrency with memory
/// usage, but each handle may choose to override these defaults, e.g.
///
/// - Handlers that produce many small rows may wish to increase their batch/chunk/max-pending
/// sizes).
/// - Handlers that do more work during processing may wish to increase their fanout so more of it
/// can be done concurrently, to preserve throughput.
#[async_trait::async_trait]
pub trait Handler {
/// Used to identify the pipeline in logs and metrics.
const NAME: &'static str;

/// How much concurrency to use when processing checkpoint data.
const FANOUT: usize = 10;

/// If at least this many rows are pending, the committer will commit them eagerly.
const BATCH_SIZE: usize = 50;

/// If there are more than this many rows pending, the committer will only commit this many in
/// one operation.
const CHUNK_SIZE: usize = 200;

/// If there are more than this many rows pending, the committer applies backpressure.
const MAX_PENDING_SIZE: usize = 1000;

/// The type of value being inserted by the handler.
type Value: Send + Sync + 'static;

/// The processing logic for turning a checkpoint into rows of the table.
fn process(checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>>;

/// Take a chunk of values and commit them to the database, returning the number of rows
/// affected.
async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>)
-> anyhow::Result<usize>;
}
21 changes: 12 additions & 9 deletions crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,16 @@ use anyhow::Result;
use diesel_async::RunQueryDsl;
use sui_types::{effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData};

use crate::{db, models::transactions::StoredTxAffectedObject, schema::tx_affected_objects};

use super::Handler;
use crate::{
db, models::transactions::StoredTxAffectedObject, pipeline::concurrent::Handler,
pipeline::Processor, schema::tx_affected_objects,
};

pub struct TxAffectedObjects;

#[async_trait::async_trait]
impl Handler for TxAffectedObjects {
impl Processor for TxAffectedObjects {
const NAME: &'static str = "tx_affected_objects";

const BATCH_SIZE: usize = 100;
const CHUNK_SIZE: usize = 1000;
const MAX_PENDING_SIZE: usize = 10000;

type Value = StoredTxAffectedObject;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
Expand Down Expand Up @@ -51,6 +47,13 @@ impl Handler for TxAffectedObjects {

Ok(values)
}
}

#[async_trait::async_trait]
impl Handler for TxAffectedObjects {
const BATCH_SIZE: usize = 100;
const CHUNK_SIZE: usize = 1000;
const MAX_PENDING_SIZE: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Ok(diesel::insert_into(tx_affected_objects::table)
Expand Down
18 changes: 10 additions & 8 deletions crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,16 @@ use sui_types::{
use crate::{
db,
models::transactions::{BalanceChange, StoredTxBalanceChange},
pipeline::concurrent::Handler,
pipeline::Processor,
schema::tx_balance_changes,
};

use super::Handler;

pub struct TxBalanceChanges;

#[async_trait::async_trait]
impl Handler for TxBalanceChanges {
impl Processor for TxBalanceChanges {
const NAME: &'static str = "tx_balance_changes";

const BATCH_SIZE: usize = 100;
const CHUNK_SIZE: usize = 1000;
const MAX_PENDING_SIZE: usize = 10000;

type Value = StoredTxBalanceChange;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
Expand Down Expand Up @@ -58,6 +53,13 @@ impl Handler for TxBalanceChanges {

Ok(values)
}
}

#[async_trait::async_trait]
impl Handler for TxBalanceChanges {
const BATCH_SIZE: usize = 100;
const CHUNK_SIZE: usize = 1000;
const MAX_PENDING_SIZE: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Ok(diesel::insert_into(tx_balance_changes::table)
Expand Down
3 changes: 1 addition & 2 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{collections::BTreeSet, net::SocketAddr, sync::Arc};

use anyhow::{Context, Result};
use db::{Db, DbConfig};
use handlers::Handler;
use ingestion::{IngestionConfig, IngestionService};
use metrics::{IndexerMetrics, MetricsService};
use models::watermarks::CommitterWatermark;
Expand Down Expand Up @@ -132,7 +131,7 @@ impl Indexer {

/// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
/// they will be idle until the ingestion service starts, and serves it checkpoint data.
pub async fn concurrent_pipeline<H: Handler + 'static>(&mut self) -> Result<()> {
pub async fn concurrent_pipeline<H: concurrent::Handler + 'static>(&mut self) -> Result<()> {
if !self.enabled_pipelines.is_empty() && !self.enabled_pipelines.contains(H::NAME) {
info!("Skipping pipeline {}", H::NAME);
return Ok(());
Expand Down
5 changes: 3 additions & 2 deletions crates/sui-indexer-alt/src/pipeline/concurrent/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, info};

use crate::{
handlers::Handler,
metrics::IndexerMetrics,
pipeline::{Batched, Indexed, PipelineConfig, WatermarkPart},
pipeline::{Indexed, PipelineConfig, WatermarkPart},
};

use super::{Batched, Handler};

/// Processed values that are waiting to be written to the database. This is an internal type used
/// by the concurrent collector to hold data it is waiting to send to the committer.
struct Pending<H: Handler> {
Expand Down
7 changes: 4 additions & 3 deletions crates/sui-indexer-alt/src/pipeline/concurrent/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ use tracing::{debug, error, info, warn};

use crate::{
db::Db,
handlers::Handler,
metrics::IndexerMetrics,
pipeline::{Batched, Break, PipelineConfig, WatermarkPart},
pipeline::{Break, PipelineConfig, WatermarkPart},
};

use super::{Batched, Handler};

/// If the committer needs to retry a commit, it will wait this long initially.
const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);

Expand Down Expand Up @@ -92,7 +93,7 @@ pub(super) fn committer<H: Handler + 'static>(
BE::transient(Break::Err(e.into()))
})?;

let affected = H::commit(&values, &mut conn).await;
let affected = H::commit(values.as_slice(), &mut conn).await;
let elapsed = guard.stop_and_record();

match affected {
Expand Down
Loading

0 comments on commit ca73e11

Please sign in to comment.