From 094e92edf882342452c8d180f58550da2b016c52 Mon Sep 17 00:00:00 2001 From: Ashok Menon Date: Mon, 28 Oct 2024 14:32:49 +0000 Subject: [PATCH] easy(indexer-alt): clarify names of tuning parameters ## Description Trying to make the purposes of various tuning parameters clearer by renaming them. ## Test plan :eyes: --- crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs | 6 +++--- crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs | 6 +++--- crates/sui-indexer-alt/src/handlers/kv_objects.rs | 6 +++--- .../sui-indexer-alt/src/handlers/kv_transactions.rs | 6 +++--- .../src/handlers/tx_affected_objects.rs | 6 +++--- .../src/handlers/tx_balance_changes.rs | 6 +++--- .../src/pipeline/concurrent/collector.rs | 11 +++++++---- crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs | 8 ++++---- .../src/pipeline/sequential/committer.rs | 2 +- crates/sui-indexer-alt/src/pipeline/sequential/mod.rs | 2 +- 10 files changed, 31 insertions(+), 28 deletions(-) diff --git a/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs b/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs index d09b206690a83..bc3e5f607c2c8 100644 --- a/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs +++ b/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs @@ -48,9 +48,9 @@ impl Processor for EvEmitMod { #[async_trait::async_trait] impl Handler for EvEmitMod { - const BATCH_SIZE: usize = 100; - const CHUNK_SIZE: usize = 1000; - const MAX_PENDING_SIZE: usize = 10000; + const MIN_EAGER_ROWS: usize = 100; + const MAX_CHUNK_ROWS: usize = 1000; + const MAX_PENDING_ROWS: usize = 10000; async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result { Ok(diesel::insert_into(ev_emit_mod::table) diff --git a/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs b/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs index 17aee23c2c42a..0a55d60172752 100644 --- a/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs +++ b/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs @@ -52,9 +52,9 @@ impl Processor for EvStructInst { #[async_trait::async_trait] impl Handler for EvStructInst { - const BATCH_SIZE: usize = 100; - const CHUNK_SIZE: usize = 1000; - const MAX_PENDING_SIZE: usize = 10000; + const MIN_EAGER_ROWS: usize = 100; + const MAX_CHUNK_ROWS: usize = 1000; + const MAX_PENDING_ROWS: usize = 10000; async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result { Ok(diesel::insert_into(ev_struct_inst::table) diff --git a/crates/sui-indexer-alt/src/handlers/kv_objects.rs b/crates/sui-indexer-alt/src/handlers/kv_objects.rs index 87439edd42997..f645cceab347f 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_objects.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_objects.rs @@ -55,9 +55,9 @@ impl Processor for KvObjects { #[async_trait::async_trait] impl Handler for KvObjects { - const BATCH_SIZE: usize = 100; - const CHUNK_SIZE: usize = 1000; - const MAX_PENDING_SIZE: usize = 10000; + const MIN_EAGER_ROWS: usize = 100; + const MAX_CHUNK_ROWS: usize = 1000; + const MAX_PENDING_ROWS: usize = 10000; async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result { Ok(diesel::insert_into(kv_objects::table) diff --git a/crates/sui-indexer-alt/src/handlers/kv_transactions.rs b/crates/sui-indexer-alt/src/handlers/kv_transactions.rs index 66b44739d791a..d3144032705d6 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_transactions.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_transactions.rs @@ -58,9 +58,9 @@ impl Processor for KvTransactions { #[async_trait::async_trait] impl Handler for KvTransactions { - const BATCH_SIZE: usize = 100; - const CHUNK_SIZE: usize = 1000; - const MAX_PENDING_SIZE: usize = 10000; + const MIN_EAGER_ROWS: usize = 100; + const MAX_CHUNK_ROWS: usize = 1000; + const MAX_PENDING_ROWS: usize = 10000; async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result { Ok(diesel::insert_into(kv_transactions::table) diff --git a/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs b/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs index caac375ebc1b2..309af2c08a300 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs @@ -51,9 +51,9 @@ impl Processor for TxAffectedObjects { #[async_trait::async_trait] impl Handler for TxAffectedObjects { - const BATCH_SIZE: usize = 100; - const CHUNK_SIZE: usize = 1000; - const MAX_PENDING_SIZE: usize = 10000; + const MIN_EAGER_ROWS: usize = 100; + const MAX_CHUNK_ROWS: usize = 1000; + const MAX_PENDING_ROWS: usize = 10000; async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result { Ok(diesel::insert_into(tx_affected_objects::table) diff --git a/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs b/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs index dd41dd280f3a0..1f97e806fd1ec 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs @@ -57,9 +57,9 @@ impl Processor for TxBalanceChanges { #[async_trait::async_trait] impl Handler for TxBalanceChanges { - const BATCH_SIZE: usize = 100; - const CHUNK_SIZE: usize = 1000; - const MAX_PENDING_SIZE: usize = 10000; + const MIN_EAGER_ROWS: usize = 100; + const MAX_CHUNK_ROWS: usize = 1000; + const MAX_PENDING_ROWS: usize = 10000; async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result { Ok(diesel::insert_into(tx_balance_changes::table) diff --git a/crates/sui-indexer-alt/src/pipeline/concurrent/collector.rs b/crates/sui-indexer-alt/src/pipeline/concurrent/collector.rs index 76212704dfddf..1bf3459d6817a 100644 --- a/crates/sui-indexer-alt/src/pipeline/concurrent/collector.rs +++ b/crates/sui-indexer-alt/src/pipeline/concurrent/collector.rs @@ -38,8 +38,11 @@ impl Pending { /// Adds data from this indexed checkpoint to the `batch`, honoring the handler's bounds on /// chunk size. fn batch_into(&mut self, batch: &mut Batched) { - if batch.values.len() + self.values.len() > H::CHUNK_SIZE { - let mut for_batch = self.values.split_off(H::CHUNK_SIZE - batch.values.len()); + if batch.values.len() + self.values.len() > H::MAX_CHUNK_ROWS { + let mut for_batch = self + .values + .split_off(H::MAX_CHUNK_ROWS - batch.values.len()); + std::mem::swap(&mut self.values, &mut for_batch); batch.watermark.push(self.watermark.take(for_batch.len())); batch.values.extend(for_batch); @@ -159,7 +162,7 @@ pub(super) fn collector( } } - Some(indexed) = rx.recv(), if pending_rows < H::MAX_PENDING_SIZE => { + Some(indexed) = rx.recv(), if pending_rows < H::MAX_PENDING_ROWS => { metrics .total_collector_rows_received .with_label_values(&[H::NAME]) @@ -168,7 +171,7 @@ pub(super) fn collector( pending_rows += indexed.values.len(); pending.insert(indexed.checkpoint(), indexed.into()); - if pending_rows >= H::BATCH_SIZE { + if pending_rows >= H::MIN_EAGER_ROWS { poll.reset_immediately() } } diff --git a/crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs b/crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs index 0b3c595ee5b35..3c3f91f0f89da 100644 --- a/crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs +++ b/crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs @@ -48,14 +48,14 @@ const MAX_WATERMARK_UPDATES: usize = 10_000; #[async_trait::async_trait] pub trait Handler: Processor { /// If at least this many rows are pending, the committer will commit them eagerly. - const BATCH_SIZE: usize = 50; + const MIN_EAGER_ROWS: usize = 50; /// If there are more than this many rows pending, the committer will only commit this many in /// one operation. - const CHUNK_SIZE: usize = 200; + const MAX_CHUNK_ROWS: usize = 200; /// If there are more than this many rows pending, the committer applies backpressure. - const MAX_PENDING_SIZE: usize = 1000; + const MAX_PENDING_ROWS: usize = 1000; /// Take a chunk of values and commit them to the database, returning the number of rows /// affected. @@ -88,7 +88,7 @@ impl Batched { /// The batch is full if it has more than enough values to write to the database, or more than /// enough watermarks to update. fn is_full(&self) -> bool { - self.values.len() >= H::CHUNK_SIZE || self.watermark.len() >= MAX_WATERMARK_UPDATES + self.values.len() >= H::MAX_CHUNK_ROWS || self.watermark.len() >= MAX_WATERMARK_UPDATES } } diff --git a/crates/sui-indexer-alt/src/pipeline/sequential/committer.rs b/crates/sui-indexer-alt/src/pipeline/sequential/committer.rs index e9025de635b6b..3bd06518c9c10 100644 --- a/crates/sui-indexer-alt/src/pipeline/sequential/committer.rs +++ b/crates/sui-indexer-alt/src/pipeline/sequential/committer.rs @@ -302,7 +302,7 @@ pub(super) fn committer( // rows to write, and they are already in the batch, or we can process the next // checkpoint to extract them. - if pending_rows < H::MIN_BATCH_ROWS { + if pending_rows < H::MIN_EAGER_ROWS { continue; } diff --git a/crates/sui-indexer-alt/src/pipeline/sequential/mod.rs b/crates/sui-indexer-alt/src/pipeline/sequential/mod.rs index 01f76207031dd..a15efd0f72330 100644 --- a/crates/sui-indexer-alt/src/pipeline/sequential/mod.rs +++ b/crates/sui-indexer-alt/src/pipeline/sequential/mod.rs @@ -39,7 +39,7 @@ mod committer; #[async_trait::async_trait] pub trait Handler: Processor { /// If at least this many rows are pending, the committer will commit them eagerly. - const MIN_BATCH_ROWS: usize = 50; + const MIN_EAGER_ROWS: usize = 50; /// Maximum number of checkpoints to try and write in a single batch. The larger this number /// is, the more chances the pipeline has to merge redundant writes, but the longer each write