Skip to content

Commit

Permalink
some cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
wlmyng committed Jun 25, 2024
1 parent d1b01a5 commit a1168d7
Showing 1 changed file with 63 additions and 193 deletions.
256 changes: 63 additions & 193 deletions crates/sui-graphql-rpc/src/types/transaction_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,148 +411,40 @@ impl TransactionBlock {
Ok(conn)
}

pub(crate) async fn paginate_v2(
db: &Db,
page: Page<Cursor>,
filter: TransactionBlockFilter,
checkpoint_viewed_at: u64,
scan_limit: Option<u64>,
) -> Result<Connection<String, TransactionBlock>, Error> {
use transactions as tx;

let cursor_viewed_at = page.validate_cursor_consistency()?;
let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);

let (prev, next, results) = db
.execute(move |conn| {
page.paginate_query::<StoredTransaction, _, _, _>(
conn,
checkpoint_viewed_at,
move || {
let mut query = tx::dsl::transactions.into_boxed();

if let Some(f) = &filter.function {
let sub_query = tx_calls::dsl::tx_calls
.select(tx_calls::dsl::tx_sequence_number)
.into_boxed();

query = query.filter(tx::dsl::tx_sequence_number.eq_any(f.apply(
sub_query,
tx_calls::dsl::package,
tx_calls::dsl::module,
tx_calls::dsl::func,
)));
}

if let Some(k) = &filter.kind {
query = query.filter(tx::dsl::transaction_kind.eq(*k as i16))
}

if let Some(c) = &filter.after_checkpoint {
query = query.filter(tx::dsl::checkpoint_sequence_number.gt(*c as i64));
}

if let Some(c) = &filter.at_checkpoint {
query = query.filter(tx::dsl::checkpoint_sequence_number.eq(*c as i64));
}

let before_checkpoint = filter
.before_checkpoint
.map_or(checkpoint_viewed_at + 1, |c| {
c.min(checkpoint_viewed_at + 1)
});
query = query.filter(
tx::dsl::checkpoint_sequence_number.lt(before_checkpoint as i64),
);

if let Some(a) = &filter.sign_address {
let sub_query = tx_senders::dsl::tx_senders
.select(tx_senders::dsl::tx_sequence_number)
.filter(tx_senders::dsl::sender.eq(a.into_vec()));
query = query.filter(tx::dsl::tx_sequence_number.eq_any(sub_query));
}

if let Some(a) = &filter.recv_address {
let sub_query = tx_recipients::dsl::tx_recipients
.select(tx_recipients::dsl::tx_sequence_number)
.filter(tx_recipients::dsl::recipient.eq(a.into_vec()));
query = query.filter(tx::dsl::tx_sequence_number.eq_any(sub_query));
}

if let Some(o) = &filter.input_object {
let sub_query = tx_input_objects::dsl::tx_input_objects
.select(tx_input_objects::dsl::tx_sequence_number)
.filter(tx_input_objects::dsl::object_id.eq(o.into_vec()));
query = query.filter(tx::dsl::tx_sequence_number.eq_any(sub_query));
}

if let Some(o) = &filter.changed_object {
let sub_query = tx_changed_objects::dsl::tx_changed_objects
.select(tx_changed_objects::dsl::tx_sequence_number)
.filter(tx_changed_objects::dsl::object_id.eq(o.into_vec()));
query = query.filter(tx::dsl::tx_sequence_number.eq_any(sub_query));
}

if let Some(txs) = &filter.transaction_ids {
let digests: Vec<_> = txs.iter().map(|d| d.to_vec()).collect();
query = query.filter(tx::dsl::transaction_digest.eq_any(digests));
}

query
},
)
})
.await?;

let mut conn = Connection::new(prev, next);

// The "checkpoint viewed at" sets a consistent upper bound for the nested queries.
for stored in results {
let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
let inner = TransactionBlockInner::try_from(stored)?;
let transaction = TransactionBlock {
inner,
checkpoint_viewed_at,
};
conn.edges.push(Edge::new(cursor, transaction));
}

Ok(conn)
}

