diff --git a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala index be2e0c3a..dfe984d8 100644 --- a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala +++ b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/function/RowDataConverterFunction.scala @@ -40,32 +40,16 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) e hudiSchemaParser = new HudiSchemaParser() // Register Flink metrics for inputEventCount and failedEventCount - inputEventCount = getRuntimeContext.getMetricGroup.counter("inputEventCountdata") - failedEventCount = getRuntimeContext.getMetricGroup.counter("failedEventCountdata") + inputEventCount = getRuntimeContext.getMetricGroup.counter("input-event-count") + failedEventCount = getRuntimeContext.getMetricGroup.counter("input-event-count") } override def map(event: MMap[String, AnyRef]): RowData = { startTime = System.currentTimeMillis() try { - // Increment the input event count - inputEventCount.inc() - - // Optionally, increment by the size of the event (if it's a collection) inputEventCount.inc(event.size) - - // Convert the event data into RowData for further processing val rowData = convertToRowData(event) - - logger.info("Metric inputEventCount: " + inputEventCount) - logger.info("Metric failedEventCount: " + failedEventCount) - - logger.info("Before increment: inputEventCount=" + inputEventCount.getCount) - inputEventCount.inc() - logger.info("After increment: inputEventCount=" + inputEventCount.getCount) - - - rowData } catch { case ex: Exception => @@ -89,8 +73,8 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) e // Custom method to retrieve the metric values (e.g., for exposing them in a monitoring system) def getMetrics: Map[String, Long] = { Map( - "inputEventCountdata" -> inputEventCount.getCount, - "failedEventCountdata" -> failedEventCount.getCount + "input-event-count" -> inputEventCount.getCount, + "failed-event-count" -> failedEventCount.getCount ) } }