From 55cad4a1fdd9c53c5c371504b69598d1fa92ef90 Mon Sep 17 00:00:00 2001 From: cjc7373 Date: Sat, 11 Nov 2023 21:05:32 +0800 Subject: [PATCH] perf: improve performance of filter_period_intersect (#436) * perf: improve performance of filter_period_intersect * forgot some files * fix lint errors --- aw-query/src/functions.rs | 2 +- aw-transform/benches/bench.rs | 2 +- aw-transform/src/filter_period.rs | 105 +++++++++++++++++++++++------- 3 files changed, 85 insertions(+), 24 deletions(-) diff --git a/aw-query/src/functions.rs b/aw-query/src/functions.rs index 53e7d9bb..c060b9a6 100644 --- a/aw-query/src/functions.rs +++ b/aw-query/src/functions.rs @@ -506,7 +506,7 @@ mod qfunctions { let events: Vec = (&args[0]).try_into()?; let filter_events: Vec = (&args[1]).try_into()?; - let mut filtered_events = aw_transform::filter_period_intersect(&events, &filter_events); + let mut filtered_events = aw_transform::filter_period_intersect(events, filter_events); let mut filtered_tagged_events = Vec::new(); for event in filtered_events.drain(..) { filtered_tagged_events.push(DataType::Event(event)); diff --git a/aw-transform/benches/bench.rs b/aw-transform/benches/bench.rs index 443f7ba3..0631e67c 100644 --- a/aw-transform/benches/bench.rs +++ b/aw-transform/benches/bench.rs @@ -45,7 +45,7 @@ fn bench_filter_period_intersect(c: &mut Criterion) { c.bench_function("1000 events", |b| { b.iter(|| { let events1 = create_events(1000); - filter_period_intersect(&events1, &events2); + filter_period_intersect(events1, events2.clone()); }) }); } diff --git a/aw-transform/src/filter_period.rs b/aw-transform/src/filter_period.rs index 6ab5c00f..6af8d5c2 100644 --- a/aw-transform/src/filter_period.rs +++ b/aw-transform/src/filter_period.rs @@ -1,5 +1,7 @@ use aw_models::Event; -use chrono::{DateTime, Utc}; +use chrono::Duration; + +use crate::sort_by_timestamp; /// Removes events not intersecting with the provided filter_events /// @@ -15,33 +17,55 @@ use chrono::{DateTime, Utc}; /// filter_events: [ ] [ ] /// output: [a ] [a ][b ] /// ``` -pub fn filter_period_intersect(events: &[Event], filter_events: &[Event]) -> Vec { +pub fn filter_period_intersect(events: Vec, filter_events: Vec) -> Vec { + if events.len() == 0 || filter_events.len() == 0 { + return Vec::new(); + } + let mut filtered_events = Vec::new(); + let events = sort_by_timestamp(events); + let filter_events = sort_by_timestamp(filter_events); - // Start with pre-calculating endtimes of events - let mut events_with_endtimes: Vec<(&Event, DateTime)> = Vec::new(); - for event in events { - events_with_endtimes.push((event, event.calculate_endtime())); - } + let mut events_iter = events.into_iter(); + let mut filter_events_iter = filter_events.into_iter(); + let mut cur_event = events_iter.next().unwrap(); + let mut cur_filter_event = filter_events_iter.next().unwrap(); - // Do actual filtering - for filter in filter_events { - let filter_endtime = filter.calculate_endtime(); - for (event, event_endtime) in &events_with_endtimes { - if event.timestamp > filter_endtime { - continue; + loop { + let event_endtime = cur_event.calculate_endtime(); + let filter_endtime = cur_filter_event.calculate_endtime(); + if cur_event.duration == Duration::seconds(0) || event_endtime <= cur_filter_event.timestamp + { + match events_iter.next() { + Some(e) => { + cur_event = e; + continue; + } + None => return filtered_events, } - if *event_endtime < filter.timestamp { - continue; + } + if cur_event.timestamp >= cur_filter_event.calculate_endtime() { + match filter_events_iter.next() { + Some(e) => { + cur_filter_event = e; + continue; + } + None => return filtered_events, } - let mut e = (*event).clone(); - e.timestamp = std::cmp::max(e.timestamp, filter.timestamp); - let endtime = std::cmp::min(*event_endtime, filter_endtime); - e.duration = endtime - e.timestamp; - filtered_events.push(e); } + + let mut e = cur_event.clone(); + e.timestamp = std::cmp::max(e.timestamp, cur_filter_event.timestamp); + let endtime = std::cmp::min(event_endtime, filter_endtime); + e.duration = endtime - e.timestamp; + + // trim current event + let old_timestamp = cur_event.timestamp; + cur_event.timestamp = e.timestamp + e.duration; + cur_event.duration = old_timestamp + cur_event.duration - cur_event.timestamp; + + filtered_events.push(e); } - filtered_events } #[cfg(test)] @@ -81,7 +105,8 @@ mod tests { data: json_map! {"test": json!(1)}, }; - let filtered_events = filter_period_intersect(&vec![e1, e2, e3, e4, e5], &[filter_event]); + let filtered_events = + filter_period_intersect(vec![e1, e2, e3, e4, e5], vec![filter_event.clone()]); assert_eq!(filtered_events.len(), 3); assert_eq!(filtered_events[0].duration, Duration::milliseconds(500)); assert_eq!(filtered_events[1].duration, Duration::milliseconds(1000)); @@ -93,5 +118,41 @@ mod tests { assert_eq!(filtered_events[1].timestamp, dt); let dt: DateTime = DateTime::from_str("2000-01-01T00:00:04.000Z").unwrap(); assert_eq!(filtered_events[2].timestamp, dt); + + let timestamp_01s = DateTime::from_str("2000-01-01T00:00:01Z").unwrap(); + let e = Event { + id: None, + timestamp: timestamp_01s, + duration: Duration::seconds(1), + data: json_map! {"test": json!(1)}, + }; + let mut f2 = filter_event.clone(); + f2.timestamp = DateTime::from_str("2000-01-01T00:00:00Z").unwrap(); + f2.duration = Duration::milliseconds(1500); + let res = filter_period_intersect(vec![e.clone()], vec![f2]); + assert_eq!(res[0].timestamp, timestamp_01s); + assert_eq!(res[0].duration, Duration::milliseconds(500)); + + let timestamp_01_5s = DateTime::from_str("2000-01-01T00:00:01.5Z").unwrap(); + let mut f3 = filter_event.clone(); + f3.timestamp = timestamp_01_5s; + f3.duration = Duration::milliseconds(1000); + let res = filter_period_intersect(vec![e.clone()], vec![f3]); + assert_eq!(res[0].timestamp, timestamp_01_5s); + assert_eq!(res[0].duration, Duration::milliseconds(500)); + + let mut f4 = filter_event.clone(); + f4.timestamp = DateTime::from_str("2000-01-01T00:00:01.5Z").unwrap(); + f4.duration = Duration::milliseconds(100); + let res = filter_period_intersect(vec![e.clone()], vec![f4]); + assert_eq!(res[0].timestamp, timestamp_01_5s); + assert_eq!(res[0].duration, Duration::milliseconds(100)); + + let mut f5 = filter_event.clone(); + f5.timestamp = DateTime::from_str("2000-01-01T00:00:00Z").unwrap(); + f5.duration = Duration::seconds(10); + let res = filter_period_intersect(vec![e.clone()], vec![f5]); + assert_eq!(res[0].timestamp, timestamp_01s); + assert_eq!(res[0].duration, Duration::milliseconds(1000)); } }