Skip to content

Commit

Permalink
#I250: Added a flags to enable or disable of generating a open teleme…
Browse files Browse the repository at this point in the history
…try events
  • Loading branch information
Manjunath authored and Manjunath committed Dec 2, 2024
1 parent 9705f92 commit 85c1125
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 115 deletions.
3 changes: 2 additions & 1 deletion framework/src/main/resources/baseconfig.conf
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,5 @@ postgres {
database = "postgres"
}

otel.collector.endpoint="http://0.0.0.0:4317"
otel.collector.endpoint="http://0.0.0.0:4317"
otel.enable=false
Original file line number Diff line number Diff line change
Expand Up @@ -2,85 +2,38 @@ package org.sunbird.obsrv.core.otel

import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.metrics.{LongCounter, Meter}
import com.typesafe.config.Config

object MetricRegistry {
private val oTel: OpenTelemetry = OTelService.init()
private val meter: Meter = oTel.meterBuilder("obsrv-pipeline").build()

val errorCount: LongCounter = meter.counterBuilder("event.error.count")
.setDescription("Dataset Error Event Count")
.setUnit("1")
.build()

val processingTimeCounter: LongCounter = meter.counterBuilder("pipeline.processing.time")
.setDescription("Processing Time")
.setUnit("ms")
.build()

val totalProcessingTimeCounter: LongCounter = meter.counterBuilder("pipeline.total.processing.time")
.setDescription("Total Processing Time")
.setUnit("ms")
.build()

val latencyTimeCounter: LongCounter = meter.counterBuilder("pipeline.latency.time")
.setDescription("Latency Time")
.setUnit("ms")
.build()

val extractorEventCounter: LongCounter = meter.counterBuilder("pipeline.extractor.events.count")
.setDescription("Count of Extractor Events")
.setUnit("1")
.build()

val extractorTimeCounter: LongCounter = meter.counterBuilder("pipeline.extractor.time")
.setDescription("Extractor Processing Time")
.setUnit("ms")
.build()

val transformStatusCounter: LongCounter = meter.counterBuilder("pipeline.transform.status")
.setDescription("Data transform Status")
.setUnit("1")
.build()

val transformTimeCounter: LongCounter = meter.counterBuilder("pipeline.transform.time")
.setDescription("Transformation Processing Time")
.setUnit("ms")
.build()

val denormStatusCounter: LongCounter = meter.counterBuilder("pipeline.denorm.status")
.setDescription("Denorm Status")
.setUnit("1")
.build()

val denormTimeCounter: LongCounter = meter.counterBuilder("pipeline.denorm.time")
.setDescription("Denormalization Processing Time")
.setUnit("ms")
.build()

val dedupStatusCounter: LongCounter = meter.counterBuilder("pipeline.de-dup.status")
.setDescription("De-dup Status")
.setUnit("1")
.build()

val dedupTimeCounter: LongCounter = meter.counterBuilder("pipeline.dedup.time")
.setDescription("Deduplication Processing Time")
.setUnit("ms")
.build()

val validatorTimeCounter: LongCounter = meter.counterBuilder("pipeline.validator.time")
.setDescription("Validator Processing Time")
.setUnit("ms")
.build()

val validatorStatusCounter: LongCounter = meter.counterBuilder("pipeline.validator.status")
.setDescription("Validator Status")
.setUnit("1")
.build()

val extractorStatusCounter: LongCounter = meter.counterBuilder("pipeline.extractor.status")
.setDescription("Extractor Status")
.setUnit("1")
.build()

private val config: Config = OTelService.getConfig
private val oTel: Option[OpenTelemetry] =
if (config.getBoolean("otel.enable")) OTelService.init()
else None

private val meter: Option[Meter] = oTel.map(_.meterBuilder("obsrv-pipeline").build())

// Helper method to register a metric
private def register(name: String, description: String, unit: String): Option[LongCounter] = {
meter.map(_.counterBuilder(name)
.setDescription(description)
.setUnit(unit)
.build())
}

// Metric definitions using the helper method
val errorCount: Option[LongCounter] = register("event.error.count", "Dataset Error Event Count", "1")
val processingTimeCounter: Option[LongCounter] = register("pipeline.processing.time", "Processing Time", "ms")
val totalProcessingTimeCounter: Option[LongCounter] = register("pipeline.total.processing.time", "Total Processing Time", "ms")
val latencyTimeCounter: Option[LongCounter] = register("pipeline.latency.time", "Latency Time", "ms")
val extractorEventCounter: Option[LongCounter] = register("pipeline.extractor.events.count", "Count of Extractor Events", "1")
val extractorTimeCounter: Option[LongCounter] = register("pipeline.extractor.time", "Extractor Processing Time", "ms")
val transformStatusCounter: Option[LongCounter] = register("pipeline.transform.status", "Data Transform Status", "1")
val transformTimeCounter: Option[LongCounter] = register("pipeline.transform.time", "Transformation Processing Time", "ms")
val denormStatusCounter: Option[LongCounter] = register("pipeline.denorm.status", "Denormalization Status", "1")
val denormTimeCounter: Option[LongCounter] = register("pipeline.denorm.time", "Denormalization Processing Time", "ms")
val dedupStatusCounter: Option[LongCounter] = register("pipeline.dedup.status", "Deduplication Status", "1")
val dedupTimeCounter: Option[LongCounter] = register("pipeline.dedup.time", "Deduplication Processing Time", "ms")
val validatorTimeCounter: Option[LongCounter] = register("pipeline.validator.time", "Validator Processing Time", "ms")
val validatorStatusCounter: Option[LongCounter] = register("pipeline.validator.status", "Validator Status", "1")
val extractorStatusCounter: Option[LongCounter] = register("pipeline.extractor.status", "Extractor Status", "1")
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,24 @@ import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.{LongCounter, Meter}
import org.sunbird.obsrv.core.model.Models.SystemEvent
import com.typesafe.config.{Config, ConfigFactory}

object OTelMetricsGenerator {

private val oTel: OpenTelemetry = OTelService.init()
private val config: Config = OTelService.getConfig
private val oTel: Option[OpenTelemetry] = if (config.getBoolean("otel.enable")) OTelService.init() else None

def generateOTelSystemEventMetric(systemEvent: SystemEvent): Unit = {
val meter: Meter = oTel.meterBuilder("obsrv-pipeline").build()
oTel match {
case Some(openTelemetry) =>
generate(systemEvent, openTelemetry)
case None =>
println("OpenTelemetry is disabled. No metrics generated.")
}
}

def generate(systemEvent: SystemEvent, openTelemetry: OpenTelemetry): Unit = {
val meter: Meter = openTelemetry.meterBuilder("obsrv-pipeline").build()
val errorCount: LongCounter = meter.counterBuilder("event.error.count")
.setDescription("Dataset Error Event Count")
.setUnit("1")
Expand All @@ -25,8 +36,8 @@ object OTelMetricsGenerator {
.put("ctx.dataset_type", systemEvent.ctx.dataset_type.getOrElse("unknown"))
.build()

// Handle error events and add them to the error count
systemEvent.data.error.foreach { errorLog =>
// Create attributes from the ErrorLog
val errorAttributes: Attributes = Attributes.builder()
.put("error.pdata_id", errorLog.pdata_id.toString)
.put("error.pdata_status", errorLog.pdata_status.toString)
Expand All @@ -41,69 +52,95 @@ object OTelMetricsGenerator {
errorCount.add(1, errorAttributes)
}

// Handle pipeline stats
systemEvent.data.pipeline_stats.foreach { stats =>

// Extractor Job Metrics
stats.extractor_events.foreach { events =>
MetricRegistry.extractorEventCounter.add(events.toLong, contextAttributes)
MetricRegistry.extractorEventCounter.foreach { counter =>
counter.add(events.toLong, contextAttributes) // Correct way to add to counter
}
}

stats.extractor_time.foreach { time =>
MetricRegistry.extractorTimeCounter.add(time, contextAttributes)
MetricRegistry.extractorTimeCounter.foreach { counter =>
counter.add(time, contextAttributes)
}
}

stats.extractor_status.foreach { status =>
MetricRegistry.extractorStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build())
MetricRegistry.extractorStatusCounter.foreach { counter =>
counter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build())
}
}

// Schema Validator Metrics
stats.validator_status.foreach { status =>
MetricRegistry.validatorStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build())
MetricRegistry.validatorStatusCounter.foreach { counter =>
counter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build())
}
}
stats.validator_time.foreach { time =>
MetricRegistry.validatorTimeCounter.add(time, contextAttributes)
MetricRegistry.validatorTimeCounter.foreach { counter =>
counter.add(time, contextAttributes)
}
}

// De-Duplication Metrics
stats.dedup_time.foreach { time =>
MetricRegistry.dedupTimeCounter.add(time, contextAttributes)
MetricRegistry.dedupTimeCounter.foreach { counter =>
counter.add(time, contextAttributes)
}
}

stats.dedup_status.foreach { status =>
MetricRegistry.dedupStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build()
)
MetricRegistry.dedupStatusCounter.foreach { counter =>
counter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build())
}
}

