Skip to content

Commit

Permalink
feat(trace): catch all traces from collector
Browse files Browse the repository at this point in the history
Signed-off-by: GALLLASMILAN <gallas.milan@gmail.com>
  • Loading branch information
GALLLASMILAN committed Jan 8, 2025
1 parent 1ebf0c2 commit 5346f5c
Show file tree
Hide file tree
Showing 21 changed files with 629 additions and 264 deletions.
2 changes: 1 addition & 1 deletion .env.testing.docker
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
NODE_ENV=production
PORT=4318
PORT=4319
AUTH_KEY=valid-api-key
FASTIFY_BODY_LIMIT=10485760

Expand Down
2 changes: 1 addition & 1 deletion .env.testing.local
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
BEE_FRAMEWORK_INSTRUMENTATION_METRICS_ENABLED=false
BEE_FRAMEWORK_INSTRUMENTATION_ENABLED=true
PORT=4318
PORT=4319
AUTH_KEY=valid-api-key
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ node_modules

#infra
.env.docker
scripts/open_telemetry_collector/otelcol
28 changes: 26 additions & 2 deletions compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ services:
retries: 5
mlflow:
image: bitnami/mlflow:2.17.2
restart: always
ports:
- '${MLFLOW_EXPOSED_PORT:-8080}:8080'
entrypoint:
Expand All @@ -36,14 +37,35 @@ services:
- 'label=disable'
volumes:
- ./entrypoint.sh:/entrypoint.sh:ro
wait_for_mlflow:
image: curlimages/curl:latest
depends_on:
mlflow:
condition: service_started
entrypoint:
[
'sh',
'-c',
"while ! curl --silent --fail http://mlflow:8080; do echo 'Waiting for API...'; sleep 5; done"
]
otel-collector:
image: otel/opentelemetry-collector-contrib:0.114.0
restart: always
ports:
- '4318:4318' # OTLP HTTP receiver
- '13133:13133' # Health Check
- '55679:55679' # Prometheus scraping port
command: ['--config=/etc/otel-collector-config.yaml']
volumes:
- ./scripts/open_telemetry_collector/otel-collector-config-docker.yaml:/etc/otel-collector-config.yaml
observe_api:
build:
context: .
args:
GIT_TAG: ${TAG:-testing}
BUILD_DATETIME: ${BUILD_DATETIME:-}
ports:
- '${OBSERVE_API_EXPOSED_PORT:-4318}:4318'
- '${OBSERVE_API_EXPOSED_PORT:-4319}:4319'
env_file:
- .env.testing.docker
command: >
Expand All @@ -53,7 +75,7 @@ services:
node ./dist/index.js
"
healthcheck:
test: wget --no-verbose --tries=1 --spider http://0.0.0.0:4318/health || exit 1
test: wget --no-verbose --tries=1 --spider http://0.0.0.0:4319/health || exit 1
interval: 10s
timeout: 5s
retries: 5
Expand All @@ -64,6 +86,8 @@ services:
condition: service_healthy
redis:
condition: service_healthy
otel-collector:
condition: service_started
wait_for_api:
image: curlimages/curl:latest
depends_on:
Expand Down
12 changes: 9 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
"proto:generate": "./scripts/open_telemetry_generate_protos/generate_protos.sh",
"start:infra": "docker compose up -d mongo redis mlflow",
"start:dev": "tsx watch ./src/index.ts | pino-pretty --singleLine",
"start:collector:local": "./scripts/open_telemetry_collector/otelcol --config=./scripts/open_telemetry_collector/otel-collector-config-local.yaml",
"stop:infra": "docker compose down",
"install:collector:local": "./scripts/open_telemetry_collector/install-collector.sh",
"dev": "yarn start:dev",
"start": "node ./dist/index.js",
"test": "./scripts/test_local.sh",
Expand All @@ -35,13 +37,16 @@
"@commitlint/config-conventional": "^19.5.0",
"@commitlint/types": "^19.5.0",
"@fastify/type-provider-json-schema-to-ts": "^4.0.1",
"@opentelemetry/exporter-trace-otlp-proto": "^0.55.0",
"@opentelemetry/instrumentation": "^0.55.0",
"@opentelemetry/sdk-node": "^0.55.0",
"@opentelemetry/exporter-trace-otlp-proto": "^0.57.0",
"@opentelemetry/instrumentation": "^0.57.0",
"@opentelemetry/otlp-exporter-base": "^0.57.0",
"@opentelemetry/sdk-node": "^0.57.0",
"@opentelemetry/sdk-trace-base": "^1.30.0",
"@opentelemetry/semantic-conventions": "^1.28.0",
"@release-it/conventional-changelog": "^8.0.1",
"@types/dotenv-safe": "^8.1.6",
"@types/node": "^20.16.5",
"@types/semver": "^7",
"@typescript-eslint/eslint-plugin": "^7.13.0",
"@typescript-eslint/parser": "^7.13.0",
"@vitest/coverage-v8": "^1.6.0",
Expand Down Expand Up @@ -88,6 +93,7 @@
"json-schema-to-ts": "^3.1.0",
"pino": "^9.2.0",
"protobufjs": "^7.4.0",
"semver": "^7.6.3",
"typescript-json-schema": "^0.65.1"
},
"resolutions": {
Expand Down
33 changes: 33 additions & 0 deletions scripts/open_telemetry_collector/install-collector.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/bin/bash
# Copyright 2025 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -e

cd "$(dirname "$0")"

# Check if the 'otelcol' file already exists
if [ ! -f otelcol ]; then
echo "otelcol file not found. Downloading and extracting..."

curl --proto '=https' --tlsv1.2 -fOL https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v0.115.1/otelcol_0.115.1_darwin_arm64.tar.gz

tar -xvf otelcol_0.115.1_darwin_arm64.tar.gz

## clean unnecessary files
rm otelcol_0.115.1_darwin_arm64.tar.gz
rm README.MD
else
echo "otelcol file already exists. Skipping download and extraction."
fi
36 changes: 36 additions & 0 deletions scripts/open_telemetry_collector/otel-collector-config-docker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
receivers:
otlp:
protocols:
http:
endpoint: 0.0.0.0:4318

exporters:
debug: {}
otlphttp/observe:
endpoint: http://host.docker.internal:4319
tls:
insecure: true
headers:
x-bee-authorization: 'valid-api-key'

extensions:
health_check:
endpoint: http://host.docker.internal:13133

processors:
batch: {}
filter/observe:
traces:
span:
- instrumentation_scope.name != "bee-agent-framework"
memory_limiter:
check_interval: 5s
limit_percentage: 80
spike_limit_percentage: 25

service:
pipelines:
traces/observe:
receivers: [otlp]
processors: [filter/observe, memory_limiter, batch]
exporters: [debug, otlphttp/observe]
36 changes: 36 additions & 0 deletions scripts/open_telemetry_collector/otel-collector-config-local.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
receivers:
otlp:
protocols:
http:
endpoint: 0.0.0.0:4318

exporters:
debug: {}
otlphttp/observe:
endpoint: http://127.0.0.1:4319
tls:
insecure: true
headers:
x-bee-authorization: 'valid-api-key'

extensions:
health_check:
endpoint: http://127.0.0.1:13133

processors:
batch: {}
filter/observe:
traces:
span:
- instrumentation_scope.name != "bee-agent-framework"
memory_limiter:
check_interval: 5s
limit_percentage: 80
spike_limit_percentage: 25

service:
pipelines:
traces/observe:
receivers: [otlp]
processors: [filter/observe, memory_limiter, batch]
exporters: [debug, otlphttp/observe]
9 changes: 7 additions & 2 deletions src/mlflow/queue/mlflow-trace-create.queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ const { queue } = createQueue({
}
});

