From 9b7dfbd3608a05046d06edf5b52af4b3f0a36830 Mon Sep 17 00:00:00 2001 From: Jesse White Date: Wed, 8 May 2024 12:13:47 -0400 Subject: [PATCH] d1 metrics --- README.md | 8 ++ features/step_definitions/cf_mock_server.ts | 10 +- .../step_definitions/d1_query_response.json | 1 + gql/d1.graphql | 29 ++++ gql/queries.graphql | 116 ++++++++++++---- gql/workers.graphql | 33 +++++ src/gql.rs | 124 ++++++++++++++++-- src/lib.rs | 70 +++++++--- 8 files changed, 331 insertions(+), 60 deletions(-) create mode 100644 features/step_definitions/d1_query_response.json create mode 100644 gql/d1.graphql create mode 100644 gql/workers.graphql diff --git a/README.md b/README.md index 6b1846d..b694de3 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,14 @@ This worker was written to push Cloudflare Analytics data to an OpenTelemetry co It is inspired by the [cloudflare-exporter](https://github.com/lablabs/cloudflare-exporter), which is unfortunately no longer maintained. By running it as a worker and pushing metrics, we avoid the need to deploy a dedicated container and allow the worker to be run on [green compute](https://blog.cloudflare.com/announcing-green-compute). +## Metrics currently support + +- [x] Workers +- [x] D1 +- [ ] Queues +- [ ] Durable Objects +- [ ] Zones + ## Usage * Clone the repo diff --git a/features/step_definitions/cf_mock_server.ts b/features/step_definitions/cf_mock_server.ts index f55b8a0..2b6ed99 100644 --- a/features/step_definitions/cf_mock_server.ts +++ b/features/step_definitions/cf_mock_server.ts @@ -7,7 +7,8 @@ export class CloudflareMockServer { start() { let self = this; - const fileContents = fs.readFileSync('./features/step_definitions/worker_query_response.json').toString() + const workerQuery = fs.readFileSync('./features/step_definitions/worker_query_response.json').toString(); + const d1Query = fs.readFileSync('./features/step_definitions/d1_query_response.json').toString(); this.server = http.createServer((req, res) => { var body = ""; req.on('readable', function() { @@ -16,10 +17,15 @@ export class CloudflareMockServer { body += part; } }); + req.on('end', function() { res.statusCode = 200; res.setHeader('Content-Type', 'application/json'); - res.end(fileContents); + if (body.indexOf('d1AnalyticsAdaptiveGroups') > -1) { + res.end(d1Query); + } else { + res.end(workerQuery); + } }); }); this.server.listen(() => { diff --git a/features/step_definitions/d1_query_response.json b/features/step_definitions/d1_query_response.json new file mode 100644 index 0000000..29906dd --- /dev/null +++ b/features/step_definitions/d1_query_response.json @@ -0,0 +1 @@ +{"data":{"viewer":{"accounts":[{"d1AnalyticsAdaptiveGroups":[]}]}},"errors":null} \ No newline at end of file diff --git a/gql/d1.graphql b/gql/d1.graphql new file mode 100644 index 0000000..d180d24 --- /dev/null +++ b/gql/d1.graphql @@ -0,0 +1,29 @@ +query GetD1AnalyticsQuery($accountTag: string!, $datetimeStart: Time, $datetimeEnd: Time, $limit: Int!) { + viewer { + accounts(filter: {accountTag: $accountTag}) { + d1AnalyticsAdaptiveGroups(limit: $limit, filter: { + datetimeMinute_geq: $datetimeStart, + datetimeMinute_lt: $datetimeEnd + }) { + dimensions { + databaseId + datetimeMinute + } + + sum { + readQueries + rowsRead + rowsWritten + writeQueries + } + + quantiles { + queryBatchResponseBytesP50 + queryBatchResponseBytesP90 + queryBatchTimeMsP50 + queryBatchTimeMsP90 + } + } + } + } +} \ No newline at end of file diff --git a/gql/queries.graphql b/gql/queries.graphql index 8d57c1b..28e4e98 100644 --- a/gql/queries.graphql +++ b/gql/queries.graphql @@ -1,33 +1,91 @@ -query GetWorkersAnalyticsQuery($accountTag: string, $datetimeStart: string, $datetimeEnd: string, $limit: Int!) { - viewer { - accounts(filter: {accountTag: $accountTag}) { - workersInvocationsAdaptive(limit: $limit, filter: { - datetime_geq: $datetimeStart, - datetime_lt: $datetimeEnd - }) { - dimensions { - scriptName - status - datetime - } - - sum { - requests - errors - duration - } - - quantiles { - cpuTimeP50 - cpuTimeP75 - cpuTimeP99 - cpuTimeP999 - durationP50 - durationP75 - durationP99 - durationP999 - } + + +query GetDurableObjectsAnalyticsQuery($accountTag: string!, $datetimeStart: Time, $datetimeEnd: Time, $limit: Int!) { + viewer { + accounts(filter: {accountTag: $accountTag}) { + durableObjectsInvocationsAdaptiveGroups(limit: $limit, filter: { + datetimeMinute_geq: $datetimeStart, + datetimeMinute_lt: $datetimeEnd + }) { + dimensions { + scriptName + datetimeMinute + } + + sum { + errors + requests + responseBodySize + wallTime + } + + quantiles { + responseBodySizeP25 + responseBodySizeP50 + responseBodySizeP75 + responseBodySizeP90 + responseBodySizeP99 + responseBodySizeP999 + wallTimeP25 + wallTimeP50 + wallTimeP75 + wallTimeP90 + wallTimeP99 + wallTimeP999 + } + } + } + } +} + +query GetQueueAnalyticsQuery($accountTag: string!, $datetimeStart: Time, $datetimeEnd: Time, $limit: Int!) { + viewer { + accounts(filter: {accountTag: $accountTag}) { + queueBacklogAdaptiveGroups(limit: $limit, filter: { + datetimeMinute_geq: $datetimeStart, + datetimeMinute_lt: $datetimeEnd + }) { + dimensions { + queueId + datetimeMinute + } + + avg { + bytes + messages + sampleInterval + } + } + + queueConsumerMetricsAdaptiveGroups(limit: $limit, filter: { + datetimeMinute_geq: $datetimeStart, + datetimeMinute_lt: $datetimeEnd + }) { + dimensions { + queueId + datetimeMinute + } + + avg { + concurrency + sampleInterval + } + } + + queueMessageOperationsAdaptiveGroups(limit: $limit, filter: { + datetimeMinute_geq: $datetimeStart, + datetimeMinute_lt: $datetimeEnd + }) { + dimensions { + queueId + datetimeMinute + } + + avg { + lagTime + retryCount } } } } +} diff --git a/gql/workers.graphql b/gql/workers.graphql new file mode 100644 index 0000000..8740fae --- /dev/null +++ b/gql/workers.graphql @@ -0,0 +1,33 @@ +query GetWorkersAnalyticsQuery($accountTag: string!, $datetimeStart: Time, $datetimeEnd: Time, $limit: Int!) { + viewer { + accounts(filter: {accountTag: $accountTag}) { + workersInvocationsAdaptive(limit: $limit, filter: { + datetime_geq: $datetimeStart, + datetime_lt: $datetimeEnd + }) { + dimensions { + scriptName + status + datetime + } + + sum { + requests + errors + duration + } + + quantiles { + cpuTimeP50 + cpuTimeP75 + cpuTimeP99 + cpuTimeP999 + durationP50 + durationP75 + durationP99 + durationP999 + } + } + } + } +} diff --git a/src/gql.rs b/src/gql.rs index a5a7d6f..b7b58eb 100644 --- a/src/gql.rs +++ b/src/gql.rs @@ -11,12 +11,40 @@ use worker::console_log; // Both json and the GraphQL schema language are supported as sources for the schema #[derive(GraphQLQuery)] #[graphql( -schema_path = "gql/schema.graphql", -query_path = "gql/queries.graphql", -response_derives = "Debug,Clone" + schema_path = "gql/schema.graphql", + query_path = "gql/workers.graphql", + variables_derives = "Debug", + response_derives = "Debug,Clone" )] pub struct GetWorkersAnalyticsQuery; +#[derive(GraphQLQuery)] +#[graphql( + schema_path = "gql/schema.graphql", + query_path = "gql/d1.graphql", + variables_derives = "Debug", + response_derives = "Debug,Clone" +)] +pub struct GetD1AnalyticsQuery; + +// #[derive(GraphQLQuery)] +// #[graphql( +// schema_path = "gql/schema.graphql", +// query_path = "gql/queries.graphql", +// variables_derives = "Debug", +// response_derives = "Debug,Clone" +// )] +// pub struct GetDurableObjectsAnalyticsQuery; +// +// #[derive(GraphQLQuery)] +// #[graphql( +// schema_path = "gql/schema.graphql", +// query_path = "gql/queries.graphql", +// variables_derives = "Debug", +// response_derives = "Debug,Clone" +// )] +// pub struct GetQueueAnalyticsQuery; + #[allow(non_camel_case_types)] type float32 = f32; @@ -29,11 +57,15 @@ type Time = String; #[allow(non_camel_case_types)] type uint64 = u64; +#[allow(non_camel_case_types)] +type uint32 = u32; + #[allow(non_camel_case_types)] type float64 = f64; -pub async fn perform_my_query(cloudflare_api_url: String, cloudflare_api_key: String, variables: get_workers_analytics_query::Variables) -> Result, Box> { +pub async fn do_get_workers_analytics_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_workers_analytics_query::Variables) -> Result, Box> { let request_body = GetWorkersAnalyticsQuery::build_query(variables); + //console_log!("request_body: {:?}", request_body); let client = reqwest::Client::new(); let res = client.post(cloudflare_api_url) .bearer_auth(cloudflare_api_key) @@ -52,19 +84,19 @@ pub async fn perform_my_query(cloudflare_api_url: String, cloudflare_api_key: St let response_data: get_workers_analytics_query::ResponseData = response_body.data.expect("missing response data"); let registry = Registry::new(); - let worker_requests_opts = Opts::new("cloudflare_worker_requests", "number of requests to the worker"); + let worker_requests_opts = Opts::new("cloudflare_worker_requests", "Sum of Requests"); let worker_requests = CounterVec::new(worker_requests_opts, &["script_name"]).unwrap(); registry.register(Box::new(worker_requests.clone())).unwrap(); - let worker_errors_opts = Opts::new("cloudflare_worker_errors", "number of failed requests to the worker"); + let worker_errors_opts = Opts::new("cloudflare_worker_errors", "Sum of Errors"); let worker_errors = CounterVec::new(worker_errors_opts, &["script_name"]).unwrap(); registry.register(Box::new(worker_errors.clone())).unwrap(); - let worker_cpu_time_opts = Opts::new("cloudflare_worker_cpu_time", "cpu time processing request"); + let worker_cpu_time_opts = Opts::new("cloudflare_worker_cpu_time", "CPU time - microseconds"); let worker_cpu_time = GaugeVec::new(worker_cpu_time_opts, &["script_name", "quantile"]).unwrap(); registry.register(Box::new(worker_cpu_time.clone())).unwrap(); - let worker_duration_opts = Opts::new("cloudflare_worker_duration", "wall clock time processing request"); + let worker_duration_opts = Opts::new("cloudflare_worker_duration", "Duration - GB*s"); let worker_duration = GaugeVec::new(worker_duration_opts, &["script_name", "quantile"]).unwrap(); registry.register(Box::new(worker_duration.clone())).unwrap(); @@ -100,6 +132,82 @@ pub async fn perform_my_query(cloudflare_api_url: String, cloudflare_api_key: St Ok(prometheus_registry_to_opentelemetry_metrics(registry, timestamp)) } +pub async fn do_get_d1_analytics_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_d1_analytics_query::Variables) -> Result, Box> { + let request_body = GetD1AnalyticsQuery::build_query(variables); + //console_log!("request_body: {:?}", request_body); + let client = reqwest::Client::new(); + let res = client.post(cloudflare_api_url) + .bearer_auth(cloudflare_api_key) + .json(&request_body).send().await?; + + if !res.status().is_success() { + console_log!("GraphQL query failed: {:?}", res.status()); + return Err(Box::new(res.error_for_status().unwrap_err())); + } + + let response_body: Response = res.json().await?; + if response_body.errors.is_some() { + console_log!("GraphQL query failed: {:?}", response_body.errors); + return Err(Box::new(worker::Error::JsError("graphql".parse().unwrap()))); + } + let response_data: get_d1_analytics_query::ResponseData = response_body.data.expect("missing response data"); + + let registry = Registry::new(); + let d1_read_queries_opts = Opts::new("cloudflare_d1_read_queries", "The number of read queries."); + let d1_read_queries = CounterVec::new(d1_read_queries_opts, &["database_id"]).unwrap(); + registry.register(Box::new(d1_read_queries.clone())).unwrap(); + + let d1_rows_read_opts = Opts::new("cloudflare_d1_rows_read", "The number of rows your queries read."); + let d1_rows_read = CounterVec::new(d1_rows_read_opts, &["database_id"]).unwrap(); + registry.register(Box::new(d1_rows_read.clone())).unwrap(); + + let d1_rows_written_opts = Opts::new("cloudflare_d1_rows_written", "The number of rows your queries wrote."); + let d1_rows_written = CounterVec::new(d1_rows_written_opts, &["database_id"]).unwrap(); + registry.register(Box::new(d1_rows_written.clone())).unwrap(); + + let d1_write_queries_opts = Opts::new("cloudflare_d1_write_queries", "The number of write queries."); + let d1_write_queries = CounterVec::new(d1_write_queries_opts, &["database_id"]).unwrap(); + registry.register(Box::new(d1_write_queries.clone())).unwrap(); + + let d1_query_batch_response_bytes_opts = Opts::new("cloudflare_d1_query_batch_response_bytes", "The total number of bytes in the response, including all returned rows and metadata."); + let d1_query_batch_response_bytes = GaugeVec::new(d1_query_batch_response_bytes_opts, &["database_id", "quantile"]).unwrap(); + registry.register(Box::new(d1_query_batch_response_bytes.clone())).unwrap(); + + let d1_query_batch_time_ms_opts = Opts::new("cloudflare_d1_query_batch_time_ms", "Query batch response time in milliseconds."); + let d1_query_batch_time_ms = GaugeVec::new(d1_query_batch_time_ms_opts, &["database_id", "quantile"]).unwrap(); + registry.register(Box::new(d1_query_batch_time_ms.clone())).unwrap(); + + let mut last_datetime: Option