pub(crate) async fn my_head(
db: &Db,
page: Page<Cursor>,
filter: TransactionBlockFilter,
checkpoint_viewed_at: u64,
scan_limit: Option<u64>,
) -> Result<Connection<String, TransactionBlock>, Error> {
// handle inconsistencies - immediately return empty result
let num_filters = filter.num_filters();

let mut conn = Connection::new(false, false);

if !filter.is_consistent() {
return Ok(conn);
}

if num_filters > 1 && scan_limit.is_none() {
return Err(Error::Client("Arbitrary combinations of `recvAddress`, `inputObject`, `changedObject`, and `function` requires specifying a `scanLimit`".to_string()));
}

let lb_cp = calculate_lower_bound(filter.after_checkpoint, filter.at_checkpoint);
let ub_cp = calculate_upper_bound(filter.at_checkpoint, filter.before_checkpoint);

use amnn_0_hybrid_cp_tx::dsl as cp;
use amnn_0_hybrid_transactions::dsl as tx;
// determine upper and lower bounds, apply scan limit

// do the queries for tx_sequence_number
let (prev, next, transactions): (bool, bool, Vec<StoredTransaction>) = db
.execute_repeatable(move |conn| {
// in the asc case, the lower cp checkpoint is the max(at, after+1), tx check is >= cp.max
let lb = calculate_lower_bound(filter.at_checkpoint, filter.after_checkpoint);
let ub = calculate_upper_bound(filter.at_checkpoint, filter.before_checkpoint);

// Fetching transaction sequence numbers from database based on checkpoint bounds
let lb_txs = match lb {
let lb_tx_seq_num = match lb_cp {
Some(lb_value) => {
let sequence_number: Option<i64> = conn
.first(move || {
cp::amnn_0_hybrid_cp_tx
.select(cp::max_tx_sequence_number)
.select(cp::min_tx_sequence_number)
.filter(cp::checkpoint_sequence_number.eq(lb_value as i64))
})
.optional()?;
Expand All @@ -562,12 +454,12 @@ impl TransactionBlock {
None => None,
};

let ub_txs = match ub {
let ub_tx_seq_num = match ub_cp {
Some(ub_value) => {
let sequence_number: Option<i64> = conn
.first(move || {
cp::amnn_0_hybrid_cp_tx
.select(cp::min_tx_sequence_number)
.select(cp::max_tx_sequence_number)
.filter(cp::checkpoint_sequence_number.eq(ub_value as i64))
})
.optional()?;
Expand All @@ -578,22 +470,28 @@ impl TransactionBlock {
};

let before_cursor = page.before().map(|c| c.tx_sequence_number);

let after_cursor = page.after().map(|c| c.tx_sequence_number);

// TODO: apply scan limit too - and also figure out desc case
let lo = calculate_lower_bound(lb_txs, after_cursor);
let hi = calculate_upper_bound(ub_txs, before_cursor);

// determine the upper bound

// final merge with the cursor we've been given
let mut lo = max_option(lb_tx_seq_num, after_cursor);
let mut hi = min_option(ub_tx_seq_num, before_cursor);

// Finally, if a scan_limit is specified, apply it to `lo` or `hi` depending on the
// presence of `first` or `last`.
if let Some(scan_limit) = scan_limit {
if num_filters > 1 {
if page.is_from_front() {
hi = min_option(hi, lo.map(|lo| lo.saturating_add(scan_limit)));
} else {
lo = max_option(lo, hi.map(|hi| hi.saturating_sub(scan_limit)));
}
}
}

let sender = filter.sign_address;

let mut subqueries = vec![];

if !filter.no_filters() {
if num_filters > 0 {
if let Some(f) = &filter.function {
subqueries.push(match f {
FqNameFilter::ByModule(filter) => match filter {
Expand All @@ -613,8 +511,14 @@ impl TransactionBlock {
if let Some(kind) = &filter.kind {
subqueries.push((select_kind(kind.clone(), sender, lo, hi), "tx_kinds"));
}
// Only need to explicitly query `tx_senders` if no other filters are specified.
// Otherwise, the other lookup tables will already leverage this information.
// Would like to chain this, but currently unstable:
// https://github.com/rust-lang/rust/issues/53667
if let Some(sender) = &filter.sign_address {
subqueries.push((select_sender(sender, lo, hi), "tx_senders"));
if num_filters == 1 {
subqueries.push((select_sender(sender, lo, hi), "tx_senders"));
}
}
if let Some(recv) = &filter.recv_address {
subqueries.push((select_recipient(recv, sender, lo, hi), "tx_recipients"));
Expand Down Expand Up @@ -659,8 +563,10 @@ impl TransactionBlock {
}
}

// TODO (wlmyng): this is kind of hacky
// we can also use page.apply for multi-filter case
// Issue the query to fetch the set of `tx_sequence_number` that will then be used
// to fetch remaining contents from the `transactions` table.

// TODO (wlmyng): this typing is a bit hacky, and we likely don't need it
println!("Submitting the query now");
let (prev, next, results) =
page.paginate_raw_query::<TxLookup>(conn, checkpoint_viewed_at, subquery)?;
Expand All @@ -679,7 +585,7 @@ impl TransactionBlock {
})
.await?;

let mut conn = Connection::new(prev, next);
conn = Connection::new(prev, next);
for stored in transactions {
let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
let inner = TransactionBlockInner::try_from(stored)?;
Expand All @@ -690,11 +596,7 @@ impl TransactionBlock {
conn.edges.push(Edge::new(cursor, transaction));
}

return Ok(conn);

Ok(conn)

// finally, the multi-get
}
}

Expand Down Expand Up @@ -731,32 +633,16 @@ impl TransactionBlockFilter {
})
}

pub(crate) fn is_single_filter(&self) -> bool {
let filters_count = [
self.recv_address.is_some(),
self.input_object.is_some(),
self.changed_object.is_some(),
self.function.is_some(),
]
.iter()
.filter(|&is_set| *is_set)
.count();

filters_count <= 1
}

pub(crate) fn no_filters(&self) -> bool {
let filters_count = [
pub(crate) fn num_filters(&self) -> usize {
[
self.recv_address.is_some(),
self.input_object.is_some(),
self.changed_object.is_some(),
self.function.is_some(),
]
.iter()
.filter(|&is_set| *is_set)
.count();

filters_count == 0
.count()
}

pub(crate) fn is_consistent(&self) -> bool {
Expand Down Expand Up @@ -853,19 +739,6 @@ impl Target<Cursor> for TxLookup {
}
}

// impl<DB: diesel::backend::Backend> Queryable<BigInt, DB> for TxLookup
// where
// i64: FromSql<BigInt, DB>,
// {
// type Row = Self;

// fn build(row: Self::Row) -> deserialize::Result<Self> {
// Ok(TxLookup {
// tx_sequence_number: row.tx_sequence_number,
// })
// }
// }

impl RawPaginated<Cursor> for TxLookup {
fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
filter!(
Expand Down Expand Up @@ -1163,34 +1036,31 @@ pub(crate) fn select_ids(
}
}

fn calculate_lower_bound(at: Option<u64>, after: Option<u64>) -> Option<u64> {
match (at, after) {
(Some(at_value), Some(after_value)) => Some(std::cmp::max(at_value, after_value + 1)),
(Some(at_value), None) => Some(at_value),
(None, Some(after_value)) => Some(after_value + 1),
fn max_option(a: Option<u64>, b: Option<u64>) -> Option<u64> {
match (a, b) {
(Some(a_val), Some(b_val)) => Some(std::cmp::max(a_val, b_val)),
(Some(val), None) | (None, Some(val)) => Some(val),
(None, None) => None,
}
}

fn calculate_upper_bound(at: Option<u64>, before: Option<u64>) -> Option<u64> {
match (at, before) {
(Some(at_value), Some(before_value)) => {
if before_value > 0 {
Some(std::cmp::min(at_value, before_value - 1))
} else {
// Handle the case where before_value is 0 and cannot be decremented
Some(at_value)
}
}
(Some(at_value), None) => Some(at_value),
(None, Some(before_value)) => {
if before_value > 0 {
Some(before_value - 1)
} else {
// Handle the case where before_value is 0 and cannot be decremented
None
}
}
fn min_option(a: Option<u64>, b: Option<u64>) -> Option<u64> {
match (a, b) {
(Some(a_val), Some(b_val)) => Some(std::cmp::min(a_val, b_val)),
(Some(val), None) | (None, Some(val)) => Some(val),
(None, None) => None,
}
}

/// Given two optional checkpoints, return the larger of the two. We increment `after` by 1 so that
/// we can uniformly select the `min_tx_sequence_number` for the lower bound checkpoint.
fn calculate_lower_bound(after: Option<u64>, at: Option<u64>) -> Option<u64> {
max_option(after.map(|x| x + 1), at)
}

/// Given two optional checkpoints, return the smaller of the two. We decrement `before` by 1 so
/// that we can uniformly select the `max_tx_sequence_number` for the upper bound checkpoint.
/// Assumes that `before` must be a positive integer.
fn calculate_upper_bound(at: Option<u64>, before: Option<u64>) -> Option<u64> {
min_option(at, before.map(|x| x.saturating_sub(1)))
}

0 comments on commit a1168d7

Please sign in to comment.