Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hudi Metrics generation #105

Merged
merged 2 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions framework/src/main/resources/baseconfig.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ kafka {
linger.ms = 10
compression = "snappy"
}
output.system.event.topic = ${job.env}".system.events"
output.failed.topic = ${job.env}".failed"
output.system.event.topic = "system.events"
output.failed.topic = "failed"
}

job {
Expand Down
4 changes: 2 additions & 2 deletions pipeline/cache-indexer/src/main/resources/cache-indexer.conf
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
include "baseconfig.conf"

kafka {
output.failed.topic = ${job.env}".masterdata.failed"
groupId = ${job.env}"-cache-indexer-group"
output.failed.topic = "masterdata.failed"
groupId = "cache-indexer-group"
producer {
max-request-size = 5242880
}
Expand Down
2 changes: 1 addition & 1 deletion pipeline/cache-indexer/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ include "base-test.conf"

kafka {

output.failed.topic = ${job.env}".masterdata.failed"
output.failed.topic = "masterdata.failed"
groupId = ${job.env}"-cache-indexer-group"
producer {
max-request-size = 5242880
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
include "baseconfig.conf"

kafka {
input.topic = ${job.env}".transform"
stats.topic = ${job.env}".stats"
groupId = ${job.env}"-druid-router-group"
input.topic = "transform"
stats.topic = "stats"
groupId = "druid-router-group"
}

task {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
include "baseconfig.conf"

kafka {
input.topic = ${job.env}".unique"
output.denorm.topic = ${job.env}".denorm"
output.denorm.failed.topic = ${job.env}".failed"
groupId = ${job.env}"-denormalizer-group"
input.topic = "unique"
output.denorm.topic = "denorm"
output.denorm.failed.topic = "failed"
groupId = "denormalizer-group"
}

task {
Expand Down
10 changes: 5 additions & 5 deletions pipeline/extractor/src/main/resources/extractor.conf
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
include "baseconfig.conf"

kafka {
input.topic = ${job.env}".ingest"
output.raw.topic = ${job.env}".raw"
output.extractor.duplicate.topic = ${job.env}".failed"
output.batch.failed.topic = ${job.env}".failed"
input.topic = "ingest"
output.raw.topic = "raw"
output.extractor.duplicate.topic = "failed"
output.batch.failed.topic = "failed"
event.max.size = "1048576" # Max is only 1MB
groupId = ${job.env}"-extractor-group"
groupId = "extractor-group"
producer {
max-request-size = 5242880
}
Expand Down
8 changes: 4 additions & 4 deletions pipeline/hudi-connector/src/main/resources/hudi-writer.conf
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
include "baseconfig.conf"

kafka {
input.topic = ${job.env}".hudi.connector.in"
output.topic = ${job.env}".hudi.connector.out"
output.invalid.topic = ${job.env}".failed"
input.topic = "hudi.connector.in"
output.topic = "hudi.connector.out"
output.invalid.topic = "failed"
event.max.size = "1048576" # Max is only 1MB
groupId = ${job.env}"-hudi-writer-group"
groupId = "hudi-writer-group"
producer {
max-request-size = 5242880
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,63 +1,61 @@
package org.sunbird.obsrv.function

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.metrics.ScalaGauge
import org.apache.flink.configuration.Configuration
import org.apache.flink.formats.common.TimestampFormat
import org.apache.flink.formats.json.JsonToRowDataConverters
import org.apache.flink.metrics.SimpleCounter
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
import org.sunbird.obsrv.util.{HudiSchemaParser, HudiSchemaSpec}
import org.apache.flink.table.data.RowData
import org.slf4j.LoggerFactory
import org.sunbird.obsrv.core.streaming.{JobMetrics, Metrics, MetricsList}
import org.sunbird.obsrv.core.util.{JSONUtil, Util}
import org.sunbird.obsrv.core.util.JSONUtil
import org.sunbird.obsrv.streaming.HudiConnectorConfig
import org.sunbird.obsrv.util.{HMetrics, HudiSchemaParser, ScalaGauge}

import scala.collection.mutable.{Map => MMap}

class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) extends RichMapFunction[MMap[String, AnyRef], RowData] with JobMetrics {
class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String)
extends RichMapFunction[MMap[String, AnyRef], RowData] {

// Variables for converters and object mapper
var jsonToRowDataConverters: JsonToRowDataConverters = _
var objectMapper: ObjectMapper = _
var hudiSchemaParser: HudiSchemaParser = _

// Logger for logging
private val logger = LoggerFactory.getLogger(classOf[RowDataConverterFunction])

protected val metricsList: MetricsList = getMetricsList()
protected val metrics: Metrics = registerMetrics(metricsList.datasets, metricsList.metrics)



// Metrics for time tracking
private var startTime: Long = _
private var metrics: HMetrics = _
private var jsonToRowDataConverters: JsonToRowDataConverters = _
private var objectMapper: ObjectMapper = _
private var hudiSchemaParser: HudiSchemaParser = _

override def open(parameters: Configuration): Unit = {
super.open(parameters)

// Initialize required converters and parsers
metrics = new HMetrics()
jsonToRowDataConverters = new JsonToRowDataConverters(false, true, TimestampFormat.SQL)
objectMapper = new ObjectMapper()
hudiSchemaParser = new HudiSchemaParser()

// Register metrics
getRuntimeContext.getMetricGroup.addGroup(config.jobName).addGroup(datasetId)
.counter(config.inputEventCountMetric, new SimpleCounter())
getRuntimeContext.getMetricGroup.addGroup(config.jobName).addGroup(datasetId)
.counter(config.failedEventCountMetric, new SimpleCounter())
getRuntimeContext.getMetricGroup
.addGroup(config.jobName)
.addGroup(datasetId)
.gauge[Long, ScalaGauge[Long]](config.inputEventCountMetric, ScalaGauge[Long](() =>
metrics.getAndReset(datasetId, config.inputEventCountMetric)
))

getRuntimeContext.getMetricGroup
.addGroup(config.jobName)
.addGroup(datasetId)
.gauge[Long, ScalaGauge[Long]](config.failedEventCountMetric, ScalaGauge[Long](() =>
metrics.getAndReset(datasetId, config.failedEventCountMetric)
))
}

override def map(event: MMap[String, AnyRef]): RowData = {
startTime = System.currentTimeMillis()
try {
metrics.incCounter(datasetId, config.inputEventCountMetric, event.size)
if (event.nonEmpty) {
metrics.increment(datasetId, config.inputEventCountMetric, 1)
}
val rowData = convertToRowData(event)
rowData
} catch {
case ex: Exception =>
metrics.incCounter(datasetId, config.failedEventCountMetric)
metrics.increment(datasetId, config.failedEventCountMetric, 1)
logger.error("Failed to process record", ex)
throw ex
}
Expand All @@ -67,15 +65,8 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) e
val eventJson = JSONUtil.serialize(data)
val flattenedData = hudiSchemaParser.parseJson(datasetId, eventJson)
val rowType = hudiSchemaParser.rowTypeMap(datasetId)
val converter: JsonToRowDataConverters.JsonToRowDataConverter = jsonToRowDataConverters.createRowConverter(rowType)
val rowData = converter.convert(objectMapper.readTree(JSONUtil.serialize(flattenedData))).asInstanceOf[RowData]
rowData
}

def getMetricsList(): MetricsList = {
MetricsList(
datasets = List(datasetId),
metrics = List(config.inputEventCountMetric, config.failedEventCountMetric)
)
val converter: JsonToRowDataConverters.JsonToRowDataConverter =
jsonToRowDataConverters.createRowConverter(rowType)
converter.convert(objectMapper.readTree(JSONUtil.serialize(flattenedData))).asInstanceOf[RowData]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ class HudiConnectorConfig(override val config: Config) extends BaseJobConfig[mut

// Metrics

val inputEventCountMetric = "inputEventCount"
val failedEventCountMetric = "failedEventCount"
val inputEventCountMetric = "input-event-count"
val failedEventCountMetric = "failed-event-count"

// Metrics Exporter
val metricsReportType = config.getString("metrics.reporter.type")
val metricsReporterHost = config.getString("metrics.reporter.host")
val metricsReporterPort = config.getString("metrics.reporter.port")
val metricsReportType: String = config.getString("metrics.reporter.type")
val metricsReporterHost: String = config.getString("metrics.reporter.host")
val metricsReporterPort: String = config.getString("metrics.reporter.port")


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.sunbird.obsrv.util


import org.apache.flink.metrics.Gauge

import scala.collection.concurrent.TrieMap

class HMetrics {
private val metricStore = TrieMap[(String, String), Long]()

def increment(dataset: String, metric: String, value: Long): Unit = {
metricStore.synchronized {
val key = (dataset, metric)
val current = metricStore.getOrElse(key, 0L)
metricStore.put(key, current + value)
}
}

def getAndReset(dataset: String, metric: String): Long = {
metricStore.synchronized {
val key = (dataset, metric)
val current = metricStore.getOrElse(key, 0L)
metricStore.remove(key)
current
}
}
}

case class ScalaGauge[T](getValueFn: () => T) extends Gauge[T] {
override def getValue: T = getValueFn()
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
include "baseconfig.conf"

kafka {
input.topic = ${job.env}".masterdata.ingest"
output.raw.topic = ${job.env}".masterdata.raw"
output.extractor.duplicate.topic = ${job.env}".masterdata.failed"
output.failed.topic = ${job.env}".masterdata.failed"
output.batch.failed.topic = ${job.env}".masterdata.failed"
input.topic = "masterdata.ingest"
output.raw.topic = "masterdata.raw"
output.extractor.duplicate.topic = "masterdata.failed"
output.failed.topic = "masterdata.failed"
output.batch.failed.topic = "masterdata.failed"
event.max.size = "1048576" # Max is only 1MB
output.invalid.topic = ${job.env}".masterdata.failed"
output.unique.topic = ${job.env}".masterdata.unique"
output.duplicate.topic = ${job.env}".masterdata.failed"
output.denorm.topic = ${job.env}".masterdata.denorm"
output.transform.topic = ${job.env}".masterdata.transform"
output.transform.failed.topic = ${job.env}".masterdata.transform.failed"
stats.topic = ${job.env}".masterdata.stats"
groupId = ${job.env}"-masterdata-pipeline-group"
output.invalid.topic = "masterdata.failed"
output.unique.topic = "masterdata.unique"
output.duplicate.topic = "masterdata.failed"
output.denorm.topic = "masterdata.denorm"
output.transform.topic = "masterdata.transform"
output.transform.failed.topic = "masterdata.transform.failed"
stats.topic = "masterdata.stats"
groupId = "masterdata-pipeline-group"
producer {
max-request-size = 5242880
}
Expand Down
24 changes: 12 additions & 12 deletions pipeline/master-data-processor/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ job {

kafka {

input.topic = ${job.env}".masterdata.ingest"
output.raw.topic = ${job.env}".masterdata.raw"
output.extractor.duplicate.topic = ${job.env}".masterdata.failed"
output.failed.topic = ${job.env}".masterdata.failed"
output.batch.failed.topic = ${job.env}".masterdata.failed"
input.topic = "masterdata.ingest"
output.raw.topic = "masterdata.raw"
output.extractor.duplicate.topic = "masterdata.failed"
output.failed.topic = "masterdata.failed"
output.batch.failed.topic = "masterdata.failed"
event.max.size = "1048576" # Max is only 1MB
output.invalid.topic = ${job.env}".masterdata.failed"
output.unique.topic = ${job.env}".masterdata.unique"
output.duplicate.topic = ${job.env}".masterdata.failed"
output.denorm.topic = ${job.env}".masterdata.denorm"
output.transform.topic = ${job.env}".masterdata.transform"
output.transform.failed.topic = ${job.env}".masterdata.transform.failed"
stats.topic = ${job.env}".masterdata.stats"
output.invalid.topic = "masterdata.failed"
output.unique.topic = "masterdata.unique"
output.duplicate.topic = "masterdata.failed"
output.denorm.topic = "masterdata.denorm"
output.transform.topic = "masterdata.transform"
output.transform.failed.topic = "masterdata.transform.failed"
stats.topic = "masterdata.stats"
groupId = ${job.env}"-masterdata-pipeline-group"
producer {
max-request-size = 5242880
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
include "baseconfig.conf"

kafka {
input.topic = ${job.env}".raw"
output.invalid.topic = ${job.env}".failed"
output.unique.topic = ${job.env}".unique"
output.duplicate.topic = ${job.env}".failed"
groupId = ${job.env}"-pipeline-preprocessor-group"
input.topic = "raw"
output.invalid.topic = "failed"
output.unique.topic = "unique"
output.duplicate.topic = "failed"
groupId = "pipeline-preprocessor-group"
}

task {
Expand Down
8 changes: 4 additions & 4 deletions pipeline/transformer/src/main/resources/transformer.conf
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
include "baseconfig.conf"

kafka {
input.topic = ${job.env}".denorm"
output.transform.topic = ${job.env}".transform"
output.transform.failed.topic = ${job.env}".transform.failed"
groupId = ${job.env}"-transformer-group"
input.topic = "denorm"
output.transform.topic = "transform"
output.transform.failed.topic = "transform.failed"
groupId = "transformer-group"
producer {
max-request-size = 5242880
}
Expand Down
26 changes: 13 additions & 13 deletions pipeline/unified-pipeline/src/main/resources/unified-pipeline.conf
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
include "baseconfig.conf"

kafka {
input.topic = ${job.env}".ingest"
output.raw.topic = ${job.env}".raw"
output.extractor.duplicate.topic = ${job.env}".failed"
output.batch.failed.topic = ${job.env}".failed"
input.topic = "ingest"
output.raw.topic = "raw"
output.extractor.duplicate.topic = "failed"
output.batch.failed.topic = "failed"
event.max.size = "1048576" # Max is only 1MB
output.invalid.topic = ${job.env}".failed"
output.unique.topic = ${job.env}".unique"
output.duplicate.topic = ${job.env}".failed"
output.denorm.topic = ${job.env}".denorm"
output.denorm.failed.topic = ${job.env}".failed"
output.transform.topic = ${job.env}".transform"
output.transform.failed.topic = ${job.env}".failed"
stats.topic = ${job.env}".stats"
groupId = ${job.env}"-unified-pipeline-group"
output.invalid.topic = "failed"
output.unique.topic = "unique"
output.duplicate.topic = "failed"
output.denorm.topic = "denorm"
output.denorm.failed.topic = "failed"
output.transform.topic = "transform"
output.transform.failed.topic = "failed"
stats.topic = "stats"
groupId = "unified-pipeline-group"
producer {
max-request-size = 5242880
}
Expand Down
Loading
Loading