Skip to content

Commit

Permalink
d1 metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
j-white committed May 8, 2024
1 parent b8f07d2 commit 9b7dfbd
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 60 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ This worker was written to push Cloudflare Analytics data to an OpenTelemetry co
It is inspired by the [cloudflare-exporter](https://github.com/lablabs/cloudflare-exporter), which is unfortunately no longer maintained.
By running it as a worker and pushing metrics, we avoid the need to deploy a dedicated container and allow the worker to be run on [green compute](https://blog.cloudflare.com/announcing-green-compute).

## Metrics currently support

- [x] Workers
- [x] D1
- [ ] Queues
- [ ] Durable Objects
- [ ] Zones

## Usage

* Clone the repo
Expand Down
10 changes: 8 additions & 2 deletions features/step_definitions/cf_mock_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ export class CloudflareMockServer {

start() {
let self = this;
const fileContents = fs.readFileSync('./features/step_definitions/worker_query_response.json').toString()
const workerQuery = fs.readFileSync('./features/step_definitions/worker_query_response.json').toString();
const d1Query = fs.readFileSync('./features/step_definitions/d1_query_response.json').toString();
this.server = http.createServer((req, res) => {
var body = "";
req.on('readable', function() {
Expand All @@ -16,10 +17,15 @@ export class CloudflareMockServer {
body += part;
}
});

req.on('end', function() {
res.statusCode = 200;
res.setHeader('Content-Type', 'application/json');
res.end(fileContents);
if (body.indexOf('d1AnalyticsAdaptiveGroups') > -1) {
res.end(d1Query);
} else {
res.end(workerQuery);
}
});
});
this.server.listen(() => {
Expand Down
1 change: 1 addition & 0 deletions features/step_definitions/d1_query_response.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"data":{"viewer":{"accounts":[{"d1AnalyticsAdaptiveGroups":[]}]}},"errors":null}
29 changes: 29 additions & 0 deletions gql/d1.graphql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
query GetD1AnalyticsQuery($accountTag: string!, $datetimeStart: Time, $datetimeEnd: Time, $limit: Int!) {
viewer {
accounts(filter: {accountTag: $accountTag}) {
d1AnalyticsAdaptiveGroups(limit: $limit, filter: {
datetimeMinute_geq: $datetimeStart,
datetimeMinute_lt: $datetimeEnd
}) {
dimensions {
databaseId
datetimeMinute
}

sum {
readQueries
rowsRead
rowsWritten
writeQueries
}

quantiles {
queryBatchResponseBytesP50
queryBatchResponseBytesP90
queryBatchTimeMsP50
queryBatchTimeMsP90
}
}
}
}
}
116 changes: 87 additions & 29 deletions gql/queries.graphql
Original file line number Diff line number Diff line change
@@ -1,33 +1,91 @@
query GetWorkersAnalyticsQuery($accountTag: string, $datetimeStart: string, $datetimeEnd: string, $limit: Int!) {
viewer {
accounts(filter: {accountTag: $accountTag}) {
workersInvocationsAdaptive(limit: $limit, filter: {
datetime_geq: $datetimeStart,
datetime_lt: $datetimeEnd
}) {
dimensions {
scriptName
status
datetime
}

sum {
requests
errors
duration
}

quantiles {
cpuTimeP50
cpuTimeP75
cpuTimeP99
cpuTimeP999
durationP50
durationP75
durationP99
durationP999
}


query GetDurableObjectsAnalyticsQuery($accountTag: string!, $datetimeStart: Time, $datetimeEnd: Time, $limit: Int!) {
viewer {
accounts(filter: {accountTag: $accountTag}) {
durableObjectsInvocationsAdaptiveGroups(limit: $limit, filter: {
datetimeMinute_geq: $datetimeStart,
datetimeMinute_lt: $datetimeEnd
}) {
dimensions {
scriptName
datetimeMinute
}

sum {
errors
requests
responseBodySize
wallTime
}

quantiles {
responseBodySizeP25
responseBodySizeP50
responseBodySizeP75
responseBodySizeP90
responseBodySizeP99
responseBodySizeP999
wallTimeP25
wallTimeP50
wallTimeP75
wallTimeP90
wallTimeP99
wallTimeP999
}
}
}
}
}

query GetQueueAnalyticsQuery($accountTag: string!, $datetimeStart: Time, $datetimeEnd: Time, $limit: Int!) {
viewer {
accounts(filter: {accountTag: $accountTag}) {
queueBacklogAdaptiveGroups(limit: $limit, filter: {
datetimeMinute_geq: $datetimeStart,
datetimeMinute_lt: $datetimeEnd
}) {
dimensions {
queueId
datetimeMinute
}

avg {
bytes
messages
sampleInterval
}
}

queueConsumerMetricsAdaptiveGroups(limit: $limit, filter: {
datetimeMinute_geq: $datetimeStart,
datetimeMinute_lt: $datetimeEnd
}) {
dimensions {
queueId
datetimeMinute
}

avg {
concurrency
sampleInterval
}
}

queueMessageOperationsAdaptiveGroups(limit: $limit, filter: {
datetimeMinute_geq: $datetimeStart,
datetimeMinute_lt: $datetimeEnd
}) {
dimensions {
queueId
datetimeMinute
}

avg {
lagTime
retryCount
}
}
}
}
}
33 changes: 33 additions & 0 deletions gql/workers.graphql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
query GetWorkersAnalyticsQuery($accountTag: string!, $datetimeStart: Time, $datetimeEnd: Time, $limit: Int!) {
viewer {
accounts(filter: {accountTag: $accountTag}) {
workersInvocationsAdaptive(limit: $limit, filter: {
datetime_geq: $datetimeStart,
datetime_lt: $datetimeEnd
}) {
dimensions {
scriptName
status
datetime
}

sum {
requests
errors
duration
}

quantiles {
cpuTimeP50
cpuTimeP75
cpuTimeP99
cpuTimeP999
durationP50
durationP75
durationP99
durationP999
}
}
}
}
}
124 changes: 116 additions & 8 deletions src/gql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,40 @@ use worker::console_log;
// Both json and the GraphQL schema language are supported as sources for the schema
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "gql/schema.graphql",
query_path = "gql/queries.graphql",
response_derives = "Debug,Clone"
schema_path = "gql/schema.graphql",
query_path = "gql/workers.graphql",
variables_derives = "Debug",
response_derives = "Debug,Clone"
)]
pub struct GetWorkersAnalyticsQuery;

#[derive(GraphQLQuery)]
#[graphql(
schema_path = "gql/schema.graphql",
query_path = "gql/d1.graphql",
variables_derives = "Debug",
response_derives = "Debug,Clone"
)]
pub struct GetD1AnalyticsQuery;

// #[derive(GraphQLQuery)]
// #[graphql(
// schema_path = "gql/schema.graphql",
// query_path = "gql/queries.graphql",
// variables_derives = "Debug",
// response_derives = "Debug,Clone"
// )]
// pub struct GetDurableObjectsAnalyticsQuery;
//
// #[derive(GraphQLQuery)]
// #[graphql(
// schema_path = "gql/schema.graphql",
// query_path = "gql/queries.graphql",
// variables_derives = "Debug",
// response_derives = "Debug,Clone"
// )]
// pub struct GetQueueAnalyticsQuery;

#[allow(non_camel_case_types)]
type float32 = f32;

Expand All @@ -29,11 +57,15 @@ type Time = String;
#[allow(non_camel_case_types)]
type uint64 = u64;

#[allow(non_camel_case_types)]
type uint32 = u32;

#[allow(non_camel_case_types)]
type float64 = f64;

pub async fn perform_my_query(cloudflare_api_url: String, cloudflare_api_key: String, variables: get_workers_analytics_query::Variables) -> Result<Vec<Metric>, Box<dyn Error>> {
pub async fn do_get_workers_analytics_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_workers_analytics_query::Variables) -> Result<Vec<Metric>, Box<dyn Error>> {
let request_body = GetWorkersAnalyticsQuery::build_query(variables);
//console_log!("request_body: {:?}", request_body);
let client = reqwest::Client::new();
let res = client.post(cloudflare_api_url)
.bearer_auth(cloudflare_api_key)
Expand All @@ -52,19 +84,19 @@ pub async fn perform_my_query(cloudflare_api_url: String, cloudflare_api_key: St
let response_data: get_workers_analytics_query::ResponseData = response_body.data.expect("missing response data");

let registry = Registry::new();
let worker_requests_opts = Opts::new("cloudflare_worker_requests", "number of requests to the worker");
let worker_requests_opts = Opts::new("cloudflare_worker_requests", "Sum of Requests");
let worker_requests = CounterVec::new(worker_requests_opts, &["script_name"]).unwrap();
registry.register(Box::new(worker_requests.clone())).unwrap();

let worker_errors_opts = Opts::new("cloudflare_worker_errors", "number of failed requests to the worker");
let worker_errors_opts = Opts::new("cloudflare_worker_errors", "Sum of Errors");
let worker_errors = CounterVec::new(worker_errors_opts, &["script_name"]).unwrap();
registry.register(Box::new(worker_errors.clone())).unwrap();

let worker_cpu_time_opts = Opts::new("cloudflare_worker_cpu_time", "cpu time processing request");
let worker_cpu_time_opts = Opts::new("cloudflare_worker_cpu_time", "CPU time - microseconds");
let worker_cpu_time = GaugeVec::new(worker_cpu_time_opts, &["script_name", "quantile"]).unwrap();
registry.register(Box::new(worker_cpu_time.clone())).unwrap();

let worker_duration_opts = Opts::new("cloudflare_worker_duration", "wall clock time processing request");
let worker_duration_opts = Opts::new("cloudflare_worker_duration", "Duration - GB*s");
let worker_duration = GaugeVec::new(worker_duration_opts, &["script_name", "quantile"]).unwrap();
registry.register(Box::new(worker_duration.clone())).unwrap();

Expand Down Expand Up @@ -100,6 +132,82 @@ pub async fn perform_my_query(cloudflare_api_url: String, cloudflare_api_key: St
Ok(prometheus_registry_to_opentelemetry_metrics(registry, timestamp))
}

pub async fn do_get_d1_analytics_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_d1_analytics_query::Variables) -> Result<Vec<Metric>, Box<dyn Error>> {
let request_body = GetD1AnalyticsQuery::build_query(variables);
//console_log!("request_body: {:?}", request_body);
let client = reqwest::Client::new();
let res = client.post(cloudflare_api_url)
.bearer_auth(cloudflare_api_key)
.json(&request_body).send().await?;

if !res.status().is_success() {
console_log!("GraphQL query failed: {:?}", res.status());
return Err(Box::new(res.error_for_status().unwrap_err()));
}

let response_body: Response<get_d1_analytics_query::ResponseData> = res.json().await?;
if response_body.errors.is_some() {
console_log!("GraphQL query failed: {:?}", response_body.errors);
return Err(Box::new(worker::Error::JsError("graphql".parse().unwrap())));
}
let response_data: get_d1_analytics_query::ResponseData = response_body.data.expect("missing response data");

let registry = Registry::new();
let d1_read_queries_opts = Opts::new("cloudflare_d1_read_queries", "The number of read queries.");
let d1_read_queries = CounterVec::new(d1_read_queries_opts, &["database_id"]).unwrap();
registry.register(Box::new(d1_read_queries.clone())).unwrap();

let d1_rows_read_opts = Opts::new("cloudflare_d1_rows_read", "The number of rows your queries read.");
let d1_rows_read = CounterVec::new(d1_rows_read_opts, &["database_id"]).unwrap();
registry.register(Box::new(d1_rows_read.clone())).unwrap();

let d1_rows_written_opts = Opts::new("cloudflare_d1_rows_written", "The number of rows your queries wrote.");
let d1_rows_written = CounterVec::new(d1_rows_written_opts, &["database_id"]).unwrap();
registry.register(Box::new(d1_rows_written.clone())).unwrap();

let d1_write_queries_opts = Opts::new("cloudflare_d1_write_queries", "The number of write queries.");
let d1_write_queries = CounterVec::new(d1_write_queries_opts, &["database_id"]).unwrap();
registry.register(Box::new(d1_write_queries.clone())).unwrap();

let d1_query_batch_response_bytes_opts = Opts::new("cloudflare_d1_query_batch_response_bytes", "The total number of bytes in the response, including all returned rows and metadata.");
let d1_query_batch_response_bytes = GaugeVec::new(d1_query_batch_response_bytes_opts, &["database_id", "quantile"]).unwrap();
registry.register(Box::new(d1_query_batch_response_bytes.clone())).unwrap();

let d1_query_batch_time_ms_opts = Opts::new("cloudflare_d1_query_batch_time_ms", "Query batch response time in milliseconds.");
let d1_query_batch_time_ms = GaugeVec::new(d1_query_batch_time_ms_opts, &["database_id", "quantile"]).unwrap();
registry.register(Box::new(d1_query_batch_time_ms.clone())).unwrap();

let mut last_datetime: Option<Time> = None;
for account in response_data.clone().viewer.unwrap().accounts.iter() {
for group in account.d1_analytics_adaptive_groups.iter() {
let dimensions = group.dimensions.as_ref().unwrap();
last_datetime = Some(dimensions.datetime_minute.clone());
let database_id = dimensions.database_id.clone();
let sum = group.sum.as_ref().unwrap();
let quantiles = group.quantiles.as_ref().unwrap();

d1_read_queries.with_label_values(&[database_id.as_str()]).inc_by(sum.read_queries as f64);
d1_rows_read.with_label_values(&[database_id.as_str()]).inc_by(sum.rows_read as f64);
d1_rows_written.with_label_values(&[database_id.as_str()]).inc_by(sum.rows_written as f64);
d1_write_queries.with_label_values(&[database_id.as_str()]).inc_by(sum.write_queries as f64);

d1_query_batch_response_bytes.with_label_values(&[database_id.as_str(), "P50"]).set(quantiles.query_batch_response_bytes_p50 as f64);
d1_query_batch_response_bytes.with_label_values(&[database_id.as_str(), "P90"]).set(quantiles.query_batch_response_bytes_p90 as f64);
d1_query_batch_time_ms.with_label_values(&[database_id.as_str(), "P50"]).set(quantiles.query_batch_time_ms_p50 as f64);
d1_query_batch_time_ms.with_label_values(&[database_id.as_str(), "P90"]).set(quantiles.query_batch_time_ms_p90 as f64);
}
}

let timestamp: std::time::SystemTime = last_datetime.map(|datetime| {
let datetime: NaiveDateTime = NaiveDateTime::parse_from_str(&*datetime, "%+").unwrap();
datetime.and_utc().into()
}).unwrap_or_else(|| {
to_std_systemtime(SystemTime::now())
});

Ok(prometheus_registry_to_opentelemetry_metrics(registry, timestamp))
}

fn to_std_systemtime(time: web_time::SystemTime) -> std::time::SystemTime {
let duration = time.duration_since(web_time::SystemTime::UNIX_EPOCH).unwrap();
std::time::SystemTime::UNIX_EPOCH + duration
Expand Down
Loading

0 comments on commit 9b7dfbd

Please sign in to comment.