From 9ca6f0ba9ee9abba300fd0e162c1bc95a8af3ff9 Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Tue, 14 Jan 2025 20:21:50 -0500 Subject: [PATCH] feat: support v1 query API GROUP BY semantics This updates the v1 /query API hanlder to handle InfluxDB v1's unique query response structure when GROUP BY clauses are provided. The distinction is in the addition of a "tags" field to the emitted series data that contains a map of the GROUP BY tags along with their distinct values associated with the data in the "values" field. This required splitting the QueryExecutor into two query paths for InfluxQL and SQL, as this allowed for handling InfluxQL query parsing in advance of query planning. A set of snapshot tests were added to check that it all works. --- Cargo.lock | 11 +- Cargo.toml | 9 +- influxdb3/tests/server/main.rs | 2 +- influxdb3/tests/server/query.rs | 58 +++ ...rver__query__api_v1_query_group_by-10.snap | 111 +++++ ...rver__query__api_v1_query_group_by-11.snap | 88 ++++ ...rver__query__api_v1_query_group_by-12.snap | 115 ++++++ ...rver__query__api_v1_query_group_by-13.snap | 84 ++++ ...rver__query__api_v1_query_group_by-14.snap | 111 +++++ ...rver__query__api_v1_query_group_by-15.snap | 41 ++ ...rver__query__api_v1_query_group_by-16.snap | 41 ++ ...erver__query__api_v1_query_group_by-2.snap | 71 ++++ ...erver__query__api_v1_query_group_by-3.snap | 84 ++++ ...erver__query__api_v1_query_group_by-4.snap | 111 +++++ ...erver__query__api_v1_query_group_by-5.snap | 84 ++++ ...erver__query__api_v1_query_group_by-6.snap | 111 +++++ ...erver__query__api_v1_query_group_by-7.snap | 56 +++ ...erver__query__api_v1_query_group_by-8.snap | 65 +++ ...erver__query__api_v1_query_group_by-9.snap | 84 ++++ .../server__query__api_v1_query_group_by.snap | 62 +++ influxdb3_internal_api/Cargo.toml | 1 + influxdb3_internal_api/src/query_executor.rs | 58 +-- influxdb3_py_api/src/system_py.rs | 11 +- influxdb3_server/Cargo.toml | 2 + influxdb3_server/src/http.rs | 42 +- influxdb3_server/src/http/v1.rs | 380 +++++++++++++++--- influxdb3_server/src/query_executor/mod.rs | 131 ++++-- influxdb3_server/src/query_planner.rs | 126 +++++- 28 files changed, 2000 insertions(+), 150 deletions(-) create mode 100644 influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-10.snap create mode 100644 influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-11.snap create mode 100644 influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-12.snap create mode 100644 influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-13.snap create mode 100644 influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-14.snap create mode 100644 influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-15.snap create mode 100644 influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-16.snap create mode 100644 influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-2.snap create mode 100644 influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-3.snap create mode 100644 influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-4.snap create mode 100644 influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-5.snap create mode 100644 influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-6.snap create mode 100644 influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-7.snap create mode 100644 influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-8.snap create mode 100644 influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-9.snap create mode 100644 influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by.snap diff --git a/Cargo.lock b/Cargo.lock index d994b338d7a..832a745d01e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2918,6 +2918,7 @@ version = "0.1.0" dependencies = [ "async-trait", "datafusion", + "influxdb_influxql_parser", "iox_query", "iox_query_params", "thiserror 1.0.69", @@ -3052,6 +3053,7 @@ dependencies = [ "influxdb3_telemetry", "influxdb3_wal", "influxdb3_write", + "influxdb_influxql_parser", "iox_catalog", "iox_http", "iox_query", @@ -3071,6 +3073,7 @@ dependencies = [ "pin-project-lite", "pretty_assertions", "pyo3", + "regex", "schema", "secrecy", "serde", @@ -6370,9 +6373,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.42.0" +version = "1.43.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551" +checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" dependencies = [ "backtrace", "bytes", @@ -6399,9 +6402,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 2b97362fa86..7bb9bda7036 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,8 +71,8 @@ datafusion-proto = { git = "https://github.com/influxdata/arrow-datafusion.git", dashmap = "6.1.0" dotenvy = "0.15.7" flate2 = "1.0.27" -futures = "0.3.28" -futures-util = "0.3.30" +futures = "0.3.31" +futures-util = "0.3.31" hashbrown = { version = "0.15.1", features = ["serde"] } hex = "0.4.3" http = "0.2.9" @@ -101,6 +101,7 @@ prost-build = "0.12.6" prost-types = "0.12.6" proptest = { version = "1", default-features = false, features = ["std"] } rand = "0.8.5" +regex = "1.11.1" reqwest = { version = "0.11.27", default-features = false, features = ["rustls-tls", "stream", "json"] } secrecy = "0.8.0" serde = { version = "1.0", features = ["derive"] } @@ -117,8 +118,8 @@ sysinfo = "0.30.8" tempfile = "3.14.0" test-log = { version = "0.2.16", features = ["trace"] } thiserror = "1.0" -tokio = { version = "1.42", features = ["full"] } -tokio-util = "0.7.9" +tokio = { version = "1.43", features = ["full"] } +tokio-util = "0.7.13" tonic = { version = "0.11.0", features = ["tls", "tls-roots"] } tonic-build = "0.11.0" tonic-health = "0.11.0" diff --git a/influxdb3/tests/server/main.rs b/influxdb3/tests/server/main.rs index 9c75f589d4a..ff2d4b8d085 100644 --- a/influxdb3/tests/server/main.rs +++ b/influxdb3/tests/server/main.rs @@ -147,7 +147,7 @@ impl TestServer { // Use the TEST_LOG env var to determine if logs are emitted from the spawned process let emit_logs = if std::env::var("TEST_LOG").is_ok() { // use "info" filter, as would be used in production: - command.env("LOG_FILTER", "info"); + command.env("LOG_FILTER", "debug"); true } else { false diff --git a/influxdb3/tests/server/query.rs b/influxdb3/tests/server/query.rs index 8d376f13f51..51fa441e2d0 100644 --- a/influxdb3/tests/server/query.rs +++ b/influxdb3/tests/server/query.rs @@ -1,3 +1,5 @@ +use core::str; + use crate::TestServer; use futures::StreamExt; use hyper::StatusCode; @@ -1582,6 +1584,62 @@ async fn api_v1_query_uri_and_body() { } } +#[tokio::test] +async fn api_v1_query_group_by() { + let server = TestServer::spawn().await; + + server + .write_lp_to_db( + "foo", + "\ + bar,t1=a,t2=aa val=1 2998574931\n\ + bar,t1=b,t2=aa val=2 2998574932\n\ + bar,t1=a,t2=bb val=3 2998574933\n\ + bar,t1=b,t2=bb val=4 2998574934", + Precision::Second, + ) + .await + .unwrap(); + + for (chunked, query) in [ + (false, "select * from bar group by t1"), + (true, "select * from bar group by t1"), + (false, "select * from bar group by t1, t2"), + (true, "select * from bar group by t1, t2"), + (false, "select * from bar group by /t/"), + (true, "select * from bar group by /t/"), + (false, "select * from bar group by /t[1]/"), + (true, "select * from bar group by /t[1]/"), + (false, "select * from bar group by /t[1,2]/"), + (true, "select * from bar group by /t[1,2]/"), + (false, "select * from bar group by t1, t2, t3"), + (true, "select * from bar group by t1, t2, t3"), + (false, "select * from bar group by *"), + (true, "select * from bar group by *"), + (false, "select * from bar group by /not_a_match/"), + (true, "select * from bar group by /not_a_match/"), + ] { + let params = vec![ + ("db", "foo"), + ("q", query), + ("chunked", if chunked { "true" } else { "false" }), + ]; + let stream = server.api_v1_query(¶ms, None).await.bytes_stream(); + let values = stream + .map(|chunk| serde_json::from_slice(&chunk.unwrap()).unwrap()) + .collect::>() + .await; + // Use a snapshot to assert on the output structure. This deserializes each emitted line as + // as JSON first, then combines and collects them into a Vec to serialize into a JSON + // array for the snapshot. + insta::with_settings!({ + description => format!("query: {query}, chunked: {chunked}"), + }, { + insta::assert_json_snapshot!(values); + }); + } +} + #[tokio::test] async fn api_v3_query_sql_distinct_cache() { let server = TestServer::spawn().await; diff --git a/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-10.snap b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-10.snap new file mode 100644 index 00000000000..f9555d9337d --- /dev/null +++ b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-10.snap @@ -0,0 +1,111 @@ +--- +source: influxdb3/tests/server/query.rs +description: "query: select * from bar group by /t[1,2]/, chunked: true" +expression: values +--- +[ + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "aa" + }, + "values": [ + [ + "2065-01-07T17:28:51Z", + 1.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + }, + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "bb" + }, + "values": [ + [ + "2065-01-07T17:28:53Z", + 3.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + }, + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "aa" + }, + "values": [ + [ + "2065-01-07T17:28:52Z", + 2.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + }, + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "bb" + }, + "values": [ + [ + "2065-01-07T17:28:54Z", + 4.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + } +] diff --git a/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-11.snap b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-11.snap new file mode 100644 index 00000000000..8022b0794c6 --- /dev/null +++ b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-11.snap @@ -0,0 +1,88 @@ +--- +source: influxdb3/tests/server/query.rs +description: "query: select * from bar group by t1, t2, t3, chunked: false" +expression: values +--- +[ + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "bb", + "t3": "" + }, + "values": [ + [ + "2065-01-07T17:28:54Z", + 4.0 + ] + ] + }, + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "aa", + "t3": "" + }, + "values": [ + [ + "2065-01-07T17:28:52Z", + 2.0 + ] + ] + }, + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "bb", + "t3": "" + }, + "values": [ + [ + "2065-01-07T17:28:53Z", + 3.0 + ] + ] + }, + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "aa", + "t3": "" + }, + "values": [ + [ + "2065-01-07T17:28:51Z", + 1.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + } +] diff --git a/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-12.snap b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-12.snap new file mode 100644 index 00000000000..76f9493fe0a --- /dev/null +++ b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-12.snap @@ -0,0 +1,115 @@ +--- +source: influxdb3/tests/server/query.rs +description: "query: select * from bar group by t1, t2, t3, chunked: true" +expression: values +--- +[ + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "aa", + "t3": "" + }, + "values": [ + [ + "2065-01-07T17:28:51Z", + 1.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + }, + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "bb", + "t3": "" + }, + "values": [ + [ + "2065-01-07T17:28:53Z", + 3.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + }, + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "aa", + "t3": "" + }, + "values": [ + [ + "2065-01-07T17:28:52Z", + 2.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + }, + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "bb", + "t3": "" + }, + "values": [ + [ + "2065-01-07T17:28:54Z", + 4.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + } +] diff --git a/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-13.snap b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-13.snap new file mode 100644 index 00000000000..266739bb0e0 --- /dev/null +++ b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-13.snap @@ -0,0 +1,84 @@ +--- +source: influxdb3/tests/server/query.rs +description: "query: select * from bar group by *, chunked: false" +expression: values +--- +[ + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "bb" + }, + "values": [ + [ + "2065-01-07T17:28:54Z", + 4.0 + ] + ] + }, + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "aa" + }, + "values": [ + [ + "2065-01-07T17:28:52Z", + 2.0 + ] + ] + }, + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "bb" + }, + "values": [ + [ + "2065-01-07T17:28:53Z", + 3.0 + ] + ] + }, + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "aa" + }, + "values": [ + [ + "2065-01-07T17:28:51Z", + 1.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + } +] diff --git a/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-14.snap b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-14.snap new file mode 100644 index 00000000000..8fa39fee9ac --- /dev/null +++ b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-14.snap @@ -0,0 +1,111 @@ +--- +source: influxdb3/tests/server/query.rs +description: "query: select * from bar group by *, chunked: true" +expression: values +--- +[ + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "aa" + }, + "values": [ + [ + "2065-01-07T17:28:51Z", + 1.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + }, + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "bb" + }, + "values": [ + [ + "2065-01-07T17:28:53Z", + 3.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + }, + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "aa" + }, + "values": [ + [ + "2065-01-07T17:28:52Z", + 2.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + }, + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "bb" + }, + "values": [ + [ + "2065-01-07T17:28:54Z", + 4.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + } +] diff --git a/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-15.snap b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-15.snap new file mode 100644 index 00000000000..ab892c1d0d3 --- /dev/null +++ b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-15.snap @@ -0,0 +1,41 @@ +--- +source: influxdb3/tests/server/query.rs +description: "query: select * from bar group by /not_a_match/, chunked: false" +expression: values +--- +[ + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "values": [ + [ + "2065-01-07T17:28:51Z", + 1.0 + ], + [ + "2065-01-07T17:28:52Z", + 2.0 + ], + [ + "2065-01-07T17:28:53Z", + 3.0 + ], + [ + "2065-01-07T17:28:54Z", + 4.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + } +] diff --git a/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-16.snap b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-16.snap new file mode 100644 index 00000000000..e5136b02d73 --- /dev/null +++ b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-16.snap @@ -0,0 +1,41 @@ +--- +source: influxdb3/tests/server/query.rs +description: "query: select * from bar group by /not_a_match/, chunked: true" +expression: values +--- +[ + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "values": [ + [ + "2065-01-07T17:28:51Z", + 1.0 + ], + [ + "2065-01-07T17:28:52Z", + 2.0 + ], + [ + "2065-01-07T17:28:53Z", + 3.0 + ], + [ + "2065-01-07T17:28:54Z", + 4.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + } +] diff --git a/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-2.snap b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-2.snap new file mode 100644 index 00000000000..5c862a55355 --- /dev/null +++ b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-2.snap @@ -0,0 +1,71 @@ +--- +source: influxdb3/tests/server/query.rs +description: "query: select * from bar group by t1, chunked: true" +expression: values +--- +[ + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "t2", + "val" + ], + "name": "bar", + "tags": { + "t1": "a" + }, + "values": [ + [ + "2065-01-07T17:28:51Z", + "aa", + 1.0 + ], + [ + "2065-01-07T17:28:53Z", + "bb", + 3.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + }, + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "t2", + "val" + ], + "name": "bar", + "tags": { + "t1": "b" + }, + "values": [ + [ + "2065-01-07T17:28:52Z", + "aa", + 2.0 + ], + [ + "2065-01-07T17:28:54Z", + "bb", + 4.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + } +] diff --git a/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-3.snap b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-3.snap new file mode 100644 index 00000000000..f6e395adddd --- /dev/null +++ b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-3.snap @@ -0,0 +1,84 @@ +--- +source: influxdb3/tests/server/query.rs +description: "query: select * from bar group by t1, t2, chunked: false" +expression: values +--- +[ + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "bb" + }, + "values": [ + [ + "2065-01-07T17:28:54Z", + 4.0 + ] + ] + }, + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "aa" + }, + "values": [ + [ + "2065-01-07T17:28:52Z", + 2.0 + ] + ] + }, + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "bb" + }, + "values": [ + [ + "2065-01-07T17:28:53Z", + 3.0 + ] + ] + }, + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "aa" + }, + "values": [ + [ + "2065-01-07T17:28:51Z", + 1.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + } +] diff --git a/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-4.snap b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-4.snap new file mode 100644 index 00000000000..783b65cc025 --- /dev/null +++ b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-4.snap @@ -0,0 +1,111 @@ +--- +source: influxdb3/tests/server/query.rs +description: "query: select * from bar group by t1, t2, chunked: true" +expression: values +--- +[ + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "aa" + }, + "values": [ + [ + "2065-01-07T17:28:51Z", + 1.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + }, + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "bb" + }, + "values": [ + [ + "2065-01-07T17:28:53Z", + 3.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + }, + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "aa" + }, + "values": [ + [ + "2065-01-07T17:28:52Z", + 2.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + }, + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "bb" + }, + "values": [ + [ + "2065-01-07T17:28:54Z", + 4.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + } +] diff --git a/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-5.snap b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-5.snap new file mode 100644 index 00000000000..0ee623c4fc9 --- /dev/null +++ b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-5.snap @@ -0,0 +1,84 @@ +--- +source: influxdb3/tests/server/query.rs +description: "query: select * from bar group by /t/, chunked: false" +expression: values +--- +[ + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "bb" + }, + "values": [ + [ + "2065-01-07T17:28:54Z", + 4.0 + ] + ] + }, + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "aa" + }, + "values": [ + [ + "2065-01-07T17:28:52Z", + 2.0 + ] + ] + }, + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "bb" + }, + "values": [ + [ + "2065-01-07T17:28:53Z", + 3.0 + ] + ] + }, + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "aa" + }, + "values": [ + [ + "2065-01-07T17:28:51Z", + 1.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + } +] diff --git a/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-6.snap b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-6.snap new file mode 100644 index 00000000000..ea8fd5ba599 --- /dev/null +++ b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-6.snap @@ -0,0 +1,111 @@ +--- +source: influxdb3/tests/server/query.rs +description: "query: select * from bar group by /t/, chunked: true" +expression: values +--- +[ + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "aa" + }, + "values": [ + [ + "2065-01-07T17:28:51Z", + 1.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + }, + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "bb" + }, + "values": [ + [ + "2065-01-07T17:28:53Z", + 3.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + }, + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "aa" + }, + "values": [ + [ + "2065-01-07T17:28:52Z", + 2.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + }, + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "bb" + }, + "values": [ + [ + "2065-01-07T17:28:54Z", + 4.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + } +] diff --git a/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-7.snap b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-7.snap new file mode 100644 index 00000000000..8d0641f5c46 --- /dev/null +++ b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-7.snap @@ -0,0 +1,56 @@ +--- +source: influxdb3/tests/server/query.rs +description: "query: select * from bar group by /t[1]/, chunked: false" +expression: values +--- +[ + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b" + }, + "values": [ + [ + "2065-01-07T17:28:52Z", + 2.0 + ], + [ + "2065-01-07T17:28:54Z", + 4.0 + ] + ] + }, + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a" + }, + "values": [ + [ + "2065-01-07T17:28:51Z", + 1.0 + ], + [ + "2065-01-07T17:28:53Z", + 3.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + } +] diff --git a/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-8.snap b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-8.snap new file mode 100644 index 00000000000..be3ec2a5c0d --- /dev/null +++ b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-8.snap @@ -0,0 +1,65 @@ +--- +source: influxdb3/tests/server/query.rs +description: "query: select * from bar group by /t[1]/, chunked: true" +expression: values +--- +[ + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a" + }, + "values": [ + [ + "2065-01-07T17:28:51Z", + 1.0 + ], + [ + "2065-01-07T17:28:53Z", + 3.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + }, + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b" + }, + "values": [ + [ + "2065-01-07T17:28:52Z", + 2.0 + ], + [ + "2065-01-07T17:28:54Z", + 4.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + } +] diff --git a/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-9.snap b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-9.snap new file mode 100644 index 00000000000..266739bb0e0 --- /dev/null +++ b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by-9.snap @@ -0,0 +1,84 @@ +--- +source: influxdb3/tests/server/query.rs +description: "query: select * from bar group by *, chunked: false" +expression: values +--- +[ + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "bb" + }, + "values": [ + [ + "2065-01-07T17:28:54Z", + 4.0 + ] + ] + }, + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "b", + "t2": "aa" + }, + "values": [ + [ + "2065-01-07T17:28:52Z", + 2.0 + ] + ] + }, + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "bb" + }, + "values": [ + [ + "2065-01-07T17:28:53Z", + 3.0 + ] + ] + }, + { + "columns": [ + "time", + "val" + ], + "name": "bar", + "tags": { + "t1": "a", + "t2": "aa" + }, + "values": [ + [ + "2065-01-07T17:28:51Z", + 1.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + } +] diff --git a/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by.snap b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by.snap new file mode 100644 index 00000000000..2d94ee4a1c3 --- /dev/null +++ b/influxdb3/tests/server/snapshots/server__query__api_v1_query_group_by.snap @@ -0,0 +1,62 @@ +--- +source: influxdb3/tests/server/query.rs +description: "query: select * from bar group by t1, chunked: false" +expression: values +--- +[ + { + "results": [ + { + "series": [ + { + "columns": [ + "time", + "t2", + "val" + ], + "name": "bar", + "tags": { + "t1": "b" + }, + "values": [ + [ + "2065-01-07T17:28:52Z", + "aa", + 2.0 + ], + [ + "2065-01-07T17:28:54Z", + "bb", + 4.0 + ] + ] + }, + { + "columns": [ + "time", + "t2", + "val" + ], + "name": "bar", + "tags": { + "t1": "a" + }, + "values": [ + [ + "2065-01-07T17:28:51Z", + "aa", + 1.0 + ], + [ + "2065-01-07T17:28:53Z", + "bb", + 3.0 + ] + ] + } + ], + "statement_id": 0 + } + ] + } +] diff --git a/influxdb3_internal_api/Cargo.toml b/influxdb3_internal_api/Cargo.toml index e2259cf5c15..affafefcb96 100644 --- a/influxdb3_internal_api/Cargo.toml +++ b/influxdb3_internal_api/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true [dependencies] # Core Crates +influxdb_influxql_parser.workspace = true iox_query.workspace = true iox_query_params.workspace = true trace.workspace = true diff --git a/influxdb3_internal_api/src/query_executor.rs b/influxdb3_internal_api/src/query_executor.rs index 6a44966b7c2..d9fcaef53a4 100644 --- a/influxdb3_internal_api/src/query_executor.rs +++ b/influxdb3_internal_api/src/query_executor.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use datafusion::arrow::error::ArrowError; use datafusion::common::DataFusionError; use datafusion::execution::SendableRecordBatchStream; +use influxdb_influxql_parser::statement::Statement; use iox_query::query_log::QueryLogEntries; use iox_query::{QueryDatabase, QueryNamespace}; use iox_query_params::StatementParams; @@ -30,12 +31,21 @@ pub enum QueryExecutorError { #[async_trait] pub trait QueryExecutor: QueryDatabase + Debug + Send + Sync + 'static { - async fn query( + async fn query_sql( &self, database: &str, q: &str, params: Option, - kind: QueryKind, + span_ctx: Option, + external_span_ctx: Option, + ) -> Result; + + async fn query_influxql( + &self, + database_name: &str, + query_str: &str, + influxql_statement: Statement, + params: Option, span_ctx: Option, external_span_ctx: Option, ) -> Result; @@ -54,21 +64,6 @@ pub trait QueryExecutor: QueryDatabase + Debug + Send + Sync + 'static { fn upcast(&self) -> Arc<(dyn QueryDatabase + 'static)>; } -#[derive(Debug, Clone, Copy)] -pub enum QueryKind { - Sql, - InfluxQl, -} - -impl QueryKind { - pub fn query_type(&self) -> &'static str { - match self { - Self::Sql => "sql", - Self::InfluxQl => "influxql", - } - } -} - #[derive(Debug, Copy, Clone)] pub struct UnimplementedQueryExecutor; @@ -97,27 +92,34 @@ impl QueryDatabase for UnimplementedQueryExecutor { #[async_trait] impl QueryExecutor for UnimplementedQueryExecutor { - async fn query( + async fn query_sql( &self, _database: &str, _q: &str, _params: Option, - _kind: QueryKind, _span_ctx: Option, _external_span_ctx: Option, ) -> Result { - Err(QueryExecutorError::DatabaseNotFound { - db_name: "unimplemented".to_string(), - }) + Err(QueryExecutorError::MethodNotImplemented("query_sql")) + } + + async fn query_influxql( + &self, + _database_name: &str, + _query_str: &str, + _influxql_statement: Statement, + _params: Option, + _span_ctx: Option, + _external_span_ctx: Option, + ) -> Result { + Err(QueryExecutorError::MethodNotImplemented("query_influxql")) } fn show_databases( &self, _include_deleted: bool, ) -> Result { - Err(QueryExecutorError::DatabaseNotFound { - db_name: "unimplemented".to_string(), - }) + Err(QueryExecutorError::MethodNotImplemented("show_databases")) } async fn show_retention_policies( @@ -125,9 +127,9 @@ impl QueryExecutor for UnimplementedQueryExecutor { _database: Option<&str>, _span_ctx: Option, ) -> Result { - Err(QueryExecutorError::DatabaseNotFound { - db_name: "unimplemented".to_string(), - }) + Err(QueryExecutorError::MethodNotImplemented( + "show_retention_policies", + )) } fn upcast(&self) -> Arc<(dyn QueryDatabase + 'static)> { diff --git a/influxdb3_py_api/src/system_py.rs b/influxdb3_py_api/src/system_py.rs index 792c12a2a80..db2334307b4 100644 --- a/influxdb3_py_api/src/system_py.rs +++ b/influxdb3_py_api/src/system_py.rs @@ -8,7 +8,7 @@ use futures::TryStreamExt; use hashbrown::HashMap; use influxdb3_catalog::catalog::DatabaseSchema; use influxdb3_id::TableId; -use influxdb3_internal_api::query_executor::{QueryExecutor, QueryKind}; +use influxdb3_internal_api::query_executor::QueryExecutor; use influxdb3_wal::{FieldData, WriteBatch}; use iox_query_params::StatementParams; use observability_deps::tracing::{error, info, warn}; @@ -139,14 +139,7 @@ impl PyPluginCallApi { // Spawn the async task let handle = tokio::spawn(async move { let res = query_executor - .query( - db_schema_name.as_ref(), - &query, - params, - QueryKind::Sql, - None, - None, - ) + .query_sql(db_schema_name.as_ref(), &query, params, None, None) .await .map_err(|e| PyValueError::new_err(format!("Error executing query: {}", e)))?; diff --git a/influxdb3_server/Cargo.toml b/influxdb3_server/Cargo.toml index 5e3279a2133..7bbdc2f34df 100644 --- a/influxdb3_server/Cargo.toml +++ b/influxdb3_server/Cargo.toml @@ -11,6 +11,7 @@ authz.workspace = true data_types.workspace = true datafusion_util.workspace = true influxdb-line-protocol.workspace = true +influxdb_influxql_parser.workspace = true iox_catalog.workspace = true iox_http.workspace = true iox_query.workspace = true @@ -67,6 +68,7 @@ mime.workspace = true object_store.workspace = true parking_lot.workspace = true pin-project-lite.workspace = true +regex.workspace = true secrecy.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index 1f04678970a..baf80ad9ade 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -23,7 +23,7 @@ use hyper::{Body, Method, Request, Response, StatusCode}; use influxdb3_cache::distinct_cache::{self, CreateDistinctCacheArgs, MaxAge, MaxCardinality}; use influxdb3_cache::last_cache; use influxdb3_catalog::catalog::Error as CatalogError; -use influxdb3_internal_api::query_executor::{QueryExecutor, QueryExecutorError, QueryKind}; +use influxdb3_internal_api::query_executor::{QueryExecutor, QueryExecutorError}; use influxdb3_process::{INFLUXDB3_GIT_HASH_SHORT, INFLUXDB3_VERSION}; use influxdb3_processing_engine::manager::ProcessingEngineManager; use influxdb3_wal::{PluginType, TriggerSpecificationDefinition}; @@ -32,6 +32,8 @@ use influxdb3_write::write_buffer::Error as WriteBufferError; use influxdb3_write::BufferedWriteRequest; use influxdb3_write::Precision; use influxdb3_write::WriteBuffer; +use influxdb_influxql_parser::select::GroupByClause; +use influxdb_influxql_parser::statement::Statement; use iox_http::write::single_tenant::SingleTenantRequestUnifier; use iox_http::write::v1::V1_NAMESPACE_RP_SEPARATOR; use iox_http::write::{WriteParseError, WriteRequestUnifier}; @@ -547,7 +549,7 @@ where let stream = self .query_executor - .query(&database, &query_str, params, QueryKind::Sql, None, None) + .query_sql(&database, &query_str, params, None, None) .await?; Response::builder() @@ -567,7 +569,7 @@ where info!(?database, %query_str, ?format, "handling query_influxql"); - let stream = self + let (stream, _) = self .query_influxql_inner(database, &query_str, params) .await?; @@ -733,7 +735,7 @@ where database: Option, query_str: &str, params: Option, - ) -> Result { + ) -> Result<(SendableRecordBatchStream, Option)> { let mut statements = rewrite::parse_statements(query_str)?; if statements.len() != 1 { @@ -756,31 +758,29 @@ where } }; - if statement.statement().is_show_databases() { - self.query_executor.show_databases(true) - } else if statement.statement().is_show_retention_policies() { + let statement = statement.to_statement(); + let group_by = match &statement { + Statement::Select(select_statement) => select_statement.group_by.clone(), + _ => None, + }; + + let stream = if statement.is_show_databases() { + self.query_executor.show_databases(true)? + } else if statement.is_show_retention_policies() { self.query_executor .show_retention_policies(database.as_deref(), None) - .await + .await? } else { let Some(database) = database else { return Err(Error::InfluxqlNoDatabase); }; self.query_executor - .query( - &database, - // TODO - implement an interface that takes the statement directly, - // so we don't need to double down on the parsing - &statement.to_statement().to_string(), - params, - QueryKind::InfluxQl, - None, - None, - ) - .await - } - .map_err(Into::into) + .query_influxql(&database, query_str, statement, params, None, None) + .await? + }; + + Ok((stream, group_by)) } /// Create a new distinct value cache given the [`DistinctCacheCreateRequest`] arguments in the request diff --git a/influxdb3_server/src/http/v1.rs b/influxdb3_server/src/http/v1.rs index b4b915ea0da..18e00948d13 100644 --- a/influxdb3_server/src/http/v1.rs +++ b/influxdb3_server/src/http/v1.rs @@ -1,5 +1,6 @@ use std::{ collections::{HashMap, VecDeque}, + ops::{Deref, DerefMut}, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -17,15 +18,18 @@ use arrow::{ record_batch::RecordBatch, }; +use arrow_schema::{Field, SchemaRef}; use bytes::Bytes; use chrono::{format::SecondsFormat, DateTime}; use datafusion::physical_plan::SendableRecordBatchStream; use futures::{ready, stream::Fuse, Stream, StreamExt}; use hyper::http::HeaderValue; use hyper::{header::ACCEPT, header::CONTENT_TYPE, Body, Request, Response, StatusCode}; +use influxdb_influxql_parser::select::{Dimension, GroupByClause}; use iox_time::TimeProvider; use observability_deps::tracing::info; -use schema::{INFLUXQL_MEASUREMENT_COLUMN_NAME, TIME_COLUMN_NAME}; +use regex::Regex; +use schema::{InfluxColumnType, INFLUXQL_MEASUREMENT_COLUMN_NAME, TIME_COLUMN_NAME}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -73,9 +77,9 @@ where // TODO - Currently not supporting parameterized queries, see // https://github.com/influxdata/influxdb/issues/24805 - let stream = self.query_influxql_inner(database, &query, None).await?; - let stream = - QueryResponseStream::new(0, stream, chunk_size, format, epoch).map_err(QueryError)?; + let (stream, group_by) = self.query_influxql_inner(database, &query, None).await?; + let stream = QueryResponseStream::new(0, stream, chunk_size, format, epoch, group_by) + .map_err(QueryError)?; let body = Body::wrap_stream(stream); Ok(Response::builder() @@ -249,7 +253,7 @@ enum Precision { /// [`anyhow::Error`] is used as a catch-all because if anything fails during /// that process it will result in a 500 INTERNAL ERROR. #[derive(Debug, thiserror::Error)] -#[error("unexpected query error: {0}")] +#[error("unexpected query error: {0:#}")] pub struct QueryError(#[from] anyhow::Error); /// The response structure returned by the v1 query API @@ -354,6 +358,8 @@ struct StatementResponse { #[derive(Debug, Serialize)] struct Series { name: String, + #[serde(skip_serializing_if = "Option::is_none")] + tags: Option>>, columns: Vec, values: Vec, } @@ -362,14 +368,29 @@ struct Series { #[derive(Debug, Serialize)] struct Row(Vec); +impl Deref for Row { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for Row { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + /// A buffer for storing records from a stream of [`RecordBatch`]es /// /// The optional `size` indicates whether this is operating in `chunked` mode (see /// [`QueryResponseStream`]), and when specified, gives the size of chunks that will /// be emitted. +#[derive(Debug)] struct ChunkBuffer { size: Option, - series: VecDeque<(String, Vec)>, + series: VecDeque<(String, BufferGroupByTagSet, Vec)>, } impl ChunkBuffer { @@ -382,27 +403,80 @@ impl ChunkBuffer { /// Get the name of the current measurement [`Series`] being streamed fn current_measurement_name(&self) -> Option<&str> { - self.series.front().map(|(n, _)| n.as_str()) + self.series.front().map(|(n, _, _)| n.as_str()) } /// For queries that produce multiple [`Series`], this will be called when /// the current series is completed streaming fn push_next_measurement>(&mut self, name: S) { - self.series.push_front((name.into(), vec![])); + self.series.push_front((name.into(), None, vec![])); } /// Push a new [`Row`] into the current measurement [`Series`] - fn push_row(&mut self, row: Row) -> Result<(), anyhow::Error> { - self.series - .front_mut() - .context("tried to push row with no measurements buffered")? - .1 - .push(row); + /// + /// If the stream is producing tags that are part of a `GROUP BY` clause, then `group_by` should + /// hold a map of those tag keys to tag values for the given row. + fn push_row( + &mut self, + group_by: Option>>, + row: Row, + ) -> Result<(), anyhow::Error> { + let (_, tags, rows) = self + .series + .front() + .context("tried to push row with no measurements buffered")?; + + // Usually series are split on the measurement name. This functin is not concerned with + // that split, as the caller does that. However, if we are processing a query with a `GROUP BY` + // clause, then we make the decision here. If the incoming `group_by` tag key/value pairs do + // not match the those for the current row set, then we need to start a new entry in the + // `series` on the chunk buffer. + use BufferGroupByDecision::*; + let group_by_decision = match (tags, &group_by) { + (None, None) => NotAGroupBy, + (Some(tags), Some(group_by)) => { + if group_by.len() == tags.len() { + if group_by == tags { + NewRowInExistingSet + } else { + NewSet + } + } else { + bail!( + "group by columns in query result and chunk buffer are not the same size" + ); + } + } + (None, Some(_)) => { + if rows.is_empty() { + FirstRowInSeries + } else { + bail!("received inconsistent group by tags in query result"); + } + } + (Some(_), None) => bail!( + "chunk buffer expects group by tags but none were present in the query result" + ), + }; + + match group_by_decision { + NotAGroupBy | NewRowInExistingSet => self.series.front_mut().unwrap().2.push(row), + FirstRowInSeries => { + let (_, tags, rows) = self.series.front_mut().unwrap(); + *tags = group_by; + rows.push(row); + } + NewSet => { + let name = self.series.front().unwrap().0.clone(); + self.series.push_front((name, group_by, vec![row])); + } + } + Ok(()) } /// Flush a single chunk from the [`ChunkBuffer`], if possible - fn flush_one(&mut self) -> Option<(String, Vec)> { + fn flush_one(&mut self) -> Option<(String, BufferGroupByTagSet, Vec)> { if !self.can_flush() { return None; } @@ -411,23 +485,23 @@ impl ChunkBuffer { if self .series .back() - .is_some_and(|(_, rows)| rows.len() <= size) + .is_some_and(|(_, _, rows)| rows.len() <= size) { // the back series is smaller than the chunk size, so we just // pop and take the whole thing: self.series.pop_back() } else { // only drain a chunk's worth from the back series: - self.series - .back_mut() - .map(|(name, rows)| (name.to_owned(), rows.drain(..size).collect())) + self.series.back_mut().map(|(name, tags, rows)| { + (name.to_owned(), tags.clone(), rows.drain(..size).collect()) + }) } } /// The [`ChunkBuffer`] is operating in chunked mode, and can flush a chunk fn can_flush(&self) -> bool { if let (Some(size), Some(m)) = (self.size, self.series.back()) { - m.1.len() >= size || self.series.len() > 1 + m.2.len() >= size || self.series.len() > 1 } else { false } @@ -439,6 +513,24 @@ impl ChunkBuffer { } } +/// Convenience type for representing an optional map of tag name to optional tag values +type BufferGroupByTagSet = Option>>; + +/// Decide how to handle an incoming set of `GROUP BY` tag key value pairs when pushing a row into +/// the `ChunkBuffer` +enum BufferGroupByDecision { + /// The query is not using a `GROUP BY` with tags + NotAGroupBy, + /// This is the first time a row has been pushed to the series with this `GROUP BY` tag + /// key/value combination + FirstRowInSeries, + /// Still adding rows to the current set of `GROUP BY` tag key/value pairs + NewRowInExistingSet, + /// The incoming set of `GROUP BY` tag key/value pairs do not match, so we need to start a + /// new row set in the series. + NewSet, +} + /// The state of the [`QueryResponseStream`] enum State { /// The initial state of the stream; no query results have been streamed @@ -473,7 +565,7 @@ impl State { struct QueryResponseStream { buffer: ChunkBuffer, input: Fuse, - column_map: HashMap, + column_map: ColumnMap, statement_id: usize, format: QueryFormat, epoch: Option, @@ -490,22 +582,11 @@ impl QueryResponseStream { chunk_size: Option, format: QueryFormat, epoch: Option, + group_by_clause: Option, ) -> Result { - let buffer = ChunkBuffer::new(chunk_size); let schema = input.schema(); - let column_map = schema - .fields - .iter() - .map(|f| f.name().to_owned()) - .enumerate() - .flat_map(|(i, n)| { - if n != INFLUXQL_MEASUREMENT_COLUMN_NAME && i > 0 { - Some((n, i - 1)) - } else { - None - } - }) - .collect(); + let buffer = ChunkBuffer::new(chunk_size); + let column_map = ColumnMap::new(schema, group_by_clause)?; Ok(Self { buffer, column_map, @@ -543,7 +624,8 @@ impl QueryResponseStream { let schema = batch.schema(); for row_index in 0..batch.num_rows() { - let mut row = vec![Value::Null; column_map.len()]; + let mut row = vec![Value::Null; column_map.row_size()]; + let mut tags = None; for (col_index, column) in columns.iter().enumerate() { let field = schema.field(col_index); @@ -577,32 +659,43 @@ impl QueryResponseStream { cell_value = convert_ns_epoch(cell_value, precision)? } } - let col_position = column_map - .get(column_name) - .context("failed to retrieve column position")?; - row[*col_position] = cell_value; + if let Some(index) = column_map.as_row_index(column_name) { + row[index] = cell_value; + } else if column_map.is_group_by_tag(column_name) { + let tag_val = match cell_value { + Value::Null => None, + Value::String(s) => Some(s), + other => bail!( + "tag column {column_name} expected as a string or null, got {other:?}" + ), + }; + tags.get_or_insert_with(HashMap::new) + .insert(column_name.to_string(), tag_val); + } else if column_map.is_orphan_group_by_tag(column_name) { + tags.get_or_insert_with(HashMap::new) + .insert(column_name.to_string(), Some(String::default())); + } else { + bail!("failed to retrieve column position for column with name {column_name}"); + } } - self.buffer.push_row(Row(row))?; + self.buffer.push_row(tags.take(), Row(row))?; } Ok(()) } - fn columns(&self) -> Vec { - let mut columns = vec!["".to_string(); self.column_map.len()]; - self.column_map - .iter() - .for_each(|(k, i)| k.clone_into(&mut columns[*i])); - columns + fn column_names(&self) -> Vec { + self.column_map.row_column_names() } /// Flush a single chunk, or time series, when operating in chunked mode fn flush_one(&mut self) -> QueryResponse { - let columns = self.columns(); + let columns = self.column_names(); // this unwrap is okay because we only ever call flush_one // after calling can_flush on the buffer: - let (name, values) = self.buffer.flush_one().unwrap(); + let (name, tags, values) = self.buffer.flush_one().unwrap(); let series = vec![Series { name, + tags, columns, values, }]; @@ -618,13 +711,14 @@ impl QueryResponseStream { /// Flush the entire buffer fn flush_all(&mut self) -> QueryResponse { - let columns = self.columns(); + let columns = self.column_names(); let series = self .buffer .series .drain(..) - .map(|(name, values)| Series { + .map(|(name, tags, values)| Series { name, + tags, columns: columns.clone(), values, }) @@ -771,6 +865,189 @@ fn cast_column_value(column: &ArrayRef, row_index: usize) -> Result, + /// How many columns are in the `values` set, i.e., that are not `GROUP BY` tags + row_size: usize, +} + +enum ColumnType { + /// A value to be included in the `values` set, at the given `index` + Value { index: usize }, + /// A tag that is part of the `GROUP BY` clause, either explicitly, or by a regex/wildcard match + GroupByTag, + /// This is a case where a GROUP BY clause contains a field which doesn't exist in the table + /// + /// For example, + /// ```text + /// select * from foo group by t1, t2 + /// ``` + /// If `t1` is a tag in the table, but `t2` is not, nor is a field in the table, then the v1 + /// /query API response will include `t2` in the `series.[].tags` property in the results with + /// an empty string for a value (`""`). + OrphanGroupByTag, +} + +impl ColumnMap { + /// Create a new `ColumnMap` + fn new( + schema: SchemaRef, + group_by_clause: Option, + ) -> Result { + let mut map = HashMap::new(); + let group_by = if let Some(clause) = group_by_clause { + GroupByEval::from_clause(clause)? + } else { + None + }; + let mut index = 0; + for field in schema + .fields() + .into_iter() + .filter(|f| f.name() != INFLUXQL_MEASUREMENT_COLUMN_NAME) + { + if group_by + .as_ref() + .is_some_and(|gb| is_tag(field) && gb.evaluate_tag(field.name())) + { + map.insert(field.name().to_string(), ColumnType::GroupByTag); + } else if group_by.as_ref().is_some_and(|gb| { + field.metadata().is_empty() && gb.contains_explicit_col_name(field.name()) + }) { + map.insert(field.name().to_string(), ColumnType::OrphanGroupByTag); + } else { + map.insert(field.name().to_string(), ColumnType::Value { index }); + index += 1; + } + } + Ok(Self { + map, + row_size: index, + }) + } + + fn row_size(&self) -> usize { + self.row_size + } + + fn row_column_names(&self) -> Vec { + let mut result = vec![None; self.row_size]; + self.map.iter().for_each(|(name, c)| { + if let ColumnType::Value { index } = c { + result[*index].replace(name.to_owned()); + } + }); + result.into_iter().flatten().collect() + } + + /// If this column is part of the `values` row data, get its index, or `None` otherwise + fn as_row_index(&self, column_name: &str) -> Option { + self.map.get(column_name).and_then(|col| match col { + ColumnType::Value { index } => Some(*index), + ColumnType::GroupByTag | ColumnType::OrphanGroupByTag => None, + }) + } + + /// This column is a `GROUP BY` tag + fn is_group_by_tag(&self, column_name: &str) -> bool { + self.map + .get(column_name) + .is_some_and(|col| matches!(col, ColumnType::GroupByTag)) + } + + /// This column is an orphan `GROUP BY` tag + fn is_orphan_group_by_tag(&self, column_name: &str) -> bool { + self.map + .get(column_name) + .is_some_and(|col| matches!(col, ColumnType::OrphanGroupByTag)) + } +} + +// TODO: this is defined in schema crate, so needs to be made pub there: +const COLUMN_METADATA_KEY: &str = "iox::column::type"; + +/// Decide based on metadata if this [`Field`] is a tag column +fn is_tag(field: &Arc) -> bool { + field + .metadata() + .get(COLUMN_METADATA_KEY) + .map(|s| InfluxColumnType::try_from(s.as_str())) + .transpose() + .ok() + .flatten() + .is_some_and(|t| matches!(t, InfluxColumnType::Tag)) +} + +/// Derived from a [`GroupByClause`] and used to evaluate whether a given tag column is part of the +/// `GROUP BY` clause in an InfluxQL query +struct GroupByEval(Vec); + +/// The kind of `GROUP BY` evaluator +enum GroupByEvalType { + /// An explicit tag name in a `GROUP BY` clause, e.g., `GROUP BY t1, t2` + Tag(String), + /// A regex in a `GROUP BY` that could match 0-or-more tags, e.g., `GROUP BY /t[1,2]/` + Regex(Regex), + /// A wildcard that matches all tags, e.g., `GROUP BY *` + Wildcard, +} + +impl GroupByEval { + /// Convert a [`GroupByClause`] to a [`GroupByEval`] if any of its members match on tag columns + /// + /// This will produce an error if an invalid regex is provided as one of the `GROUP BY` clauses. + /// That will likely be caught upstream during query parsing, but handle it here anyway. + fn from_clause(clause: GroupByClause) -> Result, anyhow::Error> { + let v = clause + .iter() + .filter_map(|dim| match dim { + Dimension::Time(_) => None, + Dimension::VarRef(tag) => Some(Ok(GroupByEvalType::Tag(tag.to_string()))), + Dimension::Regex(regex) => Some( + Regex::new(regex.as_str()) + .map(GroupByEvalType::Regex) + .context("invalid regex in group by clause"), + ), + Dimension::Wildcard => Some(Ok(GroupByEvalType::Wildcard)), + }) + .collect::, anyhow::Error>>()?; + + if v.is_empty() { + Ok(None) + } else { + Ok(Some(Self(v))) + } + } + + /// Check if a tag is matched by this set of `GROUP BY` clauses + fn evaluate_tag(&self, tag_name: &str) -> bool { + self.0.iter().any(|eval| eval.test(tag_name)) + } + + /// Check if the tag name is included explicitly in the `GROUP BY` clause. + /// + /// This is for determining orphan `GROUP BY` tag columns. + fn contains_explicit_col_name(&self, col_name: &str) -> bool { + self.0.iter().any(|eval| match eval { + GroupByEvalType::Tag(t) => t == col_name, + _ => false, + }) + } +} + +impl GroupByEvalType { + /// Test the given `tag_name` agains this evaluator + fn test(&self, tag_name: &str) -> bool { + match self { + Self::Tag(t) => t == tag_name, + Self::Regex(regex) => regex.is_match(tag_name), + Self::Wildcard => true, + } + } +} + impl Stream for QueryResponseStream { type Item = Result; @@ -808,6 +1085,7 @@ impl Stream for QueryResponseStream { // this is why the input stream is fused, because we will end up // polling the input stream again if we end up here. Poll::Ready(Some(Ok(self.flush_all()))) + // Poll::Ready(None) } else if self.state.is_initialized() { // we are still in an initialized state, which means no records were buffered // and therefore we need to emit an empty result set before ending the stream: diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index 493350431e1..3d578ff2779 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -21,10 +21,11 @@ use datafusion_util::MemoryStream; use influxdb3_cache::distinct_cache::{DistinctCacheFunction, DISTINCT_CACHE_UDTF_NAME}; use influxdb3_cache::last_cache::{LastCacheFunction, LAST_CACHE_UDTF_NAME}; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; -use influxdb3_internal_api::query_executor::{QueryExecutor, QueryExecutorError, QueryKind}; +use influxdb3_internal_api::query_executor::{QueryExecutor, QueryExecutorError}; use influxdb3_sys_events::SysEventStore; use influxdb3_telemetry::store::TelemetryStore; use influxdb3_write::WriteBuffer; +use influxdb_influxql_parser::statement::Statement; use iox_query::exec::{Executor, IOxSessionContext, QueryConfig}; use iox_query::provider::ProviderBuilder; use iox_query::query_log::QueryLog; @@ -109,35 +110,66 @@ impl QueryExecutorImpl { sys_events_store, } } + + async fn get_db_namespace( + &self, + database_name: &str, + span_ctx: &Option, + ) -> Result, QueryExecutorError> { + self.namespace( + database_name, + span_ctx.child_span("get_db_namespace"), + false, + ) + .await + .map_err(|_| QueryExecutorError::DatabaseNotFound { + db_name: database_name.to_string(), + })? + .ok_or_else(|| QueryExecutorError::DatabaseNotFound { + db_name: database_name.to_string(), + }) + } } #[async_trait] impl QueryExecutor for QueryExecutorImpl { - async fn query( + async fn query_sql( + &self, + database: &str, + query: &str, + params: Option, + span_ctx: Option, + external_span_ctx: Option, + ) -> Result { + info!(%database, %query, ?params, "executing sql query"); + let db = self.get_db_namespace(database, &span_ctx).await?; + query_database_sql( + db, + query, + params, + span_ctx, + external_span_ctx, + Arc::clone(&self.telemetry_store), + ) + .await + } + + async fn query_influxql( &self, database: &str, query: &str, + influxql_statement: Statement, params: Option, - kind: QueryKind, span_ctx: Option, external_span_ctx: Option, ) -> Result { - info!(%database, %query, ?params, ?kind, "QueryExecutorImpl as QueryExecutor::query"); - let db = self - .namespace(database, span_ctx.child_span("get database"), false) - .await - .map_err(|_| QueryExecutorError::DatabaseNotFound { - db_name: database.to_string(), - })? - .ok_or_else(|| QueryExecutorError::DatabaseNotFound { - db_name: database.to_string(), - })?; - - query_database( + info!(database, query, ?params, "executing influxql query"); + let db = self.get_db_namespace(database, &span_ctx).await?; + query_database_influxql( db, query, + influxql_statement, params, - kind, span_ctx, external_span_ctx, Arc::clone(&self.telemetry_store), @@ -232,11 +264,12 @@ impl QueryExecutor for QueryExecutorImpl { } } -async fn query_database( +// NOTE: this method is separated out as it is called from a separate query executor +// implementation in Enterprise +async fn query_database_sql( db: Arc, query: &str, params: Option, - kind: QueryKind, span_ctx: Option, external_span_ctx: Option, telemetry_store: Arc, @@ -245,7 +278,7 @@ async fn query_database( let token = db.record_query( external_span_ctx.as_ref().map(RequestLogContext::ctx), - kind.query_type(), + "sql", Box::new(query.to_string()), params.clone(), ); @@ -258,12 +291,7 @@ async fn query_database( // Perform query planning on a separate threadpool than the IO runtime that is servicing // this request by using `IOxSessionContext::run`. let plan = ctx - .run(async move { - match kind { - QueryKind::Sql => planner.sql(query, params).await, - QueryKind::InfluxQl => planner.influxql(query, params).await, - } - }) + .run(async move { planner.sql(query, params).await }) .await; let plan = match plan.map_err(QueryExecutorError::QueryPlanning) { @@ -292,6 +320,55 @@ async fn query_database( } } +async fn query_database_influxql( + db: Arc, + query_str: &str, + statement: Statement, + params: Option, + span_ctx: Option, + external_span_ctx: Option, + telemetry_store: Arc, +) -> Result { + let params = params.unwrap_or_default(); + let token = db.record_query( + external_span_ctx.as_ref().map(RequestLogContext::ctx), + "influxql", + Box::new(query_str.to_string()), + params.clone(), + ); + + let ctx = db.new_query_context(span_ctx, Default::default()); + let planner = Planner::new(&ctx); + let plan = ctx + .run(async move { planner.influxql(statement, params).await }) + .await; + + let plan = match plan.map_err(QueryExecutorError::QueryPlanning) { + Ok(plan) => plan, + Err(e) => { + token.fail(); + return Err(e); + } + }; + + let token = token.planned(&ctx, Arc::clone(&plan)); + + let token = token.permit(); + + telemetry_store.update_num_queries(); + + match ctx.execute_stream(Arc::clone(&plan)).await { + Ok(query_results) => { + token.success(); + Ok(query_results) + } + Err(err) => { + token.fail(); + Err(QueryExecutorError::ExecuteStream(err)) + } + } +} + #[derive(Debug)] struct RetentionPolicyRow { database: String, @@ -682,7 +759,7 @@ mod tests { parquet_cache::test_cached_obj_store_and_oracle, }; use influxdb3_catalog::catalog::Catalog; - use influxdb3_internal_api::query_executor::{QueryExecutor, QueryKind}; + use influxdb3_internal_api::query_executor::QueryExecutor; use influxdb3_sys_events::SysEventStore; use influxdb3_telemetry::store::TelemetryStore; use influxdb3_wal::{Gen1Duration, WalConfig}; @@ -895,7 +972,7 @@ mod tests { for t in test_cases { let batch_stream = query_executor - .query(db_name, t.query, None, QueryKind::Sql, None, None) + .query_sql(db_name, t.query, None, None, None) .await .unwrap(); let batches: Vec = batch_stream.try_collect().await.unwrap(); diff --git a/influxdb3_server/src/query_planner.rs b/influxdb3_server/src/query_planner.rs index 1f068fa38c6..1f39c88c758 100644 --- a/influxdb3_server/src/query_planner.rs +++ b/influxdb3_server/src/query_planner.rs @@ -1,6 +1,15 @@ -use std::sync::Arc; +use std::{any::Any, sync::Arc}; -use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan}; +use arrow_schema::SchemaRef; +use datafusion::{ + error::DataFusionError, + execution::{SendableRecordBatchStream, TaskContext}, + physical_expr::EquivalenceProperties, + physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, + }, +}; +use influxdb_influxql_parser::statement::Statement; use iox_query::{exec::IOxSessionContext, frontend::sql::SqlQueryPlanner}; use iox_query_influxql::frontend::planner::InfluxQLQueryPlanner; use iox_query_params::StatementParams; @@ -41,12 +50,119 @@ impl Planner { /// Plan an InfluxQL query and return a DataFusion physical plan pub(crate) async fn influxql( &self, - query: impl AsRef + Send, + statement: Statement, params: impl Into + Send, ) -> Result> { - let query = query.as_ref(); let ctx = self.ctx.child_ctx("rest_api_query_planner_influxql"); - InfluxQLQueryPlanner::query(query, params, &ctx).await + let logical_plan = InfluxQLQueryPlanner::statement_to_plan(statement, params, &ctx).await?; + let input = ctx.create_physical_plan(&logical_plan).await?; + let input_schema = input.schema(); + let mut md = input_schema.metadata().clone(); + md.extend(logical_plan.schema().metadata().clone()); + let schema = Arc::new(arrow::datatypes::Schema::new_with_metadata( + input_schema.fields().clone(), + md, + )); + + Ok(Arc::new(SchemaExec::new(input, schema))) + } +} + +// NOTE: the below code is currently copied from IOx and needs to be made pub so we can +// re-use it. + +/// A physical operator that overrides the `schema` API, +/// to return an amended version owned by `SchemaExec`. The +/// principal use case is to add additional metadata to the schema. +struct SchemaExec { + input: Arc, + schema: SchemaRef, + + /// Cache holding plan properties like equivalences, output partitioning, output ordering etc. + cache: PlanProperties, +} + +impl SchemaExec { + fn new(input: Arc, schema: SchemaRef) -> Self { + let cache = Self::compute_properties(&input, Arc::clone(&schema)); + Self { + input, + schema, + cache, + } + } + + /// This function creates the cache object that stores the plan properties such as equivalence properties, partitioning, ordering, etc. + fn compute_properties(input: &Arc, schema: SchemaRef) -> PlanProperties { + let eq_properties = match input.properties().output_ordering() { + None => EquivalenceProperties::new(schema), + Some(output_odering) => { + EquivalenceProperties::new_with_orderings(schema, &[output_odering.to_vec()]) + } + }; + + let output_partitioning = input.output_partitioning().clone(); + + PlanProperties::new(eq_properties, output_partitioning, input.execution_mode()) + } +} + +impl std::fmt::Debug for SchemaExec { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.fmt_as(DisplayFormatType::Default, f) + } +} + +impl ExecutionPlan for SchemaExec { + fn name(&self) -> &str { + Self::static_name() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + unimplemented!() + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + self.input.execute(partition, context) + } + + fn statistics(&self) -> Result { + Ok(datafusion::physical_plan::Statistics::new_unknown( + &self.schema(), + )) + } +} + +impl DisplayAs for SchemaExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "SchemaExec") + } + } } }