Skip to content

Commit

Permalink
#000: Review comment fix - Generation of metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Manjunath authored and Manjunath committed Jan 3, 2025
1 parent a458613 commit 8276883
Showing 1 changed file with 15 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import org.apache.flink.formats.common.TimestampFormat
import org.apache.flink.formats.json.JsonToRowDataConverters
import org.apache.flink.metrics.{Counter, SimpleCounter}
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
import org.sunbird.obsrv.util.{HudiSchemaParser, HudiSchemaSpec}
import org.apache.flink.table.data.RowData
import org.slf4j.LoggerFactory
import org.sunbird.obsrv.util.{HudiSchemaParser, HudiSchemaSpec}
import org.sunbird.obsrv.core.util.{JSONUtil, Util}
import org.sunbird.obsrv.streaming.HudiConnectorConfig

Expand All @@ -33,13 +33,23 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) e
hudiSchemaParser = new HudiSchemaParser()

// Register Flink metrics for inputEventCount and failedEventCount
inputEventCount = getRuntimeContext.getMetricGroup.addGroup(config.jobName).addGroup(datasetId).counter(config.inputEventCountMetric)
failedEventCount = getRuntimeContext.getMetricGroup.addGroup(config.jobName).addGroup(datasetId).counter(config.failedEventCountMetric)
inputEventCount = getRuntimeContext.getMetricGroup
.addGroup(config.jobName)
.addGroup(datasetId)
.counter(config.inputEventCountMetric)

failedEventCount = getRuntimeContext.getMetricGroup
.addGroup(config.jobName)
.addGroup(datasetId)
.counter(config.failedEventCountMetric)
}

override def map(event: MMap[String, AnyRef]): RowData = {
try {
inputEventCount.inc(event.size)
println("======= map method..")
if (event.nonEmpty) {
inputEventCount.inc(event.size) // Increment by the event size
}
val rowData = convertToRowData(event)
rowData
} catch {
Expand All @@ -59,6 +69,7 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) e
rowData
}

// Custom method to retrieve the metric values
def getMetrics: Map[String, Long] = {
Map(
"input-event-count" -> inputEventCount.getCount,
Expand Down

0 comments on commit 8276883

Please sign in to comment.