diff --git a/framework/src/main/resources/baseconfig.conf b/framework/src/main/resources/baseconfig.conf index dd62e21e..5c565253 100644 --- a/framework/src/main/resources/baseconfig.conf +++ b/framework/src/main/resources/baseconfig.conf @@ -7,8 +7,8 @@ kafka { linger.ms = 10 compression = "snappy" } - output.system.event.topic = ${job.env}".system.events" - output.failed.topic = ${job.env}".failed" + output.system.event.topic = "system.events" + output.failed.topic = "failed" } job { diff --git a/pipeline/cache-indexer/src/main/resources/cache-indexer.conf b/pipeline/cache-indexer/src/main/resources/cache-indexer.conf index 58a9c9d1..3d11e25c 100644 --- a/pipeline/cache-indexer/src/main/resources/cache-indexer.conf +++ b/pipeline/cache-indexer/src/main/resources/cache-indexer.conf @@ -1,8 +1,8 @@ include "baseconfig.conf" kafka { - output.failed.topic = ${job.env}".masterdata.failed" - groupId = ${job.env}"-cache-indexer-group" + output.failed.topic = "masterdata.failed" + groupId = "cache-indexer-group" producer { max-request-size = 5242880 } diff --git a/pipeline/cache-indexer/src/test/resources/test.conf b/pipeline/cache-indexer/src/test/resources/test.conf index 7861c8d0..dda6e65f 100644 --- a/pipeline/cache-indexer/src/test/resources/test.conf +++ b/pipeline/cache-indexer/src/test/resources/test.conf @@ -2,7 +2,7 @@ include "base-test.conf" kafka { - output.failed.topic = ${job.env}".masterdata.failed" + output.failed.topic = "masterdata.failed" groupId = ${job.env}"-cache-indexer-group" producer { max-request-size = 5242880 diff --git a/pipeline/dataset-router/src/main/resources/dataset-router.conf b/pipeline/dataset-router/src/main/resources/dataset-router.conf index e0091ef6..2768bfe6 100644 --- a/pipeline/dataset-router/src/main/resources/dataset-router.conf +++ b/pipeline/dataset-router/src/main/resources/dataset-router.conf @@ -1,9 +1,9 @@ include "baseconfig.conf" kafka { - input.topic = ${job.env}".transform" - stats.topic = ${job.env}".stats" - groupId = ${job.env}"-druid-router-group" + input.topic = "transform" + stats.topic = "stats" + groupId = "druid-router-group" } task { diff --git a/pipeline/denormalizer/src/main/resources/de-normalization.conf b/pipeline/denormalizer/src/main/resources/de-normalization.conf index 63b5bc9e..4a57e456 100644 --- a/pipeline/denormalizer/src/main/resources/de-normalization.conf +++ b/pipeline/denormalizer/src/main/resources/de-normalization.conf @@ -1,10 +1,10 @@ include "baseconfig.conf" kafka { - input.topic = ${job.env}".unique" - output.denorm.topic = ${job.env}".denorm" - output.denorm.failed.topic = ${job.env}".failed" - groupId = ${job.env}"-denormalizer-group" + input.topic = "unique" + output.denorm.topic = "denorm" + output.denorm.failed.topic = "failed" + groupId = "denormalizer-group" } task { diff --git a/pipeline/extractor/src/main/resources/extractor.conf b/pipeline/extractor/src/main/resources/extractor.conf index 103649d0..c4202fed 100644 --- a/pipeline/extractor/src/main/resources/extractor.conf +++ b/pipeline/extractor/src/main/resources/extractor.conf @@ -1,12 +1,12 @@ include "baseconfig.conf" kafka { - input.topic = ${job.env}".ingest" - output.raw.topic = ${job.env}".raw" - output.extractor.duplicate.topic = ${job.env}".failed" - output.batch.failed.topic = ${job.env}".failed" + input.topic = "ingest" + output.raw.topic = "raw" + output.extractor.duplicate.topic = "failed" + output.batch.failed.topic = "failed" event.max.size = "1048576" # Max is only 1MB - groupId = ${job.env}"-extractor-group" + groupId = "extractor-group" producer { max-request-size = 5242880 } diff --git a/pipeline/hudi-connector/src/main/resources/hudi-writer.conf b/pipeline/hudi-connector/src/main/resources/hudi-writer.conf index 1a8c63b3..093bb2a4 100644 --- a/pipeline/hudi-connector/src/main/resources/hudi-writer.conf +++ b/pipeline/hudi-connector/src/main/resources/hudi-writer.conf @@ -1,11 +1,11 @@ include "baseconfig.conf" kafka { - input.topic = ${job.env}".hudi.connector.in" - output.topic = ${job.env}".hudi.connector.out" - output.invalid.topic = ${job.env}".failed" + input.topic = "hudi.connector.in" + output.topic = "hudi.connector.out" + output.invalid.topic = "failed" event.max.size = "1048576" # Max is only 1MB - groupId = ${job.env}"-hudi-writer-group" + groupId = "hudi-writer-group" producer { max-request-size = 5242880 } diff --git a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala index 7585fede..f85979e6 100644 --- a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala +++ b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala @@ -1,63 +1,61 @@ package org.sunbird.obsrv.function import org.apache.flink.api.common.functions.RichMapFunction -import org.apache.flink.api.scala.metrics.ScalaGauge import org.apache.flink.configuration.Configuration import org.apache.flink.formats.common.TimestampFormat import org.apache.flink.formats.json.JsonToRowDataConverters -import org.apache.flink.metrics.SimpleCounter import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper -import org.sunbird.obsrv.util.{HudiSchemaParser, HudiSchemaSpec} import org.apache.flink.table.data.RowData import org.slf4j.LoggerFactory -import org.sunbird.obsrv.core.streaming.{JobMetrics, Metrics, MetricsList} -import org.sunbird.obsrv.core.util.{JSONUtil, Util} +import org.sunbird.obsrv.core.util.JSONUtil import org.sunbird.obsrv.streaming.HudiConnectorConfig +import org.sunbird.obsrv.util.{HMetrics, HudiSchemaParser, ScalaGauge} import scala.collection.mutable.{Map => MMap} -class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) extends RichMapFunction[MMap[String, AnyRef], RowData] with JobMetrics { +class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) + extends RichMapFunction[MMap[String, AnyRef], RowData] { - // Variables for converters and object mapper - var jsonToRowDataConverters: JsonToRowDataConverters = _ - var objectMapper: ObjectMapper = _ - var hudiSchemaParser: HudiSchemaParser = _ - - // Logger for logging private val logger = LoggerFactory.getLogger(classOf[RowDataConverterFunction]) - protected val metricsList: MetricsList = getMetricsList() - protected val metrics: Metrics = registerMetrics(metricsList.datasets, metricsList.metrics) - - - - // Metrics for time tracking - private var startTime: Long = _ + private var metrics: HMetrics = _ + private var jsonToRowDataConverters: JsonToRowDataConverters = _ + private var objectMapper: ObjectMapper = _ + private var hudiSchemaParser: HudiSchemaParser = _ override def open(parameters: Configuration): Unit = { super.open(parameters) - // Initialize required converters and parsers + metrics = new HMetrics() jsonToRowDataConverters = new JsonToRowDataConverters(false, true, TimestampFormat.SQL) objectMapper = new ObjectMapper() hudiSchemaParser = new HudiSchemaParser() - // Register metrics - getRuntimeContext.getMetricGroup.addGroup(config.jobName).addGroup(datasetId) - .counter(config.inputEventCountMetric, new SimpleCounter()) - getRuntimeContext.getMetricGroup.addGroup(config.jobName).addGroup(datasetId) - .counter(config.failedEventCountMetric, new SimpleCounter()) + getRuntimeContext.getMetricGroup + .addGroup(config.jobName) + .addGroup(datasetId) + .gauge[Long, ScalaGauge[Long]](config.inputEventCountMetric, ScalaGauge[Long](() => + metrics.getAndReset(datasetId, config.inputEventCountMetric) + )) + + getRuntimeContext.getMetricGroup + .addGroup(config.jobName) + .addGroup(datasetId) + .gauge[Long, ScalaGauge[Long]](config.failedEventCountMetric, ScalaGauge[Long](() => + metrics.getAndReset(datasetId, config.failedEventCountMetric) + )) } override def map(event: MMap[String, AnyRef]): RowData = { - startTime = System.currentTimeMillis() try { - metrics.incCounter(datasetId, config.inputEventCountMetric, event.size) + if (event.nonEmpty) { + metrics.increment(datasetId, config.inputEventCountMetric, 1) + } val rowData = convertToRowData(event) rowData } catch { case ex: Exception => - metrics.incCounter(datasetId, config.failedEventCountMetric) + metrics.increment(datasetId, config.failedEventCountMetric, 1) logger.error("Failed to process record", ex) throw ex } @@ -67,15 +65,8 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) e val eventJson = JSONUtil.serialize(data) val flattenedData = hudiSchemaParser.parseJson(datasetId, eventJson) val rowType = hudiSchemaParser.rowTypeMap(datasetId) - val converter: JsonToRowDataConverters.JsonToRowDataConverter = jsonToRowDataConverters.createRowConverter(rowType) - val rowData = converter.convert(objectMapper.readTree(JSONUtil.serialize(flattenedData))).asInstanceOf[RowData] - rowData - } - - def getMetricsList(): MetricsList = { - MetricsList( - datasets = List(datasetId), - metrics = List(config.inputEventCountMetric, config.failedEventCountMetric) - ) + val converter: JsonToRowDataConverters.JsonToRowDataConverter = + jsonToRowDataConverters.createRowConverter(rowType) + converter.convert(objectMapper.readTree(JSONUtil.serialize(flattenedData))).asInstanceOf[RowData] } -} +} \ No newline at end of file diff --git a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorConfig.scala b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorConfig.scala index aef0643f..1bf12167 100644 --- a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorConfig.scala +++ b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorConfig.scala @@ -63,13 +63,13 @@ class HudiConnectorConfig(override val config: Config) extends BaseJobConfig[mut // Metrics - val inputEventCountMetric = "inputEventCount" - val failedEventCountMetric = "failedEventCount" + val inputEventCountMetric = "input-event-count" + val failedEventCountMetric = "failed-event-count" // Metrics Exporter - val metricsReportType = config.getString("metrics.reporter.type") - val metricsReporterHost = config.getString("metrics.reporter.host") - val metricsReporterPort = config.getString("metrics.reporter.port") + val metricsReportType: String = config.getString("metrics.reporter.type") + val metricsReporterHost: String = config.getString("metrics.reporter.host") + val metricsReporterPort: String = config.getString("metrics.reporter.port") } diff --git a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/util/HMetrics.scala b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/util/HMetrics.scala new file mode 100644 index 00000000..3f3d1b1c --- /dev/null +++ b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/util/HMetrics.scala @@ -0,0 +1,31 @@ +package org.sunbird.obsrv.util + + +import org.apache.flink.metrics.Gauge + +import scala.collection.concurrent.TrieMap + +class HMetrics { + private val metricStore = TrieMap[(String, String), Long]() + + def increment(dataset: String, metric: String, value: Long): Unit = { + metricStore.synchronized { + val key = (dataset, metric) + val current = metricStore.getOrElse(key, 0L) + metricStore.put(key, current + value) + } + } + + def getAndReset(dataset: String, metric: String): Long = { + metricStore.synchronized { + val key = (dataset, metric) + val current = metricStore.getOrElse(key, 0L) + metricStore.remove(key) + current + } + } +} + +case class ScalaGauge[T](getValueFn: () => T) extends Gauge[T] { + override def getValue: T = getValueFn() +} diff --git a/pipeline/master-data-processor/src/main/resources/master-data-processor.conf b/pipeline/master-data-processor/src/main/resources/master-data-processor.conf index 0d3f2d89..909ff8e0 100644 --- a/pipeline/master-data-processor/src/main/resources/master-data-processor.conf +++ b/pipeline/master-data-processor/src/main/resources/master-data-processor.conf @@ -1,20 +1,20 @@ include "baseconfig.conf" kafka { - input.topic = ${job.env}".masterdata.ingest" - output.raw.topic = ${job.env}".masterdata.raw" - output.extractor.duplicate.topic = ${job.env}".masterdata.failed" - output.failed.topic = ${job.env}".masterdata.failed" - output.batch.failed.topic = ${job.env}".masterdata.failed" + input.topic = "masterdata.ingest" + output.raw.topic = "masterdata.raw" + output.extractor.duplicate.topic = "masterdata.failed" + output.failed.topic = "masterdata.failed" + output.batch.failed.topic = "masterdata.failed" event.max.size = "1048576" # Max is only 1MB - output.invalid.topic = ${job.env}".masterdata.failed" - output.unique.topic = ${job.env}".masterdata.unique" - output.duplicate.topic = ${job.env}".masterdata.failed" - output.denorm.topic = ${job.env}".masterdata.denorm" - output.transform.topic = ${job.env}".masterdata.transform" - output.transform.failed.topic = ${job.env}".masterdata.transform.failed" - stats.topic = ${job.env}".masterdata.stats" - groupId = ${job.env}"-masterdata-pipeline-group" + output.invalid.topic = "masterdata.failed" + output.unique.topic = "masterdata.unique" + output.duplicate.topic = "masterdata.failed" + output.denorm.topic = "masterdata.denorm" + output.transform.topic = "masterdata.transform" + output.transform.failed.topic = "masterdata.transform.failed" + stats.topic = "masterdata.stats" + groupId = "masterdata-pipeline-group" producer { max-request-size = 5242880 } diff --git a/pipeline/master-data-processor/src/test/resources/test.conf b/pipeline/master-data-processor/src/test/resources/test.conf index 3533006c..f5338bc5 100644 --- a/pipeline/master-data-processor/src/test/resources/test.conf +++ b/pipeline/master-data-processor/src/test/resources/test.conf @@ -6,19 +6,19 @@ job { kafka { - input.topic = ${job.env}".masterdata.ingest" - output.raw.topic = ${job.env}".masterdata.raw" - output.extractor.duplicate.topic = ${job.env}".masterdata.failed" - output.failed.topic = ${job.env}".masterdata.failed" - output.batch.failed.topic = ${job.env}".masterdata.failed" + input.topic = "masterdata.ingest" + output.raw.topic = "masterdata.raw" + output.extractor.duplicate.topic = "masterdata.failed" + output.failed.topic = "masterdata.failed" + output.batch.failed.topic = "masterdata.failed" event.max.size = "1048576" # Max is only 1MB - output.invalid.topic = ${job.env}".masterdata.failed" - output.unique.topic = ${job.env}".masterdata.unique" - output.duplicate.topic = ${job.env}".masterdata.failed" - output.denorm.topic = ${job.env}".masterdata.denorm" - output.transform.topic = ${job.env}".masterdata.transform" - output.transform.failed.topic = ${job.env}".masterdata.transform.failed" - stats.topic = ${job.env}".masterdata.stats" + output.invalid.topic = "masterdata.failed" + output.unique.topic = "masterdata.unique" + output.duplicate.topic = "masterdata.failed" + output.denorm.topic = "masterdata.denorm" + output.transform.topic = "masterdata.transform" + output.transform.failed.topic = "masterdata.transform.failed" + stats.topic = "masterdata.stats" groupId = ${job.env}"-masterdata-pipeline-group" producer { max-request-size = 5242880 diff --git a/pipeline/preprocessor/src/main/resources/pipeline-preprocessor.conf b/pipeline/preprocessor/src/main/resources/pipeline-preprocessor.conf index 7e845e1d..38a92679 100644 --- a/pipeline/preprocessor/src/main/resources/pipeline-preprocessor.conf +++ b/pipeline/preprocessor/src/main/resources/pipeline-preprocessor.conf @@ -1,11 +1,11 @@ include "baseconfig.conf" kafka { - input.topic = ${job.env}".raw" - output.invalid.topic = ${job.env}".failed" - output.unique.topic = ${job.env}".unique" - output.duplicate.topic = ${job.env}".failed" - groupId = ${job.env}"-pipeline-preprocessor-group" + input.topic = "raw" + output.invalid.topic = "failed" + output.unique.topic = "unique" + output.duplicate.topic = "failed" + groupId = "pipeline-preprocessor-group" } task { diff --git a/pipeline/transformer/src/main/resources/transformer.conf b/pipeline/transformer/src/main/resources/transformer.conf index 42fbb22f..0f9c2d4c 100644 --- a/pipeline/transformer/src/main/resources/transformer.conf +++ b/pipeline/transformer/src/main/resources/transformer.conf @@ -1,10 +1,10 @@ include "baseconfig.conf" kafka { - input.topic = ${job.env}".denorm" - output.transform.topic = ${job.env}".transform" - output.transform.failed.topic = ${job.env}".transform.failed" - groupId = ${job.env}"-transformer-group" + input.topic = "denorm" + output.transform.topic = "transform" + output.transform.failed.topic = "transform.failed" + groupId = "transformer-group" producer { max-request-size = 5242880 } diff --git a/pipeline/unified-pipeline/src/main/resources/unified-pipeline.conf b/pipeline/unified-pipeline/src/main/resources/unified-pipeline.conf index 9b1e1bdf..c9c78d54 100644 --- a/pipeline/unified-pipeline/src/main/resources/unified-pipeline.conf +++ b/pipeline/unified-pipeline/src/main/resources/unified-pipeline.conf @@ -1,20 +1,20 @@ include "baseconfig.conf" kafka { - input.topic = ${job.env}".ingest" - output.raw.topic = ${job.env}".raw" - output.extractor.duplicate.topic = ${job.env}".failed" - output.batch.failed.topic = ${job.env}".failed" + input.topic = "ingest" + output.raw.topic = "raw" + output.extractor.duplicate.topic = "failed" + output.batch.failed.topic = "failed" event.max.size = "1048576" # Max is only 1MB - output.invalid.topic = ${job.env}".failed" - output.unique.topic = ${job.env}".unique" - output.duplicate.topic = ${job.env}".failed" - output.denorm.topic = ${job.env}".denorm" - output.denorm.failed.topic = ${job.env}".failed" - output.transform.topic = ${job.env}".transform" - output.transform.failed.topic = ${job.env}".failed" - stats.topic = ${job.env}".stats" - groupId = ${job.env}"-unified-pipeline-group" + output.invalid.topic = "failed" + output.unique.topic = "unique" + output.duplicate.topic = "failed" + output.denorm.topic = "denorm" + output.denorm.failed.topic = "failed" + output.transform.topic = "transform" + output.transform.failed.topic = "failed" + stats.topic = "stats" + groupId = "unified-pipeline-group" producer { max-request-size = 5242880 } diff --git a/pipeline/unified-pipeline/src/test/resources/test.conf b/pipeline/unified-pipeline/src/test/resources/test.conf index aa514d54..968d3640 100644 --- a/pipeline/unified-pipeline/src/test/resources/test.conf +++ b/pipeline/unified-pipeline/src/test/resources/test.conf @@ -5,20 +5,20 @@ job { } kafka { - input.topic = ${job.env}".ingest" - output.raw.topic = ${job.env}".raw" - output.extractor.duplicate.topic = ${job.env}".failed" - output.batch.failed.topic = ${job.env}".failed" + input.topic = "ingest" + output.raw.topic = "raw" + output.extractor.duplicate.topic = "failed" + output.batch.failed.topic = "failed" event.max.size = "1048576" # Max is only 1MB - output.invalid.topic = ${job.env}".failed" - output.unique.topic = ${job.env}".unique" - output.duplicate.topic = ${job.env}".failed" - output.denorm.topic = ${job.env}".denorm" - output.denorm.failed.topic = ${job.env}".failed" - output.transform.topic = ${job.env}".transform" - output.transform.failed.topic = ${job.env}".transform.failed" - stats.topic = ${job.env}".stats" - groupId = ${job.env}"-single-pipeline-group" + output.invalid.topic = "failed" + output.unique.topic = "unique" + output.duplicate.topic = "failed" + output.denorm.topic = "denorm" + output.denorm.failed.topic = "failed" + output.transform.topic = "transform" + output.transform.failed.topic = "transform.failed" + stats.topic = "stats" + groupId = "single-pipeline-group" producer { max-request-size = 5242880 }