diff --git a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala index 9678381d..9d6ca981 100644 --- a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala +++ b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala @@ -4,20 +4,24 @@ import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.configuration.Configuration import org.apache.flink.formats.common.TimestampFormat import org.apache.flink.formats.json.JsonToRowDataConverters -import org.apache.flink.metrics.{Counter, SimpleCounter} +import org.apache.flink.metrics.Counter import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper import org.apache.flink.table.data.RowData import org.slf4j.LoggerFactory -import org.sunbird.obsrv.util.{HudiSchemaParser, HudiSchemaSpec} -import org.sunbird.obsrv.core.util.{JSONUtil, Util} +import org.sunbird.obsrv.core.util.JSONUtil import org.sunbird.obsrv.streaming.HudiConnectorConfig +import org.sunbird.obsrv.util.HudiSchemaParser import scala.collection.mutable.{Map => MMap} -class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String, inputEventCount: Counter, failedEventCount: Counter) extends RichMapFunction[MMap[String, AnyRef], RowData] { +class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) + extends RichMapFunction[MMap[String, AnyRef], RowData] { private val logger = LoggerFactory.getLogger(classOf[RowDataConverterFunction]) + private var inputEventCount: Counter = _ + private var failedEventCount: Counter = _ + var jsonToRowDataConverters: JsonToRowDataConverters = _ var objectMapper: ObjectMapper = _ var hudiSchemaParser: HudiSchemaParser = _ @@ -29,19 +33,28 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String, i objectMapper = new ObjectMapper() hudiSchemaParser = new HudiSchemaParser() - // No need to initialize the counters here as they are passed in the constructor + // Initialize Flink metrics within the runtime context + inputEventCount = getRuntimeContext.getMetricGroup + .addGroup(config.jobName) + .addGroup(datasetId) + .counter(config.inputEventCountMetric) + + failedEventCount = getRuntimeContext.getMetricGroup + .addGroup(config.jobName) + .addGroup(datasetId) + .counter(config.failedEventCountMetric) } override def map(event: MMap[String, AnyRef]): RowData = { try { if (event.nonEmpty) { - inputEventCount.inc() + inputEventCount.inc() // Increment the input event counter } val rowData = convertToRowData(event) rowData } catch { case ex: Exception => - failedEventCount.inc() + failedEventCount.inc() // Increment the failed event counter logger.error("Failed to process record", ex) throw ex } @@ -51,11 +64,13 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String, i val eventJson = JSONUtil.serialize(data) val flattenedData = hudiSchemaParser.parseJson(datasetId, eventJson) val rowType = hudiSchemaParser.rowTypeMap(datasetId) - val converter: JsonToRowDataConverters.JsonToRowDataConverter = jsonToRowDataConverters.createRowConverter(rowType) + val converter: JsonToRowDataConverters.JsonToRowDataConverter = + jsonToRowDataConverters.createRowConverter(rowType) val rowData = converter.convert(objectMapper.readTree(JSONUtil.serialize(flattenedData))).asInstanceOf[RowData] rowData } + // Method to retrieve the metric values for testing or logging purposes def getMetrics: Map[String, Long] = { Map( "input-event-count" -> inputEventCount.getCount, @@ -64,3 +79,4 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String, i } } + diff --git a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorStreamTask.scala b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorStreamTask.scala index ac220fcf..a289c0c1 100644 --- a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorStreamTask.scala +++ b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorStreamTask.scala @@ -52,13 +52,12 @@ class HudiConnectorStreamTask(config: HudiConnectorConfig, kafkaConnector: Flink val schemaParser = new HudiSchemaParser() val dataSourceConfig = DatasetRegistry.getAllDatasources().filter(f => f.`type`.nonEmpty && f.`type`.equalsIgnoreCase(Constants.DATALAKE_TYPE) && f.status.equalsIgnoreCase("Live")) - val inputEventCount = new SimpleCounter() - val failedEventCount = new SimpleCounter() + dataSourceConfig.map { dataSource => val datasetId = dataSource.datasetId val dataStream = getMapDataStream(env, config, List(datasetId), config.kafkaConsumerProperties(), consumerSourceName = s"kafka-${datasetId}", kafkaConnector) - .map(new RowDataConverterFunction(config, datasetId, inputEventCount, failedEventCount)) + .map(new RowDataConverterFunction(config, datasetId)) .setParallelism(config.downstreamOperatorsParallelism) val conf: Configuration = new Configuration()