// De-normalisation Metrics

stats.denorm_time.foreach { time =>
MetricRegistry.denormTimeCounter.add(time, contextAttributes)
MetricRegistry.denormTimeCounter.foreach { counter =>
counter.add(time, contextAttributes)
}
}

stats.denorm_status.foreach { status =>
MetricRegistry.denormStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build())
MetricRegistry.denormStatusCounter.foreach { counter =>
counter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build())
}
}

// Data transformation Metrics
stats.transform_time.foreach { time =>
MetricRegistry.transformTimeCounter.add(time, contextAttributes)
MetricRegistry.transformTimeCounter.foreach { counter =>
counter.add(time, contextAttributes)
}
}
stats.transform_status.foreach { status =>
MetricRegistry.transformStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build())
MetricRegistry.transformStatusCounter.foreach { counter =>
counter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build())
}
}

// Common timestamp Metrics
stats.total_processing_time.foreach { time =>
//val totalProcessingTimeCounter: LongCounter = meter.counterBuilder("pipeline.total.processing.time").setDescription("Total Processing Time").setUnit("ms").build()
MetricRegistry.totalProcessingTimeCounter.add(time, contextAttributes)
MetricRegistry.totalProcessingTimeCounter.foreach { counter =>
counter.add(time, contextAttributes)
}
}

