Skip to content

Commit

Permalink
more queue metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
j-white committed May 9, 2024
1 parent c4047c0 commit 6de7572
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 32 deletions.
2 changes: 2 additions & 0 deletions features/step_definitions/cf_mock_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export class CloudflareMockServer {
res.end(durableObjectsQuery);
} else if (body.indexOf('queueBacklogAdaptiveGroups') > -1) {
res.end(queueBacklogQuery);
} else if (body.indexOf('queueMessageOperationsAdaptiveGroups') > -1) {
res.end("{\"data\":{\"viewer\":{\"accounts\":[{\"queueMessageOperationsAdaptiveGroups\":[]}]}},\"errors\":null}");
} else {
res.end(workerQuery);
}
Expand Down
31 changes: 0 additions & 31 deletions gql/queries.graphql
Original file line number Diff line number Diff line change
@@ -1,22 +1,6 @@
query GetQueueAnalyticsQuery($accountTag: string!, $datetimeStart: Time, $datetimeEnd: Time, $limit: Int!) {
viewer {
accounts(filter: {accountTag: $accountTag}) {
queueBacklogAdaptiveGroups(limit: $limit, filter: {
datetimeMinute_geq: $datetimeStart,
datetimeMinute_lt: $datetimeEnd
}) {
dimensions {
queueId
datetimeMinute
}

avg {
bytes
messages
sampleInterval
}
}

queueConsumerMetricsAdaptiveGroups(limit: $limit, filter: {
datetimeMinute_geq: $datetimeStart,
datetimeMinute_lt: $datetimeEnd
Expand All @@ -31,21 +15,6 @@ query GetQueueAnalyticsQuery($accountTag: string!, $datetimeStart: Time, $dateti
sampleInterval
}
}

queueMessageOperationsAdaptiveGroups(limit: $limit, filter: {
datetimeMinute_geq: $datetimeStart,
datetimeMinute_lt: $datetimeEnd
}) {
dimensions {
queueId
datetimeMinute
}

avg {
lagTime
retryCount
}
}
}
}
}
28 changes: 28 additions & 0 deletions gql/queue_operations_query.graphql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
query GetQueueOperationsAnalyticsQuery($accountTag: string!, $datetimeStart: Time, $datetimeEnd: Time, $limit: Int!) {
viewer {
accounts(filter: {accountTag: $accountTag}) {
queueMessageOperationsAdaptiveGroups(limit: $limit, filter: {
datetimeMinute_geq: $datetimeStart,
datetimeMinute_lt: $datetimeEnd
}) {
dimensions {
actionType
consumerType
queueId
outcome
datetime
}

sum {
billableOperations
}

avg {
lagTime
retryCount
sampleInterval
}
}
}
}
}
80 changes: 80 additions & 0 deletions src/gql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ pub struct GetDurableObjectsAnalyticsQuery;
)]
pub struct GetQueueBacklogAnalyticsQuery;

#[derive(GraphQLQuery)]
#[graphql(
schema_path = "gql/schema.graphql",
query_path = "gql/queue_operations_query.graphql",
variables_derives = "Debug",
response_derives = "Debug,Clone"
)]
pub struct GetQueueOperationsAnalyticsQuery;

#[allow(non_camel_case_types)]
type float32 = f32;

Expand Down Expand Up @@ -340,6 +349,77 @@ pub async fn do_get_queue_backlog_analytics_query(cloudflare_api_url: &String, c
Ok(prometheus_registry_to_opentelemetry_metrics(registry, timestamp))
}

