Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Env for ingest args #177

Merged
merged 4 commits into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ apache-avro = "^0.14"
base64 = "0.13"
bytes = "1"
chrono = "0.4.31"
clap = { version = "4", features = ["color"] }
clap = { version = "4", features = ["color", "env"] }
dipstick = "0.9"
env_logger = "0"
futures = "0.3"
Expand Down
222 changes: 120 additions & 102 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,65 +309,76 @@ fn parse_seek_offsets(val: &str) -> Vec<(DataTypePartition, DataTypeOffset)> {

fn build_app() -> Command {
Command::new("kafka-delta-ingest")
.version(env!["CARGO_PKG_VERSION"])
.about("Daemon for ingesting messages from Kafka and writing them to a Delta table")
.subcommand(
Command::new("ingest")
.about("Starts a stream that consumes from a Kafka topic and writes to a Delta table")
.arg(Arg::new("topic")
.help("The Kafka topic to stream from")
.required(true))
.arg(Arg::new("table_location")
.help("The Delta table location to write out")
.required(true))
.arg(Arg::new("kafka")
.short('k')
.long("kafka")
.help("Kafka broker connection string to use")
.default_value("localhost:9092"))
.arg(Arg::new("consumer_group")
.short('g')
.long("consumer_group")
.help("Consumer group to use when subscribing to Kafka topics")
.default_value("kafka_delta_ingest"))
.arg(Arg::new("app_id")
.short('a')
.long("app_id")
.help("App ID to use when writing to Delta")
.default_value("kafka_delta_ingest"))
.arg(Arg::new("seek_offsets")
.long("seek_offsets")
.help(r#"Only useful when offsets are not already stored in the delta table. A JSON string specifying the partition offset map as the starting point for ingestion. This is *seeking* rather than _starting_ offsets. The first ingested message would be (seek_offset + 1). Ex: {"0":123, "1":321}"#))
.arg(Arg::new("auto_offset_reset")
.short('o')
.long("auto_offset_reset")
.help(r#"The default offset reset policy, which is either 'earliest' or 'latest'.
.version(env!["CARGO_PKG_VERSION"])
.about("Daemon for ingesting messages from Kafka and writing them to a Delta table")
.subcommand(
Command::new("ingest")
.about("Starts a stream that consumes from a Kafka topic and writes to a Delta table")
.arg(Arg::new("topic")
.env("KAFKA_TOPIC")
.help("The Kafka topic to stream from")
.required(true))
.arg(Arg::new("table_location")
.env("TABLE_LOCATION")
.help("The Delta table location to write out")
.required(true))
.arg(Arg::new("kafka")
.short('k')
.long("kafka")
.env("KAFKA_BROKERS")
.help("Kafka broker connection string to use")
.default_value("localhost:9092"))
.arg(Arg::new("consumer_group")
.short('g')
.long("consumer_group")
.env("KAFKA_CONSUMER_GROUP")
.help("Consumer group to use when subscribing to Kafka topics")
.default_value("kafka_delta_ingest"))
.arg(Arg::new("app_id")
.short('a')
.long("app_id")
.env("APP_ID")
.help("App ID to use when writing to Delta")
.default_value("kafka_delta_ingest"))
.arg(Arg::new("seek_offsets")
.long("seek_offsets")
.env("KAFKA_SEEK_OFFSETS")
.help(r#"Only useful when offsets are not already stored in the delta table. A JSON string specifying the partition offset map as the starting point for ingestion. This is *seeking* rather than _starting_ offsets. The first ingested message would be (seek_offset + 1). Ex: {"0":123, "1":321}"#))
.arg(Arg::new("auto_offset_reset")
.short('o')
.long("auto_offset_reset")
.env("KAFKA_AUTO_OFFSET_RESET")
.help(r#"The default offset reset policy, which is either 'earliest' or 'latest'.
The configuration is applied when offsets are not found in delta table or not specified with 'seek_offsets'. This also overrides the kafka consumer's 'auto.offset.reset' config."#)
.default_value("earliest"))
.arg(Arg::new("allowed_latency")
.short('l')
.long("allowed_latency")
.help("The allowed latency (in seconds) from the time a message is consumed to when it should be written to Delta.")
.default_value("300")
.value_parser(clap::value_parser!(u64)))
.arg(Arg::new("max_messages_per_batch")
.short('m')
.long("max_messages_per_batch")
.help("The maximum number of rows allowed in a parquet batch. This shoulid be the approximate number of bytes described by MIN_BYTES_PER_FILE")
.default_value("5000")
.value_parser(clap::value_parser!(usize)))
.arg(Arg::new("min_bytes_per_file")
.short('b')
.long("min_bytes_per_file")
.help("The target minimum file size (in bytes) for each Delta file. File size may be smaller than this value if ALLOWED_LATENCY does not allow enough time to accumulate the specified number of bytes.")
.default_value("134217728")
.value_parser(clap::value_parser!(usize)))
.arg(Arg::new("transform")
.short('t')
.long("transform")
.action(ArgAction::Append)
.help(
r#"A list of transforms to apply to each Kafka message. Each transform should follow the pattern:
.default_value("earliest"))
.arg(Arg::new("allowed_latency")
.short('l')
.long("allowed_latency")
.env("ALLOWED_LATENCY")
.help("The allowed latency (in seconds) from the time a message is consumed to when it should be written to Delta.")
.default_value("300")
.value_parser(clap::value_parser!(u64)))
.arg(Arg::new("max_messages_per_batch")
.short('m')
.long("max_messages_per_batch")
.env("MAX_MESSAGES_PER_BATCH")
.help("The maximum number of rows allowed in a parquet batch. This shoulid be the approximate number of bytes described by MIN_BYTES_PER_FILE")
.default_value("5000")
.value_parser(clap::value_parser!(usize)))
.arg(Arg::new("min_bytes_per_file")
.short('b')
.long("min_bytes_per_file")
.env("MIN_BYTES_PER_FILE")
.help("The target minimum file size (in bytes) for each Delta file. File size may be smaller than this value if ALLOWED_LATENCY does not allow enough time to accumulate the specified number of bytes.")
.default_value("134217728")
.value_parser(clap::value_parser!(usize)))
.arg(Arg::new("transform")
.short('t')
.long("transform")
.env("TRANSFORMS")
.action(ArgAction::Append)
.help(
r#"A list of transforms to apply to each Kafka message. Each transform should follow the pattern:
"PROPERTY: SOURCE". For example:

... -t 'modified_date: substr(modified,`0`,`10`)' 'kafka_offset: kafka.offset'
Expand All @@ -388,54 +399,61 @@ the following well-known Kafka metadata properties:
* kafka.topic
* kafka.timestamp
"#))
.arg(Arg::new("dlq_table_location")
.long("dlq_table_location")
.required(false)
.help("Optional table to write unprocessable entities to"))
.arg(Arg::new("dlq_transform")
.long("dlq_transform")
.required(false)
.action(ArgAction::Append)
.help("Transforms to apply before writing unprocessable entities to the dlq_location"))
.arg(Arg::new("checkpoints")
.short('c')
.long("checkpoints")
.action(ArgAction::SetTrue)
.help("If set then kafka-delta-ingest will write checkpoints on every 10th commit"))
.arg(Arg::new("kafka_setting")
.short('K')
.long("kafka_setting")
.action(ArgAction::Append)
.help(r#"A list of additional settings to include when creating the Kafka consumer.
.arg(Arg::new("dlq_table_location")
.long("dlq_table_location")
.env("DLQ_TABLE_LOCATION")
.required(false)
.help("Optional table to write unprocessable entities to"))
.arg(Arg::new("dlq_transform")
.long("dlq_transform")
.env("DLQ_TRANSFORMS")
.required(false)
.action(ArgAction::Append)
.help("Transforms to apply before writing unprocessable entities to the dlq_location"))
.arg(Arg::new("checkpoints")
.short('c')
.long("checkpoints")
.env("WRITE_CHECKPOINTS")
.action(ArgAction::SetTrue)
.help("If set then kafka-delta-ingest will write checkpoints on every 10th commit"))
.arg(Arg::new("kafka_setting")
.short('K')
.long("kafka_setting")
.action(ArgAction::Append)
.help(r#"A list of additional settings to include when creating the Kafka consumer.

This can be used to provide TLS configuration as in:

... -K "security.protocol=SSL" "ssl.certificate.location=kafka.crt" "ssl.key.location=kafka.key""#))
.arg(Arg::new("statsd_endpoint")
.short('s')
.long("statsd_endpoint")
.help("Statsd endpoint for sending stats")
.default_value("localhost:8125"))
.arg(Arg::new("json")
.required(false)
.long("json")
.help("Schema registry endpoint, local path, or empty string"))
.arg(Arg::new("avro")
.long("avro")
.required(false)
.help("Schema registry endpoint, local path, or empty string"))
.group(ArgGroup::new("format")
.args(["json", "avro"])
.required(false))
.arg(Arg::new("end")
.short('e')
.long("ends_at_latest_offsets")
.required(false)
.num_args(0)
.action(ArgAction::SetTrue)
.help(""))
)
.arg_required_else_help(true)
.arg(Arg::new("statsd_endpoint")
.short('s')
.long("statsd_endpoint")
.env("STATSD_ENDPOINT")
.help("Statsd endpoint for sending stats")
.default_value("localhost:8125"))
.arg(Arg::new("json")
.required(false)
.long("json")
.env("JSON_REGISTRY")
.help("Schema registry endpoint, local path, or empty string"))
.arg(Arg::new("avro")
.long("avro")
.env("AVRO_REGISTRY")
.required(false)
.help("Schema registry endpoint, local path, or empty string"))
.group(ArgGroup::new("format")
.args(["json", "avro"])
.required(false))
.arg(Arg::new("end")
.short('e')
.long("ends_at_latest_offsets")
.env("ENDS_AT_LATEST_OFFSETS")
.required(false)
.num_args(0)
.action(ArgAction::SetTrue)
.help(""))
)
.arg_required_else_help(true)
}

fn convert_matches_to_message_format(
Expand Down
Loading