Skip to content

Commit

Permalink
more hackz
Browse files Browse the repository at this point in the history
  • Loading branch information
j-white committed May 6, 2024
1 parent e6b725e commit 5abd360
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 163 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ name = "worker-rust"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
license-file = "LICENSE"

[lib]
crate-type = ["cdylib"]
Expand All @@ -21,6 +20,7 @@ serde_json = "1.0.116"
opentelemetry-http = { version="0.11.1" }
opentelemetry-otlp = { version="0.15.0", features = ["metrics", "http-proto"], default-features = false }
http = { version = "0.2.12"}
tokio = { version = "1.37.0"}

[profile.release]
opt-level = "s" # optimize for size in release builds
Expand Down
107 changes: 107 additions & 0 deletions features/metrics.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
{
"resourceMetrics": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my.service"
}
}
]
},
"scopeMetrics": [
{
"scope": {
"name": "my.library",
"version": "1.0.0",
"attributes": [
{
"key": "my.scope.attribute",
"value": {
"stringValue": "some scope attribute"
}
}
]
},
"metrics": [
{
"name": "my.counter",
"unit": "1",
"description": "I'm a Counter",
"sum": {
"aggregationTemporality": 1,
"isMonotonic": true,
"dataPoints": [
{
"asDouble": 5,
"startTimeUnixNano": 1544712660300000000,
"timeUnixNano": 1544712660300000000,
"attributes": [
{
"key": "my.counter.attr",
"value": {
"stringValue": "some value"
}
}
]
}
]
}
},
{
"name": "my.gauge",
"unit": "1",
"description": "I'm a Gauge",
"gauge": {
"dataPoints": [
{
"asDouble": 10,
"timeUnixNano": 1544712660300000000,
"attributes": [
{
"key": "my.gauge.attr",
"value": {
"stringValue": "some value"
}
}
]
}
]
}
},
{
"name": "my.histogram",
"unit": "1",
"description": "I'm a Histogram",
"histogram": {
"aggregationTemporality": 1,
"dataPoints": [
{
"startTimeUnixNano": 1544712660300000000,
"timeUnixNano": 1544712660300000000,
"count": 3,
"sum": 3,
"bucketCounts": [1,1,1],
"explicitBounds": [1],
"min": 1,
"max": 1,
"attributes": [
{
"key": "my.histogram.attr",
"value": {
"stringValue": "some value"
}
}
]
}
]
}
}
]
}
]
}
]
}
34 changes: 24 additions & 10 deletions features/step_definitions/mf.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
import {Log, LogLevel, Miniflare} from "miniflare";
import { MockAgent } from "undici";

type MfConfig = {
metricsUrl: string|undefined;
cloudflareApiUrl: string|undefined
};

