From e968e671473ae35f4d306736c90d85ded5150efc Mon Sep 17 00:00:00 2001 From: Manjunath Date: Fri, 3 Jan 2025 15:17:04 +0530 Subject: [PATCH] #000: Review comment fix - Generation of metrics --- .../obsrv/streaming/HudiConnectorStreamTask.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorStreamTask.scala b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorStreamTask.scala index 678c1e2d..ac220fcf 100644 --- a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorStreamTask.scala +++ b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorStreamTask.scala @@ -12,6 +12,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.table.data.RowData import org.apache.hudi.common.config.TimestampKeyGeneratorConfig +import org.apache.flink.metrics.{Counter, SimpleCounter} import org.apache.hudi.configuration.{FlinkOptions, OptionsResolver} import org.apache.hudi.sink.utils.Pipelines import org.apache.hudi.util.AvroSchemaConverter @@ -50,10 +51,14 @@ class HudiConnectorStreamTask(config: HudiConnectorConfig, kafkaConnector: Flink def process(env: StreamExecutionEnvironment): Unit = { val schemaParser = new HudiSchemaParser() val dataSourceConfig = DatasetRegistry.getAllDatasources().filter(f => f.`type`.nonEmpty && f.`type`.equalsIgnoreCase(Constants.DATALAKE_TYPE) && f.status.equalsIgnoreCase("Live")) - dataSourceConfig.map{ dataSource => + + val inputEventCount = new SimpleCounter() + val failedEventCount = new SimpleCounter() + + dataSourceConfig.map { dataSource => val datasetId = dataSource.datasetId val dataStream = getMapDataStream(env, config, List(datasetId), config.kafkaConsumerProperties(), consumerSourceName = s"kafka-${datasetId}", kafkaConnector) - .map(new RowDataConverterFunction(config, datasetId)) + .map(new RowDataConverterFunction(config, datasetId, inputEventCount, failedEventCount)) .setParallelism(config.downstreamOperatorsParallelism) val conf: Configuration = new Configuration() @@ -70,6 +75,7 @@ class HudiConnectorStreamTask(config: HudiConnectorConfig, kafkaConnector: Flink Pipelines.clean(conf, pipeline).setParallelism(config.downstreamOperatorsParallelism) } }.orElse(List(addDefaultOperator(env, config, kafkaConnector))) + env.execute("Flink-Hudi-Connector") }