diff --git a/Cargo.toml b/Cargo.toml index 978f3a8..28e9a8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ apache-avro = "^0.14" base64 = "0.13" bytes = "1" chrono = "0.4.31" -clap = { version = "4", features = ["color"] } +clap = { version = "4", features = ["color", "env"] } dipstick = "0.9" env_logger = "0" futures = "0.3" diff --git a/src/main.rs b/src/main.rs index ae2c89f..56a2c50 100644 --- a/src/main.rs +++ b/src/main.rs @@ -309,65 +309,76 @@ fn parse_seek_offsets(val: &str) -> Vec<(DataTypePartition, DataTypeOffset)> { fn build_app() -> Command { Command::new("kafka-delta-ingest") - .version(env!["CARGO_PKG_VERSION"]) - .about("Daemon for ingesting messages from Kafka and writing them to a Delta table") - .subcommand( - Command::new("ingest") - .about("Starts a stream that consumes from a Kafka topic and writes to a Delta table") - .arg(Arg::new("topic") - .help("The Kafka topic to stream from") - .required(true)) - .arg(Arg::new("table_location") - .help("The Delta table location to write out") - .required(true)) - .arg(Arg::new("kafka") - .short('k') - .long("kafka") - .help("Kafka broker connection string to use") - .default_value("localhost:9092")) - .arg(Arg::new("consumer_group") - .short('g') - .long("consumer_group") - .help("Consumer group to use when subscribing to Kafka topics") - .default_value("kafka_delta_ingest")) - .arg(Arg::new("app_id") - .short('a') - .long("app_id") - .help("App ID to use when writing to Delta") - .default_value("kafka_delta_ingest")) - .arg(Arg::new("seek_offsets") - .long("seek_offsets") - .help(r#"Only useful when offsets are not already stored in the delta table. A JSON string specifying the partition offset map as the starting point for ingestion. This is *seeking* rather than _starting_ offsets. The first ingested message would be (seek_offset + 1). Ex: {"0":123, "1":321}"#)) - .arg(Arg::new("auto_offset_reset") - .short('o') - .long("auto_offset_reset") - .help(r#"The default offset reset policy, which is either 'earliest' or 'latest'. + .version(env!["CARGO_PKG_VERSION"]) + .about("Daemon for ingesting messages from Kafka and writing them to a Delta table") + .subcommand( + Command::new("ingest") + .about("Starts a stream that consumes from a Kafka topic and writes to a Delta table") + .arg(Arg::new("topic") + .env("KAFKA_TOPIC") + .help("The Kafka topic to stream from") + .required(true)) + .arg(Arg::new("table_location") + .env("TABLE_LOCATION") + .help("The Delta table location to write out") + .required(true)) + .arg(Arg::new("kafka") + .short('k') + .long("kafka") + .env("KAFKA_BROKERS") + .help("Kafka broker connection string to use") + .default_value("localhost:9092")) + .arg(Arg::new("consumer_group") + .short('g') + .long("consumer_group") + .env("KAFKA_CONSUMER_GROUP") + .help("Consumer group to use when subscribing to Kafka topics") + .default_value("kafka_delta_ingest")) + .arg(Arg::new("app_id") + .short('a') + .long("app_id") + .env("APP_ID") + .help("App ID to use when writing to Delta") + .default_value("kafka_delta_ingest")) + .arg(Arg::new("seek_offsets") + .long("seek_offsets") + .env("KAFKA_SEEK_OFFSETS") + .help(r#"Only useful when offsets are not already stored in the delta table. A JSON string specifying the partition offset map as the starting point for ingestion. This is *seeking* rather than _starting_ offsets. The first ingested message would be (seek_offset + 1). Ex: {"0":123, "1":321}"#)) + .arg(Arg::new("auto_offset_reset") + .short('o') + .long("auto_offset_reset") + .env("KAFKA_AUTO_OFFSET_RESET") + .help(r#"The default offset reset policy, which is either 'earliest' or 'latest'. The configuration is applied when offsets are not found in delta table or not specified with 'seek_offsets'. This also overrides the kafka consumer's 'auto.offset.reset' config."#) - .default_value("earliest")) - .arg(Arg::new("allowed_latency") - .short('l') - .long("allowed_latency") - .help("The allowed latency (in seconds) from the time a message is consumed to when it should be written to Delta.") - .default_value("300") - .value_parser(clap::value_parser!(u64))) - .arg(Arg::new("max_messages_per_batch") - .short('m') - .long("max_messages_per_batch") - .help("The maximum number of rows allowed in a parquet batch. This shoulid be the approximate number of bytes described by MIN_BYTES_PER_FILE") - .default_value("5000") - .value_parser(clap::value_parser!(usize))) - .arg(Arg::new("min_bytes_per_file") - .short('b') - .long("min_bytes_per_file") - .help("The target minimum file size (in bytes) for each Delta file. File size may be smaller than this value if ALLOWED_LATENCY does not allow enough time to accumulate the specified number of bytes.") - .default_value("134217728") - .value_parser(clap::value_parser!(usize))) - .arg(Arg::new("transform") - .short('t') - .long("transform") - .action(ArgAction::Append) - .help( -r#"A list of transforms to apply to each Kafka message. Each transform should follow the pattern: + .default_value("earliest")) + .arg(Arg::new("allowed_latency") + .short('l') + .long("allowed_latency") + .env("ALLOWED_LATENCY") + .help("The allowed latency (in seconds) from the time a message is consumed to when it should be written to Delta.") + .default_value("300") + .value_parser(clap::value_parser!(u64))) + .arg(Arg::new("max_messages_per_batch") + .short('m') + .long("max_messages_per_batch") + .env("MAX_MESSAGES_PER_BATCH") + .help("The maximum number of rows allowed in a parquet batch. This shoulid be the approximate number of bytes described by MIN_BYTES_PER_FILE") + .default_value("5000") + .value_parser(clap::value_parser!(usize))) + .arg(Arg::new("min_bytes_per_file") + .short('b') + .long("min_bytes_per_file") + .env("MIN_BYTES_PER_FILE") + .help("The target minimum file size (in bytes) for each Delta file. File size may be smaller than this value if ALLOWED_LATENCY does not allow enough time to accumulate the specified number of bytes.") + .default_value("134217728") + .value_parser(clap::value_parser!(usize))) + .arg(Arg::new("transform") + .short('t') + .long("transform") + .env("TRANSFORMS") + .action(ArgAction::Append) + .help( + r#"A list of transforms to apply to each Kafka message. Each transform should follow the pattern: "PROPERTY: SOURCE". For example: ... -t 'modified_date: substr(modified,`0`,`10`)' 'kafka_offset: kafka.offset' @@ -388,54 +399,61 @@ the following well-known Kafka metadata properties: * kafka.topic * kafka.timestamp "#)) - .arg(Arg::new("dlq_table_location") - .long("dlq_table_location") - .required(false) - .help("Optional table to write unprocessable entities to")) - .arg(Arg::new("dlq_transform") - .long("dlq_transform") - .required(false) - .action(ArgAction::Append) - .help("Transforms to apply before writing unprocessable entities to the dlq_location")) - .arg(Arg::new("checkpoints") - .short('c') - .long("checkpoints") - .action(ArgAction::SetTrue) - .help("If set then kafka-delta-ingest will write checkpoints on every 10th commit")) - .arg(Arg::new("kafka_setting") - .short('K') - .long("kafka_setting") - .action(ArgAction::Append) - .help(r#"A list of additional settings to include when creating the Kafka consumer. + .arg(Arg::new("dlq_table_location") + .long("dlq_table_location") + .env("DLQ_TABLE_LOCATION") + .required(false) + .help("Optional table to write unprocessable entities to")) + .arg(Arg::new("dlq_transform") + .long("dlq_transform") + .env("DLQ_TRANSFORMS") + .required(false) + .action(ArgAction::Append) + .help("Transforms to apply before writing unprocessable entities to the dlq_location")) + .arg(Arg::new("checkpoints") + .short('c') + .long("checkpoints") + .env("WRITE_CHECKPOINTS") + .action(ArgAction::SetTrue) + .help("If set then kafka-delta-ingest will write checkpoints on every 10th commit")) + .arg(Arg::new("kafka_setting") + .short('K') + .long("kafka_setting") + .action(ArgAction::Append) + .help(r#"A list of additional settings to include when creating the Kafka consumer. This can be used to provide TLS configuration as in: ... -K "security.protocol=SSL" "ssl.certificate.location=kafka.crt" "ssl.key.location=kafka.key""#)) - .arg(Arg::new("statsd_endpoint") - .short('s') - .long("statsd_endpoint") - .help("Statsd endpoint for sending stats") - .default_value("localhost:8125")) - .arg(Arg::new("json") - .required(false) - .long("json") - .help("Schema registry endpoint, local path, or empty string")) - .arg(Arg::new("avro") - .long("avro") - .required(false) - .help("Schema registry endpoint, local path, or empty string")) - .group(ArgGroup::new("format") - .args(["json", "avro"]) - .required(false)) - .arg(Arg::new("end") - .short('e') - .long("ends_at_latest_offsets") - .required(false) - .num_args(0) - .action(ArgAction::SetTrue) - .help("")) - ) - .arg_required_else_help(true) + .arg(Arg::new("statsd_endpoint") + .short('s') + .long("statsd_endpoint") + .env("STATSD_ENDPOINT") + .help("Statsd endpoint for sending stats") + .default_value("localhost:8125")) + .arg(Arg::new("json") + .required(false) + .long("json") + .env("JSON_REGISTRY") + .help("Schema registry endpoint, local path, or empty string")) + .arg(Arg::new("avro") + .long("avro") + .env("AVRO_REGISTRY") + .required(false) + .help("Schema registry endpoint, local path, or empty string")) + .group(ArgGroup::new("format") + .args(["json", "avro"]) + .required(false)) + .arg(Arg::new("end") + .short('e') + .long("ends_at_latest_offsets") + .env("ENDS_AT_LATEST_OFFSETS") + .required(false) + .num_args(0) + .action(ArgAction::SetTrue) + .help("")) + ) + .arg_required_else_help(true) } fn convert_matches_to_message_format(