diff --git a/crates/sui-graphql-rpc/src/types/transaction_block.rs b/crates/sui-graphql-rpc/src/types/transaction_block.rs index 55dd7a702179de..d7e06c144e1523 100644 --- a/crates/sui-graphql-rpc/src/types/transaction_block.rs +++ b/crates/sui-graphql-rpc/src/types/transaction_block.rs @@ -411,115 +411,6 @@ impl TransactionBlock { Ok(conn) } - pub(crate) async fn paginate_v2( - db: &Db, - page: Page, - filter: TransactionBlockFilter, - checkpoint_viewed_at: u64, - scan_limit: Option, - ) -> Result, Error> { - use transactions as tx; - - let cursor_viewed_at = page.validate_cursor_consistency()?; - let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at); - - let (prev, next, results) = db - .execute(move |conn| { - page.paginate_query::( - conn, - checkpoint_viewed_at, - move || { - let mut query = tx::dsl::transactions.into_boxed(); - - if let Some(f) = &filter.function { - let sub_query = tx_calls::dsl::tx_calls - .select(tx_calls::dsl::tx_sequence_number) - .into_boxed(); - - query = query.filter(tx::dsl::tx_sequence_number.eq_any(f.apply( - sub_query, - tx_calls::dsl::package, - tx_calls::dsl::module, - tx_calls::dsl::func, - ))); - } - - if let Some(k) = &filter.kind { - query = query.filter(tx::dsl::transaction_kind.eq(*k as i16)) - } - - if let Some(c) = &filter.after_checkpoint { - query = query.filter(tx::dsl::checkpoint_sequence_number.gt(*c as i64)); - } - - if let Some(c) = &filter.at_checkpoint { - query = query.filter(tx::dsl::checkpoint_sequence_number.eq(*c as i64)); - } - - let before_checkpoint = filter - .before_checkpoint - .map_or(checkpoint_viewed_at + 1, |c| { - c.min(checkpoint_viewed_at + 1) - }); - query = query.filter( - tx::dsl::checkpoint_sequence_number.lt(before_checkpoint as i64), - ); - - if let Some(a) = &filter.sign_address { - let sub_query = tx_senders::dsl::tx_senders - .select(tx_senders::dsl::tx_sequence_number) - .filter(tx_senders::dsl::sender.eq(a.into_vec())); - query = query.filter(tx::dsl::tx_sequence_number.eq_any(sub_query)); - } - - if let Some(a) = &filter.recv_address { - let sub_query = tx_recipients::dsl::tx_recipients - .select(tx_recipients::dsl::tx_sequence_number) - .filter(tx_recipients::dsl::recipient.eq(a.into_vec())); - query = query.filter(tx::dsl::tx_sequence_number.eq_any(sub_query)); - } - - if let Some(o) = &filter.input_object { - let sub_query = tx_input_objects::dsl::tx_input_objects - .select(tx_input_objects::dsl::tx_sequence_number) - .filter(tx_input_objects::dsl::object_id.eq(o.into_vec())); - query = query.filter(tx::dsl::tx_sequence_number.eq_any(sub_query)); - } - - if let Some(o) = &filter.changed_object { - let sub_query = tx_changed_objects::dsl::tx_changed_objects - .select(tx_changed_objects::dsl::tx_sequence_number) - .filter(tx_changed_objects::dsl::object_id.eq(o.into_vec())); - query = query.filter(tx::dsl::tx_sequence_number.eq_any(sub_query)); - } - - if let Some(txs) = &filter.transaction_ids { - let digests: Vec<_> = txs.iter().map(|d| d.to_vec()).collect(); - query = query.filter(tx::dsl::transaction_digest.eq_any(digests)); - } - - query - }, - ) - }) - .await?; - - let mut conn = Connection::new(prev, next); - - // The "checkpoint viewed at" sets a consistent upper bound for the nested queries. - for stored in results { - let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor(); - let inner = TransactionBlockInner::try_from(stored)?; - let transaction = TransactionBlock { - inner, - checkpoint_viewed_at, - }; - conn.edges.push(Edge::new(cursor, transaction)); - } - - Ok(conn) - } - pub(crate) async fn my_head( db: &Db, page: Page, @@ -527,7 +418,7 @@ impl TransactionBlock { checkpoint_viewed_at: u64, scan_limit: Option, ) -> Result, Error> { - // handle inconsistencies - immediately return empty result + let num_filters = filter.num_filters(); let mut conn = Connection::new(false, false); @@ -535,24 +426,25 @@ impl TransactionBlock { return Ok(conn); } + if num_filters > 1 && scan_limit.is_none() { + return Err(Error::Client("Arbitrary combinations of `recvAddress`, `inputObject`, `changedObject`, and `function` requires specifying a `scanLimit`".to_string())); + } + + let lb_cp = calculate_lower_bound(filter.after_checkpoint, filter.at_checkpoint); + let ub_cp = calculate_upper_bound(filter.at_checkpoint, filter.before_checkpoint); + use amnn_0_hybrid_cp_tx::dsl as cp; use amnn_0_hybrid_transactions::dsl as tx; - // determine upper and lower bounds, apply scan limit - // do the queries for tx_sequence_number let (prev, next, transactions): (bool, bool, Vec) = db .execute_repeatable(move |conn| { - // in the asc case, the lower cp checkpoint is the max(at, after+1), tx check is >= cp.max - let lb = calculate_lower_bound(filter.at_checkpoint, filter.after_checkpoint); - let ub = calculate_upper_bound(filter.at_checkpoint, filter.before_checkpoint); - // Fetching transaction sequence numbers from database based on checkpoint bounds - let lb_txs = match lb { + let lb_tx_seq_num = match lb_cp { Some(lb_value) => { let sequence_number: Option = conn .first(move || { cp::amnn_0_hybrid_cp_tx - .select(cp::max_tx_sequence_number) + .select(cp::min_tx_sequence_number) .filter(cp::checkpoint_sequence_number.eq(lb_value as i64)) }) .optional()?; @@ -562,12 +454,12 @@ impl TransactionBlock { None => None, }; - let ub_txs = match ub { + let ub_tx_seq_num = match ub_cp { Some(ub_value) => { let sequence_number: Option = conn .first(move || { cp::amnn_0_hybrid_cp_tx - .select(cp::min_tx_sequence_number) + .select(cp::max_tx_sequence_number) .filter(cp::checkpoint_sequence_number.eq(ub_value as i64)) }) .optional()?; @@ -578,22 +470,28 @@ impl TransactionBlock { }; let before_cursor = page.before().map(|c| c.tx_sequence_number); - let after_cursor = page.after().map(|c| c.tx_sequence_number); - // TODO: apply scan limit too - and also figure out desc case - let lo = calculate_lower_bound(lb_txs, after_cursor); - let hi = calculate_upper_bound(ub_txs, before_cursor); - - // determine the upper bound - - // final merge with the cursor we've been given + let mut lo = max_option(lb_tx_seq_num, after_cursor); + let mut hi = min_option(ub_tx_seq_num, before_cursor); + + // Finally, if a scan_limit is specified, apply it to `lo` or `hi` depending on the + // presence of `first` or `last`. + if let Some(scan_limit) = scan_limit { + if num_filters > 1 { + if page.is_from_front() { + hi = min_option(hi, lo.map(|lo| lo.saturating_add(scan_limit))); + } else { + lo = max_option(lo, hi.map(|hi| hi.saturating_sub(scan_limit))); + } + } + } let sender = filter.sign_address; let mut subqueries = vec![]; - if !filter.no_filters() { + if num_filters > 0 { if let Some(f) = &filter.function { subqueries.push(match f { FqNameFilter::ByModule(filter) => match filter { @@ -613,8 +511,14 @@ impl TransactionBlock { if let Some(kind) = &filter.kind { subqueries.push((select_kind(kind.clone(), sender, lo, hi), "tx_kinds")); } + // Only need to explicitly query `tx_senders` if no other filters are specified. + // Otherwise, the other lookup tables will already leverage this information. + // Would like to chain this, but currently unstable: + // https://github.com/rust-lang/rust/issues/53667 if let Some(sender) = &filter.sign_address { - subqueries.push((select_sender(sender, lo, hi), "tx_senders")); + if num_filters == 1 { + subqueries.push((select_sender(sender, lo, hi), "tx_senders")); + } } if let Some(recv) = &filter.recv_address { subqueries.push((select_recipient(recv, sender, lo, hi), "tx_recipients")); @@ -659,8 +563,10 @@ impl TransactionBlock { } } - // TODO (wlmyng): this is kind of hacky - // we can also use page.apply for multi-filter case + // Issue the query to fetch the set of `tx_sequence_number` that will then be used + // to fetch remaining contents from the `transactions` table. + + // TODO (wlmyng): this typing is a bit hacky, and we likely don't need it println!("Submitting the query now"); let (prev, next, results) = page.paginate_raw_query::(conn, checkpoint_viewed_at, subquery)?; @@ -679,7 +585,7 @@ impl TransactionBlock { }) .await?; - let mut conn = Connection::new(prev, next); + conn = Connection::new(prev, next); for stored in transactions { let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor(); let inner = TransactionBlockInner::try_from(stored)?; @@ -690,11 +596,7 @@ impl TransactionBlock { conn.edges.push(Edge::new(cursor, transaction)); } - return Ok(conn); - Ok(conn) - - // finally, the multi-get } } @@ -731,22 +633,8 @@ impl TransactionBlockFilter { }) } - pub(crate) fn is_single_filter(&self) -> bool { - let filters_count = [ - self.recv_address.is_some(), - self.input_object.is_some(), - self.changed_object.is_some(), - self.function.is_some(), - ] - .iter() - .filter(|&is_set| *is_set) - .count(); - - filters_count <= 1 - } - - pub(crate) fn no_filters(&self) -> bool { - let filters_count = [ + pub(crate) fn num_filters(&self) -> usize { + [ self.recv_address.is_some(), self.input_object.is_some(), self.changed_object.is_some(), @@ -754,9 +642,7 @@ impl TransactionBlockFilter { ] .iter() .filter(|&is_set| *is_set) - .count(); - - filters_count == 0 + .count() } pub(crate) fn is_consistent(&self) -> bool { @@ -853,19 +739,6 @@ impl Target for TxLookup { } } -// impl Queryable for TxLookup -// where -// i64: FromSql, -// { -// type Row = Self; - -// fn build(row: Self::Row) -> deserialize::Result { -// Ok(TxLookup { -// tx_sequence_number: row.tx_sequence_number, -// }) -// } -// } - impl RawPaginated for TxLookup { fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery { filter!( @@ -1163,34 +1036,31 @@ pub(crate) fn select_ids( } } -fn calculate_lower_bound(at: Option, after: Option) -> Option { - match (at, after) { - (Some(at_value), Some(after_value)) => Some(std::cmp::max(at_value, after_value + 1)), - (Some(at_value), None) => Some(at_value), - (None, Some(after_value)) => Some(after_value + 1), +fn max_option(a: Option, b: Option) -> Option { + match (a, b) { + (Some(a_val), Some(b_val)) => Some(std::cmp::max(a_val, b_val)), + (Some(val), None) | (None, Some(val)) => Some(val), (None, None) => None, } } -fn calculate_upper_bound(at: Option, before: Option) -> Option { - match (at, before) { - (Some(at_value), Some(before_value)) => { - if before_value > 0 { - Some(std::cmp::min(at_value, before_value - 1)) - } else { - // Handle the case where before_value is 0 and cannot be decremented - Some(at_value) - } - } - (Some(at_value), None) => Some(at_value), - (None, Some(before_value)) => { - if before_value > 0 { - Some(before_value - 1) - } else { - // Handle the case where before_value is 0 and cannot be decremented - None - } - } +fn min_option(a: Option, b: Option) -> Option { + match (a, b) { + (Some(a_val), Some(b_val)) => Some(std::cmp::min(a_val, b_val)), + (Some(val), None) | (None, Some(val)) => Some(val), (None, None) => None, } } + +/// Given two optional checkpoints, return the larger of the two. We increment `after` by 1 so that +/// we can uniformly select the `min_tx_sequence_number` for the lower bound checkpoint. +fn calculate_lower_bound(after: Option, at: Option) -> Option { + max_option(after.map(|x| x + 1), at) +} + +/// Given two optional checkpoints, return the smaller of the two. We decrement `before` by 1 so +/// that we can uniformly select the `max_tx_sequence_number` for the upper bound checkpoint. +/// Assumes that `before` must be a positive integer. +fn calculate_upper_bound(at: Option, before: Option) -> Option { + min_option(at, before.map(|x| x.saturating_sub(1))) +}