Skip to content

Commit

Permalink
feat: support v1 query API GROUP BY semantics
Browse files Browse the repository at this point in the history
This updates the v1 /query API hanlder to handle InfluxDB v1's unique
query response structure when GROUP BY clauses are provided.

The distinction is in the addition of a "tags" field to the emitted series
data that contains a map of the GROUP BY tags along with their distinct
values associated with the data in the "values" field.

This required splitting the QueryExecutor into two query paths for InfluxQL
and SQL, as this allowed for handling InfluxQL query parsing in advance
of query planning.

A set of snapshot tests were added to check that it all works.
  • Loading branch information
hiltontj committed Jan 16, 2025
1 parent 26bf912 commit 9ca6f0b
Show file tree
Hide file tree
Showing 28 changed files with 2,000 additions and 150 deletions.
11 changes: 7 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ datafusion-proto = { git = "https://github.com/influxdata/arrow-datafusion.git",
dashmap = "6.1.0"
dotenvy = "0.15.7"
flate2 = "1.0.27"
futures = "0.3.28"
futures-util = "0.3.30"
futures = "0.3.31"
futures-util = "0.3.31"
hashbrown = { version = "0.15.1", features = ["serde"] }
hex = "0.4.3"
http = "0.2.9"
Expand Down Expand Up @@ -101,6 +101,7 @@ prost-build = "0.12.6"
prost-types = "0.12.6"
proptest = { version = "1", default-features = false, features = ["std"] }
rand = "0.8.5"
regex = "1.11.1"
reqwest = { version = "0.11.27", default-features = false, features = ["rustls-tls", "stream", "json"] }
secrecy = "0.8.0"
serde = { version = "1.0", features = ["derive"] }
Expand All @@ -117,8 +118,8 @@ sysinfo = "0.30.8"
tempfile = "3.14.0"
test-log = { version = "0.2.16", features = ["trace"] }
thiserror = "1.0"
tokio = { version = "1.42", features = ["full"] }
tokio-util = "0.7.9"
tokio = { version = "1.43", features = ["full"] }
tokio-util = "0.7.13"
tonic = { version = "0.11.0", features = ["tls", "tls-roots"] }
tonic-build = "0.11.0"
tonic-health = "0.11.0"
Expand Down
2 changes: 1 addition & 1 deletion influxdb3/tests/server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl TestServer {
// Use the TEST_LOG env var to determine if logs are emitted from the spawned process
let emit_logs = if std::env::var("TEST_LOG").is_ok() {
// use "info" filter, as would be used in production:
command.env("LOG_FILTER", "info");
command.env("LOG_FILTER", "debug");
true
} else {
false
Expand Down
58 changes: 58 additions & 0 deletions influxdb3/tests/server/query.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use core::str;

use crate::TestServer;
use futures::StreamExt;
use hyper::StatusCode;
Expand Down Expand Up @@ -1582,6 +1584,62 @@ async fn api_v1_query_uri_and_body() {
}
}

#[tokio::test]
async fn api_v1_query_group_by() {
let server = TestServer::spawn().await;

server
.write_lp_to_db(
"foo",
"\
bar,t1=a,t2=aa val=1 2998574931\n\
bar,t1=b,t2=aa val=2 2998574932\n\
bar,t1=a,t2=bb val=3 2998574933\n\
bar,t1=b,t2=bb val=4 2998574934",
Precision::Second,
)
.await
.unwrap();

for (chunked, query) in [
(false, "select * from bar group by t1"),
(true, "select * from bar group by t1"),
(false, "select * from bar group by t1, t2"),
(true, "select * from bar group by t1, t2"),
(false, "select * from bar group by /t/"),
(true, "select * from bar group by /t/"),
(false, "select * from bar group by /t[1]/"),
(true, "select * from bar group by /t[1]/"),
(false, "select * from bar group by /t[1,2]/"),
(true, "select * from bar group by /t[1,2]/"),
(false, "select * from bar group by t1, t2, t3"),
(true, "select * from bar group by t1, t2, t3"),
(false, "select * from bar group by *"),
(true, "select * from bar group by *"),
(false, "select * from bar group by /not_a_match/"),
(true, "select * from bar group by /not_a_match/"),
] {
let params = vec![
("db", "foo"),
("q", query),
("chunked", if chunked { "true" } else { "false" }),
];
let stream = server.api_v1_query(&params, None).await.bytes_stream();
let values = stream
.map(|chunk| serde_json::from_slice(&chunk.unwrap()).unwrap())
.collect::<Vec<Value>>()
.await;
// Use a snapshot to assert on the output structure. This deserializes each emitted line as
// as JSON first, then combines and collects them into a Vec<Value> to serialize into a JSON
// array for the snapshot.
insta::with_settings!({
description => format!("query: {query}, chunked: {chunked}"),
}, {
insta::assert_json_snapshot!(values);
});
}
}

#[tokio::test]
async fn api_v3_query_sql_distinct_cache() {
let server = TestServer::spawn().await;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by /t[1,2]/, chunked: true"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "aa"
},
"values": [
[
"2065-01-07T17:28:51Z",
1.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "bb"
},
"values": [
[
"2065-01-07T17:28:53Z",
3.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "aa"
},
"values": [
[
"2065-01-07T17:28:52Z",
2.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "bb"
},
"values": [
[
"2065-01-07T17:28:54Z",
4.0
]
]
}
],
"statement_id": 0
}
]
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by t1, t2, t3, chunked: false"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "bb",
"t3": ""
},
"values": [
[
"2065-01-07T17:28:54Z",
4.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "aa",
"t3": ""
},
"values": [
[
"2065-01-07T17:28:52Z",
2.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "bb",
"t3": ""
},
"values": [
[
"2065-01-07T17:28:53Z",
3.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "aa",
"t3": ""
},
"values": [
[
"2065-01-07T17:28:51Z",
1.0
]
]
}
],
"statement_id": 0
}
]
}
]
Loading

0 comments on commit 9ca6f0b

Please sign in to comment.