diff --git a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala index 4aadb60a..7585fede 100644 --- a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala +++ b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala @@ -1,34 +1,66 @@ package org.sunbird.obsrv.function import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.scala.metrics.ScalaGauge import org.apache.flink.configuration.Configuration import org.apache.flink.formats.common.TimestampFormat import org.apache.flink.formats.json.JsonToRowDataConverters +import org.apache.flink.metrics.SimpleCounter import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper import org.sunbird.obsrv.util.{HudiSchemaParser, HudiSchemaSpec} import org.apache.flink.table.data.RowData import org.slf4j.LoggerFactory +import org.sunbird.obsrv.core.streaming.{JobMetrics, Metrics, MetricsList} import org.sunbird.obsrv.core.util.{JSONUtil, Util} import org.sunbird.obsrv.streaming.HudiConnectorConfig + import scala.collection.mutable.{Map => MMap} -class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) extends RichMapFunction[MMap[String, AnyRef], RowData] { +class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) extends RichMapFunction[MMap[String, AnyRef], RowData] with JobMetrics { + // Variables for converters and object mapper var jsonToRowDataConverters: JsonToRowDataConverters = _ var objectMapper: ObjectMapper = _ var hudiSchemaParser: HudiSchemaParser = _ + // Logger for logging private val logger = LoggerFactory.getLogger(classOf[RowDataConverterFunction]) + protected val metricsList: MetricsList = getMetricsList() + protected val metrics: Metrics = registerMetrics(metricsList.datasets, metricsList.metrics) + + + + // Metrics for time tracking + private var startTime: Long = _ + override def open(parameters: Configuration): Unit = { super.open(parameters) + + // Initialize required converters and parsers jsonToRowDataConverters = new JsonToRowDataConverters(false, true, TimestampFormat.SQL) objectMapper = new ObjectMapper() hudiSchemaParser = new HudiSchemaParser() + + // Register metrics + getRuntimeContext.getMetricGroup.addGroup(config.jobName).addGroup(datasetId) + .counter(config.inputEventCountMetric, new SimpleCounter()) + getRuntimeContext.getMetricGroup.addGroup(config.jobName).addGroup(datasetId) + .counter(config.failedEventCountMetric, new SimpleCounter()) } override def map(event: MMap[String, AnyRef]): RowData = { - convertToRowData(event) + startTime = System.currentTimeMillis() + try { + metrics.incCounter(datasetId, config.inputEventCountMetric, event.size) + val rowData = convertToRowData(event) + rowData + } catch { + case ex: Exception => + metrics.incCounter(datasetId, config.failedEventCountMetric) + logger.error("Failed to process record", ex) + throw ex + } } def convertToRowData(data: MMap[String, AnyRef]): RowData = { @@ -40,4 +72,10 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) e rowData } + def getMetricsList(): MetricsList = { + MetricsList( + datasets = List(datasetId), + metrics = List(config.inputEventCountMetric, config.failedEventCountMetric) + ) + } } diff --git a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorConfig.scala b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorConfig.scala index 908cb3cd..aef0643f 100644 --- a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorConfig.scala +++ b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorConfig.scala @@ -61,4 +61,15 @@ class HudiConnectorConfig(override val config: Config) extends BaseJobConfig[mut val hudiCompactionTaskMemory: Int = config.getInt("hudi.write.compaction.max.memory") val hudiFsAtomicCreationSupport: String = config.getString("hudi.fs.atomic_creation.support") + // Metrics + + val inputEventCountMetric = "inputEventCount" + val failedEventCountMetric = "failedEventCount" + + // Metrics Exporter + val metricsReportType = config.getString("metrics.reporter.type") + val metricsReporterHost = config.getString("metrics.reporter.host") + val metricsReporterPort = config.getString("metrics.reporter.port") + + } diff --git a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorStreamTask.scala b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorStreamTask.scala index 55b11ff8..56f7748e 100644 --- a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorStreamTask.scala +++ b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorStreamTask.scala @@ -89,7 +89,17 @@ class HudiConnectorStreamTask(config: HudiConnectorConfig, kafkaConnector: Flink conf.setString(FlinkOptions.PRECOMBINE_FIELD.key, datasetSchema.schema.timestampColumn) conf.setString(FlinkOptions.PARTITION_PATH_FIELD.key, datasetSchema.schema.partitionColumn) conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA.key, avroSchema.toString) - + conf.setBoolean("hoodie.metrics.on", true) + if (config.metricsReportType.equalsIgnoreCase("PROMETHEUS_PUSHGATEWAY")) { + conf.setString("hoodie.metrics.reporter.type", config.metricsReportType) + conf.setString("hoodie.metrics.pushgateway.host", config.metricsReporterHost) + conf.setString("hoodie.metrics.pushgateway.port", config.metricsReporterPort) + } + if (config.metricsReportType.equalsIgnoreCase("JMX")) { + conf.setString("hoodie.metrics.reporter.type", config.metricsReportType) + conf.setString("hoodie.metrics.jmx.host", config.metricsReporterHost) + conf.setString("hoodie.metrics.jmx.port", config.metricsReporterPort) + } val partitionField = datasetSchema.schema.columnSpec.filter(f => f.name.equalsIgnoreCase(datasetSchema.schema.partitionColumn)).head if(partitionField.`type`.equalsIgnoreCase("timestamp") || partitionField.`type`.equalsIgnoreCase("epoch")) { conf.setString(FlinkOptions.PARTITION_PATH_FIELD.key, datasetSchema.schema.partitionColumn + "_partition")