Skip to content

Commit

Permalink
#000: Review comment fix - Generation of metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Manjunath authored and Manjunath committed Jan 3, 2025
1 parent 1eb2d78 commit 1d7dec4
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,6 @@ abstract class WindowBaseProcessFunction[I, O, K](config: BaseJobConfig[O]) exte
process(key, context, elements, metrics)
}

}
}

import scala.collection.concurrent.TrieMap
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,23 @@ import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.formats.common.TimestampFormat
import org.apache.flink.formats.json.JsonToRowDataConverters
import org.apache.flink.metrics.Counter
import org.apache.flink.metrics.{Counter, Gauge}
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
import org.apache.flink.table.data.RowData
import org.slf4j.LoggerFactory
import org.sunbird.obsrv.core.util.JSONUtil
import org.sunbird.obsrv.streaming.HudiConnectorConfig
import org.sunbird.obsrv.util.HudiSchemaParser

import scala.collection.mutable.{Map => MMap}



import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.formats.common.TimestampFormat
import org.apache.flink.formats.json.JsonToRowDataConverters
import org.apache.flink.metrics.Gauge
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
import org.apache.flink.table.data.RowData
import org.slf4j.LoggerFactory
Expand All @@ -19,42 +35,46 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String)

private val logger = LoggerFactory.getLogger(classOf[RowDataConverterFunction])

private var inputEventCount: Counter = _
private var failedEventCount: Counter = _

var jsonToRowDataConverters: JsonToRowDataConverters = _
var objectMapper: ObjectMapper = _
var hudiSchemaParser: HudiSchemaParser = _
private var metrics: Metrics = _
private var jsonToRowDataConverters: JsonToRowDataConverters = _
private var objectMapper: ObjectMapper = _
private var hudiSchemaParser: HudiSchemaParser = _

override def open(parameters: Configuration): Unit = {
super.open(parameters)

metrics = new Metrics()
jsonToRowDataConverters = new JsonToRowDataConverters(false, true, TimestampFormat.SQL)
objectMapper = new ObjectMapper()
hudiSchemaParser = new HudiSchemaParser()

// Initialize Flink metrics within the runtime context
inputEventCount = getRuntimeContext.getMetricGroup
// Register dynamic Gauge for inputEventCount
getRuntimeContext.getMetricGroup
.addGroup(config.jobName)
.addGroup(datasetId)
.counter(config.inputEventCountMetric)
.gauge[Long, ScalaGauge[Long]]("input-data-count", ScalaGauge[Long](() =>
metrics.getAndReset(datasetId, "input-data-count")
))

failedEventCount = getRuntimeContext.getMetricGroup
// Register dynamic Gauge for failedEventCount
getRuntimeContext.getMetricGroup
.addGroup(config.jobName)
.addGroup(datasetId)
.counter(config.failedEventCountMetric)
.gauge[Long, ScalaGauge[Long]]("failed-data-count", ScalaGauge[Long](() =>
metrics.getAndReset(datasetId, "failed-data-count")
))
}

override def map(event: MMap[String, AnyRef]): RowData = {
try {
if (event.nonEmpty) {
inputEventCount.inc() // Increment the input event counter
metrics.increment(datasetId, "input-data-count", 1)
}
val rowData = convertToRowData(event)
rowData
} catch {
case ex: Exception =>
failedEventCount.inc() // Increment the failed event counter
metrics.increment(datasetId, "failed-data-count", 1)
logger.error("Failed to process record", ex)
throw ex
}
Expand All @@ -66,17 +86,39 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String)
val rowType = hudiSchemaParser.rowTypeMap(datasetId)
val converter: JsonToRowDataConverters.JsonToRowDataConverter =
jsonToRowDataConverters.createRowConverter(rowType)
val rowData = converter.convert(objectMapper.readTree(JSONUtil.serialize(flattenedData))).asInstanceOf[RowData]
rowData
converter.convert(objectMapper.readTree(JSONUtil.serialize(flattenedData))).asInstanceOf[RowData]
}
}


import scala.collection.concurrent.TrieMap

class Metrics {
private val metricStore = TrieMap[(String, String), Long]()

def increment(dataset: String, metric: String, value: Long): Unit = {
metricStore.synchronized {
val key = (dataset, metric)
val current = metricStore.getOrElse(key, 0L)
metricStore.put(key, current + value)
}
}

// Method to retrieve the metric values for testing or logging purposes
def getMetrics: Map[String, Long] = {
Map(
"input-event-count" -> inputEventCount.getCount,
"failed-event-count" -> failedEventCount.getCount
)
def getAndReset(dataset: String, metric: String): Long = {
metricStore.synchronized {
val key = (dataset, metric)
val current = metricStore.getOrElse(key, 0L)
metricStore.remove(key)
current
}
}
}


import org.apache.flink.metrics.Gauge

case class ScalaGauge[T](getValueFn: () => T) extends Gauge[T] {
override def getValue: T = getValueFn()
}


0 comments on commit 1d7dec4

Please sign in to comment.