Skip to content

Commit

Permalink
Add support for percentiles
Browse files Browse the repository at this point in the history
  • Loading branch information
rcoh committed Mar 6, 2018
1 parent 119a752 commit 14a3ccd
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 20 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ nom = "4.0.0-beta1"
clap = "2.30.0"
regex = "0.2"
regex-syntax = "0.2"
terminal_size = "0.1.7"
terminal_size = "0.1.7"
quantiles = "0.7.1"
23 changes: 14 additions & 9 deletions src/lang.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub enum Search {
MatchAll,
}

#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq)]
pub enum Operator {
Inline(InlineOperator),
Aggregate(AggregateOperator),
Expand All @@ -23,7 +23,7 @@ pub enum InlineOperator {
},
}

#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq)]
pub enum AggregateFunction {
Count,
Sum {
Expand All @@ -33,19 +33,20 @@ pub enum AggregateFunction {
column: String,
},
Percentile {
percentiles: Vec<u64>,
percentile: f64,
percentile_str: String,
column: String,
},
}

#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq)]
pub struct AggregateOperator {
pub key_cols: Vec<String>,
pub aggregate_function: AggregateFunction,
pub output_column: Option<String>,
}

#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq)]
pub struct Query {
pub search: Search,
pub operators: Vec<Operator>,
Expand Down Expand Up @@ -109,10 +110,13 @@ fn is_digit_char(digit: char) -> bool {
named!(p_nn<&str, AggregateFunction>, ws!(
do_parse!(
alt!(tag!("pct") | tag!("percentile") | tag!("p")) >>
d1: map_res!(take_while_m_n!(2, 2, is_digit_char), |d: &str|d.parse::<u64>()) >>
pct: take_while_m_n!(2, 2, is_digit_char) >>
column: delimited!(tag!("("), ident ,tag!(")")) >>
(AggregateFunction::Percentile{column: column.to_string(), percentiles: vec![d1]})

(AggregateFunction::Percentile{
column: column.to_string(),
percentile: (".".to_owned() + pct).parse::<f64>().unwrap(),
percentile_str: pct.to_string()
})
)
));

Expand Down Expand Up @@ -247,7 +251,8 @@ mod tests {
key_cols: vec![],
aggregate_function: AggregateFunction::Percentile {
column: "x".to_string(),
percentiles: vec![50],
percentile: 0.5,
percentile_str: "50".to_string()
},
output_column: None,
})
Expand Down
14 changes: 11 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,17 @@ pub mod pipeline {
))
}
AggregateFunction::Percentile {
column: _column,
percentiles: _percentiles,
} => panic!("Percentile not supported"),
column,
percentile,
percentile_str,
} => {
let column_name = format!("_p{}", percentile_str);
Box::new(operator::Grouper::<operator::Percentile>::new(
op.key_cols.iter().map(AsRef::as_ref).collect(),
&op.output_column.unwrap_or(column_name),
operator::Percentile::empty(column, percentile),
))
}
}
}

Expand Down
62 changes: 55 additions & 7 deletions src/operator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
extern crate itertools;
extern crate ord_subset;
extern crate quantiles;
extern crate regex;
extern crate regex_syntax;
extern crate serde_json;
Expand All @@ -9,6 +10,7 @@ use self::ord_subset::OrdSubsetSliceExt;
use data::{Aggregate, Record, Row};
use data;
use self::serde_json::Value;
use self::quantiles::ckms::CKMS;
pub trait UnaryPreAggOperator {
fn process(&self, rec: &Record) -> Option<Record>;
}
Expand All @@ -18,7 +20,7 @@ pub trait AggregateOperator {
fn process(&mut self, row: &Row);
}

pub trait AccumOperator: Clone {
pub trait AggregateFunction: Clone {
fn process(&mut self, rec: &Record);
fn emit(&self) -> data::Value;
}
Expand All @@ -34,7 +36,7 @@ impl Count {
}
}

impl AccumOperator for Count {
impl AggregateFunction for Count {
fn process(&mut self, _rec: &Record) {
self.count += 1;
}
Expand Down Expand Up @@ -63,7 +65,7 @@ impl Sum {
}
}

impl AccumOperator for Sum {
impl AggregateFunction for Sum {
fn process(&mut self, rec: &Record) {
rec.data
.get(&self.column)
Expand Down Expand Up @@ -105,7 +107,7 @@ impl Average {
}
}

impl AccumOperator for Average {
impl AggregateFunction for Average {
fn process(&mut self, rec: &Record) {
rec.data
.get(&self.column)
Expand All @@ -129,14 +131,60 @@ impl AccumOperator for Average {
}
}

pub struct Grouper<T: AccumOperator> {
#[derive(Clone)]
pub struct Percentile {
ckms: CKMS<f64>,
column: String,
percentile: f64,
warnings: Vec<String>,
}

impl Percentile {
pub fn empty(column: String, percentile: f64) -> Self {
if percentile >= 1.0 {
panic!("Percentiles must be < 1");
}

Percentile {
ckms: CKMS::<f64>::new(0.001),
column: column,
warnings: Vec::new(),
percentile: percentile,
}
}
}

impl AggregateFunction for Percentile {
fn process(&mut self, rec: &Record) {
rec.data
.get(&self.column)
.iter()
.for_each(|value| match value {
&&data::Value::Float(ref f) => {
self.ckms.insert(*f);
}
&&data::Value::Int(ref i) => self.ckms.insert(*i as f64),
_other => self.warnings
.push("Got string. Can only average int or float".to_string()),
});
}

fn emit(&self) -> data::Value {
let pct_opt = self.ckms.query(self.percentile);
pct_opt
.map(|(_usize, pct_float)| data::Value::Float(pct_float))
.unwrap_or(data::Value::no_value())
}
}

pub struct Grouper<T: AggregateFunction> {
key_cols: Vec<String>,
agg_col: String,
state: HashMap<Vec<String>, T>,
empty: T,
}

impl<T: AccumOperator> Grouper<T> {
impl<T: AggregateFunction> Grouper<T> {
pub fn new(key_cols: Vec<&str>, agg_col: &str, empty: T) -> Grouper<T> {
Grouper {
key_cols: key_cols.iter().map(|s| s.to_string()).collect(),
Expand All @@ -147,7 +195,7 @@ impl<T: AccumOperator> Grouper<T> {
}
}

impl<T: AccumOperator> AggregateOperator for Grouper<T> {
impl<T: AggregateFunction> AggregateOperator for Grouper<T> {
fn emit(&self) -> data::Aggregate {
let mut data: Vec<(HashMap<String, String>, data::Value)> = self.state
.iter()
Expand Down

0 comments on commit 14a3ccd

Please sign in to comment.