Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
wlmyng committed Oct 28, 2024
1 parent e24a754 commit eed4da8
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 261 deletions.
263 changes: 2 additions & 261 deletions crates/sui-indexer/src/handlers/pruner.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use diesel::query_dsl::methods::FilterDsl;
use diesel::ExpressionMethods;
use futures::future::join_all;
use mysten_metrics::spawn_monitored_task;
use serde::{Deserialize, Serialize};
Expand All @@ -15,26 +13,14 @@ use tracing::{error, info};

use crate::config::RetentionConfig;
use crate::errors::IndexerError;
use crate::execute_delete_range_query;
use crate::handlers::pruners::events::Events;
use crate::handlers::pruners::objects_history::ObjectsHistory;
use crate::handlers::pruners::spawn_pruner;
use crate::handlers::pruners::transactions::Transactions;
use crate::schema::{
checkpoints, event_emit_module, event_emit_package, event_senders, event_struct_instantiation,
event_struct_module, event_struct_name, event_struct_package, events, objects_history,
transactions, tx_affected_addresses, tx_affected_objects, tx_calls_fun, tx_calls_mod,
tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects, tx_kinds,
};
use crate::store::pg_partition_manager::PgPartitionManager;
use crate::handlers::pruners::spawn_pruners;
use crate::store::PgIndexerStore;
use crate::{metrics::IndexerMetrics, store::IndexerStore, types::IndexerResult};

const MAX_DELAY_MS: u64 = 10000;

pub struct Pruner {
pub store: PgIndexerStore,
pub partition_manager: PgPartitionManager,
pub retention_policies: HashMap<PrunableTable, u64>,
pub metrics: IndexerMetrics,
}
Expand Down Expand Up @@ -85,245 +71,6 @@ pub enum PrunableTable {
Checkpoints,
}

struct TablePruner {
table: PrunableTable,
store: PgIndexerStore,
partition_manager: PgPartitionManager,
cancel: CancellationToken,
}

impl TablePruner {
fn new(
table: PrunableTable,
store: PgIndexerStore,
partition_manager: PgPartitionManager,
cancel: CancellationToken,
) -> Self {
Self {
table,
store,
partition_manager,
cancel,
}
}

async fn run(&mut self) -> IndexerResult<()> {
loop {
if self.cancel.is_cancelled() {
info!("Pruner task cancelled.");
return Ok(());
}

let (watermark, _) = self.store.get_watermark(self.table).await?;

self.prune(0, watermark.pruner_hi as u64).await?;
}
}

async fn prune(&self, prune_lo: u64, prune_hi: u64) -> IndexerResult<()> {
let table_partitions: HashMap<_, _> = self
.partition_manager
.get_table_partitions()
.await?
.into_iter()
.filter(|(table_name, _)| {
self.partition_manager
.get_strategy(table_name)
.is_epoch_partitioned()
})
.collect();

if let Some((min_partition, _)) = table_partitions.get(self.table.as_ref()) {
for epoch in *min_partition..=prune_hi {
self.partition_manager
.drop_table_partition(self.table.as_ref().to_string(), epoch)
.await?;
}
return Ok(());
};

let pool = self.store.pool();
let mut conn = pool.get().await?;

use diesel_async::RunQueryDsl;

if let Err(err) = match self.table {
PrunableTable::ObjectsHistory => execute_delete_range_query!(
&mut conn,
objects_history,
checkpoint_sequence_number,
prune_lo,
prune_hi
),
PrunableTable::Transactions => {
execute_delete_range_query!(
&mut conn,
transactions,
tx_sequence_number,
prune_lo,
prune_hi
)
}
PrunableTable::Events => {
execute_delete_range_query!(
&mut conn,
events,
tx_sequence_number,
prune_lo,
prune_hi
)
}
PrunableTable::EventEmitPackage => {
execute_delete_range_query!(
&mut conn,
event_emit_package,
tx_sequence_number,
prune_lo,
prune_hi
)
}
PrunableTable::EventEmitModule => {
execute_delete_range_query!(
&mut conn,
event_emit_module,
tx_sequence_number,
prune_lo,
prune_hi
)
}
PrunableTable::EventSenders => {
execute_delete_range_query!(
&mut conn,
event_senders,
tx_sequence_number,
prune_lo,
prune_hi
)
}
PrunableTable::EventStructInstantiation => execute_delete_range_query!(
&mut conn,
event_struct_instantiation,
tx_sequence_number,
prune_lo,
prune_hi
),
PrunableTable::EventStructModule => execute_delete_range_query!(
&mut conn,
event_struct_module,
tx_sequence_number,
prune_lo,
prune_hi
),
PrunableTable::EventStructName => {
execute_delete_range_query!(
&mut conn,
event_struct_name,
tx_sequence_number,
prune_lo,
prune_hi
)
}
PrunableTable::EventStructPackage => execute_delete_range_query!(
&mut conn,
event_struct_package,
tx_sequence_number,
prune_lo,
prune_hi
),
PrunableTable::TxAffectedAddresses => execute_delete_range_query!(
&mut conn,
tx_affected_addresses,
tx_sequence_number,
prune_lo,
prune_hi
),
PrunableTable::TxAffectedObjects => execute_delete_range_query!(
&mut conn,
tx_affected_objects,
tx_sequence_number,
prune_lo,
prune_hi
),
PrunableTable::TxCallsPkg => {
execute_delete_range_query!(
&mut conn,
tx_calls_pkg,
tx_sequence_number,
prune_lo,
prune_hi
)
}
PrunableTable::TxCallsMod => {
execute_delete_range_query!(
&mut conn,
tx_calls_mod,
tx_sequence_number,
prune_lo,
prune_hi
)
}
PrunableTable::TxCallsFun => {
execute_delete_range_query!(
&mut conn,
tx_calls_fun,
tx_sequence_number,
prune_lo,
prune_hi
)
}
PrunableTable::TxChangedObjects => {
execute_delete_range_query!(
&mut conn,
tx_changed_objects,
tx_sequence_number,
prune_lo,
prune_hi
)
}
PrunableTable::TxDigests => {
execute_delete_range_query!(
&mut conn,
tx_digests,
tx_sequence_number,
prune_lo,
prune_hi
)
}
PrunableTable::TxInputObjects => {
execute_delete_range_query!(
&mut conn,
tx_input_objects,
tx_sequence_number,
prune_lo,
prune_hi
)
}
PrunableTable::TxKinds => {
execute_delete_range_query!(
&mut conn,
tx_kinds,
tx_sequence_number,
prune_lo,
prune_hi
)
}
PrunableTable::Checkpoints => {
execute_delete_range_query!(
&mut conn,
checkpoints,
sequence_number,
prune_lo,
prune_hi
)
}
} {
error!("Failed to prune table {}: {}", self.table, err);
};

Ok(())
}
}

