Skip to content

Commit

Permalink
feat: add docs directory
Browse files Browse the repository at this point in the history
Adds a docs/ directory to thoroughly describe the InfluxDB data model/schema, and
how it relates to other data models/schemas. As I write the docs, I'm
also auditing the otel2influx implementation; some incremental
improvements are included as well.
  • Loading branch information
Jacob Marble committed Mar 29, 2021
1 parent 0bf16ca commit f471805
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 30 deletions.
23 changes: 21 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
# InfluxDB Observability

**This is experimental software**
> This is experimental software
This repository aims to be the reference for storing traces, metrics, and logs in InfluxDB/IOx.
This repository is a reference for converting observability signals (traces, metrics, logs) to/from a common InfluxDB/IOx schema.

The [InfluxDB/IOx storage engine](https://github.com/influxdata/influxdb_iox) is a new time series storage engine, currently under active development.
Its design objectives include critical features for storing and querying observability signals at scale:
- high cardinality
- high capacity
- high performance

## Schema Reference

[Schema reference with conversion tables](docs/index.md).

## otel2influx

The golang package [`otel2influx`](otel2influx/README.md) converts OpenTelemetry protocol buffer objects to (measurement, tags, fields, timestamp) tuples.
It is imported by [a WIP fork of OpenTelemetry Collector Contrib](https://github.com/influxdata/opentelemetry-collector-contrib/tree/influxdb) and by [a WIP fork of Telegraf](https://github.com/jacobmarble/telegraf/tree/jgm-opentelemetry).

## jaeger-query-plugin

The [Jaeger Query Plugin for InfluxDB](jaeger-query-plugin) enables querying traces stored in InfluxDB/IOx via the Jaeger UI.
21 changes: 21 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# InfluxDB/IOx Common Observability Schema

*Perfect is the enemy of good.*

Reference for InfluxDB/IOx schema, in terms of the OpenTelemetry data model.
The goal of this schema is to be (1) a common reference for clients writing to and reading from InfluxDB/IOx and (2) a common reference for humans performing ad-hoc queries to troubleshoot observed systems.

While OpenTelemetry is the primary reference, translation to/from some other common schemas are also provided.

InfluxDB value types are expressed as tag and field.
Tags and fields have non-empty string keys.
Tags have string values, and fields have basic scalar values: string, int, uint, float, bool.

Non-finite floating-point field values (+/- infinity and NaN from IEEE 754) are not currently supported by InfluxDB/IOx, but are part of the design spec.
Therefore, no special consideration is given here.

## Signal Types

- [Traces](traces.md)
- [Metrics](metrics.md)
- [Logs](logs.md)
100 changes: 100 additions & 0 deletions docs/traces.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Traces

A trace is a list of spans.
A span is composed of:

- some specific attributes
- zero-to-many free-form attributes
- logs
- links to other spans

#### References

- [OpenTelemetry Tracing Specification](https://github.com/open-telemetry/opentelemetry-specification/tree/v1.1.0/specification/trace)
- [OpenTelemetry Span protocol buffer message](https://github.com/open-telemetry/opentelemetry-proto/blob/v0.7.0/opentelemetry/proto/trace/v1/trace.proto#L48-L227)
- [OpenTracing Specification](https://github.com/opentracing/specification)
- [Jaeger protocol buffers](https://github.com/jaegertracing/jaeger-idl/tree/master/proto)
- [OpenTelemetry -> Jaeger](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.0.1/specification/trace/sdk_exporters/jaeger.md) TODO link to code and documentation
- [Zipkin protocol buffers](https://github.com/openzipkin/zipkin-api/blob/1.0.0/zipkin.proto)
- [OpenTelemetry -> Zipkin](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.0.1/specification/trace/sdk_exporters/zipkin.md) TODO link to code and documentation

## InfluxDB measurement `spans`

Influx tag/field | OpenTelemetry Span field | Jaeger Span field | Zipkin Span field
--- | --- | --- | ---
timestamp | `start_time_unix_nano` fixed64 | `start_time` Timestamp | `timestamp` fixed64 (µs)
`trace_id` tag | `trace_id` bytes | `trace_id` bytes | `trace_id` bytes
`span_id` tag | `span_id` bytes | `span_id` bytes | `id` bytes
`parent_span_id` tag | `parent_span_id` bytes | (included in `references`)<br />type `CHILD_OF` | `parent_id` bytes
`trace_state` tag | `trace_state` string
`name` tag | `name` string | `operation_name` string | `name` string
`kind` tag<br />(OTel stringified) | `kind` enum SpanKind | `tags["span.kind"]` | `kind` enum Kind
`end_time_unix_nano` field int | `end_time_unix_nano` fixed64
`duration_nano` field int | | `duration` Duration | `duration` uint64 (µs)
- | `status` Status
`otel.status_code` tag; `OK` or `ERROR` | `status.code` enum StatusCode | `tags["otel.status_code"]`<br />if `ERROR` then add:<br />`tags["error"] = true` | `tags["otel.status_code"]`<br />if `ERROR` then add:<br />`tags["error"] = true`
`otel.status_description` field string | `status.message` string | `tags["otel.status_description"]` | `tags["error"]`<br />iff `otel.status_code` == ERROR
- | `instrumentation_library` InstrumentationLibrary
`otel.library.name` tag | `InstrumentationLibrary.name` string | `tags["otel.library.name"]` | `tags["otel.library.name"]`
`otel.library.version` tag | `InstrumentationLibrary.version` string | `tags["otel.library.version"]` | `tags["otel.library.version"]`
- | `resource` Resource | `process` Process
- | `attributes["service.name"]` | `process.service_name` string
(free-form fields)\* | `Resource.attributes` repeated KeyValue. | `process.tags` repeated KeyValue
`otel.resource.dropped_attributes_count` field uint | `Resource.dropped_attributes_count` uint32
(free-form fields)\* | `attributes` repeated KeyValue | `tags` repeated KeyValue | `tags` map<string, string>
`otel.span.dropped_attributes_count` field uint | `dropped_attributes_count` uint32
(see "Influx measurement `logs`") | `events` repeated Event | `logs` repeated Log | `annotations` repeated Annotation
`otel.span.dropped_events_count` field uint. | `dropped_events_count` uint32
(see "Influx measurement `span-links`") | `links` repeated Link | `references` repeated SpanRef
`otel.span.dropped_links_count` field uint | `dropped_links_count` uint32
- | | `flags` uint32
- | | `warnings` string
- | `attributes["zipkin.local_endpoint"]` | | `local_endpoint` Endpoint
- | \*\* | | `remote_endpoint` Endpoint
- | `attributes["zipkin.debug"]` | | `debug` bool
- | `attributes["zipkin.shared"]` | | `shared` bool

\* To convert from Influx to OTel, use common OTel attribute key prefixes to distinguish resource attributes from span attributes.
This regex matches resource attribute keys:

```
^(service\.|telemetry\.|container\.|process\.|host\.|os\.|cloud\.|deployment\.|k8s\.|aws\.|gcp\.|azure\.|faas\.name|faas\.id|faas\.version|faas\.instance|faas\.max_memory)
```

\*\* Zipkin's `remote_endpoint` [must be created from several OTel attributes](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/zipkin.md#remote-endpoint)

## InfluxDB measurement `logs`

Influx tag/field | OpenTelemetry Span.Event field | Jaeger Log field | Zipkin Annotation field
--- | --- | --- | ---
timestamp | `time_unix_nano` fixed64 | `timestamp` Timestamp | `timestamp` fixed64 (µs)
`trace_id` tag | `trace_id` bytes
`span_id` tag | `span_id` bytes
`name` tag | `name` string | `fields["event"]` | `value` string\*\*
`body` field string\* | | `fields["message"]`
? | ? | `fields["stack"]`
(free-form fields) | `attributes` repeated KeyValue | `fields` repeated KeyValue
`otel.event.dropped_attributes_count` field uint | `dropped_attributes_count` uint32 | `fields["otel.event.dropped_attributes_count"]`
(free-form fields) | span resource.attributes

\* `body` does not exist in the OpenTelemetry Span.Event, but does in OpenTelemetry LogRecord; InfluxDB explicitly names it in the `logs` measurement.

\*\* `value` is composed as:

```
"<name>": {"<attribute key>": "<attribute value", ...}
```

## InfluxDB measurement `span-links`

Influx tag/field | OpenTelemetry Span.Link field | Jaeger SpanRef field
--- | --- | ---
timestamp | (copied from linking span)
`trace_id` tag | (copied from linking span)
`span_id` tag | (copied from linking span)
`linked_trace_id` tag | `trace_id` bytes | `trace_id` bytes
`linked_span_id` tag | `span_id` bytes | `span_id` bytes
`trace_state` tag | `trace_state` string
(free-form fields) | `attributes` repeated KeyValue
`otel.link.dropped_attributes_count` field uint | `dropped_attributes_count` uint32
- | | `ref_type` SpanRefType<br />always `FOLLOWS_FROM`
56 changes: 48 additions & 8 deletions jaeger-query-plugin/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ const (
attributeSpanID = "span_id"
attributeParentSpanID = "parent_span_id"
attributeName = "name"
attributeBody = "body"
attributeSpanKind = "kind"
attributeEndTimeUnixNano = "end_time_unix_nano"
attributeDurationNano = "duration_nano"
attributeStatusCode = "otel.status_code"
attributeStatusCodeError = "ERROR"
attributeLinkedTraceID = "linked_trace_id"
attributeLinkedSpanID = "linked_span_id"
attributeServiceName = "service.name"
Expand Down Expand Up @@ -186,11 +190,14 @@ func (s *Store) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Tra
var resourceNamespace = regexp.MustCompile(`^(service\.|telemetry\.|container\.|process\.|host\.|os\.|cloud\.|deployment\.|k8s\.|aws\.|gcp\.|azure\.|faas\.name|faas\.id|faas\.version|faas\.instance|faas\.max_memory)`)

func recordToSpan(record map[string]interface{}) (*model.Span, error) {
span := new(model.Span)
span := model.Span{
Process: &model.Process{
ServiceName: "<unknown>",
},
}
parentSpanRef := model.SpanRef{
RefType: model.SpanRefType_CHILD_OF,
}
process := model.Process{}
// TODO add more process attributes
var err error
for k, v := range record {
Expand Down Expand Up @@ -218,14 +225,29 @@ func recordToSpan(record map[string]interface{}) (*model.Span, error) {
if vv, ok := v.(string); !ok {
return nil, fmt.Errorf("service name is type %T", v)
} else {
process.ServiceName = vv
span.Process.ServiceName = vv
}
case attributeName:
if vv, ok := v.(string); !ok {
return nil, fmt.Errorf("operation name is type %T", v)
} else {
span.OperationName = vv
}
case attributeSpanKind:
if vv, ok := v.(string); !ok {
return nil, fmt.Errorf("span kind is type %T", v)
} else {
switch vv {
case "SPAN_KIND_SERVER":
span.Tags = append(span.Tags, model.String("span.kind", "server"))
case "SPAN_KIND_CLIENT":
span.Tags = append(span.Tags, model.String("span.kind", "client"))
case "SPAN_KIND_PRODUCER":
span.Tags = append(span.Tags, model.String("span.kind", "producer"))
case "SPAN_KIND_CONSUMER":
span.Tags = append(span.Tags, model.String("span.kind", "consumer"))
}
}
case attributeDurationNano:
if vv, ok := v.(float64); !ok {
return nil, fmt.Errorf("duration nanoseconds is type %T", v)
Expand All @@ -244,9 +266,18 @@ func recordToSpan(record map[string]interface{}) (*model.Span, error) {
if err != nil {
return nil, err
}
case attributeStatusCode:
if vv, ok := v.(string); !ok {
return nil, fmt.Errorf("status code is type %T", v)
} else {
span.Tags = append(span.Tags, model.String(k, vv))
if v == attributeStatusCodeError {
span.Tags = append(span.Tags, model.Bool("error", true))
}
}
default:
if resourceNamespace.MatchString(k) {
process.Tags = append(process.Tags, kvToKeyValue(k, v))
span.Process.Tags = append(span.Process.Tags, kvToKeyValue(k, v))
} else {
span.Tags = append(span.Tags, kvToKeyValue(k, v))
}
Expand All @@ -259,11 +290,8 @@ func recordToSpan(record map[string]interface{}) (*model.Span, error) {
if parentSpanRef.SpanID != 0 {
span.References = []model.SpanRef{parentSpanRef}
}
if process.ServiceName != "" || len(process.Tags) > 0 {
span.Process = &process
}

return span, nil
return &span, nil
}

func kvToKeyValue(k string, v interface{}) model.KeyValue {
Expand Down Expand Up @@ -308,6 +336,18 @@ func recordToLog(record map[string]interface{}) (model.TraceID, model.SpanID, *m
} else if spanID, err = model.SpanIDFromString(vv); err != nil {
return model.TraceID{}, 0, nil, err
}
case attributeName:
if vv, ok := v.(string); !ok {
return model.TraceID{}, 0, nil, fmt.Errorf("log name is type %T", v)
} else {
log.Fields = append(log.Fields, model.String("event", vv))
}
case attributeBody:
if vv, ok := v.(string); !ok {
return model.TraceID{}, 0, nil, fmt.Errorf("log body is type %T", v)
} else {
log.Fields = append(log.Fields, model.String("message", vv))
}
default:
log.Fields = append(log.Fields, kvToKeyValue(k, v))
}
Expand Down
8 changes: 7 additions & 1 deletion otel2influx/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
# OpenTelemetry to InfluxDB Line Protocol Converter

**This is experimental software**
> This is experimental software
[![Go Reference](https://pkg.go.dev/badge/github.com/influxdata/influxdb-observability/otel2influx.svg)](https://pkg.go.dev/github.com/influxdata/influxdb-observability/otel2influx)

This package converts OpenTelemetry traces, metrics, and logs to [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/).
The schema is optimized for [InfluxDB/IOx](https://github.com/influxdata/influxdb_iox), a timeseries database engine that is currently under development.

[Docker Image: WIP OpenTelemetry Collector Contrib](https://hub.docker.com/r/jacobmarble/opentelemetry-collector-contrib-influxdb)

[Docker Image: WIP Telegraf](https://hub.docker.com/r/jacobmarble/telegraf-opentelemetry)

## Definitions

["InfluxDB"](https://www.influxdata.com/products/influxdb/)
Expand Down
20 changes: 12 additions & 8 deletions otel2influx/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,20 @@ const (
attributeSpanKind = "kind"
attributeEndTimeUnixNano = "end_time_unix_nano"
attributeDurationNano = "duration_nano"
attributeDroppedResourceAttributesCount = "dropped_resource_attributes_count"
attributeDroppedAttributesCount = "dropped_attributes_count"
attributeDroppedEventsCount = "dropped_events_count"
attributeDroppedResourceAttributesCount = "otel.resource.dropped_attributes_count"
attributeDroppedSpanAttributesCount = "otel.span.dropped_attributes_count"
attributeDroppedEventAttributesCount = "otel.event.dropped_attributes_count"
attributeDroppedEventsCount = "otel.span.dropped_events_count"
attributeDroppedLinkAttributesCount = "otel.link.dropped_attributes_count"
attributeDroppedLinksCount = "otel.span.dropped_links_count"
attributeLinkedTraceID = "linked_trace_id"
attributeLinkedSpanID = "linked_span_id"
attributeDroppedLinksCount = "dropped_links_count"
attributeStatusCode = "status_code"
attributeStatusMessage = "status_message"
attributeInstrumentationLibraryName = "instrumentation_library_name"
attributeInstrumentationLibraryVersion = "instrumentation_library_version"
attributeStatusCode = "otel.status_code"
attributeStatusCodeOK = "OK"
attributeStatusCodeError = "ERROR"
attributeStatusMessage = "otel.status_description"
attributeInstrumentationLibraryName = "otel.library.name"
attributeInstrumentationLibraryVersion = "otel.library.version"
attributeSeverityNumber = "severity_number"
attributeSeverityText = "severity_text"
attributeBody = "body"
Expand Down
2 changes: 1 addition & 1 deletion otel2influx/converter_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (c *OpenTelemetryToInfluxConverter) writeLogRecord(ctx context.Context, res
}
}
if droppedAttributesCount > 0 {
fields[attributeDroppedAttributesCount] = droppedAttributesCount
fields[attributeDroppedSpanAttributesCount] = droppedAttributesCount
}

if err := w.WritePoint(ctx, measurement, tags, fields, ts); err != nil {
Expand Down
16 changes: 11 additions & 5 deletions otel2influx/converter_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (c *OpenTelemetryToInfluxConverter) writeSpan(ctx context.Context, resource
}
}
if droppedAttributesCount > 0 {
fields[attributeDroppedAttributesCount] = droppedAttributesCount
fields[attributeDroppedSpanAttributesCount] = droppedAttributesCount
}

droppedEventsCount := uint64(span.DroppedEventsCount)
Expand Down Expand Up @@ -119,8 +119,14 @@ func (c *OpenTelemetryToInfluxConverter) writeSpan(ctx context.Context, resource
}

if status := span.Status; status != nil {
if code := status.Code; code != otlptrace.Status_STATUS_CODE_UNSET {
fields[attributeStatusCode] = code.String()
switch status.Code {
case otlptrace.Status_STATUS_CODE_UNSET:
case otlptrace.Status_STATUS_CODE_OK:
fields[attributeStatusCode] = attributeStatusCodeOK
case otlptrace.Status_STATUS_CODE_ERROR:
fields[attributeStatusCode] = attributeStatusCodeError
default:
c.logger.Debug("status code not recognized: %q", status.Code)
}

if message := status.Message; message != "" {
Expand Down Expand Up @@ -166,7 +172,7 @@ func (c *OpenTelemetryToInfluxConverter) spanEventToLP(traceID, spanID string, r
}
}
if droppedAttributesCount > 0 {
fields[attributeDroppedAttributesCount] = droppedAttributesCount
fields[attributeDroppedEventAttributesCount] = droppedAttributesCount
}

if len(fields) == 0 {
Expand Down Expand Up @@ -222,7 +228,7 @@ func (c *OpenTelemetryToInfluxConverter) spanLinkToLP(traceID, spanID string, sp
}
}
if droppedAttributesCount > 0 {
fields[attributeDroppedAttributesCount] = droppedAttributesCount
fields[attributeDroppedLinkAttributesCount] = droppedAttributesCount
}

if len(fields) == 0 {
Expand Down
Loading

0 comments on commit f471805

Please sign in to comment.