diff --git a/features/step_definitions/cf_mock_server.ts b/features/step_definitions/cf_mock_server.ts index 57cd653..13a7ee0 100644 --- a/features/step_definitions/cf_mock_server.ts +++ b/features/step_definitions/cf_mock_server.ts @@ -29,6 +29,8 @@ export class CloudflareMockServer { res.end(durableObjectsQuery); } else if (body.indexOf('queueBacklogAdaptiveGroups') > -1) { res.end(queueBacklogQuery); + } else if (body.indexOf('queueMessageOperationsAdaptiveGroups') > -1) { + res.end("{\"data\":{\"viewer\":{\"accounts\":[{\"queueMessageOperationsAdaptiveGroups\":[]}]}},\"errors\":null}"); } else { res.end(workerQuery); } diff --git a/gql/queries.graphql b/gql/queries.graphql index 60fa633..409c1fa 100644 --- a/gql/queries.graphql +++ b/gql/queries.graphql @@ -1,22 +1,6 @@ query GetQueueAnalyticsQuery($accountTag: string!, $datetimeStart: Time, $datetimeEnd: Time, $limit: Int!) { viewer { accounts(filter: {accountTag: $accountTag}) { - queueBacklogAdaptiveGroups(limit: $limit, filter: { - datetimeMinute_geq: $datetimeStart, - datetimeMinute_lt: $datetimeEnd - }) { - dimensions { - queueId - datetimeMinute - } - - avg { - bytes - messages - sampleInterval - } - } - queueConsumerMetricsAdaptiveGroups(limit: $limit, filter: { datetimeMinute_geq: $datetimeStart, datetimeMinute_lt: $datetimeEnd @@ -31,21 +15,6 @@ query GetQueueAnalyticsQuery($accountTag: string!, $datetimeStart: Time, $dateti sampleInterval } } - - queueMessageOperationsAdaptiveGroups(limit: $limit, filter: { - datetimeMinute_geq: $datetimeStart, - datetimeMinute_lt: $datetimeEnd - }) { - dimensions { - queueId - datetimeMinute - } - - avg { - lagTime - retryCount - } - } } } } diff --git a/gql/queue_operations_query.graphql b/gql/queue_operations_query.graphql new file mode 100644 index 0000000..7c2d4c6 --- /dev/null +++ b/gql/queue_operations_query.graphql @@ -0,0 +1,28 @@ +query GetQueueOperationsAnalyticsQuery($accountTag: string!, $datetimeStart: Time, $datetimeEnd: Time, $limit: Int!) { + viewer { + accounts(filter: {accountTag: $accountTag}) { + queueMessageOperationsAdaptiveGroups(limit: $limit, filter: { + datetimeMinute_geq: $datetimeStart, + datetimeMinute_lt: $datetimeEnd + }) { + dimensions { + actionType + consumerType + queueId + outcome + datetime + } + + sum { + billableOperations + } + + avg { + lagTime + retryCount + sampleInterval + } + } + } + } +} \ No newline at end of file diff --git a/src/gql.rs b/src/gql.rs index 30029b9..78152e0 100644 --- a/src/gql.rs +++ b/src/gql.rs @@ -45,6 +45,15 @@ pub struct GetDurableObjectsAnalyticsQuery; )] pub struct GetQueueBacklogAnalyticsQuery; +#[derive(GraphQLQuery)] +#[graphql( + schema_path = "gql/schema.graphql", + query_path = "gql/queue_operations_query.graphql", + variables_derives = "Debug", + response_derives = "Debug,Clone" +)] +pub struct GetQueueOperationsAnalyticsQuery; + #[allow(non_camel_case_types)] type float32 = f32; @@ -340,6 +349,77 @@ pub async fn do_get_queue_backlog_analytics_query(cloudflare_api_url: &String, c Ok(prometheus_registry_to_opentelemetry_metrics(registry, timestamp)) } +pub async fn do_get_queue_operations_analytics_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_queue_operations_analytics_query::Variables) -> Result, Box> { + let request_body = GetQueueOperationsAnalyticsQuery::build_query(variables); + //console_log!("request_body: {:?}", request_body); + let client = reqwest::Client::new(); + let res = client.post(cloudflare_api_url) + .bearer_auth(cloudflare_api_key) + .json(&request_body).send().await?; + + if !res.status().is_success() { + console_log!("GraphQL query failed: {:?}", res.status()); + return Err(Box::new(res.error_for_status().unwrap_err())); + } + + let response_body: Response = res.json().await?; + if response_body.errors.is_some() { + console_log!("GraphQL query failed: {:?}", response_body.errors); + return Err(Box::new(worker::Error::JsError("graphql".parse().unwrap()))); + } + let response_data: get_queue_operations_analytics_query::ResponseData = response_body.data.expect("missing response data"); + + let registry = Registry::new(); + let queue_billable_opts = Opts::new("cloudflare_queue_operations_billable", "Number of Billable Operations (some message operations count as multiple billable operations)"); + let queue_billable = CounterVec::new(queue_billable_opts, &["queue_id"]).unwrap(); + registry.register(Box::new(queue_billable.clone())).unwrap(); + + let queue_lag_time_ms_opts = Opts::new("cloudflare_queue_operations_lag_time_ms", "The average time in milliseconds between when the message was written to the queue and the current operation over the sample interval. Will always be 0 for WriteMessage operations."); + let queue_lag_time_ms = GaugeVec::new(queue_lag_time_ms_opts, &["action_type", "consumer_type", "queue_id", "outcome"]).unwrap(); + registry.register(Box::new(queue_lag_time_ms.clone())).unwrap(); + + let queue_retry_count_opts = Opts::new("cloudflare_queue_operations_retry_count", "The average number of retries per message operation. A retry occurs after an unsucessful delivery, if the queue is configured to retry failed attempts. Only applicable to ReadMessage and DeleteMessage operations. Will always be 0 for WriteMessage operations."); + let queue_retry_count = GaugeVec::new(queue_retry_count_opts, &["action_type", "consumer_type", "queue_id", "outcome"]).unwrap(); + registry.register(Box::new(queue_retry_count.clone())).unwrap(); + + let queue_sample_interval_opts = Opts::new("cloudflare_queue_operations_sample_interval", "The average value used for sample interval"); + let queue_sample_interval = GaugeVec::new(queue_sample_interval_opts, &["action_type", "consumer_type", "queue_id", "outcome"]).unwrap(); + registry.register(Box::new(queue_sample_interval.clone())).unwrap(); + + let mut last_datetime: Option