Skip to content

Commit

Permalink
#000: Added a metrics registry class to register all the metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Manjunath authored and Manjunath committed Nov 25, 2024
1 parent 1861d94 commit 0b056f3
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.sunbird.obsrv.core.otel

import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.metrics.{LongCounter, Meter}

object MetricRegistry {
private val oTel: OpenTelemetry = OTelService.init()
private val meter: Meter = oTel.meterBuilder("obsrv-pipeline").build()

val errorCount: LongCounter = meter.counterBuilder("event.error.count")
.setDescription("Dataset Error Event Count")
.setUnit("1")
.build()

val processingTimeCounter: LongCounter = meter.counterBuilder("pipeline.processing.time")
.setDescription("Processing Time")
.setUnit("ms")
.build()

val totalProcessingTimeCounter: LongCounter = meter.counterBuilder("pipeline.total.processing.time")
.setDescription("Total Processing Time")
.setUnit("ms")
.build()

val latencyTimeCounter: LongCounter = meter.counterBuilder("pipeline.latency.time")
.setDescription("Latency Time")
.setUnit("ms")
.build()

val extractorEventCounter: LongCounter = meter.counterBuilder("pipeline.extractor.events.count")
.setDescription("Count of Extractor Events")
.setUnit("1")
.build()

val extractorTimeCounter: LongCounter = meter.counterBuilder("pipeline.extractor.time")
.setDescription("Extractor Processing Time")
.setUnit("ms")
.build()

val transformStatusCounter: LongCounter = meter.counterBuilder("pipeline.transform.status")
.setDescription("Data transform Status")
.setUnit("1")
.build()

val transformTimeCounter: LongCounter = meter.counterBuilder("pipeline.transform.time")
.setDescription("Transformation Processing Time")
.setUnit("ms")
.build()

val denormStatusCounter: LongCounter = meter.counterBuilder("pipeline.denorm.status")
.setDescription("Denorm Status")
.setUnit("1")
.build()

val denormTimeCounter: LongCounter = meter.counterBuilder("pipeline.denorm.time")
.setDescription("Denormalization Processing Time")
.setUnit("ms")
.build()

val dedupStatusCounter: LongCounter = meter.counterBuilder("pipeline.de-dup.status")
.setDescription("De-dup Status")
.setUnit("1")
.build()

val dedupTimeCounter: LongCounter = meter.counterBuilder("pipeline.dedup.time")
.setDescription("Deduplication Processing Time")
.setUnit("ms")
.build()

val validatorTimeCounter: LongCounter = meter.counterBuilder("pipeline.validator.time")
.setDescription("Validator Processing Time")
.setUnit("ms")
.build()

val validatorStatusCounter: LongCounter = meter.counterBuilder("pipeline.validator.status")
.setDescription("Validator Status")
.setUnit("1")
.build()

val extractorStatusCounter: LongCounter = meter.counterBuilder("pipeline.extractor.status")
.setDescription("Extractor Status")
.setUnit("1")
.build()

}

Original file line number Diff line number Diff line change
Expand Up @@ -45,76 +45,63 @@ object OTelMetricsGenerator {

// Extractor Job Metrics
stats.extractor_events.foreach { events =>
val extractorEventCounter: LongCounter = meter.counterBuilder("pipeline.extractor.events.count").setDescription("Count of Extractor Events").setUnit("1").build()
extractorEventCounter.add(events.toLong, contextAttributes)
MetricRegistry.extractorEventCounter.add(events.toLong, contextAttributes)
}
stats.extractor_time.foreach { time =>
val extractorTimeCounter: LongCounter = meter.counterBuilder("pipeline.extractor.time").setDescription("Extractor Processing Time").setUnit("ms").build()
extractorTimeCounter.add(time, contextAttributes)
MetricRegistry.extractorTimeCounter.add(time, contextAttributes)
}

stats.extractor_status.foreach { status =>
val extractorStatusCounter: LongCounter = meter.counterBuilder("pipeline.extractor.status").setDescription("Extractor Status").setUnit("1").build()
extractorStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build())
MetricRegistry.extractorStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build())
}