impl PrunableTable {
pub fn select_reader_lo(&self, cp: u64, tx: u64) -> u64 {
match self {
Expand Down Expand Up @@ -362,12 +109,10 @@ impl Pruner {
retention_config: RetentionConfig,
metrics: IndexerMetrics,
) -> Result<Self, IndexerError> {
let partition_manager = PgPartitionManager::new(store.pool())?;
let retention_policies = retention_config.retention_policies();

Ok(Self {
store,
partition_manager,
retention_policies,
metrics,
})
Expand All @@ -383,11 +128,7 @@ impl Pruner {
cancel_clone
));

let mut table_tasks = vec![
spawn_pruner::<Events>(cancel.clone(), self.store.clone()),
spawn_pruner::<ObjectsHistory>(cancel.clone(), self.store.clone()),
spawn_pruner::<Transactions>(cancel.clone(), self.store.clone()),
];
let mut table_tasks = spawn_pruners(cancel.clone(), self.store.clone());

println!("added table tasks");

Expand Down
28 changes: 28 additions & 0 deletions crates/sui-indexer/src/handlers/pruners/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,34 @@ pub fn spawn_pruner<T: Prunable>(
spawn_monitored_task!(run_pruner::<T>(cancel, store))
}

pub fn spawn_pruners(
cancel: CancellationToken,
store: PgIndexerStore,
) -> Vec<JoinHandle<IndexerResult<()>>> {
vec![
spawn_pruner::<checkpoints::Checkpoints>(cancel.clone(), store.clone()),
spawn_pruner::<ev_emit_module::EventEmitModule>(cancel.clone(), store.clone()),
spawn_pruner::<ev_emit_package::EventEmitPackage>(cancel.clone(), store.clone()),
spawn_pruner::<ev_senders::EventSenders>(cancel.clone(), store.clone()),
spawn_pruner::<ev_struct_inst::EventStructInstantiation>(cancel.clone(), store.clone()),
spawn_pruner::<ev_struct_module::EventStructModule>(cancel.clone(), store.clone()),
spawn_pruner::<ev_struct_name::EventStructName>(cancel.clone(), store.clone()),
spawn_pruner::<ev_struct_package::EventStructPackage>(cancel.clone(), store.clone()),
spawn_pruner::<events::Events>(cancel.clone(), store.clone()),
spawn_pruner::<objects_history::ObjectsHistory>(cancel.clone(), store.clone()),
spawn_pruner::<transactions::Transactions>(cancel.clone(), store.clone()),
spawn_pruner::<tx_affected_addresses::TxAffectedAddresses>(cancel.clone(), store.clone()),
spawn_pruner::<tx_affected_objects::TxAffectedObjects>(cancel.clone(), store.clone()),
spawn_pruner::<tx_calls_fun::TxCallsFun>(cancel.clone(), store.clone()),
spawn_pruner::<tx_calls_mod::TxCallsMod>(cancel.clone(), store.clone()),
spawn_pruner::<tx_calls_pkg::TxCallsPkg>(cancel.clone(), store.clone()),
spawn_pruner::<tx_changed_objects::TxChangedObjects>(cancel.clone(), store.clone()),
spawn_pruner::<tx_digests::TxDigests>(cancel.clone(), store.clone()),
spawn_pruner::<tx_input_objects::TxInputObjects>(cancel.clone(), store.clone()),
spawn_pruner::<tx_kinds::TxKinds>(cancel.clone(), store.clone()),
]
}

#[macro_export]
macro_rules! execute_delete_range_query {
($conn:expr, $table:ident, $column:ident, $min:expr, $max:expr) => {
Expand Down

0 comments on commit eed4da8

Please sign in to comment.