Skip to content

Commit

Permalink
#000: Review comment fix - Generation of metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Manjunath authored and Manjunath committed Jan 3, 2025
1 parent 88e7e41 commit 9767d5e
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,11 @@ class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) e

override def map(event: MMap[String, AnyRef]): RowData = {
try {
println("======= map method..")
println(s"Event size: ${event.size}")

if (event.nonEmpty) {
inputEventCount.inc(event.size) // Increment by the event size
inputEventCount.inc()
}
val rowData = convertToRowData(event)
println(s"Current inputEventCount: ${inputEventCount.getCount}")

rowData
} catch {
case ex: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import org.apache.flink.configuration.Configuration
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.table.data.RowData
import org.apache.hudi.common.config.TimestampKeyGeneratorConfig
import org.apache.hudi.configuration.{FlinkOptions, OptionsResolver}
import org.apache.hudi.sink.utils.Pipelines
Expand Down

0 comments on commit 9767d5e

Please sign in to comment.