// Schema Validator Metrics
stats.validator_status.foreach { status =>
val validatorStatusCounter: LongCounter = meter.counterBuilder("pipeline.validator.status").setDescription("Validator Status").setUnit("1").build()
validatorStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build())
MetricRegistry.validatorStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build())
}
stats.validator_time.foreach { time =>
val validatorTimeCounter: LongCounter = meter.counterBuilder("pipeline.validator.time").setDescription("Validator Processing Time").setUnit("ms").build()
validatorTimeCounter.add(time, contextAttributes)
MetricRegistry.validatorTimeCounter.add(time, contextAttributes)
}

// De-Duplication Metrics
stats.dedup_time.foreach { time =>
val dedupTimeCounter: LongCounter = meter.counterBuilder("pipeline.dedup.time").setDescription("Deduplication Processing Time").setUnit("ms").build()
dedupTimeCounter.add(time, contextAttributes)
MetricRegistry.dedupTimeCounter.add(time, contextAttributes)
}
stats.dedup_status.foreach { status =>
val dedupStatusCounter: LongCounter = meter.counterBuilder("pipeline.de-dup.status").setDescription("De-dup Status").setUnit("1").build()
dedupStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build()
MetricRegistry.dedupStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build()
)
}

// De-normalisation Metrics

stats.denorm_time.foreach { time =>
val denormTimeCounter: LongCounter = meter.counterBuilder("pipeline.denorm.time").setDescription("Denormalization Processing Time").setUnit("ms").build()
denormTimeCounter.add(time, contextAttributes)
MetricRegistry.denormTimeCounter.add(time, contextAttributes)
}

stats.denorm_status.foreach { status =>
val denormStatusCounter: LongCounter = meter.counterBuilder("pipeline.denorm.status").setDescription("Denorm Status").setUnit("1").build()
denormStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build())
MetricRegistry.denormStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build())
}

// Data transformation Metrics
stats.transform_time.foreach { time =>
val transformTimeCounter: LongCounter = meter.counterBuilder("pipeline.transform.time").setDescription("Transformation Processing Time").setUnit("ms").build()
transformTimeCounter.add(time, contextAttributes)
MetricRegistry.transformTimeCounter.add(time, contextAttributes)
}
stats.transform_status.foreach { status =>
val transformStatusCounter: LongCounter = meter.counterBuilder("pipeline.transform.status").setDescription("Data transform Status").setUnit("1").build()
transformStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build())
MetricRegistry.transformStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build())
}

// Common timestamp Metrics
stats.total_processing_time.foreach { time =>
val totalProcessingTimeCounter: LongCounter = meter.counterBuilder("pipeline.total.processing.time").setDescription("Total Processing Time").setUnit("ms").build()
totalProcessingTimeCounter.add(time, contextAttributes)
//val totalProcessingTimeCounter: LongCounter = meter.counterBuilder("pipeline.total.processing.time").setDescription("Total Processing Time").setUnit("ms").build()
MetricRegistry.totalProcessingTimeCounter.add(time, contextAttributes)
}

stats.latency_time.foreach { time =>
val latencyTimeCounter: LongCounter = meter.counterBuilder("pipeline.latency.time").setDescription("Latency Time").setUnit("ms").build()
latencyTimeCounter.add(time, contextAttributes)
MetricRegistry.latencyTimeCounter.add(time, contextAttributes)
}

stats.processing_time.foreach { time =>
val processingTimeCounter: LongCounter = meter.counterBuilder("pipeline.processing.time").setDescription("Processing Time").setUnit("ms").build()
processingTimeCounter.add(time, contextAttributes)
//val processingTimeCounter: LongCounter = meter.counterBuilder("pipeline.processing.time").setDescription("Processing Time").setUnit("ms").build()
MetricRegistry.processingTimeCounter.add(time, contextAttributes)
}
}

Expand Down

0 comments on commit 0b056f3

Please sign in to comment.