Skip to content

Commit

Permalink
[S3 Data Lake] Rename every class to remove references to Iceberg v2 …
Browse files Browse the repository at this point in the history
…- Part 2 (airbytehq#51022)

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
frifriSF59 and octavia-squidington-iii authored Jan 9, 2025
1 parent 9dd5283 commit c1f28cb
Show file tree
Hide file tree
Showing 23 changed files with 179 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ data:
alias: airbyte-connector-testing-secret-store
connectorType: destination
definitionId: 716ca874-520b-4902-9f80-9fad66754b89
dockerImageTag: 0.2.8
dockerImageTag: 0.2.9
dockerRepository: airbyte/destination-s3-data-lake
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake
githubIssueLabel: destination-s3-data-lake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,25 @@ package io.airbyte.integrations.destination.s3_data_lake

import io.airbyte.cdk.load.check.DestinationChecker
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.integrations.destination.s3_data_lake.io.IcebergTableCleaner
import io.airbyte.integrations.destination.s3_data_lake.io.IcebergUtil
import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableCleaner
import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil
import javax.inject.Singleton
import org.apache.iceberg.Schema
import org.apache.iceberg.types.Types

@Singleton
class IcebergV2Checker(
private val icebergTableCleaner: IcebergTableCleaner,
private val icebergUtil: IcebergUtil,
class S3DataLakeChecker(
private val s3DataLakeTableCleaner: S3DataLakeTableCleaner,
private val s3DataLakeUtil: S3DataLakeUtil,
private val tableIdGenerator: TableIdGenerator,
) : DestinationChecker<IcebergV2Configuration> {
) : DestinationChecker<S3DataLakeConfiguration> {

override fun check(config: IcebergV2Configuration) {
override fun check(config: S3DataLakeConfiguration) {
catalogValidation(config)
}
private fun catalogValidation(config: IcebergV2Configuration) {
val catalogProperties = icebergUtil.toCatalogProperties(config)
val catalog = icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, catalogProperties)
private fun catalogValidation(config: S3DataLakeConfiguration) {
val catalogProperties = s3DataLakeUtil.toCatalogProperties(config)
val catalog = s3DataLakeUtil.createCatalog(DEFAULT_CATALOG_NAME, catalogProperties)

val testTableIdentifier = DestinationStream.Descriptor(TEST_NAMESPACE, TEST_TABLE)

Expand All @@ -34,14 +34,14 @@ class IcebergV2Checker(
Types.NestedField.optional(2, "data", Types.StringType.get()),
)
val table =
icebergUtil.createTable(
s3DataLakeUtil.createTable(
testTableIdentifier,
catalog,
testTableSchema,
catalogProperties,
)

icebergTableCleaner.clearTable(
s3DataLakeTableCleaner.clearTable(
catalog,
tableIdGenerator.toTableIdentifier(testTableIdentifier),
table.io(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const val DEFAULT_STAGING_BRANCH = "airbyte_staging"
const val TEST_NAMESPACE = "airbyte_test_namespace"
const val TEST_TABLE = "airbyte_test_table"

data class IcebergV2Configuration(
data class S3DataLakeConfiguration(
override val awsAccessKeyConfiguration: AWSAccessKeyConfiguration,
override val s3BucketConfiguration: S3BucketConfiguration,
override val icebergCatalogConfiguration: IcebergCatalogConfiguration
Expand All @@ -31,12 +31,12 @@ data class IcebergV2Configuration(
S3BucketConfigurationProvider

@Singleton
class IcebergV2ConfigurationFactory :
DestinationConfigurationFactory<IcebergV2Specification, IcebergV2Configuration> {
class S3DataLakeConfigurationFactory :
DestinationConfigurationFactory<S3DataLakeSpecification, S3DataLakeConfiguration> {
override fun makeWithoutExceptionHandling(
pojo: IcebergV2Specification
): IcebergV2Configuration {
return IcebergV2Configuration(
pojo: S3DataLakeSpecification
): S3DataLakeConfiguration {
return S3DataLakeConfiguration(
awsAccessKeyConfiguration = pojo.toAWSAccessKeyConfiguration(),
s3BucketConfiguration = pojo.toS3BucketConfiguration(),
icebergCatalogConfiguration = pojo.toIcebergCatalogConfiguration(),
Expand All @@ -45,9 +45,9 @@ class IcebergV2ConfigurationFactory :
}

@Factory
class IcebergV2ConfigurationProvider(private val config: DestinationConfiguration) {
class S3DataLakeConfigurationProvider(private val config: DestinationConfiguration) {
@Singleton
fun get(): IcebergV2Configuration {
return config as IcebergV2Configuration
fun get(): S3DataLakeConfiguration {
return config as S3DataLakeConfiguration
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import jakarta.inject.Singleton

@Singleton
@JsonSchemaTitle("Iceberg V2 Destination Specification")
class IcebergV2Specification :
class S3DataLakeSpecification :
ConfigurationSpecification(),
AWSAccessKeySpecification,
S3BucketSpecification,
Expand Down Expand Up @@ -61,7 +61,7 @@ class IcebergV2Specification :
}

@Singleton
class IcebergV2SpecificationExtension : DestinationSpecificationExtension {
class S3DataLakeSpecificationExtension : DestinationSpecificationExtension {
override val supportedSyncModes =
listOf(
DestinationSyncMode.OVERWRITE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.load.state.StreamProcessingFailed
import io.airbyte.cdk.load.write.StreamLoader
import io.airbyte.integrations.destination.s3_data_lake.io.IcebergTableCleaner
import io.airbyte.integrations.destination.s3_data_lake.io.IcebergTableWriterFactory
import io.airbyte.integrations.destination.s3_data_lake.io.IcebergUtil
import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableCleaner
import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableWriterFactory
import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil
import io.github.oshai.kotlinlogging.KotlinLogging
import org.apache.iceberg.Table

@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation")
class IcebergStreamLoader(
class S3DataLakeStreamLoader(
override val stream: DestinationStream,
private val table: Table,
private val icebergTableWriterFactory: IcebergTableWriterFactory,
private val icebergUtil: IcebergUtil,
private val s3DataLakeTableWriterFactory: S3DataLakeTableWriterFactory,
private val s3DataLakeUtil: S3DataLakeUtil,
private val pipeline: MapperPipeline,
private val stagingBranchName: String,
private val mainBranchName: String
Expand All @@ -35,17 +35,17 @@ class IcebergStreamLoader(
totalSizeBytes: Long,
endOfStream: Boolean
): Batch {
icebergTableWriterFactory
s3DataLakeTableWriterFactory
.create(
table = table,
generationId = icebergUtil.constructGenerationIdSuffix(stream),
generationId = s3DataLakeUtil.constructGenerationIdSuffix(stream),
importType = stream.importType
)
.use { writer ->
log.info { "Writing records to branch $stagingBranchName" }
records.forEach { record ->
val icebergRecord =
icebergUtil.toRecord(
s3DataLakeUtil.toRecord(
record = record,
stream = stream,
tableSchema = table.schema(),
Expand Down Expand Up @@ -84,10 +84,10 @@ class IcebergStreamLoader(
}
val generationIdsToDelete =
(0 until stream.minimumGenerationId).map(
icebergUtil::constructGenerationIdSuffix
s3DataLakeUtil::constructGenerationIdSuffix
)
val icebergTableCleaner = IcebergTableCleaner(icebergUtil = icebergUtil)
icebergTableCleaner.deleteGenerationId(
val s3DataLakeTableCleaner = S3DataLakeTableCleaner(s3DataLakeUtil = s3DataLakeUtil)
s3DataLakeTableCleaner.deleteGenerationId(
table,
stagingBranchName,
generationIdsToDelete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,25 @@ import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.iceberg.parquet.IcebergParquetPipelineFactory
import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
import io.airbyte.integrations.destination.s3_data_lake.io.IcebergTableWriterFactory
import io.airbyte.integrations.destination.s3_data_lake.io.IcebergUtil
import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableWriterFactory
import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil
import javax.inject.Singleton
import org.apache.iceberg.Schema

@Singleton
class IcebergV2Writer(
private val icebergTableWriterFactory: IcebergTableWriterFactory,
private val icebergConfiguration: IcebergV2Configuration,
private val icebergUtil: IcebergUtil
class S3DataLakeWriter(
private val s3DataLakeTableWriterFactory: S3DataLakeTableWriterFactory,
private val icebergConfiguration: S3DataLakeConfiguration,
private val s3DataLakeUtil: S3DataLakeUtil
) : DestinationWriter {

override fun createStreamLoader(stream: DestinationStream): StreamLoader {
val properties = icebergUtil.toCatalogProperties(config = icebergConfiguration)
val catalog = icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, properties)
val properties = s3DataLakeUtil.toCatalogProperties(config = icebergConfiguration)
val catalog = s3DataLakeUtil.createCatalog(DEFAULT_CATALOG_NAME, properties)
val pipeline = IcebergParquetPipelineFactory().create(stream)
val schema = icebergUtil.toIcebergSchema(stream = stream, pipeline = pipeline)
val schema = s3DataLakeUtil.toIcebergSchema(stream = stream, pipeline = pipeline)
val table =
icebergUtil.createTable(
s3DataLakeUtil.createTable(
streamDescriptor = stream.descriptor,
catalog = catalog,
schema = schema,
Expand All @@ -35,11 +35,11 @@ class IcebergV2Writer(

existingAndIncomingSchemaShouldBeSame(catalogSchema = schema, tableSchema = table.schema())

return IcebergStreamLoader(
return S3DataLakeStreamLoader(
stream = stream,
table = table,
icebergTableWriterFactory = icebergTableWriterFactory,
icebergUtil = icebergUtil,
s3DataLakeTableWriterFactory = s3DataLakeTableWriterFactory,
s3DataLakeUtil = s3DataLakeUtil,
pipeline = pipeline,
stagingBranchName = DEFAULT_STAGING_BRANCH,
mainBranchName = icebergConfiguration.icebergCatalogConfiguration.mainBranchName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class GlueTableIdGenerator : TableIdGenerator {
}

@Factory
class TableIdGeneratorFactory(private val icebergConfiguration: IcebergV2Configuration) {
class TableIdGeneratorFactory(private val icebergConfiguration: S3DataLakeConfiguration) {
@Singleton
fun create() =
when (icebergConfiguration.icebergCatalogConfiguration.catalogConfiguration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.apache.iceberg.io.SupportsPrefixOperations
* catalog implementations do not clear the underlying files written to table storage.
*/
@Singleton
class IcebergTableCleaner(private val icebergUtil: IcebergUtil) {
class S3DataLakeTableCleaner(private val s3DataLakeUtil: S3DataLakeUtil) {

/**
* Clears the table identified by the provided [TableIdentifier]. This removes all data and
Expand Down Expand Up @@ -49,7 +49,7 @@ class IcebergTableCleaner(private val icebergUtil: IcebergUtil) {
val genIdsToDelete =
generationIdSuffix
.filter {
icebergUtil.assertGenerationIdSuffixIsOfValidFormat(it)
s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(it)
true
}
.toSet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.iceberg.util.PropertyUtil
* and whether primary keys are configured on the destination table's schema.
*/
@Singleton
class IcebergTableWriterFactory(private val icebergUtil: IcebergUtil) {
class S3DataLakeTableWriterFactory(private val s3DataLakeUtil: S3DataLakeUtil) {
/**
* Creates a new [BaseTaskWriter] based on the configuration of the destination target [Table].
*
Expand All @@ -39,7 +39,7 @@ class IcebergTableWriterFactory(private val icebergUtil: IcebergUtil) {
* @return The [BaseTaskWriter] that writes records to the target [Table].
*/
fun create(table: Table, generationId: String, importType: ImportType): BaseTaskWriter<Record> {
icebergUtil.assertGenerationIdSuffixIsOfValidFormat(generationId)
s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(generationId)
val format =
FileFormat.valueOf(
table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import io.airbyte.cdk.load.data.withAirbyteMeta
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.integrations.destination.s3_data_lake.ACCESS_KEY_ID
import io.airbyte.integrations.destination.s3_data_lake.GlueCredentialsProvider
import io.airbyte.integrations.destination.s3_data_lake.IcebergV2Configuration
import io.airbyte.integrations.destination.s3_data_lake.S3DataLakeConfiguration
import io.airbyte.integrations.destination.s3_data_lake.SECRET_ACCESS_KEY
import io.airbyte.integrations.destination.s3_data_lake.TableIdGenerator
import io.github.oshai.kotlinlogging.KotlinLogging
Expand Down Expand Up @@ -68,7 +68,7 @@ data class AWSSystemCredentials(
* will be removed when we change all of this to use Micronaut
*/
@Singleton
class IcebergUtil(
class S3DataLakeUtil(
private val tableIdGenerator: TableIdGenerator,
val awsSystemCredentials: AWSSystemCredentials? = null
) {
Expand Down Expand Up @@ -202,7 +202,7 @@ class IcebergUtil(
* @param config The destination's configuration
* @return The Iceberg [Catalog] configuration properties.
*/
fun toCatalogProperties(config: IcebergV2Configuration): Map<String, String> {
fun toCatalogProperties(config: S3DataLakeConfiguration): Map<String, String> {
val icebergCatalogConfig = config.icebergCatalogConfiguration
val catalogConfig = icebergCatalogConfig.catalogConfiguration
val region = config.s3BucketConfiguration.s3BucketRegion.region
Expand All @@ -226,7 +226,7 @@ class IcebergUtil(
}

private fun buildS3Properties(
config: IcebergV2Configuration,
config: S3DataLakeConfiguration,
icebergCatalogConfig: IcebergCatalogConfiguration
): Map<String, String> {
return buildMap {
Expand All @@ -242,7 +242,7 @@ class IcebergUtil(
}

private fun buildNessieProperties(
config: IcebergV2Configuration,
config: S3DataLakeConfiguration,
catalogConfig: NessieCatalogConfiguration,
s3Properties: Map<String, String>
): Map<String, String> {
Expand Down Expand Up @@ -276,7 +276,7 @@ class IcebergUtil(
}

private fun buildGlueProperties(
config: IcebergV2Configuration,
config: S3DataLakeConfiguration,
catalogConfig: GlueCatalogConfiguration,
icebergCatalogConfig: IcebergCatalogConfiguration
): Map<String, String> {
Expand All @@ -302,7 +302,7 @@ class IcebergUtil(

private fun buildRoleBasedClientProperties(
roleArn: String,
config: IcebergV2Configuration
config: S3DataLakeConfiguration
): Map<String, String> {
val region = config.s3BucketConfiguration.s3BucketRegion.region
val (accessKeyId, secretAccessKey, externalId) =
Expand Down Expand Up @@ -339,7 +339,9 @@ class IcebergUtil(
)
}

private fun buildKeyBasedClientProperties(config: IcebergV2Configuration): Map<String, String> {
private fun buildKeyBasedClientProperties(
config: S3DataLakeConfiguration
): Map<String, String> {
val awsAccessKeyId =
requireNotNull(config.awsAccessKeyConfiguration.accessKeyId) {
"AWS Access Key ID is required for key-based authentication"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ package io.airbyte.integrations.destination.s3_data_lake

import io.airbyte.cdk.load.check.CheckIntegrationTest
import io.airbyte.cdk.load.check.CheckTestConfig
import io.airbyte.integrations.destination.s3_data_lake.IcebergV2TestUtil.GLUE_CONFIG_PATH
import io.airbyte.integrations.destination.s3_data_lake.S3DataLakeTestUtil.GLUE_CONFIG_PATH

class IcebergV2CheckTest :
CheckIntegrationTest<IcebergV2Specification>(
class S3DataLakeCheckTest :
CheckIntegrationTest<S3DataLakeSpecification>(
successConfigFilenames =
listOf(
CheckTestConfig(GLUE_CONFIG_PATH),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import java.util.UUID
import org.apache.iceberg.data.IcebergGenerics
import org.apache.iceberg.data.Record

object IcebergV2DataDumper : DestinationDataDumper {
object S3DataLakeDataDumper : DestinationDataDumper {

private fun toAirbyteValue(record: Record): ObjectValue {
return ObjectValue(
Expand Down Expand Up @@ -71,9 +71,9 @@ object IcebergV2DataDumper : DestinationDataDumper {
spec: ConfigurationSpecification,
stream: DestinationStream
): List<OutputRecord> {
val config = IcebergV2TestUtil.getConfig(spec)
val config = S3DataLakeTestUtil.getConfig(spec)
val catalog =
IcebergV2TestUtil.getCatalog(config, IcebergV2TestUtil.getAWSSystemCredentials())
S3DataLakeTestUtil.getCatalog(config, S3DataLakeTestUtil.getAWSSystemCredentials())
val table =
catalog.loadTable(
TableIdGeneratorFactory(config).create().toTableIdentifier(stream.descriptor)
Expand Down
Loading

0 comments on commit c1f28cb

Please sign in to comment.