Skip to content

Commit

Permalink
Added sql_query expression function
Browse files Browse the repository at this point in the history
  • Loading branch information
mdecimus committed Feb 15, 2024
1 parent 53f0222 commit 7f5538a
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 13 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

93 changes: 90 additions & 3 deletions crates/smtp/src/core/eval.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{borrow::Cow, net::IpAddr, sync::Arc, vec::IntoIter};
use std::{borrow::Cow, cmp::Ordering, net::IpAddr, sync::Arc, vec::IntoIter};

use directory::Directory;
use mail_auth::IpLookupStrategy;
use sieve::Sieve;
use store::{Deserialize, LookupStore};
use smtp_proto::IntoString;
use store::{Deserialize, LookupStore, Rows, Value};
use utils::{
config::if_block::IfBlock,
expr::{Expression, Variable},
Expand All @@ -29,7 +30,8 @@ pub const F_IS_LOCAL_DOMAIN: u32 = 0;
pub const F_IS_LOCAL_ADDRESS: u32 = 1;
pub const F_KEY_GET: u32 = 2;
pub const F_KEY_EXISTS: u32 = 3;
pub const F_DNS_QUERY: u32 = 4;
pub const F_SQL_QUERY: u32 = 4;
pub const F_DNS_QUERY: u32 = 5;

pub const VARIABLES_MAP: &[(&str, u32)] = &[
("rcpt", V_RECIPIENT),
Expand All @@ -51,6 +53,7 @@ pub const FUNCTIONS_MAP: &[(&str, u32, u32)] = &[
("key_get", F_KEY_GET, 2),
("key_exists", F_KEY_EXISTS, 2),
("dns_query", F_DNS_QUERY, 2),
("sql_query", F_SQL_QUERY, 3),
];

impl SMTP {
Expand Down Expand Up @@ -200,6 +203,7 @@ impl SMTP {
.into()
}
F_DNS_QUERY => self.dns_query(params).await,
F_SQL_QUERY => self.sql_query(params).await,
_ => Variable::default(),
}
}
Expand Down Expand Up @@ -294,6 +298,65 @@ impl SMTP {
})
}

async fn sql_query<'x>(&self, mut arguments: FncParams<'x>) -> Variable<'x> {
let store = self.get_lookup_store(arguments.next_as_string().as_ref());
let query = arguments.next_as_string();

if query.is_empty() {
tracing::warn!(
context = "eval:sql_query",
event = "invalid",
reason = "Empty query string",
);
return Variable::default();
}

// Obtain arguments
let arguments = match arguments.next() {
Variable::Array(l) => l.into_iter().map(to_store_value).collect(),
v => vec![to_store_value(v)],
};

// Run query
if query
.as_bytes()
.get(..6)
.map_or(false, |q| q.eq_ignore_ascii_case(b"SELECT"))
{
if let Ok(mut rows) = store.query::<Rows>(&query, arguments).await {
match rows.rows.len().cmp(&1) {
Ordering::Equal => {
let mut row = rows.rows.pop().unwrap().values;
match row.len().cmp(&1) {
Ordering::Equal if !matches!(row.first(), Some(Value::Null)) => {
row.pop().map(into_variable).unwrap()
}
Ordering::Less => Variable::default(),
_ => Variable::Array(
row.into_iter().map(into_variable).collect::<Vec<_>>(),
),
}
}
Ordering::Less => Variable::default(),
Ordering::Greater => rows
.rows
.into_iter()
.map(|r| {
Variable::Array(
r.values.into_iter().map(into_variable).collect::<Vec<_>>(),
)
})
.collect::<Vec<_>>()
.into(),
}
} else {
false.into()
}
} else {
store.query::<usize>(&query, arguments).await.is_ok().into()
}
}

async fn dns_query<'x>(&self, mut arguments: FncParams<'x>) -> Variable<'x> {
let entry = arguments.next_as_string();
let record_type = arguments.next_as_string();
Expand Down Expand Up @@ -386,6 +449,10 @@ impl<'x> FncParams<'x> {
pub fn next_as_string(&mut self) -> Cow<'x, str> {
self.params.next().unwrap().into_string()
}

pub fn next(&mut self) -> Variable<'x> {
self.params.next().unwrap()
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -414,3 +481,23 @@ impl VariableWrapper {
self.0
}
}

fn to_store_value(value: Variable) -> Value {
match value {
Variable::String(v) => Value::Text(v),
Variable::Integer(v) => Value::Integer(v),
Variable::Float(v) => Value::Float(v),
v => Value::Text(v.to_string().into_owned().into()),
}
}

fn into_variable(value: Value) -> Variable {
match value {
Value::Integer(v) => Variable::Integer(v),
Value::Bool(v) => Variable::Integer(i64::from(v)),
Value::Float(v) => Variable::Float(v),
Value::Text(v) => Variable::String(v),
Value::Blob(v) => Variable::String(v.into_owned().into_string().into()),
Value::Null => Variable::default(),
}
}
12 changes: 8 additions & 4 deletions crates/smtp/src/scripts/plugins/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,20 @@ pub fn exec(ctx: PluginContext<'_>) -> Variable {
let span = ctx.span;

// Obtain store name
let store = ctx.arguments[0].to_string();
let store = if let Some(store_) = ctx.core.shared.lookup_stores.get(store.as_ref()) {
store_
let store = match &ctx.arguments[0] {
Variable::String(v) if !v.is_empty() => ctx.core.shared.lookup_stores.get(v.as_ref()),
_ => Some(&ctx.core.shared.default_lookup_store),
};

let store = if let Some(store) = store {
store
} else {
tracing::warn!(
parent: span,
context = "sieve:query",
event = "failed",
reason = "Unknown store",
store = %store,
store = ctx.arguments[0].to_string().as_ref(),
);
return false.into();
};
Expand Down
2 changes: 1 addition & 1 deletion crates/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ resolver = "2"
[dependencies]
utils = { path = "../utils" }
nlp = { path = "../nlp" }
rocksdb = { version = "0.21", optional = true, features = ["multi-threaded-cf"] }
rocksdb = { version = "0.22", optional = true, features = ["multi-threaded-cf"] }
foundationdb = { version = "0.8.0", features = ["embedded-fdb-include"], optional = true }
rusqlite = { version = "0.30.0", features = ["bundled"], optional = true }
rust-s3 = { version = "0.33.0", default-features = false, features = ["tokio-rustls-tls"], optional = true }
Expand Down

0 comments on commit 7f5538a

Please sign in to comment.