Skip to content

Commit

Permalink
#000: Metric name changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Manjunath authored and Manjunath committed Jan 3, 2025
1 parent 1c5fded commit 83a5ccd
Showing 1 changed file with 3 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@ import org.sunbird.obsrv.streaming.HudiConnectorConfig

import scala.collection.mutable.{Map => MMap}

class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) extends RichMapFunction[MMap[String, AnyRef], RowData] {
class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String, inputEventCount: Counter, failedEventCount: Counter) extends RichMapFunction[MMap[String, AnyRef], RowData] {

private val logger = LoggerFactory.getLogger(classOf[RowDataConverterFunction])

private var inputEventCount: Counter = _
private var failedEventCount: Counter = _

var jsonToRowDataConverters: JsonToRowDataConverters = _
var objectMapper: ObjectMapper = _
var hudiSchemaParser: HudiSchemaParser = _
Expand All @@ -32,22 +29,7 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) e
objectMapper = new ObjectMapper()
hudiSchemaParser = new HudiSchemaParser()

// Register Flink metrics for inputEventCount and failedEventCount

if (inputEventCount == null) {
inputEventCount = getRuntimeContext.getMetricGroup
.addGroup(config.jobName)
.addGroup(datasetId)
.counter(config.inputEventCountMetric)
}

if (failedEventCount == null) {
failedEventCount = getRuntimeContext.getMetricGroup
.addGroup(config.jobName)
.addGroup(datasetId)
.counter(config.failedEventCountMetric)
}

// No need to initialize the counters here as they are passed in the constructor
}

override def map(event: MMap[String, AnyRef]): RowData = {
Expand All @@ -56,7 +38,6 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) e
inputEventCount.inc()
}
val rowData = convertToRowData(event)
println(s"Current inputEventCount: ${inputEventCount.getCount}")
rowData
} catch {
case ex: Exception =>
Expand All @@ -75,11 +56,11 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) e
rowData
}

// Custom method to retrieve the metric values
def getMetrics: Map[String, Long] = {
Map(
"input-event-count" -> inputEventCount.getCount,
"failed-event-count" -> failedEventCount.getCount
)
}
}

0 comments on commit 83a5ccd

Please sign in to comment.