export function addMlflowTraceToQueue(input: JobInput) {
queue.add(QueueName['mlflow-trace-create'], input);
export function addMlflowTracesToQueue(jobInputs: JobInput[]) {
queue.addBulk(
jobInputs.map((jobInput) => ({
name: QueueName['mlflow-trace-create'],
data: jobInput
}))
);
}
2 changes: 1 addition & 1 deletion src/span/span.document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export class Span extends BaseDocument {
attributes!: SpanDto['attributes'];

@ManyToOne()
trace!: Ref<Trace>;
trace?: Ref<Trace>;

constructor(input: Span__Output) {
super(new ObjectId());
Expand Down
23 changes: 23 additions & 0 deletions src/span/span.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
*/

import { QueryOrderNumeric } from '@mikro-orm/mongodb';
import * as semver from 'semver';

import { isValidFrameworkId, ORM } from '../utils/db.js';
import { constants } from '../utils/constants.js';

import { SpanDto, SpanGetOneParams, SpanGetOneQuery } from './span.dto.js';
import { Span } from './span.document.js';

export async function getSpans(props: SpanGetOneParams & SpanGetOneQuery): Promise<{
totalCount: number;
Expand Down Expand Up @@ -47,3 +50,23 @@ export async function getSpans(props: SpanGetOneParams & SpanGetOneQuery): Promi
spans: spans.map((span) => span.toTelemetry(props))
};
}

export async function loadAllNestedSpans(span: Span): Promise<Span[]> {
const { version } = span.attributes;
// Optimised query = use the traceId for loading all nested spans
if (
version &&
semver.valid(version) &&
semver.gte(version, constants.FRAMEWORK_BRAKING_CHANGES.TRACE_ID_FOR_EACH_SPAN)
) {
return ORM.span.find({ attributes: { traceId: span.attributes.traceId } });
}

// The old temporary way (will be removed soon)
const spans = await ORM.span.find({ parentId: span.context.spanId });
if (spans.length === 0) return spans;

const nestedSpans = await Promise.all(spans.map((span) => loadAllNestedSpans(span)));

return [...spans, ...nestedSpans.flat()];
}
3 changes: 2 additions & 1 deletion src/span/span.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { describe, it, expect, beforeAll, afterAll } from 'vitest';
import { Version } from 'bee-agent-framework';

import { sdk, spanTraceExporterProcessor } from '../testing/telemetry.js';
import { generateTrace, makeRequest } from '../testing/utils.js';
import { generateTrace, makeRequest, waitForTrace } from '../testing/utils.js';

let traceId: string | undefined = undefined;
const prompt = 'hello';
Expand All @@ -28,6 +28,7 @@ describe('span module', () => {
await sdk.start();
traceId = await generateTrace({ prompt });
await spanTraceExporterProcessor.forceFlush();
if (traceId) await waitForTrace({ traceId, includeMlflow: true });
});

afterAll(async () => {
Expand Down
6 changes: 4 additions & 2 deletions src/span/utilt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ export function getAttributeValue({
return attributes.find((attr) => attr.key === key)?.value?.stringValue;
}

export function findMainSpan(spans: Span[]): MainSpan | undefined {
return spans.find((span) => !span.parentId && span.attributes.traceId) as MainSpan | undefined;
export function filterMainSpans(spans: Span[]): MainSpan[] {
return spans.filter(
(span) => span.attributes.traceId && span.name.includes('bee-agent-framework')
) as MainSpan[];
}
39 changes: 31 additions & 8 deletions src/testing/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,45 @@ import { ATTR_SERVICE_NAME, ATTR_SERVICE_VERSION } from '@opentelemetry/semantic
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-proto';

import { constants } from '../utils/constants.js';

import { buildUrl } from './utils.js';
import { buildUrl } from '../mlflow/utils/api/utils/build-url.js';

const traceExporter = new OTLPTraceExporter({
url: buildUrl('v1/traces'),
headers: {
[constants.BEE_AUTH_HEADER]: constants.AUTH_KEY
},
timeoutMillis: 120_000
});
export const spanTraceExporterProcessor = new node.BatchSpanProcessor(traceExporter);

const resource = new resources.Resource({
[ATTR_SERVICE_NAME]: 'bee-agent-framework',
[ATTR_SERVICE_VERSION]: '0.0.1'
});

// main sdk with the default BatchSpanProcessor and the Collector backend
export const spanTraceExporterProcessor = new node.BatchSpanProcessor(traceExporter);
export const sdk = new NodeSDK({
resource: new resources.Resource({
[ATTR_SERVICE_NAME]: 'bee-agent-framework',
[ATTR_SERVICE_VERSION]: '0.0.1'
}),
resource,
spanProcessors: [spanTraceExporterProcessor]
});

// sdk with the SimpleSpanProcessor and the Collector backend
export const sdkWithSimpleProcessor = new NodeSDK({
resource,
spanProcessors: [new node.SimpleSpanProcessor(traceExporter)]
});

// sdk with the SimpleSpanProcessor and direct Observe backend
export const sdkWithSimpleProcessorAndObserveBackend = new NodeSDK({
resource,
spanProcessors: [
new node.SimpleSpanProcessor(
new OTLPTraceExporter({
url: buildUrl('v1/traces'),
headers: {
[constants.BEE_AUTH_HEADER]: constants.AUTH_KEY
},
timeoutMillis: 120_000
})
)
]
});
Loading

0 comments on commit 5346f5c

Please sign in to comment.