export class MiniflareDriver {
mockAgent = new MockAgent();
mf: Miniflare | undefined;
config: MfConfig = {
metricsUrl: undefined,
cloudflareApiUrl: undefined,
}

start(options?: {metricsUrl?: string, cloudflareApiUrl?: string}): Miniflare {
this.mockAgent
Expand All @@ -29,15 +38,12 @@ export class MiniflareDriver {
}
);

let metricsUrl = "";
let cloudflareApiUrl = "";
if (options !== undefined) {
if (options.metricsUrl !== undefined) {
metricsUrl = options.metricsUrl;
}
if (options.cloudflareApiUrl !== undefined) {
cloudflareApiUrl = options.cloudflareApiUrl;
}
let self = this;
if(self.config.metricsUrl === undefined) {
throw new Error("metricsUrl is not defined!");
}
if(self.config.cloudflareApiUrl === undefined) {
throw new Error("cloudflareApiUrl is not defined!");
}

this.mf = new Miniflare({
Expand All @@ -51,6 +57,11 @@ export class MiniflareDriver {
compatibilityDate: "2022-04-05",
cache: true,
modules: true,
bindings: {
METRICS_URL: self.config.metricsUrl,
CLOUDFLARE_API_URL: self.config.cloudflareApiUrl,
CLOUDFLARE_API_KEY: "fake-key",
},
modulesRules: [
{ type: "CompiledWasm", include: ["**/*.wasm"], fallthrough: true },
],
Expand All @@ -70,7 +81,10 @@ export class MiniflareDriver {

async trigger() {
this.start({});
await this.mf?.dispatchFetch("http://localhost:8787/cdn-cgi/mf/scheduled");
const res = await this.mf?.dispatchFetch("http://localhost:8787/");

// await this.mf?.dispatchFetch("http://fake.host/cdn-cgi/mf/scheduled");
console.log("Triggered worker");
this.dispose();
}
}
9 changes: 5 additions & 4 deletions features/step_definitions/o11y_steps.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import {After, Given, When, Then} from '@cucumber/cucumber';
import {cloudflareMockServer, mf, mfConfig, otelServer} from "./state";
import {cloudflareMockServer, mf, otelServer} from "./state";
import {expect} from "chai";

Given('Worker is configured to point to mock Cloudflare API', function () {
cloudflareMockServer.start();
mfConfig.cloudflareApiUrl = cloudflareMockServer.url();
mf.config.cloudflareApiUrl = cloudflareMockServer.url();
});

Given('Worker is configured to send metrics to a mock OpenTelemetry collector', function () {
otelServer.start();
mfConfig.metricsUrl = otelServer.metricsUrl();
mf.config.metricsUrl = otelServer.metricsUrl();
});

When('Worker is triggered', async function () {
await mf.trigger();
});

Then('Worker metrics are published', function () {
Then('Worker metrics are published', async function () {
await new Promise(r => setTimeout(r, 5000));
let metrics = otelServer.getMetrics();
expect(metrics).to.have.length.gte(1);
});
Expand Down
1 change: 1 addition & 0 deletions features/step_definitions/otel_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export class OpenTelemetryServer {
indexMetrics() {
let self = this;
this.metricNames.clear();
console.log("Indexing metrics", this.metrics);
for (let metrics of this.metrics) {
for (let resourceMetrics of metrics.resourceMetrics) {
for (let scopeMetrics of resourceMetrics.scopeMetrics) {
Expand Down
12 changes: 1 addition & 11 deletions features/step_definitions/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,4 @@ const mf = new MiniflareDriver();
const otelServer = new OpenTelemetryServer();
const cloudflareMockServer = new CloudflareMockServer();

type MfConfig = {
metricsUrl: string|undefined;
cloudflareApiUrl: string|undefined
};

const mfConfig: MfConfig = {
metricsUrl: undefined,
cloudflareApiUrl: undefined,
}

export { mf, mfConfig, otelServer, cloudflareMockServer };
export { mf, otelServer, cloudflareMockServer };
64 changes: 48 additions & 16 deletions src/gql.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use std::borrow::Cow;
use std::error::Error;
use std::sync::{Arc, Mutex};
use graphql_client::GraphQLQuery;
use opentelemetry::{global, KeyValue};
use opentelemetry::KeyValue;
use opentelemetry::metrics::Unit;
use opentelemetry_sdk::AttributeSet;
use opentelemetry_sdk::metrics::data::{DataPoint, Metric};
use opentelemetry_sdk::metrics::data::Gauge;
use worker::console_log;

// The paths are relative to the directory where your `Cargo.toml` is located.
// Both json and the GraphQL schema language are supported as sources for the schema
Expand All @@ -25,26 +31,52 @@ type Time = String;
#[allow(non_camel_case_types)]
type uint64 = u64;

pub async fn perform_my_query(variables: get_workers_analytics_query::Variables) -> Result<(), Box<dyn Error>> {
pub async fn perform_my_query(cloudflare_api_url: String, cloudflare_api_key: String, variables: get_workers_analytics_query::Variables) -> Result<Vec<Metric>, Box<dyn Error>> {
let metrics = Arc::new(Mutex::new(Vec::new()));

let request_body = GetWorkersAnalyticsQuery::build_query(variables);
let client = reqwest::Client::new();
let res = client.post("/graphql").json(&request_body).send().await?;
let res = client.post(cloudflare_api_url)
.bearer_auth(cloudflare_api_key)
.json(&request_body).send().await?;
if !res.status().is_success() {
return Err(Box::new(res.error_for_status().unwrap_err()));
}

let response: get_workers_analytics_query::ResponseData = res.json().await?;
console_log!("Response: {:?}", response);
let _ = response.viewer.unwrap().accounts.iter().map(|account| account.workers_invocations_adaptive.iter().map(|worker| {
// See https://github.com/lablabs/cloudflare-exporter/blob/05e80d9cc5034c5a40b08f7630e6ca5a54c66b20/prometheus.go#L44C61-L44C93
let requests = worker.sum.as_ref().unwrap().requests;
let meter = global::meter("cloudflare_worker_requests");
let gauge = meter
.u64_gauge("count")
.with_description("A gauge of the number of requests to a worker.")
.with_unit(Unit::new("requests"))
.init();
gauge.record(
requests,
&[
KeyValue::new("script_name", worker.dimensions.as_ref().unwrap().script_name.clone())
],
);
let metric = create_metric("cloudflare_worker_requests".to_string(), "A gauge of the number of requests to a worker.".to_string(), requests, "requests".to_string()).unwrap();
metrics.lock().unwrap().push(metric);
}));
Ok(())

let mut metrics_to_return: Vec<Metric> = Vec::new();
let mut vec = metrics.lock().unwrap();
metrics_to_return.extend(vec.drain(..));
Ok(metrics_to_return)
}

fn create_metric(name: String, description: String, value: uint64, unit:String) -> Result<Metric, Box<dyn Error>> {
let key_value = KeyValue::new("key", "value");
let attribute_set: AttributeSet = std::slice::from_ref(&key_value).into();
let data_point = DataPoint {
attributes: attribute_set,
start_time: None,
time: None,
value,
exemplars: vec![],
};
let sample: Gauge<u64> = Gauge {
data_points: vec![data_point],
};
Ok(Metric {
name: Cow::from(name),
description: Cow::from(description),
unit: Unit::new(unit),
data: Box::new(sample),
})
}


42 changes: 0 additions & 42 deletions src/http.rs

This file was deleted.

Loading

0 comments on commit 5abd360

Please sign in to comment.