pub async fn do_get_queue_operations_analytics_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_queue_operations_analytics_query::Variables) -> Result<Vec<Metric>, Box<dyn Error>> {
let request_body = GetQueueOperationsAnalyticsQuery::build_query(variables);
//console_log!("request_body: {:?}", request_body);
let client = reqwest::Client::new();
let res = client.post(cloudflare_api_url)
.bearer_auth(cloudflare_api_key)
.json(&request_body).send().await?;

if !res.status().is_success() {
console_log!("GraphQL query failed: {:?}", res.status());
return Err(Box::new(res.error_for_status().unwrap_err()));
}

let response_body: Response<get_queue_operations_analytics_query::ResponseData> = res.json().await?;
if response_body.errors.is_some() {
console_log!("GraphQL query failed: {:?}", response_body.errors);
return Err(Box::new(worker::Error::JsError("graphql".parse().unwrap())));
}
let response_data: get_queue_operations_analytics_query::ResponseData = response_body.data.expect("missing response data");

let registry = Registry::new();
let queue_billable_opts = Opts::new("cloudflare_queue_operations_billable", "Number of Billable Operations (some message operations count as multiple billable operations)");
let queue_billable = CounterVec::new(queue_billable_opts, &["queue_id"]).unwrap();
registry.register(Box::new(queue_billable.clone())).unwrap();

let queue_lag_time_ms_opts = Opts::new("cloudflare_queue_operations_lag_time_ms", "The average time in milliseconds between when the message was written to the queue and the current operation over the sample interval. Will always be 0 for WriteMessage operations.");
let queue_lag_time_ms = GaugeVec::new(queue_lag_time_ms_opts, &["action_type", "consumer_type", "queue_id", "outcome"]).unwrap();
registry.register(Box::new(queue_lag_time_ms.clone())).unwrap();

let queue_retry_count_opts = Opts::new("cloudflare_queue_operations_retry_count", "The average number of retries per message operation. A retry occurs after an unsucessful delivery, if the queue is configured to retry failed attempts. Only applicable to ReadMessage and DeleteMessage operations. Will always be 0 for WriteMessage operations.");
let queue_retry_count = GaugeVec::new(queue_retry_count_opts, &["action_type", "consumer_type", "queue_id", "outcome"]).unwrap();
registry.register(Box::new(queue_retry_count.clone())).unwrap();

let queue_sample_interval_opts = Opts::new("cloudflare_queue_operations_sample_interval", "The average value used for sample interval");
let queue_sample_interval = GaugeVec::new(queue_sample_interval_opts, &["action_type", "consumer_type", "queue_id", "outcome"]).unwrap();
registry.register(Box::new(queue_sample_interval.clone())).unwrap();

let mut last_datetime: Option<Time> = None;
for account in response_data.clone().viewer.unwrap().accounts.iter() {
for group in account.queue_message_operations_adaptive_groups.iter() {
let dimensions = group.dimensions.as_ref().unwrap();
last_datetime = Some(dimensions.datetime.clone());
let action_type = dimensions.action_type.clone();
let consumer_type = dimensions.consumer_type.clone();
let queue_id = dimensions.queue_id.clone();
let outcome = dimensions.outcome.clone();

let sum = group.sum.as_ref().unwrap();
let avg = group.avg.as_ref().unwrap();

queue_billable.with_label_values(&[action_type.as_str(), consumer_type.as_str(),
queue_id.as_str(), outcome.as_str()]).inc_by(sum.billable_operations as f64);

queue_lag_time_ms.with_label_values(&[action_type.as_str(), consumer_type.as_str(),
queue_id.as_str(), outcome.as_str()]).set(avg.lag_time as f64);
queue_retry_count.with_label_values(&[action_type.as_str(), consumer_type.as_str(),
queue_id.as_str(), outcome.as_str()]).set(avg.retry_count as f64);
queue_sample_interval.with_label_values(&[action_type.as_str(), consumer_type.as_str(),
queue_id.as_str(), outcome.as_str()]).set(avg.sample_interval as f64);
}
}

let timestamp: std::time::SystemTime = last_datetime.map(|datetime| {
let datetime: NaiveDateTime = NaiveDateTime::parse_from_str(&*datetime, "%+").unwrap();
datetime.and_utc().into()
}).unwrap_or_else(|| {
to_std_systemtime(SystemTime::now())
});

Ok(prometheus_registry_to_opentelemetry_metrics(registry, timestamp))
}

fn to_std_systemtime(time: web_time::SystemTime) -> std::time::SystemTime {
let duration = time.duration_since(web_time::SystemTime::UNIX_EPOCH).unwrap();
Expand Down
20 changes: 19 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use prost::Message;
use worker::*;
use worker::js_sys::Uint8Array;
use worker::wasm_bindgen::JsValue;
use crate::gql::{get_workers_analytics_query, do_get_workers_analytics_query, do_get_d1_analytics_query, get_d1_analytics_query, do_get_durableobjects_analytics_query, get_durable_objects_analytics_query, do_get_queue_backlog_analytics_query, get_queue_backlog_analytics_query};
use crate::gql::{get_workers_analytics_query, do_get_workers_analytics_query, do_get_d1_analytics_query, get_d1_analytics_query, do_get_durableobjects_analytics_query, get_durable_objects_analytics_query, do_get_queue_backlog_analytics_query, get_queue_backlog_analytics_query, do_get_queue_operations_analytics_query, get_queue_operations_analytics_query};

mod gql;
mod metrics;
Expand Down Expand Up @@ -139,6 +139,24 @@ async fn do_trigger(env: Env) -> Result<()> {
return Err(Error::JsError(e.to_string()));
}
};

let result = do_get_queue_operations_analytics_query(&cloudflare_api_url, &cloudflare_api_key, get_queue_operations_analytics_query::Variables {
account_tag: cloudflare_account_id.clone(),
datetime_start: Some(start.to_rfc3339()),
datetime_end: Some(end.to_rfc3339()),
limit: 9999,
}).await;
match result {
Ok(metrics) => {
for metric in metrics {
all_metrics.push(metric);
}
},
Err(e) => {
console_log!("Querying Cloudflare API failed: {:?}", e);
return Err(Error::JsError(e.to_string()));
}
};
console_log!("Done fetching!");

do_push_metrics(env, all_metrics).await
Expand Down

0 comments on commit 6de7572

Please sign in to comment.