Skip to content

Commit

Permalink
feat: Add iceberg destination
Browse files Browse the repository at this point in the history
  • Loading branch information
fdmsantos committed Nov 8, 2024
1 parent eae8e31 commit fd0a9c7
Show file tree
Hide file tree
Showing 11 changed files with 331 additions and 5 deletions.
35 changes: 33 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Supports all destinations and all Kinesis Firehose Features.
* [Logic Monitor](#logic-monitor)
* [MongoDB](#mongodb)
* [SumoLogic](#sumologic)
* [Iceberg](#iceberg)
* [Server Side Encryption](#server-side-encryption)
* [Data Transformation with Lambda](#data-transformation-with-lambda)
* [Data Format Conversion](#data-format-conversion)
Expand Down Expand Up @@ -525,6 +526,25 @@ module "firehose" {
}
```

#### Iceberg

**To Enabled It:** `destination = "iceberg"`

**Variables Prefix:** `iceberg_`

```hcl
module "firehose" {
source = "fdmsantos/kinesis-firehose/aws"
version = "x.x.x"
name = "firehose-delivery-stream"
destination = "iceberg"
s3_bucket_arn = "<s3_bucket_arn>"
iceberg_catalog_arn = "arn:${data.aws_partition.current.partition}:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:catalog"
iceberg_database_name = "<database>"
iceberg_table_name = "<table>"
}
```

### Server Side Encryption

**Supported By:** Only Direct Put source
Expand Down Expand Up @@ -856,6 +876,7 @@ The destination variable configured in module is mapped to firehose valid destin
| opensearch | opensearch | |
| opensearchserverless | opensearchserverless | |
| snowflake | snowflake | |
| iceberg | iceberg | |
| http_endpoint | http_endpoint | |
| datadog | http_endpoint | The difference regarding http_endpoint is the http_endpoint_url and http_endpoint_name variables aren't support, and it's necessary configure datadog_endpoint_type variable |
| newrelic | http_endpoint | The difference regarding http_endpoint is the http_endpoint_url and http_endpoint_name variables aren't support, and it's necessary configure newrelic_endpoint_type variable |
Expand Down Expand Up @@ -892,20 +913,21 @@ The destination variable configured in module is mapped to firehose valid destin
- [LogicMonitor](https://github.com/fdmsantos/terraform-aws-kinesis-firehose/tree/main/examples/http-endpoint/logicmonitor) - Creates a Kinesis Firehose Stream with Logic Monitor as destination.
- [MongoDB](https://github.com/fdmsantos/terraform-aws-kinesis-firehose/tree/main/examples/http-endpoint/mongodb) - Creates a Kinesis Firehose Stream with MongoDB as destination.
- [SumoLogic](https://github.com/fdmsantos/terraform-aws-kinesis-firehose/tree/main/examples/http-endpoint/sumologic) - Creates a Kinesis Firehose Stream with Sumo Logic as destination.
- [Iceberg](https://github.com/fdmsantos/terraform-aws-kinesis-firehose/tree/main/examples/iceberg/direct-put-to-iceberg) - Creates a Kinesis Firehose Stream with Iceberg as destination.

<!-- BEGINNING OF PRE-COMMIT-TERRAFORM DOCS HOOK -->
## Requirements

| Name | Version |
|------|---------|
| <a name="requirement_terraform"></a> [terraform](#requirement\_terraform) | >= 0.13.1 |
| <a name="requirement_aws"></a> [aws](#requirement\_aws) | >= 5.59 |
| <a name="requirement_aws"></a> [aws](#requirement\_aws) | >= 5.73 |

## Providers

| Name | Version |
|------|---------|
| <a name="provider_aws"></a> [aws](#provider\_aws) | >= 5.59 |
| <a name="provider_aws"></a> [aws](#provider\_aws) | >= 5.73 |

## Modules

Expand All @@ -922,6 +944,7 @@ No modules.
| [aws_iam_policy.cw](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
| [aws_iam_policy.elasticsearch](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
| [aws_iam_policy.glue](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
| [aws_iam_policy.iceberg](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
| [aws_iam_policy.kinesis](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
| [aws_iam_policy.lambda](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
| [aws_iam_policy.msk](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
Expand All @@ -938,6 +961,7 @@ No modules.
| [aws_iam_role_policy_attachment.cw](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy_attachment) | resource |
| [aws_iam_role_policy_attachment.elasticsearch](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy_attachment) | resource |
| [aws_iam_role_policy_attachment.glue](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy_attachment) | resource |
| [aws_iam_role_policy_attachment.iceberg](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy_attachment) | resource |
| [aws_iam_role_policy_attachment.kinesis](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy_attachment) | resource |
| [aws_iam_role_policy_attachment.lambda](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy_attachment) | resource |
| [aws_iam_role_policy_attachment.msk](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy_attachment) | resource |
Expand Down Expand Up @@ -968,6 +992,7 @@ No modules.
| [aws_iam_policy_document.cw](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.elasticsearch](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.glue](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.iceberg](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.kinesis](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.lambda](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.msk](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
Expand Down Expand Up @@ -1076,6 +1101,12 @@ No modules.
| <a name="input_http_endpoint_request_configuration_content_encoding"></a> [http\_endpoint\_request\_configuration\_content\_encoding](#input\_http\_endpoint\_request\_configuration\_content\_encoding) | Kinesis Data Firehose uses the content encoding to compress the body of a request before sending the request to the destination | `string` | `"GZIP"` | no |
| <a name="input_http_endpoint_retry_duration"></a> [http\_endpoint\_retry\_duration](#input\_http\_endpoint\_retry\_duration) | Total amount of seconds Firehose spends on retries. This duration starts after the initial attempt fails, It does not include the time periods during which Firehose waits for acknowledgment from the specified destination after each attempt | `number` | `300` | no |
| <a name="input_http_endpoint_url"></a> [http\_endpoint\_url](#input\_http\_endpoint\_url) | The HTTP endpoint URL to which Kinesis Firehose sends your data | `string` | `null` | no |
| <a name="input_iceberg_catalog_arn"></a> [iceberg\_catalog\_arn](#input\_iceberg\_catalog\_arn) | Glue catalog ARN identifier of the destination Apache Iceberg Tables. You must specify the ARN in the format arn:aws:glue:region:account-id:catalog. | `string` | `null` | no |
| <a name="input_iceberg_database_name"></a> [iceberg\_database\_name](#input\_iceberg\_database\_name) | The name of the Apache Iceberg database. | `string` | `null` | no |
| <a name="input_iceberg_destination_config_s3_error_output_prefix"></a> [iceberg\_destination\_config\_s3\_error\_output\_prefix](#input\_iceberg\_destination\_config\_s3\_error\_output\_prefix) | The table specific S3 error output prefix. All the errors that occurred while delivering to this table will be prefixed with this value in S3 destination. | `string` | `null` | no |
| <a name="input_iceberg_destination_config_unique_keys"></a> [iceberg\_destination\_config\_unique\_keys](#input\_iceberg\_destination\_config\_unique\_keys) | A list of unique keys for a given Apache Iceberg table. Firehose will use these for running Create, Update, or Delete operations on the given Iceberg table. | `list(string)` | `[]` | no |
| <a name="input_iceberg_retry_duration"></a> [iceberg\_retry\_duration](#input\_iceberg\_retry\_duration) | The period of time, in seconds between 0 to 7200, during which Firehose retries to deliver data to the specified destination. | `number` | `300` | no |
| <a name="input_iceberg_table_name"></a> [iceberg\_table\_name](#input\_iceberg\_table\_name) | The name of the Apache Iceberg Table. | `string` | `null` | no |
| <a name="input_input_source"></a> [input\_source](#input\_input\_source) | This is the kinesis firehose source | `string` | `"direct-put"` | no |
| <a name="input_kinesis_source_is_encrypted"></a> [kinesis\_source\_is\_encrypted](#input\_kinesis\_source\_is\_encrypted) | Indicates if Kinesis data stream source is encrypted | `bool` | `false` | no |
| <a name="input_kinesis_source_kms_arn"></a> [kinesis\_source\_kms\_arn](#input\_kinesis\_source\_kms\_arn) | Kinesis Source KMS Key to add Firehose role to decrypt the records. | `string` | `null` | no |
Expand Down
65 changes: 65 additions & 0 deletions examples/iceberg/direct-put-to-iceberg/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Iceberg

Configuration in this directory creates kinesis firehose stream with Direct Put as source and Iceberg as destination.

This example can be tested with Demo Data in Kinesis Firehose Console.

## Usage

To run this example you need to execute:

```bash
$ terraform init
$ terraform plan
$ terraform apply
```

Note that this example may create resources which cost money. Run `terraform destroy` when you don't need these resources.

<!-- BEGINNING OF PRE-COMMIT-TERRAFORM DOCS HOOK -->
## Requirements

| Name | Version |
|------|---------|
| <a name="requirement_terraform"></a> [terraform](#requirement\_terraform) | >= 0.13.1 |
| <a name="requirement_aws"></a> [aws](#requirement\_aws) | ~> 5.0 |
| <a name="requirement_random"></a> [random](#requirement\_random) | >= 2.0 |

## Providers

| Name | Version |
|------|---------|
| <a name="provider_aws"></a> [aws](#provider\_aws) | ~> 5.0 |
| <a name="provider_random"></a> [random](#provider\_random) | >= 2.0 |

## Modules

| Name | Source | Version |
|------|--------|---------|
| <a name="module_firehose"></a> [firehose](#module\_firehose) | ../../../ | n/a |

## Resources

| Name | Type |
|------|------|
| [aws_glue_catalog_database.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/glue_catalog_database) | resource |
| [aws_glue_catalog_table.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/glue_catalog_table) | resource |
| [aws_kms_key.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/kms_key) | resource |
| [aws_s3_bucket.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_bucket) | resource |
| [random_pet.this](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/pet) | resource |
| [aws_caller_identity.current](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/caller_identity) | data source |
| [aws_partition.current](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/partition) | data source |
| [aws_region.current](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/region) | data source |

## Inputs

| Name | Description | Type | Default | Required |
|------|-------------|------|---------|:--------:|
| <a name="input_name_prefix"></a> [name\_prefix](#input\_name\_prefix) | Name prefix to use in resources | `string` | `"firehose-to-iceberg"` | no |

## Outputs

| Name | Description |
|------|-------------|
| <a name="output_firehose_role"></a> [firehose\_role](#output\_firehose\_role) | Firehose Role |
<!-- END OF PRE-COMMIT-TERRAFORM DOCS HOOK -->
64 changes: 64 additions & 0 deletions examples/iceberg/direct-put-to-iceberg/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
data "aws_caller_identity" "current" {}
data "aws_partition" "current" {}
data "aws_region" "current" {}

resource "random_pet" "this" {
length = 2
}

resource "aws_s3_bucket" "this" {
bucket = "${var.name_prefix}-dest-bucket-${random_pet.this.id}"
force_destroy = true
}

resource "aws_kms_key" "this" {
description = "${var.name_prefix}-kms-key"
deletion_window_in_days = 7
}

resource "aws_glue_catalog_database" "this" {
name = "demo"
}

resource "aws_glue_catalog_table" "this" {
name = "demo"
database_name = aws_glue_catalog_database.this.name
table_type = "EXTERNAL_TABLE"
parameters = {
format = "parquet"
}
open_table_format_input {
iceberg_input {
metadata_operation = "CREATE"
version = 2
}
}
storage_descriptor {
location = "s3://${aws_s3_bucket.this.id}"

columns {
name = "my_column_1"
type = "int"
}
}
}

module "firehose" {
source = "../../../"
name = "${var.name_prefix}-delivery-stream"
destination = "iceberg"
s3_bucket_arn = aws_s3_bucket.this.arn
buffering_interval = 30
buffering_size = 10
iceberg_catalog_arn = "arn:${data.aws_partition.current.partition}:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:catalog"
iceberg_database_name = aws_glue_catalog_database.this.name
iceberg_table_name = aws_glue_catalog_table.this.name
s3_backup_mode = "FailedOnly"
s3_backup_prefix = "backup/"
s3_backup_bucket_arn = aws_s3_bucket.this.arn
s3_backup_buffering_interval = 100
s3_backup_buffering_size = 100
s3_backup_compression = "GZIP"
s3_backup_enable_encryption = true
s3_backup_kms_key_arn = aws_kms_key.this.arn
}
4 changes: 4 additions & 0 deletions examples/iceberg/direct-put-to-iceberg/outputs.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
output "firehose_role" {
description = "Firehose Role"
value = module.firehose.kinesis_firehose_role_arn
}
5 changes: 5 additions & 0 deletions examples/iceberg/direct-put-to-iceberg/variables.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
variable "name_prefix" {
description = "Name prefix to use in resources"
type = string
default = "firehose-to-iceberg"
}
14 changes: 14 additions & 0 deletions examples/iceberg/direct-put-to-iceberg/versions.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
terraform {
required_version = ">= 0.13.1"

required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
random = {
source = "hashicorp/random"
version = ">= 2.0"
}
}
}
35 changes: 35 additions & 0 deletions iam.tf
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ locals {
add_elasticsearch_policy = var.create && var.create_role && local.destination == "elasticsearch"
add_opensearch_policy = var.create && var.create_role && local.destination == "opensearch"
add_opensearchserverless_policy = var.create && var.create_role && local.destination == "opensearchserverless"
add_iceberg_policy = var.create && var.create_role && local.destination == "iceberg"
add_vpc_policy = var.create && var.create_role && var.enable_vpc && var.vpc_use_existing_role && local.is_search_destination
add_secretsmanager_policy = var.create && var.create_role && var.enable_secrets_manager
add_secretsmanager_decrypt_policy = local.add_secretsmanager_policy && var.secret_kms_key_arn != null
Expand Down Expand Up @@ -698,6 +699,40 @@ resource "aws_iam_role_policy_attachment" "application" {
policy_arn = aws_iam_policy.application[0].arn
}

##################
# Iceberg
##################
data "aws_iam_policy_document" "iceberg" {
count = local.add_iceberg_policy ? 1 : 0
statement {
effect = "Allow"
actions = [
"glue:GetTable",
"glue:GetDatabase",
"glue:UpdateTable"
]
resources = [
var.iceberg_catalog_arn,
"${replace(var.iceberg_catalog_arn, ":catalog", "")}:database/${var.iceberg_database_name}",
"${replace(var.iceberg_catalog_arn, ":catalog", "")}:table/${var.iceberg_database_name}/${var.iceberg_table_name}"
]
}
}

resource "aws_iam_policy" "iceberg" {
count = local.add_iceberg_policy ? 1 : 0
name = "${local.role_name}-iceberg"
path = var.policy_path
policy = data.aws_iam_policy_document.iceberg[0].json
tags = var.tags
}

resource "aws_iam_role_policy_attachment" "iceberg" {
count = local.add_iceberg_policy ? 1 : 0
role = aws_iam_role.firehose[0].name
policy_arn = aws_iam_policy.iceberg[0].arn
}

##################
# Secrets Manager
##################
Expand Down
6 changes: 5 additions & 1 deletion locals.tf
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ locals {
logicmonitor : "http_endpoint",
mongodb : "http_endpoint",
sumologic : "http_endpoint",
snowflake : "snowflake"
snowflake : "snowflake",
iceberg : "iceberg"
}
destination = local.destinations[var.destination]
s3_destination = local.destination == "extended_s3" ? true : false
Expand Down Expand Up @@ -177,6 +178,9 @@ locals {
FailedOnly : "FailedDataOnly",
All : "AllData"
}
iceberg : {
FailedOnly : "FailedDataOnly"
}
}
s3_backup_mode = local.use_backup_vars_in_s3_configuration ? local.backup_modes[local.destination][var.s3_backup_mode] : null

Expand Down
Loading

0 comments on commit fd0a9c7

Please sign in to comment.