Skip to content

Commit

Permalink
Added env names for ingest arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
Nekit2217 authored and rtyler committed May 25, 2024
1 parent 4c039a8 commit ef4b050
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 103 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ apache-avro = "^0.14"
base64 = "0.13"
bytes = "1"
chrono = "0.4.31"
clap = { version = "4", features = ["color"] }
clap = { version = "4", features = ["color", "env"] }
dipstick = "0.9"
env_logger = "0"
futures = "0.3"
Expand Down
223 changes: 121 additions & 102 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,65 +309,77 @@ fn parse_seek_offsets(val: &str) -> Vec<(DataTypePartition, DataTypeOffset)> {

fn build_app() -> Command {
Command::new("kafka-delta-ingest")
.version(env!["CARGO_PKG_VERSION"])
.about("Daemon for ingesting messages from Kafka and writing them to a Delta table")
.subcommand(
Command::new("ingest")
.about("Starts a stream that consumes from a Kafka topic and writes to a Delta table")
.arg(Arg::new("topic")
.help("The Kafka topic to stream from")
.required(true))
.arg(Arg::new("table_location")
.help("The Delta table location to write out")
.required(true))
.arg(Arg::new("kafka")
.short('k')
.long("kafka")
.help("Kafka broker connection string to use")
.default_value("localhost:9092"))
.arg(Arg::new("consumer_group")
.short('g')
.long("consumer_group")
.help("Consumer group to use when subscribing to Kafka topics")
.default_value("kafka_delta_ingest"))
.arg(Arg::new("app_id")
.short('a')
.long("app_id")
.help("App ID to use when writing to Delta")
.default_value("kafka_delta_ingest"))
.arg(Arg::new("seek_offsets")
.long("seek_offsets")
.help(r#"Only useful when offsets are not already stored in the delta table. A JSON string specifying the partition offset map as the starting point for ingestion. This is *seeking* rather than _starting_ offsets. The first ingested message would be (seek_offset + 1). Ex: {"0":123, "1":321}"#))
.arg(Arg::new("auto_offset_reset")
.short('o')
.long("auto_offset_reset")
.help(r#"The default offset reset policy, which is either 'earliest' or 'latest'.
.version(env!["CARGO_PKG_VERSION"])
.about("Daemon for ingesting messages from Kafka and writing them to a Delta table")
.subcommand(
Command::new("ingest")
.about("Starts a stream that consumes from a Kafka topic and writes to a Delta table")
.arg(Arg::new("topic")
.env("KAFKA_TOPIC")
.help("The Kafka topic to stream from")
.required(true))
.arg(Arg::new("table_location")
.env("TABLE_LOCATION")
.help("The Delta table location to write out")
.required(true)
)
.arg(Arg::new("kafka")
.short('k')
.long("kafka")
.env("KAFKA_BROKERS")
.help("Kafka broker connection string to use")
.default_value("localhost:9092"))
.arg(Arg::new("consumer_group")
.short('g')
.long("consumer_group")
.env("KAFKA_CONSUMER_GROUP")
.help("Consumer group to use when subscribing to Kafka topics")
.default_value("kafka_delta_ingest"))
.arg(Arg::new("app_id")
.short('a')
.long("app_id")
.env("APP_ID")
.help("App ID to use when writing to Delta")
.default_value("kafka_delta_ingest"))
.arg(Arg::new("seek_offsets")
.long("seek_offsets")
.env("KAFKA_SEEK_OFFSETS")
.help(r#"Only useful when offsets are not already stored in the delta table. A JSON string specifying the partition offset map as the starting point for ingestion. This is *seeking* rather than _starting_ offsets. The first ingested message would be (seek_offset + 1). Ex: {"0":123, "1":321}"#))
.arg(Arg::new("auto_offset_reset")
.short('o')
.long("auto_offset_reset")
.env("KAFKA_AUTO_OFFSET_RESET")
.help(r#"The default offset reset policy, which is either 'earliest' or 'latest'.
The configuration is applied when offsets are not found in delta table or not specified with 'seek_offsets'. This also overrides the kafka consumer's 'auto.offset.reset' config."#)
.default_value("earliest"))
.arg(Arg::new("allowed_latency")
.short('l')
.long("allowed_latency")
.help("The allowed latency (in seconds) from the time a message is consumed to when it should be written to Delta.")
.default_value("300")
.value_parser(clap::value_parser!(u64)))
.arg(Arg::new("max_messages_per_batch")
.short('m')
.long("max_messages_per_batch")
.help("The maximum number of rows allowed in a parquet batch. This shoulid be the approximate number of bytes described by MIN_BYTES_PER_FILE")
.default_value("5000")
.value_parser(clap::value_parser!(usize)))
.arg(Arg::new("min_bytes_per_file")
.short('b')
.long("min_bytes_per_file")
.help("The target minimum file size (in bytes) for each Delta file. File size may be smaller than this value if ALLOWED_LATENCY does not allow enough time to accumulate the specified number of bytes.")
.default_value("134217728")
.value_parser(clap::value_parser!(usize)))
.arg(Arg::new("transform")
.short('t')
.long("transform")
.action(ArgAction::Append)
.help(
r#"A list of transforms to apply to each Kafka message. Each transform should follow the pattern:
.default_value("earliest"))
.arg(Arg::new("allowed_latency")
.short('l')
.long("allowed_latency")
.env("ALLOWED_LATENCY")
.help("The allowed latency (in seconds) from the time a message is consumed to when it should be written to Delta.")
.default_value("300")
.value_parser(clap::value_parser!(u64)))
.arg(Arg::new("max_messages_per_batch")
.short('m')
.long("max_messages_per_batch")
.env("MAX_MESSAGES_PER_BATCH")
.help("The maximum number of rows allowed in a parquet batch. This shoulid be the approximate number of bytes described by MIN_BYTES_PER_FILE")
.default_value("5000")
.value_parser(clap::value_parser!(usize)))
.arg(Arg::new("min_bytes_per_file")
.short('b')
.long("min_bytes_per_file")
.env("MIN_BYTES_PER_FILE")
.help("The target minimum file size (in bytes) for each Delta file. File size may be smaller than this value if ALLOWED_LATENCY does not allow enough time to accumulate the specified number of bytes.")
.default_value("134217728")
.value_parser(clap::value_parser!(usize)))
.arg(Arg::new("transform")
.short('t')
.long("transform")
.env("TRANSFORMS")
.action(ArgAction::Append)
.help(
r#"A list of transforms to apply to each Kafka message. Each transform should follow the pattern:
"PROPERTY: SOURCE". For example:
... -t 'modified_date: substr(modified,`0`,`10`)' 'kafka_offset: kafka.offset'
Expand All @@ -388,54 +400,61 @@ the following well-known Kafka metadata properties:
* kafka.topic
* kafka.timestamp
"#))
.arg(Arg::new("dlq_table_location")
.long("dlq_table_location")
.required(false)
.help("Optional table to write unprocessable entities to"))
.arg(Arg::new("dlq_transform")
.long("dlq_transform")
.required(false)
.action(ArgAction::Append)
.help("Transforms to apply before writing unprocessable entities to the dlq_location"))
.arg(Arg::new("checkpoints")
.short('c')
.long("checkpoints")
.action(ArgAction::SetTrue)
.help("If set then kafka-delta-ingest will write checkpoints on every 10th commit"))
.arg(Arg::new("kafka_setting")
.short('K')
.long("kafka_setting")
.action(ArgAction::Append)
.help(r#"A list of additional settings to include when creating the Kafka consumer.
.arg(Arg::new("dlq_table_location")
.long("dlq_table_location")
.env("DLQ_TABLE_LOCATION")
.required(false)
.help("Optional table to write unprocessable entities to"))
.arg(Arg::new("dlq_transform")
.long("dlq_transform")
.env("DLQ_TRANSFORMS")
.required(false)
.action(ArgAction::Append)
.help("Transforms to apply before writing unprocessable entities to the dlq_location"))
.arg(Arg::new("checkpoints")
.short('c')
.long("checkpoints")
.env("WRITE_CHECKPOINTS")
.action(ArgAction::SetTrue)
.help("If set then kafka-delta-ingest will write checkpoints on every 10th commit"))
.arg(Arg::new("kafka_setting")
.short('K')
.long("kafka_setting")
.action(ArgAction::Append)
.help(r#"A list of additional settings to include when creating the Kafka consumer.
This can be used to provide TLS configuration as in:
... -K "security.protocol=SSL" "ssl.certificate.location=kafka.crt" "ssl.key.location=kafka.key""#))
.arg(Arg::new("statsd_endpoint")
.short('s')
.long("statsd_endpoint")
.help("Statsd endpoint for sending stats")
.default_value("localhost:8125"))
.arg(Arg::new("json")
.required(false)
.long("json")
.help("Schema registry endpoint, local path, or empty string"))
.arg(Arg::new("avro")
.long("avro")
.required(false)
.help("Schema registry endpoint, local path, or empty string"))
.group(ArgGroup::new("format")
.args(["json", "avro"])
.required(false))
.arg(Arg::new("end")
.short('e')
.long("ends_at_latest_offsets")
.required(false)
.num_args(0)
.action(ArgAction::SetTrue)
.help(""))
)
.arg_required_else_help(true)
.arg(Arg::new("statsd_endpoint")
.short('s')
.long("statsd_endpoint")
.env("STATSD_ENDPOINT")
.help("Statsd endpoint for sending stats")
.default_value("localhost:8125"))
.arg(Arg::new("json")
.required(false)
.long("json")
.env("JSON_REGISTRY")
.help("Schema registry endpoint, local path, or empty string"))
.arg(Arg::new("avro")
.long("avro")
.env("AVRO_REGISTRY")
.required(false)
.help("Schema registry endpoint, local path, or empty string"))
.group(ArgGroup::new("format")
.args(["json", "avro"])
.required(false))
.arg(Arg::new("end")
.short('e')
.long("ends_at_latest_offsets")
.env("TURN_OFF_AT_KAFKA_TOPIC_END")
.required(false)
.num_args(0)
.action(ArgAction::SetTrue)
.help(""))
)
.arg_required_else_help(true)
}

fn convert_matches_to_message_format(
Expand Down

0 comments on commit ef4b050

Please sign in to comment.