Skip to content

Commit

Permalink
#000: Review comment fix - Generation of metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Manjunath authored and Manjunath committed Jan 3, 2025
1 parent 83a5ccd commit e968e67
Showing 1 changed file with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.table.data.RowData
import org.apache.hudi.common.config.TimestampKeyGeneratorConfig
import org.apache.flink.metrics.{Counter, SimpleCounter}
import org.apache.hudi.configuration.{FlinkOptions, OptionsResolver}
import org.apache.hudi.sink.utils.Pipelines
import org.apache.hudi.util.AvroSchemaConverter
Expand Down Expand Up @@ -50,10 +51,14 @@ class HudiConnectorStreamTask(config: HudiConnectorConfig, kafkaConnector: Flink
def process(env: StreamExecutionEnvironment): Unit = {
val schemaParser = new HudiSchemaParser()
val dataSourceConfig = DatasetRegistry.getAllDatasources().filter(f => f.`type`.nonEmpty && f.`type`.equalsIgnoreCase(Constants.DATALAKE_TYPE) && f.status.equalsIgnoreCase("Live"))
dataSourceConfig.map{ dataSource =>

val inputEventCount = new SimpleCounter()
val failedEventCount = new SimpleCounter()

dataSourceConfig.map { dataSource =>
val datasetId = dataSource.datasetId
val dataStream = getMapDataStream(env, config, List(datasetId), config.kafkaConsumerProperties(), consumerSourceName = s"kafka-${datasetId}", kafkaConnector)
.map(new RowDataConverterFunction(config, datasetId))
.map(new RowDataConverterFunction(config, datasetId, inputEventCount, failedEventCount))
.setParallelism(config.downstreamOperatorsParallelism)

val conf: Configuration = new Configuration()
Expand All @@ -70,6 +75,7 @@ class HudiConnectorStreamTask(config: HudiConnectorConfig, kafkaConnector: Flink
Pipelines.clean(conf, pipeline).setParallelism(config.downstreamOperatorsParallelism)
}
}.orElse(List(addDefaultOperator(env, config, kafkaConnector)))

env.execute("Flink-Hudi-Connector")
}

Expand Down

0 comments on commit e968e67

Please sign in to comment.