Skip to content

Commit

Permalink
Merge pull request #104 from Sanketika-Obsrv/hudi-metrics
Browse files Browse the repository at this point in the history
Observability Requirements of Hudi
  • Loading branch information
ravismula authored Dec 30, 2024
2 parents 46256e8 + 180cbb7 commit 8bed04c
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,34 +1,66 @@
package org.sunbird.obsrv.function

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.metrics.ScalaGauge
import org.apache.flink.configuration.Configuration
import org.apache.flink.formats.common.TimestampFormat
import org.apache.flink.formats.json.JsonToRowDataConverters
import org.apache.flink.metrics.SimpleCounter
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
import org.sunbird.obsrv.util.{HudiSchemaParser, HudiSchemaSpec}
import org.apache.flink.table.data.RowData
import org.slf4j.LoggerFactory
import org.sunbird.obsrv.core.streaming.{JobMetrics, Metrics, MetricsList}
import org.sunbird.obsrv.core.util.{JSONUtil, Util}
import org.sunbird.obsrv.streaming.HudiConnectorConfig

import scala.collection.mutable.{Map => MMap}

class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) extends RichMapFunction[MMap[String, AnyRef], RowData] {
class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) extends RichMapFunction[MMap[String, AnyRef], RowData] with JobMetrics {

// Variables for converters and object mapper
var jsonToRowDataConverters: JsonToRowDataConverters = _
var objectMapper: ObjectMapper = _
var hudiSchemaParser: HudiSchemaParser = _

// Logger for logging
private val logger = LoggerFactory.getLogger(classOf[RowDataConverterFunction])

protected val metricsList: MetricsList = getMetricsList()
protected val metrics: Metrics = registerMetrics(metricsList.datasets, metricsList.metrics)



// Metrics for time tracking
private var startTime: Long = _

override def open(parameters: Configuration): Unit = {
super.open(parameters)

// Initialize required converters and parsers
jsonToRowDataConverters = new JsonToRowDataConverters(false, true, TimestampFormat.SQL)
objectMapper = new ObjectMapper()
hudiSchemaParser = new HudiSchemaParser()

// Register metrics
getRuntimeContext.getMetricGroup.addGroup(config.jobName).addGroup(datasetId)
.counter(config.inputEventCountMetric, new SimpleCounter())
getRuntimeContext.getMetricGroup.addGroup(config.jobName).addGroup(datasetId)
.counter(config.failedEventCountMetric, new SimpleCounter())
}

override def map(event: MMap[String, AnyRef]): RowData = {
convertToRowData(event)
startTime = System.currentTimeMillis()
try {
metrics.incCounter(datasetId, config.inputEventCountMetric, event.size)
val rowData = convertToRowData(event)
rowData
} catch {
case ex: Exception =>
metrics.incCounter(datasetId, config.failedEventCountMetric)
logger.error("Failed to process record", ex)
throw ex
}
}

def convertToRowData(data: MMap[String, AnyRef]): RowData = {
Expand All @@ -40,4 +72,10 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) e
rowData
}

def getMetricsList(): MetricsList = {
MetricsList(
datasets = List(datasetId),
metrics = List(config.inputEventCountMetric, config.failedEventCountMetric)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,15 @@ class HudiConnectorConfig(override val config: Config) extends BaseJobConfig[mut
val hudiCompactionTaskMemory: Int = config.getInt("hudi.write.compaction.max.memory")
val hudiFsAtomicCreationSupport: String = config.getString("hudi.fs.atomic_creation.support")

// Metrics

val inputEventCountMetric = "inputEventCount"
val failedEventCountMetric = "failedEventCount"

// Metrics Exporter
val metricsReportType = config.getString("metrics.reporter.type")
val metricsReporterHost = config.getString("metrics.reporter.host")
val metricsReporterPort = config.getString("metrics.reporter.port")


}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,17 @@ class HudiConnectorStreamTask(config: HudiConnectorConfig, kafkaConnector: Flink
conf.setString(FlinkOptions.PRECOMBINE_FIELD.key, datasetSchema.schema.timestampColumn)
conf.setString(FlinkOptions.PARTITION_PATH_FIELD.key, datasetSchema.schema.partitionColumn)
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA.key, avroSchema.toString)

conf.setBoolean("hoodie.metrics.on", true)
if (config.metricsReportType.equalsIgnoreCase("PROMETHEUS_PUSHGATEWAY")) {
conf.setString("hoodie.metrics.reporter.type", config.metricsReportType)
conf.setString("hoodie.metrics.pushgateway.host", config.metricsReporterHost)
conf.setString("hoodie.metrics.pushgateway.port", config.metricsReporterPort)
}
if (config.metricsReportType.equalsIgnoreCase("JMX")) {
conf.setString("hoodie.metrics.reporter.type", config.metricsReportType)
conf.setString("hoodie.metrics.jmx.host", config.metricsReporterHost)
conf.setString("hoodie.metrics.jmx.port", config.metricsReporterPort)
}
val partitionField = datasetSchema.schema.columnSpec.filter(f => f.name.equalsIgnoreCase(datasetSchema.schema.partitionColumn)).head
if(partitionField.`type`.equalsIgnoreCase("timestamp") || partitionField.`type`.equalsIgnoreCase("epoch")) {
conf.setString(FlinkOptions.PARTITION_PATH_FIELD.key, datasetSchema.schema.partitionColumn + "_partition")
Expand Down

0 comments on commit 8bed04c

Please sign in to comment.