Skip to content

Commit

Permalink
#OBS-I27: Hudi Metrics generation logic changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Manjunath authored and Manjunath committed Jan 3, 2025
1 parent 8bed04c commit 2579a43
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -1,63 +1,61 @@
package org.sunbird.obsrv.function

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.metrics.ScalaGauge
import org.apache.flink.configuration.Configuration
import org.apache.flink.formats.common.TimestampFormat
import org.apache.flink.formats.json.JsonToRowDataConverters
import org.apache.flink.metrics.SimpleCounter
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
import org.sunbird.obsrv.util.{HudiSchemaParser, HudiSchemaSpec}
import org.apache.flink.table.data.RowData
import org.slf4j.LoggerFactory
import org.sunbird.obsrv.core.streaming.{JobMetrics, Metrics, MetricsList}
import org.sunbird.obsrv.core.util.{JSONUtil, Util}
import org.sunbird.obsrv.core.util.JSONUtil
import org.sunbird.obsrv.streaming.HudiConnectorConfig
import org.sunbird.obsrv.util.{HMetrics, HudiSchemaParser, ScalaGauge}

import scala.collection.mutable.{Map => MMap}

class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) extends RichMapFunction[MMap[String, AnyRef], RowData] with JobMetrics {
class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String)
extends RichMapFunction[MMap[String, AnyRef], RowData] {

// Variables for converters and object mapper
var jsonToRowDataConverters: JsonToRowDataConverters = _
var objectMapper: ObjectMapper = _
var hudiSchemaParser: HudiSchemaParser = _

// Logger for logging
private val logger = LoggerFactory.getLogger(classOf[RowDataConverterFunction])

protected val metricsList: MetricsList = getMetricsList()
protected val metrics: Metrics = registerMetrics(metricsList.datasets, metricsList.metrics)



// Metrics for time tracking
private var startTime: Long = _
private var metrics: HMetrics = _
private var jsonToRowDataConverters: JsonToRowDataConverters = _
private var objectMapper: ObjectMapper = _
private var hudiSchemaParser: HudiSchemaParser = _

override def open(parameters: Configuration): Unit = {
super.open(parameters)

// Initialize required converters and parsers
metrics = new HMetrics()
jsonToRowDataConverters = new JsonToRowDataConverters(false, true, TimestampFormat.SQL)
objectMapper = new ObjectMapper()
hudiSchemaParser = new HudiSchemaParser()

// Register metrics
getRuntimeContext.getMetricGroup.addGroup(config.jobName).addGroup(datasetId)
.counter(config.inputEventCountMetric, new SimpleCounter())
getRuntimeContext.getMetricGroup.addGroup(config.jobName).addGroup(datasetId)
.counter(config.failedEventCountMetric, new SimpleCounter())
getRuntimeContext.getMetricGroup
.addGroup(config.jobName)
.addGroup(datasetId)
.gauge[Long, ScalaGauge[Long]](config.inputEventCountMetric, ScalaGauge[Long](() =>
metrics.getAndReset(datasetId, config.inputEventCountMetric)
))

getRuntimeContext.getMetricGroup
.addGroup(config.jobName)
.addGroup(datasetId)
.gauge[Long, ScalaGauge[Long]](config.failedEventCountMetric, ScalaGauge[Long](() =>
metrics.getAndReset(datasetId, config.failedEventCountMetric)
))
}

override def map(event: MMap[String, AnyRef]): RowData = {
startTime = System.currentTimeMillis()
try {
metrics.incCounter(datasetId, config.inputEventCountMetric, event.size)
if (event.nonEmpty) {
metrics.increment(datasetId, config.inputEventCountMetric, 1)
}
val rowData = convertToRowData(event)
rowData
} catch {
case ex: Exception =>
metrics.incCounter(datasetId, config.failedEventCountMetric)
metrics.increment(datasetId, config.failedEventCountMetric, 1)
logger.error("Failed to process record", ex)
throw ex
}
Expand All @@ -67,15 +65,8 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) e
val eventJson = JSONUtil.serialize(data)
val flattenedData = hudiSchemaParser.parseJson(datasetId, eventJson)
val rowType = hudiSchemaParser.rowTypeMap(datasetId)
val converter: JsonToRowDataConverters.JsonToRowDataConverter = jsonToRowDataConverters.createRowConverter(rowType)
val rowData = converter.convert(objectMapper.readTree(JSONUtil.serialize(flattenedData))).asInstanceOf[RowData]
rowData
}

def getMetricsList(): MetricsList = {
MetricsList(
datasets = List(datasetId),
metrics = List(config.inputEventCountMetric, config.failedEventCountMetric)
)
val converter: JsonToRowDataConverters.JsonToRowDataConverter =
jsonToRowDataConverters.createRowConverter(rowType)
converter.convert(objectMapper.readTree(JSONUtil.serialize(flattenedData))).asInstanceOf[RowData]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ class HudiConnectorConfig(override val config: Config) extends BaseJobConfig[mut

// Metrics

val inputEventCountMetric = "inputEventCount"
val failedEventCountMetric = "failedEventCount"
val inputEventCountMetric = "input-event-count"
val failedEventCountMetric = "failed-event-count"

// Metrics Exporter
val metricsReportType = config.getString("metrics.reporter.type")
val metricsReporterHost = config.getString("metrics.reporter.host")
val metricsReporterPort = config.getString("metrics.reporter.port")
val metricsReportType: String = config.getString("metrics.reporter.type")
val metricsReporterHost: String = config.getString("metrics.reporter.host")
val metricsReporterPort: String = config.getString("metrics.reporter.port")


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.sunbird.obsrv.util


import org.apache.flink.metrics.Gauge

import scala.collection.concurrent.TrieMap

class HMetrics {
private val metricStore = TrieMap[(String, String), Long]()

def increment(dataset: String, metric: String, value: Long): Unit = {
metricStore.synchronized {
val key = (dataset, metric)
val current = metricStore.getOrElse(key, 0L)
metricStore.put(key, current + value)
}
}

def getAndReset(dataset: String, metric: String): Long = {
metricStore.synchronized {
val key = (dataset, metric)
val current = metricStore.getOrElse(key, 0L)
metricStore.remove(key)
current
}
}
}

case class ScalaGauge[T](getValueFn: () => T) extends Gauge[T] {
override def getValue: T = getValueFn()
}

0 comments on commit 2579a43

Please sign in to comment.