Skip to content

Commit

Permalink
#000: Review comment fix - Generation of metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Manjunath authored and Manjunath committed Jan 3, 2025
1 parent e968e67 commit 1eb2d78
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,24 @@ import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.formats.common.TimestampFormat
import org.apache.flink.formats.json.JsonToRowDataConverters
import org.apache.flink.metrics.{Counter, SimpleCounter}
import org.apache.flink.metrics.Counter
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
import org.apache.flink.table.data.RowData
import org.slf4j.LoggerFactory
import org.sunbird.obsrv.util.{HudiSchemaParser, HudiSchemaSpec}
import org.sunbird.obsrv.core.util.{JSONUtil, Util}
import org.sunbird.obsrv.core.util.JSONUtil
import org.sunbird.obsrv.streaming.HudiConnectorConfig
import org.sunbird.obsrv.util.HudiSchemaParser

import scala.collection.mutable.{Map => MMap}

class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String, inputEventCount: Counter, failedEventCount: Counter) extends RichMapFunction[MMap[String, AnyRef], RowData] {
class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String)
extends RichMapFunction[MMap[String, AnyRef], RowData] {

private val logger = LoggerFactory.getLogger(classOf[RowDataConverterFunction])

private var inputEventCount: Counter = _
private var failedEventCount: Counter = _

var jsonToRowDataConverters: JsonToRowDataConverters = _
var objectMapper: ObjectMapper = _
var hudiSchemaParser: HudiSchemaParser = _
Expand All @@ -29,19 +33,28 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String, i
objectMapper = new ObjectMapper()
hudiSchemaParser = new HudiSchemaParser()

// No need to initialize the counters here as they are passed in the constructor
// Initialize Flink metrics within the runtime context
inputEventCount = getRuntimeContext.getMetricGroup
.addGroup(config.jobName)
.addGroup(datasetId)
.counter(config.inputEventCountMetric)

failedEventCount = getRuntimeContext.getMetricGroup
.addGroup(config.jobName)
.addGroup(datasetId)
.counter(config.failedEventCountMetric)
}

override def map(event: MMap[String, AnyRef]): RowData = {
try {
if (event.nonEmpty) {
inputEventCount.inc()
inputEventCount.inc() // Increment the input event counter
}
val rowData = convertToRowData(event)
rowData
} catch {
case ex: Exception =>
failedEventCount.inc()
failedEventCount.inc() // Increment the failed event counter
logger.error("Failed to process record", ex)
throw ex
}
Expand All @@ -51,11 +64,13 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String, i
val eventJson = JSONUtil.serialize(data)
val flattenedData = hudiSchemaParser.parseJson(datasetId, eventJson)
val rowType = hudiSchemaParser.rowTypeMap(datasetId)
val converter: JsonToRowDataConverters.JsonToRowDataConverter = jsonToRowDataConverters.createRowConverter(rowType)
val converter: JsonToRowDataConverters.JsonToRowDataConverter =
jsonToRowDataConverters.createRowConverter(rowType)
val rowData = converter.convert(objectMapper.readTree(JSONUtil.serialize(flattenedData))).asInstanceOf[RowData]
rowData
}

// Method to retrieve the metric values for testing or logging purposes
def getMetrics: Map[String, Long] = {
Map(
"input-event-count" -> inputEventCount.getCount,
Expand All @@ -64,3 +79,4 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String, i
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,12 @@ class HudiConnectorStreamTask(config: HudiConnectorConfig, kafkaConnector: Flink
val schemaParser = new HudiSchemaParser()
val dataSourceConfig = DatasetRegistry.getAllDatasources().filter(f => f.`type`.nonEmpty && f.`type`.equalsIgnoreCase(Constants.DATALAKE_TYPE) && f.status.equalsIgnoreCase("Live"))

val inputEventCount = new SimpleCounter()
val failedEventCount = new SimpleCounter()


dataSourceConfig.map { dataSource =>
val datasetId = dataSource.datasetId
val dataStream = getMapDataStream(env, config, List(datasetId), config.kafkaConsumerProperties(), consumerSourceName = s"kafka-${datasetId}", kafkaConnector)
.map(new RowDataConverterFunction(config, datasetId, inputEventCount, failedEventCount))
.map(new RowDataConverterFunction(config, datasetId))
.setParallelism(config.downstreamOperatorsParallelism)

val conf: Configuration = new Configuration()
Expand Down

0 comments on commit 1eb2d78

Please sign in to comment.