Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(over window): skip remaining affected rows when rank is not changed #18950

Open
wants to merge 1 commit into
base: rc/over-window-state-compute-metric
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,8 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
Option<RangeInclusive<StateKey>>,
)> {
let input_schema_len = table.get_data_types().len() - calls.len();
let rank_funcs_only = calls.iter().all(|call| call.kind.is_rank());

let mut part_changes = BTreeMap::new();
let mut accessed_entry_count = 0;
let mut compute_count = 0;
Expand All @@ -437,6 +439,10 @@ impl<'a, S: StateStore> OverPartition<'a, S> {

let snapshot = part_with_delta.snapshot();
let delta = part_with_delta.delta();
let last_delta_key = delta
.last_key_value()
.map(|(k, _)| k)
.expect("delta shouldn't be empty");

// Generate delete changes first, because deletes are skipped during iteration over
// `part_with_delta` in the next step.
Expand Down Expand Up @@ -530,12 +536,19 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
let (key, row) = curr_key_cursor
.key_value()
.expect("cursor must be valid until `last_curr_key`");
let mut should_continue = true;

let output = states.slide_no_evict_hint()?;
compute_count += 1;

let old_output = &row.as_inner()[input_schema_len..];
if !old_output.is_empty() && old_output == output {
same_output_count += 1;

if rank_funcs_only && key >= last_delta_key {
// there won't be any more changes after this point, we can stop early
should_continue = false;
}
}

let new_row = OwnedRow::new(
Expand Down Expand Up @@ -568,7 +581,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {

curr_key_cursor.move_next();

key != last_curr_key
should_continue && key != last_curr_key
} {}
}

Expand Down