Skip to content

Commit

Permalink
fix(otlp): replace otlp trace attr type from string to jsonb (#4918)
Browse files Browse the repository at this point in the history
* chore: minor update

* chore: replace otlp trace attr type from string to jsonb

* chore: add new util file and remove useless code

* chore: add license header

* chore: remove unused error

* chore: adjust otlp traces column order

* chore: update test

* chore: minor fix

---------

Co-authored-by: shuiyisong <xixing.sys@gmail.com>
  • Loading branch information
paomian and shuiyisong authored Nov 8, 2024
1 parent 1a02fc3 commit 0e0c4fa
Show file tree
Hide file tree
Showing 15 changed files with 394 additions and 208 deletions.
10 changes: 2 additions & 8 deletions src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use pipeline::PipelineWay;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
use servers::otlp::plugin::TraceParserRef;
use servers::query_handler::OpenTelemetryProtocolHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
Expand Down Expand Up @@ -64,6 +63,7 @@ impl OpenTelemetryProtocolHandler for Instance {
async fn traces(
&self,
request: ExportTraceServiceRequest,
table_name: String,
ctx: QueryContextRef,
) -> ServerResult<Output> {
self.plugins
Expand All @@ -77,13 +77,7 @@ impl OpenTelemetryProtocolHandler for Instance {
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;

let (table_name, spans) = match self.plugins.get::<TraceParserRef>() {
Some(parser) => (parser.table_name(), parser.parse(request)),
None => (
otlp::trace::TRACE_TABLE_NAME.to_string(),
otlp::trace::parse(request),
),
};
let spans = otlp::trace::parse(request);

let (requests, rows) = otlp::trace::to_grpc_insert_requests(table_name, spans)?;

Expand Down
11 changes: 10 additions & 1 deletion src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Invalid table name"))]
InvalidTableName {
#[snafu(source)]
error: tonic::metadata::errors::ToStrError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to initialize a watcher for file {}", path))]
FileWatch {
path: String,
Expand Down Expand Up @@ -620,7 +628,8 @@ impl ErrorExt for Error {
| UnsupportedContentType { .. }
| TimestampOverflow { .. }
| OpenTelemetryLog { .. }
| UnsupportedJsonDataTypeForTag { .. } => StatusCode::InvalidArguments,
| UnsupportedJsonDataTypeForTag { .. }
| InvalidTableName { .. } => StatusCode::InvalidArguments,

Catalog { source, .. } => source.status_code(),
RowWriter { source, .. } => source.status_code(),
Expand Down
16 changes: 13 additions & 3 deletions src/servers/src/grpc/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use session::context::{Channel, QueryContext};
use snafu::OptionExt;
use snafu::{OptionExt, ResultExt};
use tonic::{Request, Response, Status};

use crate::error;
use crate::http::header::constants::GREPTIME_TRACE_TABLE_NAME_HEADER_NAME;
use crate::otlp::trace::TRACE_TABLE_NAME;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;

pub struct OtlpService {
Expand All @@ -46,7 +48,15 @@ impl TraceService for OtlpService {
&self,
request: Request<ExportTraceServiceRequest>,
) -> StdResult<Response<ExportTraceServiceResponse>, Status> {
let (_headers, extensions, req) = request.into_parts();
let (headers, extensions, req) = request.into_parts();

let table_name = match headers.get(GREPTIME_TRACE_TABLE_NAME_HEADER_NAME) {
Some(table_name) => table_name
.to_str()
.context(error::InvalidTableNameSnafu)?
.to_string(),
None => TRACE_TABLE_NAME.to_string(),
};

let mut ctx = extensions
.get::<QueryContext>()
Expand All @@ -55,7 +65,7 @@ impl TraceService for OtlpService {
ctx.set_channel(Channel::Otlp);
let ctx = Arc::new(ctx);

let _ = self.handler.traces(req, ctx).await?;
let _ = self.handler.traces(req, table_name, ctx).await?;

Ok(Response::new(ExportTraceServiceResponse {
partial_success: None,
Expand Down
1 change: 1 addition & 0 deletions src/servers/src/http/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub mod constants {
pub const GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-log-pipeline-version";
pub const GREPTIME_LOG_TABLE_NAME_HEADER_NAME: &str = "x-greptime-log-table-name";
pub const GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME: &str = "x-greptime-log-extract-keys";
pub const GREPTIME_TRACE_TABLE_NAME_HEADER_NAME: &str = "x-greptime-trace-table-name";
}

pub static GREPTIME_DB_HEADER_FORMAT: HeaderName =
Expand Down
109 changes: 68 additions & 41 deletions src/servers/src/http/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use axum::response::IntoResponse;
use axum::{async_trait, Extension};
use bytes::Bytes;
use common_telemetry::tracing;
use http::HeaderMap;
use opentelemetry_proto::tonic::collector::logs::v1::{
ExportLogsServiceRequest, ExportLogsServiceResponse,
};
Expand All @@ -41,11 +42,13 @@ use snafu::prelude::*;

use super::header::constants::GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME;
use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF};
use crate::error::{self, PipelineSnafu, Result};
use crate::error::{self, InvalidUtf8ValueSnafu, PipelineSnafu, Result};
use crate::http::header::constants::{
GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
GREPTIME_LOG_TABLE_NAME_HEADER_NAME, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
};
use crate::otlp::logs::LOG_TABLE_NAME;
use crate::otlp::trace::TRACE_TABLE_NAME;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;

#[axum_macros::debug_handler]
Expand Down Expand Up @@ -80,10 +83,18 @@ pub async fn metrics(
#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))]
pub async fn traces(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
header: HeaderMap,
Extension(mut query_ctx): Extension<QueryContext>,
bytes: Bytes,
) -> Result<OtlpResponse<ExportTraceServiceResponse>> {
let db = query_ctx.get_db_string();
let table_name = extract_string_value_from_header(
&header,
GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
Some(TRACE_TABLE_NAME),
)?
// safety here, we provide default value for table_name
.unwrap();
query_ctx.set_channel(Channel::Otlp);
let query_ctx = Arc::new(query_ctx);
let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED
Expand All @@ -92,7 +103,7 @@ pub async fn traces(
let request =
ExportTraceServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
handler
.traces(request, query_ctx)
.traces(request, table_name, query_ctx)
.await
.map(|o| OtlpResponse {
resp_body: ExportTraceServiceResponse {
Expand All @@ -107,17 +118,31 @@ pub struct PipelineInfo {
pub pipeline_version: Option<String>,
}

fn pipeline_header_error(
header: &HeaderValue,
key: &str,
) -> StdResult<String, (http::StatusCode, String)> {
let header_utf8 = str::from_utf8(header.as_bytes());
match header_utf8 {
Ok(s) => Ok(s.to_string()),
Err(_) => Err((
fn parse_header_value_to_string(header: &HeaderValue) -> Result<String> {
String::from_utf8(header.as_bytes().to_vec()).context(InvalidUtf8ValueSnafu)
}

fn extract_string_value_from_header(
headers: &HeaderMap,
header: &str,
default_table_name: Option<&str>,
) -> Result<Option<String>> {
let table_name = headers.get(header);
match table_name {
Some(name) => parse_header_value_to_string(name).map(Some),
None => match default_table_name {
Some(name) => Ok(Some(name.to_string())),
None => Ok(None),
},
}
}

fn utf8_error(header_name: &str) -> impl Fn(error::Error) -> (StatusCode, String) + use<'_> {
move |_| {
(
StatusCode::BAD_REQUEST,
format!("`{}` header is not valid UTF-8 string type.", key),
)),
format!("`{}` header is not valid UTF-8 string type.", header_name),
)
}
}

Expand All @@ -129,28 +154,27 @@ where
type Rejection = (StatusCode, String);

async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let pipeline_name = parts.headers.get(GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME);
let pipeline_version = parts.headers.get(GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME);
let headers = &parts.headers;
let pipeline_name =
extract_string_value_from_header(headers, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, None)
.map_err(utf8_error(GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME))?;
let pipeline_version = extract_string_value_from_header(
headers,
GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
None,
)
.map_err(utf8_error(GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME))?;
match (pipeline_name, pipeline_version) {
(Some(name), Some(version)) => Ok(PipelineInfo {
pipeline_name: Some(pipeline_header_error(
name,
GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
)?),
pipeline_version: Some(pipeline_header_error(
version,
GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
)?),
pipeline_name: Some(name),
pipeline_version: Some(version),
}),
(None, _) => Ok(PipelineInfo {
pipeline_name: None,
pipeline_version: None,
}),
(Some(name), None) => Ok(PipelineInfo {
pipeline_name: Some(pipeline_header_error(
name,
GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
)?),
pipeline_name: Some(name),
pipeline_version: None,
}),
}
Expand All @@ -169,16 +193,16 @@ where
type Rejection = (StatusCode, String);

async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let table_name = parts.headers.get(GREPTIME_LOG_TABLE_NAME_HEADER_NAME);
let table_name = extract_string_value_from_header(
&parts.headers,
GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
Some(LOG_TABLE_NAME),
)
.map_err(utf8_error(GREPTIME_LOG_TABLE_NAME_HEADER_NAME))?
// safety here, we provide default value for table_name
.unwrap();

match table_name {
Some(name) => Ok(TableInfo {
table_name: pipeline_header_error(name, GREPTIME_LOG_TABLE_NAME_HEADER_NAME)?,
}),
None => Ok(TableInfo {
table_name: "opentelemetry_logs".to_string(),
}),
}
Ok(TableInfo { table_name })
}
}

Expand All @@ -192,16 +216,19 @@ where
type Rejection = (StatusCode, String);

async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let select = parts.headers.get(GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME);
let select = extract_string_value_from_header(
&parts.headers,
GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME,
None,
)
.map_err(utf8_error(GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME))?;

match select {
Some(name) => {
let select_header =
pipeline_header_error(name, GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME)?;
if select_header.is_empty() {
if name.is_empty() {
Ok(SelectInfoWrapper(Default::default()))
} else {
Ok(SelectInfoWrapper(SelectInfo::from(select_header)))
Ok(SelectInfoWrapper(SelectInfo::from(name)))
}
}
None => Ok(SelectInfoWrapper(Default::default())),
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@

pub mod logs;
pub mod metrics;
pub mod plugin;
pub mod trace;
mod utils;
41 changes: 3 additions & 38 deletions src/servers/src/otlp/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ use pipeline::{Array, Map, PipelineWay, SchemaInfo, SelectInfo, Value as Pipelin
use snafu::{ensure, ResultExt};

use super::trace::attributes::OtlpAnyValue;
use super::utils::{bytes_to_hex_string, key_value_to_jsonb};
use crate::error::{
IncompatibleSchemaSnafu, OpenTelemetryLogSnafu, Result, UnsupportedJsonDataTypeForTagSnafu,
};
use crate::otlp::trace::span::bytes_to_hex_string;

pub const LOG_TABLE_NAME: &str = "opentelemetry_logs";

/// Convert OpenTelemetry metrics to GreptimeDB insert requests
///
Expand Down Expand Up @@ -772,43 +774,6 @@ fn key_value_to_map(key_values: Vec<KeyValue>) -> BTreeMap<String, PipelineValue
map
}

fn any_value_to_jsonb(value: any_value::Value) -> JsonbValue<'static> {
match value {
any_value::Value::StringValue(s) => JsonbValue::String(s.into()),
any_value::Value::IntValue(i) => JsonbValue::Number(JsonbNumber::Int64(i)),
any_value::Value::DoubleValue(d) => JsonbValue::Number(JsonbNumber::Float64(d)),
any_value::Value::BoolValue(b) => JsonbValue::Bool(b),
any_value::Value::ArrayValue(a) => {
let values = a
.values
.into_iter()
.map(|v| match v.value {
Some(value) => any_value_to_jsonb(value),
None => JsonbValue::Null,
})
.collect();
JsonbValue::Array(values)
}
any_value::Value::KvlistValue(kv) => key_value_to_jsonb(kv.values),
any_value::Value::BytesValue(b) => JsonbValue::String(bytes_to_hex_string(&b).into()),
}
}

fn key_value_to_jsonb(key_values: Vec<KeyValue>) -> JsonbValue<'static> {
let mut map = BTreeMap::new();
for kv in key_values {
let value = match kv.value {
Some(value) => match value.value {
Some(value) => any_value_to_jsonb(value),
None => JsonbValue::Null,
},
None => JsonbValue::Null,
};
map.insert(kv.key.clone(), value);
}
JsonbValue::Object(map)
}

fn log_body_to_string(body: &AnyValue) -> String {
let otlp_value = OtlpAnyValue::from(body);
otlp_value.to_string()
Expand Down
28 changes: 0 additions & 28 deletions src/servers/src/otlp/plugin.rs

This file was deleted.

Loading

0 comments on commit 0e0c4fa

Please sign in to comment.