Skip to content

Commit

Permalink
Extract filter parsing into a struct with serde
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Sep 23, 2024
1 parent 2848941 commit 52067c1
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 140 deletions.
62 changes: 34 additions & 28 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ regex = { version = "1", default-features = false }
serde = { version = "1", features = ["derive"] }
thiserror = { version = "1" }
tracing = "0.1"
odata-params = "0.4.0"
odata-params = "0.4"

[dev-dependencies]
datafusion = { version = "42", default-features = false, features = [
Expand All @@ -40,7 +40,7 @@ tokio = { version = "1", default-features = false, features = [
"rt-multi-thread",
] }
tower = "0.5"
tower-http = { version = "0.5", features = ["trace", "cors"] }
tower-http = { version = "0.6", features = ["trace", "cors"] }

[patch.crates-io]
# datafusion = { git = 'https://github.com/apache/arrow-datafusion.git', tag = '42.0.0-rc1' }
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ xh GET 'http://localhost:50051/$metadata'

Query collection:
```sh
xh GET 'http://localhost:50051/tickers.spy/?$select=offset,from_symbol,to_symbol,close&$top=5'
xh GET 'http://localhost:50051/tickers.spy?$select=offset,from_symbol,to_symbol,close&$top=5'
```

## Status
Expand Down
96 changes: 4 additions & 92 deletions src/collection.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
use chrono::DateTime;
use datafusion::{
common::{Column, ScalarValue},
logical_expr::{expr::InList, BinaryExpr, Operator},
prelude::*,
};
use odata_params::filters::{
CompareOperator as ODataOperator, Expr as ODataExpr, Value as ODataValue,
};
use datafusion::prelude::*;

use crate::error::{FilterParsingError, ODataError, UnsupportedFeature};
use crate::{error::ODataError, filter::ODataFilter};

///////////////////////////////////////////////////////////////////////////////

Expand All @@ -23,7 +15,7 @@ pub struct QueryParamsRaw {
#[serde(rename = "$top")]
pub top: Option<u64>,
#[serde(rename = "$filter")]
pub filter: Option<String>,
pub filter: Option<ODataFilter>,
}

///////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -53,20 +45,12 @@ impl QueryParamsRaw {
let skip = self.skip.map(|v| v as usize);
let top = self.top.map(|v| v as usize);

let filter = match self.filter {
Some(fltr) => {
let parsed_fltr = odata_params::filters::parse_str(fltr)?;
Some(odata_expr_to_df_expr(&parsed_fltr)?)
}
None => None,
};

Ok(QueryParams {
select,
order_by,
skip,
top,
filter,
filter: self.filter.map(Into::into),
})
}
}
Expand Down Expand Up @@ -141,78 +125,6 @@ impl QueryParams {
}
}

fn odata_expr_to_df_expr(res: &ODataExpr) -> Result<Expr, ODataError> {
match res {
ODataExpr::Or(l, r) => Ok(Expr::BinaryExpr(BinaryExpr::new(
Box::new(odata_expr_to_df_expr(l)?),
Operator::Or,
Box::new(odata_expr_to_df_expr(r)?),
))),
ODataExpr::And(l, r) => Ok(Expr::BinaryExpr(BinaryExpr::new(
Box::new(odata_expr_to_df_expr(l)?),
Operator::And,
Box::new(odata_expr_to_df_expr(r)?),
))),
ODataExpr::Compare(l, op, r) => Ok(Expr::BinaryExpr(BinaryExpr::new(
Box::new(odata_expr_to_df_expr(l)?),
odata_op_to_df_op(op),
Box::new(odata_expr_to_df_expr(r)?),
))),
ODataExpr::Value(v) => Ok(Expr::Literal(odata_value_to_df_value(v)?)),
ODataExpr::Not(e) => Ok(Expr::Not(Box::new(odata_expr_to_df_expr(e)?))),
ODataExpr::In(i, l) => Ok(Expr::InList(InList::new(
Box::new(odata_expr_to_df_expr(i)?),
l.iter()
.map(odata_expr_to_df_expr)
.collect::<Result<Vec<Expr>, ODataError>>()?,
false,
))),
ODataExpr::Identifier(s) => Ok(Expr::Column(Column::new_unqualified(s))),
ODataExpr::Function(..) => {
Err(UnsupportedFeature::new("Function within the filter is not supported").into())
}
}
}

fn odata_value_to_df_value(v: &ODataValue) -> Result<ScalarValue, ODataError> {
match v {
ODataValue::String(s) => Ok(ScalarValue::LargeUtf8(Some(s.clone()))),
ODataValue::Bool(b) => Ok(ScalarValue::Boolean(Some(*b))),
ODataValue::Null => Ok(ScalarValue::Null),
ODataValue::Number(d) => {
let d = d
.to_string()
.parse::<i64>()
.map_err(|_| FilterParsingError::new("Failed to parse number"))?;
Ok(ScalarValue::Int64(Some(d)))
}
ODataValue::DateTime(d) => Ok(ScalarValue::Date64(Some(d.timestamp()))),
ODataValue::Date(d) => {
let d = d
.and_hms_opt(0, 0, 0)
.ok_or(FilterParsingError::new("Failed to parse date"))?;
let timestamp =
DateTime::<chrono::Utc>::from_naive_utc_and_offset(d, chrono::Utc).timestamp();
Ok(ScalarValue::Date64(Some(timestamp)))
}
ODataValue::Uuid(u) => Ok(ScalarValue::LargeUtf8(Some(u.to_string()))),
ODataValue::Time(_) => {
Err(UnsupportedFeature::new("Time value in filter is not supported").into())
}
}
}

fn odata_op_to_df_op(op: &ODataOperator) -> Operator {
match op {
ODataOperator::Equal => Operator::Eq,
ODataOperator::NotEqual => Operator::NotEq,
ODataOperator::LessThan => Operator::Lt,
ODataOperator::GreaterThan => Operator::Gt,
ODataOperator::LessOrEqual => Operator::LtEq,
ODataOperator::GreaterOrEqual => Operator::GtEq,
}
}

///////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
Loading

0 comments on commit 52067c1

Please sign in to comment.