From 6b88d5dd74c7360025cfcb8863065e567026d13f Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 23 Jan 2025 15:00:33 -0800 Subject: [PATCH 1/2] parallelize data cleaner; handle failure case --- .../S3DataLakeDestinationCleaner.kt | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDestinationCleaner.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDestinationCleaner.kt index 9854ddd24fef..61b521481779 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDestinationCleaner.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDestinationCleaner.kt @@ -9,6 +9,9 @@ import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.isNamespaceOld import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.randomizedNamespaceRegex import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableCleaner import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import org.apache.iceberg.catalog.Catalog import org.apache.iceberg.catalog.Namespace import org.apache.iceberg.catalog.SupportsNamespaces @@ -24,12 +27,24 @@ class S3DataLakeDestinationCleaner(private val catalog: Catalog) : DestinationCl // we're passing explicit TableIdentifier to clearTable, so just use SimpleTableIdGenerator val tableCleaner = S3DataLakeTableCleaner(S3DataLakeUtil(SimpleTableIdGenerator())) - namespaces.forEach { namespace -> - catalog.listTables(namespace).forEach { tableId -> - val table = catalog.loadTable(tableId) - tableCleaner.clearTable(catalog, tableId, table.io(), table.location()) + runBlocking(Dispatchers.IO) { + namespaces.forEach { namespace -> + launch { + catalog.listTables(namespace).forEach { tableId -> + try { + val table = catalog.loadTable(tableId) + tableCleaner.clearTable(catalog, tableId, table.io(), table.location()) + } catch (e: Exception) { + // catalog.loadTable will fail if the table has no files. + // In this case, we can just hard drop the table, because we know it has + // no + // corresponding files. + catalog.dropTable(tableId) + } + } + catalog.dropNamespace(namespace) + } } - catalog.dropNamespace(namespace) } } } From 3ef9326f5a925e2aab9971ea8fa2a313e3e6d49f Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 24 Jan 2025 16:24:19 -0800 Subject: [PATCH 2/2] fix comment format --- .../destination/s3_data_lake/S3DataLakeDestinationCleaner.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDestinationCleaner.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDestinationCleaner.kt index 61b521481779..5c0090627758 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDestinationCleaner.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDestinationCleaner.kt @@ -37,8 +37,7 @@ class S3DataLakeDestinationCleaner(private val catalog: Catalog) : DestinationCl } catch (e: Exception) { // catalog.loadTable will fail if the table has no files. // In this case, we can just hard drop the table, because we know it has - // no - // corresponding files. + // no corresponding files. catalog.dropTable(tableId) } }