Skip to content

Commit

Permalink
#000: Review comment fix - Generation of metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Manjunath authored and Manjunath committed Jan 3, 2025
1 parent f5b4b4c commit 7f69c11
Showing 1 changed file with 29 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,37 +1,34 @@
package org.sunbird.obsrv.function

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.metrics.ScalaGauge
import org.apache.flink.configuration.Configuration
import org.apache.flink.formats.common.TimestampFormat
import org.apache.flink.formats.json.JsonToRowDataConverters
import org.apache.flink.metrics.SimpleCounter
import org.apache.flink.metrics.{Counter, SimpleCounter}
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
import org.sunbird.obsrv.util.{HudiSchemaParser, HudiSchemaSpec}
import org.apache.flink.table.data.RowData
import org.slf4j.LoggerFactory
import org.sunbird.obsrv.core.streaming.{JobMetrics, Metrics, MetricsList}
import org.sunbird.obsrv.core.util.{JSONUtil, Util}
import org.sunbird.obsrv.streaming.HudiConnectorConfig

import scala.collection.mutable.{Map => MMap}

class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) extends RichMapFunction[MMap[String, AnyRef], RowData] with JobMetrics {

// Variables for converters and object mapper
var jsonToRowDataConverters: JsonToRowDataConverters = _
var objectMapper: ObjectMapper = _
var hudiSchemaParser: HudiSchemaParser = _
class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) extends RichMapFunction[MMap[String, AnyRef], RowData] {

// Logger for logging
private val logger = LoggerFactory.getLogger(classOf[RowDataConverterFunction])

private val metricsList: MetricsList = getMetricsList()
private val metrics: Metrics = registerMetrics(metricsList.datasets, metricsList.metrics)

// Initialize counters for input and failed events
private var inputEventCount: Counter = _
private var failedEventCount: Counter = _

// Variables for converters and object mapper
var jsonToRowDataConverters: JsonToRowDataConverters = _
var objectMapper: ObjectMapper = _
var hudiSchemaParser: HudiSchemaParser = _

// Metrics for time tracking
// Initialize the start time
private var startTime: Long = _

override def open(parameters: Configuration): Unit = {
Expand All @@ -42,28 +39,34 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) e
objectMapper = new ObjectMapper()
hudiSchemaParser = new HudiSchemaParser()

// // Register metrics
// getRuntimeContext.getMetricGroup.addGroup(config.jobName).addGroup(datasetId)
// .counter(config.inputEventCountMetric, new SimpleCounter())
// getRuntimeContext.getMetricGroup.addGroup(config.jobName).addGroup(datasetId)
// .counter(config.failedEventCountMetric, new SimpleCounter())
// Register Flink metrics for inputEventCount and failedEventCount
inputEventCount = getRuntimeContext.getMetricGroup.counter("inputEventCount")
failedEventCount = getRuntimeContext.getMetricGroup.counter("failedEventCount")
}

override def map(event: MMap[String, AnyRef]): RowData = {
startTime = System.currentTimeMillis()
metrics.incCounter(datasetId, config.inputEventCountMetric)

try {
metrics.incCounter(datasetId, config.inputEventCountMetric, event.size)
// Increment the input event count
inputEventCount.inc()

// Optionally, increment by the size of the event (if it's a collection)
inputEventCount.inc(event.size)

// Convert the event data into RowData for further processing
val rowData = convertToRowData(event)
rowData
} catch {
case ex: Exception =>
metrics.incCounter(datasetId, config.failedEventCountMetric)
// Increment the failed event count in case of failure
failedEventCount.inc()
logger.error("Failed to process record", ex)
throw ex
}
}

// Method to convert event data into RowData
def convertToRowData(data: MMap[String, AnyRef]): RowData = {
val eventJson = JSONUtil.serialize(data)
val flattenedData = hudiSchemaParser.parseJson(datasetId, eventJson)
Expand All @@ -73,10 +76,11 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) e
rowData
}

def getMetricsList(): MetricsList = {
MetricsList(
datasets = List(datasetId),
metrics = List(config.inputEventCountMetric, config.failedEventCountMetric)
// Custom method to retrieve the metric values (e.g., for exposing them in a monitoring system)
def getMetrics: Map[String, Long] = {
Map(
"inputEventCount" -> inputEventCount.getCount,
"failedEventCount" -> failedEventCount.getCount
)
}
}

0 comments on commit 7f69c11

Please sign in to comment.