From f4718054171b8928ff1dbae4c59eed75f3710ceb Mon Sep 17 00:00:00 2001 From: Jacob Marble Date: Mon, 29 Mar 2021 08:55:42 -0700 Subject: [PATCH] feat: add docs directory Adds a docs/ directory to thoroughly describe the InfluxDB data model/schema, and how it relates to other data models/schemas. As I write the docs, I'm also auditing the otel2influx implementation; some incremental improvements are included as well. --- README.md | 23 ++++++- docs/index.md | 21 ++++++ docs/traces.md | 100 +++++++++++++++++++++++++++++ jaeger-query-plugin/store/store.go | 56 +++++++++++++--- otel2influx/README.md | 8 ++- otel2influx/common.go | 20 +++--- otel2influx/converter_logs.go | 2 +- otel2influx/converter_traces.go | 16 +++-- otel2influx/go.mod | 4 +- otel2influx/go.sum | 4 +- 10 files changed, 224 insertions(+), 30 deletions(-) create mode 100644 docs/index.md create mode 100644 docs/traces.md diff --git a/README.md b/README.md index 7010f737..773568b1 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,24 @@ # InfluxDB Observability -**This is experimental software** +> This is experimental software -This repository aims to be the reference for storing traces, metrics, and logs in InfluxDB/IOx. +This repository is a reference for converting observability signals (traces, metrics, logs) to/from a common InfluxDB/IOx schema. + +The [InfluxDB/IOx storage engine](https://github.com/influxdata/influxdb_iox) is a new time series storage engine, currently under active development. +Its design objectives include critical features for storing and querying observability signals at scale: +- high cardinality +- high capacity +- high performance + +## Schema Reference + +[Schema reference with conversion tables](docs/index.md). + +## otel2influx + +The golang package [`otel2influx`](otel2influx/README.md) converts OpenTelemetry protocol buffer objects to (measurement, tags, fields, timestamp) tuples. +It is imported by [a WIP fork of OpenTelemetry Collector Contrib](https://github.com/influxdata/opentelemetry-collector-contrib/tree/influxdb) and by [a WIP fork of Telegraf](https://github.com/jacobmarble/telegraf/tree/jgm-opentelemetry). + +## jaeger-query-plugin + +The [Jaeger Query Plugin for InfluxDB](jaeger-query-plugin) enables querying traces stored in InfluxDB/IOx via the Jaeger UI. diff --git a/docs/index.md b/docs/index.md new file mode 100644 index 00000000..201d5ca5 --- /dev/null +++ b/docs/index.md @@ -0,0 +1,21 @@ +# InfluxDB/IOx Common Observability Schema + +*Perfect is the enemy of good.* + +Reference for InfluxDB/IOx schema, in terms of the OpenTelemetry data model. +The goal of this schema is to be (1) a common reference for clients writing to and reading from InfluxDB/IOx and (2) a common reference for humans performing ad-hoc queries to troubleshoot observed systems. + +While OpenTelemetry is the primary reference, translation to/from some other common schemas are also provided. + +InfluxDB value types are expressed as tag and field. +Tags and fields have non-empty string keys. +Tags have string values, and fields have basic scalar values: string, int, uint, float, bool. + +Non-finite floating-point field values (+/- infinity and NaN from IEEE 754) are not currently supported by InfluxDB/IOx, but are part of the design spec. +Therefore, no special consideration is given here. + +## Signal Types + +- [Traces](traces.md) +- [Metrics](metrics.md) +- [Logs](logs.md) diff --git a/docs/traces.md b/docs/traces.md new file mode 100644 index 00000000..410d2f88 --- /dev/null +++ b/docs/traces.md @@ -0,0 +1,100 @@ +# Traces + +A trace is a list of spans. +A span is composed of: + +- some specific attributes +- zero-to-many free-form attributes +- logs +- links to other spans + +#### References + +- [OpenTelemetry Tracing Specification](https://github.com/open-telemetry/opentelemetry-specification/tree/v1.1.0/specification/trace) +- [OpenTelemetry Span protocol buffer message](https://github.com/open-telemetry/opentelemetry-proto/blob/v0.7.0/opentelemetry/proto/trace/v1/trace.proto#L48-L227) +- [OpenTracing Specification](https://github.com/opentracing/specification) +- [Jaeger protocol buffers](https://github.com/jaegertracing/jaeger-idl/tree/master/proto) +- [OpenTelemetry -> Jaeger](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.0.1/specification/trace/sdk_exporters/jaeger.md) TODO link to code and documentation +- [Zipkin protocol buffers](https://github.com/openzipkin/zipkin-api/blob/1.0.0/zipkin.proto) +- [OpenTelemetry -> Zipkin](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.0.1/specification/trace/sdk_exporters/zipkin.md) TODO link to code and documentation + +## InfluxDB measurement `spans` + +Influx tag/field | OpenTelemetry Span field | Jaeger Span field | Zipkin Span field +--- | --- | --- | --- +timestamp | `start_time_unix_nano` fixed64 | `start_time` Timestamp | `timestamp` fixed64 (µs) +`trace_id` tag | `trace_id` bytes | `trace_id` bytes | `trace_id` bytes +`span_id` tag | `span_id` bytes | `span_id` bytes | `id` bytes +`parent_span_id` tag | `parent_span_id` bytes | (included in `references`)
type `CHILD_OF` | `parent_id` bytes +`trace_state` tag | `trace_state` string +`name` tag | `name` string | `operation_name` string | `name` string +`kind` tag
(OTel stringified) | `kind` enum SpanKind | `tags["span.kind"]` | `kind` enum Kind +`end_time_unix_nano` field int | `end_time_unix_nano` fixed64 +`duration_nano` field int | | `duration` Duration | `duration` uint64 (µs) +- | `status` Status +`otel.status_code` tag; `OK` or `ERROR` | `status.code` enum StatusCode | `tags["otel.status_code"]`
if `ERROR` then add:
`tags["error"] = true` | `tags["otel.status_code"]`
if `ERROR` then add:
`tags["error"] = true` +`otel.status_description` field string | `status.message` string | `tags["otel.status_description"]` | `tags["error"]`
iff `otel.status_code` == ERROR +- | `instrumentation_library` InstrumentationLibrary +`otel.library.name` tag | `InstrumentationLibrary.name` string | `tags["otel.library.name"]` | `tags["otel.library.name"]` +`otel.library.version` tag | `InstrumentationLibrary.version` string | `tags["otel.library.version"]` | `tags["otel.library.version"]` +- | `resource` Resource | `process` Process +- | `attributes["service.name"]` | `process.service_name` string +(free-form fields)\* | `Resource.attributes` repeated KeyValue. | `process.tags` repeated KeyValue +`otel.resource.dropped_attributes_count` field uint | `Resource.dropped_attributes_count` uint32 +(free-form fields)\* | `attributes` repeated KeyValue | `tags` repeated KeyValue | `tags` map +`otel.span.dropped_attributes_count` field uint | `dropped_attributes_count` uint32 +(see "Influx measurement `logs`") | `events` repeated Event | `logs` repeated Log | `annotations` repeated Annotation +`otel.span.dropped_events_count` field uint. | `dropped_events_count` uint32 +(see "Influx measurement `span-links`") | `links` repeated Link | `references` repeated SpanRef +`otel.span.dropped_links_count` field uint | `dropped_links_count` uint32 +- | | `flags` uint32 +- | | `warnings` string +- | `attributes["zipkin.local_endpoint"]` | | `local_endpoint` Endpoint +- | \*\* | | `remote_endpoint` Endpoint +- | `attributes["zipkin.debug"]` | | `debug` bool +- | `attributes["zipkin.shared"]` | | `shared` bool + +\* To convert from Influx to OTel, use common OTel attribute key prefixes to distinguish resource attributes from span attributes. +This regex matches resource attribute keys: + +``` +^(service\.|telemetry\.|container\.|process\.|host\.|os\.|cloud\.|deployment\.|k8s\.|aws\.|gcp\.|azure\.|faas\.name|faas\.id|faas\.version|faas\.instance|faas\.max_memory) +``` + +\*\* Zipkin's `remote_endpoint` [must be created from several OTel attributes](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/zipkin.md#remote-endpoint) + +## InfluxDB measurement `logs` + +Influx tag/field | OpenTelemetry Span.Event field | Jaeger Log field | Zipkin Annotation field +--- | --- | --- | --- +timestamp | `time_unix_nano` fixed64 | `timestamp` Timestamp | `timestamp` fixed64 (µs) +`trace_id` tag | `trace_id` bytes +`span_id` tag | `span_id` bytes +`name` tag | `name` string | `fields["event"]` | `value` string\*\* +`body` field string\* | | `fields["message"]` +? | ? | `fields["stack"]` +(free-form fields) | `attributes` repeated KeyValue | `fields` repeated KeyValue +`otel.event.dropped_attributes_count` field uint | `dropped_attributes_count` uint32 | `fields["otel.event.dropped_attributes_count"]` +(free-form fields) | span resource.attributes + +\* `body` does not exist in the OpenTelemetry Span.Event, but does in OpenTelemetry LogRecord; InfluxDB explicitly names it in the `logs` measurement. + +\*\* `value` is composed as: + +``` +"": {"": "always `FOLLOWS_FROM` diff --git a/jaeger-query-plugin/store/store.go b/jaeger-query-plugin/store/store.go index 997ca861..791691fa 100644 --- a/jaeger-query-plugin/store/store.go +++ b/jaeger-query-plugin/store/store.go @@ -36,8 +36,12 @@ const ( attributeSpanID = "span_id" attributeParentSpanID = "parent_span_id" attributeName = "name" + attributeBody = "body" + attributeSpanKind = "kind" attributeEndTimeUnixNano = "end_time_unix_nano" attributeDurationNano = "duration_nano" + attributeStatusCode = "otel.status_code" + attributeStatusCodeError = "ERROR" attributeLinkedTraceID = "linked_trace_id" attributeLinkedSpanID = "linked_span_id" attributeServiceName = "service.name" @@ -186,11 +190,14 @@ func (s *Store) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Tra var resourceNamespace = regexp.MustCompile(`^(service\.|telemetry\.|container\.|process\.|host\.|os\.|cloud\.|deployment\.|k8s\.|aws\.|gcp\.|azure\.|faas\.name|faas\.id|faas\.version|faas\.instance|faas\.max_memory)`) func recordToSpan(record map[string]interface{}) (*model.Span, error) { - span := new(model.Span) + span := model.Span{ + Process: &model.Process{ + ServiceName: "", + }, + } parentSpanRef := model.SpanRef{ RefType: model.SpanRefType_CHILD_OF, } - process := model.Process{} // TODO add more process attributes var err error for k, v := range record { @@ -218,7 +225,7 @@ func recordToSpan(record map[string]interface{}) (*model.Span, error) { if vv, ok := v.(string); !ok { return nil, fmt.Errorf("service name is type %T", v) } else { - process.ServiceName = vv + span.Process.ServiceName = vv } case attributeName: if vv, ok := v.(string); !ok { @@ -226,6 +233,21 @@ func recordToSpan(record map[string]interface{}) (*model.Span, error) { } else { span.OperationName = vv } + case attributeSpanKind: + if vv, ok := v.(string); !ok { + return nil, fmt.Errorf("span kind is type %T", v) + } else { + switch vv { + case "SPAN_KIND_SERVER": + span.Tags = append(span.Tags, model.String("span.kind", "server")) + case "SPAN_KIND_CLIENT": + span.Tags = append(span.Tags, model.String("span.kind", "client")) + case "SPAN_KIND_PRODUCER": + span.Tags = append(span.Tags, model.String("span.kind", "producer")) + case "SPAN_KIND_CONSUMER": + span.Tags = append(span.Tags, model.String("span.kind", "consumer")) + } + } case attributeDurationNano: if vv, ok := v.(float64); !ok { return nil, fmt.Errorf("duration nanoseconds is type %T", v) @@ -244,9 +266,18 @@ func recordToSpan(record map[string]interface{}) (*model.Span, error) { if err != nil { return nil, err } + case attributeStatusCode: + if vv, ok := v.(string); !ok { + return nil, fmt.Errorf("status code is type %T", v) + } else { + span.Tags = append(span.Tags, model.String(k, vv)) + if v == attributeStatusCodeError { + span.Tags = append(span.Tags, model.Bool("error", true)) + } + } default: if resourceNamespace.MatchString(k) { - process.Tags = append(process.Tags, kvToKeyValue(k, v)) + span.Process.Tags = append(span.Process.Tags, kvToKeyValue(k, v)) } else { span.Tags = append(span.Tags, kvToKeyValue(k, v)) } @@ -259,11 +290,8 @@ func recordToSpan(record map[string]interface{}) (*model.Span, error) { if parentSpanRef.SpanID != 0 { span.References = []model.SpanRef{parentSpanRef} } - if process.ServiceName != "" || len(process.Tags) > 0 { - span.Process = &process - } - return span, nil + return &span, nil } func kvToKeyValue(k string, v interface{}) model.KeyValue { @@ -308,6 +336,18 @@ func recordToLog(record map[string]interface{}) (model.TraceID, model.SpanID, *m } else if spanID, err = model.SpanIDFromString(vv); err != nil { return model.TraceID{}, 0, nil, err } + case attributeName: + if vv, ok := v.(string); !ok { + return model.TraceID{}, 0, nil, fmt.Errorf("log name is type %T", v) + } else { + log.Fields = append(log.Fields, model.String("event", vv)) + } + case attributeBody: + if vv, ok := v.(string); !ok { + return model.TraceID{}, 0, nil, fmt.Errorf("log body is type %T", v) + } else { + log.Fields = append(log.Fields, model.String("message", vv)) + } default: log.Fields = append(log.Fields, kvToKeyValue(k, v)) } diff --git a/otel2influx/README.md b/otel2influx/README.md index f318dde2..9b28d03c 100644 --- a/otel2influx/README.md +++ b/otel2influx/README.md @@ -1,10 +1,16 @@ # OpenTelemetry to InfluxDB Line Protocol Converter -**This is experimental software** +> This is experimental software + +[![Go Reference](https://pkg.go.dev/badge/github.com/influxdata/influxdb-observability/otel2influx.svg)](https://pkg.go.dev/github.com/influxdata/influxdb-observability/otel2influx) This package converts OpenTelemetry traces, metrics, and logs to [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/). The schema is optimized for [InfluxDB/IOx](https://github.com/influxdata/influxdb_iox), a timeseries database engine that is currently under development. +[Docker Image: WIP OpenTelemetry Collector Contrib](https://hub.docker.com/r/jacobmarble/opentelemetry-collector-contrib-influxdb) + +[Docker Image: WIP Telegraf](https://hub.docker.com/r/jacobmarble/telegraf-opentelemetry) + ## Definitions ["InfluxDB"](https://www.influxdata.com/products/influxdb/) diff --git a/otel2influx/common.go b/otel2influx/common.go index 62882f4e..ff0ad06f 100644 --- a/otel2influx/common.go +++ b/otel2influx/common.go @@ -25,16 +25,20 @@ const ( attributeSpanKind = "kind" attributeEndTimeUnixNano = "end_time_unix_nano" attributeDurationNano = "duration_nano" - attributeDroppedResourceAttributesCount = "dropped_resource_attributes_count" - attributeDroppedAttributesCount = "dropped_attributes_count" - attributeDroppedEventsCount = "dropped_events_count" + attributeDroppedResourceAttributesCount = "otel.resource.dropped_attributes_count" + attributeDroppedSpanAttributesCount = "otel.span.dropped_attributes_count" + attributeDroppedEventAttributesCount = "otel.event.dropped_attributes_count" + attributeDroppedEventsCount = "otel.span.dropped_events_count" + attributeDroppedLinkAttributesCount = "otel.link.dropped_attributes_count" + attributeDroppedLinksCount = "otel.span.dropped_links_count" attributeLinkedTraceID = "linked_trace_id" attributeLinkedSpanID = "linked_span_id" - attributeDroppedLinksCount = "dropped_links_count" - attributeStatusCode = "status_code" - attributeStatusMessage = "status_message" - attributeInstrumentationLibraryName = "instrumentation_library_name" - attributeInstrumentationLibraryVersion = "instrumentation_library_version" + attributeStatusCode = "otel.status_code" + attributeStatusCodeOK = "OK" + attributeStatusCodeError = "ERROR" + attributeStatusMessage = "otel.status_description" + attributeInstrumentationLibraryName = "otel.library.name" + attributeInstrumentationLibraryVersion = "otel.library.version" attributeSeverityNumber = "severity_number" attributeSeverityText = "severity_text" attributeBody = "body" diff --git a/otel2influx/converter_logs.go b/otel2influx/converter_logs.go index d5a9c110..d46eee4d 100644 --- a/otel2influx/converter_logs.go +++ b/otel2influx/converter_logs.go @@ -85,7 +85,7 @@ func (c *OpenTelemetryToInfluxConverter) writeLogRecord(ctx context.Context, res } } if droppedAttributesCount > 0 { - fields[attributeDroppedAttributesCount] = droppedAttributesCount + fields[attributeDroppedSpanAttributesCount] = droppedAttributesCount } if err := w.WritePoint(ctx, measurement, tags, fields, ts); err != nil { diff --git a/otel2influx/converter_traces.go b/otel2influx/converter_traces.go index 1bbd8621..63bc7137 100644 --- a/otel2influx/converter_traces.go +++ b/otel2influx/converter_traces.go @@ -89,7 +89,7 @@ func (c *OpenTelemetryToInfluxConverter) writeSpan(ctx context.Context, resource } } if droppedAttributesCount > 0 { - fields[attributeDroppedAttributesCount] = droppedAttributesCount + fields[attributeDroppedSpanAttributesCount] = droppedAttributesCount } droppedEventsCount := uint64(span.DroppedEventsCount) @@ -119,8 +119,14 @@ func (c *OpenTelemetryToInfluxConverter) writeSpan(ctx context.Context, resource } if status := span.Status; status != nil { - if code := status.Code; code != otlptrace.Status_STATUS_CODE_UNSET { - fields[attributeStatusCode] = code.String() + switch status.Code { + case otlptrace.Status_STATUS_CODE_UNSET: + case otlptrace.Status_STATUS_CODE_OK: + fields[attributeStatusCode] = attributeStatusCodeOK + case otlptrace.Status_STATUS_CODE_ERROR: + fields[attributeStatusCode] = attributeStatusCodeError + default: + c.logger.Debug("status code not recognized: %q", status.Code) } if message := status.Message; message != "" { @@ -166,7 +172,7 @@ func (c *OpenTelemetryToInfluxConverter) spanEventToLP(traceID, spanID string, r } } if droppedAttributesCount > 0 { - fields[attributeDroppedAttributesCount] = droppedAttributesCount + fields[attributeDroppedEventAttributesCount] = droppedAttributesCount } if len(fields) == 0 { @@ -222,7 +228,7 @@ func (c *OpenTelemetryToInfluxConverter) spanLinkToLP(traceID, spanID string, sp } } if droppedAttributesCount > 0 { - fields[attributeDroppedAttributesCount] = droppedAttributesCount + fields[attributeDroppedLinkAttributesCount] = droppedAttributesCount } if len(fields) == 0 { diff --git a/otel2influx/go.mod b/otel2influx/go.mod index 601246f8..a85f6305 100644 --- a/otel2influx/go.mod +++ b/otel2influx/go.mod @@ -4,7 +4,5 @@ go 1.15 require ( github.com/stretchr/testify v1.5.1 - go.opentelemetry.io/proto/otlp v0.0.0-00010101000000-000000000000 + go.opentelemetry.io/proto/otlp v0.7.0 ) - -replace go.opentelemetry.io/proto/otlp => github.com/open-telemetry/opentelemetry-proto-go/otlp v0.7.0 diff --git a/otel2influx/go.sum b/otel2influx/go.sum index 6edb1593..b2f9e2de 100644 --- a/otel2influx/go.sum +++ b/otel2influx/go.sum @@ -36,8 +36,6 @@ github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= -github.com/open-telemetry/opentelemetry-proto-go/otlp v0.7.0 h1:wN7DI+wygjS6jucEPEWqU3066+si49M5x6vF4zH/8Hk= -github.com/open-telemetry/opentelemetry-proto-go/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -45,6 +43,8 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +go.opentelemetry.io/proto/otlp v0.7.0 h1:rwOQPCuKAKmwGKq2aVNnYIibI6wnV7EvzgfTCzcdGg8= +go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=