stats.latency_time.foreach { time =>
MetricRegistry.latencyTimeCounter.add(time, contextAttributes)
MetricRegistry.latencyTimeCounter.foreach { counter =>
counter.add(time, contextAttributes)
}
}

stats.processing_time.foreach { time =>
//val processingTimeCounter: LongCounter = meter.counterBuilder("pipeline.processing.time").setDescription("Processing Time").setUnit("ms").build()
MetricRegistry.processingTimeCounter.add(time, contextAttributes)
MetricRegistry.processingTimeCounter.foreach { counter =>
counter.add(time, contextAttributes)
}
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,31 @@ object OTelService {
ConfigFactory.load("baseconfig.conf").withFallback(ConfigFactory.systemEnvironment())
}

def init(): OpenTelemetry = {
def init(): Option[OpenTelemetry] = {
val collectorEndpoint: String = config.getString("otel.collector.endpoint")
val tracerProvider = createTracerProvider()
val meterProvider = createMeterProvider(createOtlpMetricExporter(collectorEndpoint))
val loggerProvider = createLoggerProvider(collectorEndpoint)
val enable: Boolean = config.getBoolean("otel.enable")
if (enable) {
val tracerProvider = createTracerProvider()
val meterProvider = createMeterProvider(createOtlpMetricExporter(collectorEndpoint))
val loggerProvider = createLoggerProvider(collectorEndpoint)

// Build the OpenTelemetry SDK
val openTelemetry = OpenTelemetrySdk.builder()
.setTracerProvider(tracerProvider)
.setMeterProvider(meterProvider)
.setLoggerProvider(loggerProvider)
.build()
// Build the OpenTelemetry SDK
val openTelemetry = OpenTelemetrySdk.builder()
.setTracerProvider(tracerProvider)
.setMeterProvider(meterProvider)
.setLoggerProvider(loggerProvider)
.build()

sys.addShutdownHook(openTelemetry.close())
openTelemetry
sys.addShutdownHook(openTelemetry.close())
println("Open Telemetry is Enabled")
Some(openTelemetry)
} else {
println("Open Telemetry is Disabled")
None // Return None if OpenTelemetry is not enabled
}
}


private def createOtlpMetricExporter(endpoint: String): OtlpGrpcMetricExporter = {
OtlpGrpcMetricExporter.builder()
.setEndpoint(endpoint)
Expand Down Expand Up @@ -84,4 +92,8 @@ object OTelService {
.put(ResourceAttributes.SERVICE_NAME, serviceName)
.build()
}

def getConfig: Config = {
this.config
}
}

0 comments on commit 85c1125

Please sign in to comment.