Skip to content

Commit

Permalink
Merge branch 'airbytehq:master' into teradata_master
Browse files Browse the repository at this point in the history
  • Loading branch information
sc250072 authored Oct 15, 2024
2 parents 04e6c97 + 5e5a349 commit 0ef6860
Show file tree
Hide file tree
Showing 862 changed files with 98,111 additions and 57,700 deletions.
44 changes: 0 additions & 44 deletions .github/workflows/cdk-codeflash.yml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private fun resolveValues(
log.warn { "File '$jsonFile' not found for '$cliOptionKey'." }
continue
}
values["$prefix.json"] = jsonFile.readText()
values["$prefix.json"] = jsonFile.readText().replace("$", "\${:$}")
}
return values
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ interface SourceConfiguration : Configuration, SshTunnelConfiguration {
/** Does READ generate states of type GLOBAL? */
val global: Boolean

/** Maximum amount of time may be set to limit overall snapshotting duration */
val maxSnapshotReadDuration: Duration?

/** During the READ operation, how often a feed should checkpoint, ideally. */
val checkpointTargetInterval: Duration

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.read

import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.output.OutputConsumer
import io.airbyte.cdk.util.ThreadRenamingCoroutineName
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.CoroutineContext
import kotlin.time.toKotlinDuration
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.update
Expand Down Expand Up @@ -66,22 +69,32 @@ class RootReader(
}
// Call listener hook.
listener(feedJobs)
// Join on all stream feeds and collect caught exceptions.
val streamExceptions: Map<Stream, Throwable?> =
feeds.filterIsInstance<Stream>().associateWith {
feedJobs[it]?.join()
exceptions[it]
}
// Join on all global feeds and collect caught exceptions.
val globalExceptions: Map<Global, Throwable?> =
feeds.filterIsInstance<Global>().associateWith {
feedJobs[it]?.join()
exceptions[it]
}

// Certain errors on the global feed cause a full stop to all stream reads
if (globalExceptions.values.filterIsInstance<ConfigErrorException>().isNotEmpty()) {
this@supervisorScope.cancel()
}

// Join on all stream feeds and collect caught exceptions.
val streamExceptions: Map<Stream, Throwable?> =
feeds.filterIsInstance<Stream>().associateWith {
try {
feedJobs[it]?.join()
exceptions[it]
} catch (_: CancellationException) {
null
}
}
// Reduce and throw any caught exceptions.
val caughtExceptions: List<Throwable> =
streamExceptions.values.mapNotNull { it } +
globalExceptions.values.mapNotNull { it }
globalExceptions.values.mapNotNull { it } +
streamExceptions.values.mapNotNull { it }
if (caughtExceptions.isNotEmpty()) {
val cause: Throwable =
caughtExceptions.reduce { acc: Throwable, exception: Throwable ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ data class FakeSourceConfiguration(
val cursor: CursorConfiguration,
override val maxConcurrency: Int,
override val checkpointTargetInterval: Duration,
override val maxSnapshotReadDuration: Duration? = null,
) : SourceConfiguration {
override val global: Boolean = cursor is CdcCursor

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package io.airbyte.cdk.read
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ArrayNode
import io.airbyte.cdk.ClockFactory
import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.output.BufferingOutputConsumer
Expand Down Expand Up @@ -194,6 +195,51 @@ class RootReaderIntegrationTest {
Assertions.assertFalse(globalStateMessages.isEmpty())
}

@Test
fun testAllStreamsGlobalConfigError() {
val stateManager =
StateManager(
global = Global(testCases.map { it.stream }),
initialGlobalState = null,
initialStreamStates = testCases.associate { it.stream to null },
)
val testOutputConsumer = BufferingOutputConsumer(ClockFactory().fixed())
val rootReader =
RootReader(
stateManager,
slowHeartbeat,
excessiveTimeout,
testOutputConsumer,
listOf(
ConfigErrorThrowingGlobalPartitionsCreatorFactory(
Semaphore(CONSTRAINED),
*testCases.toTypedArray()
)
),
)
Assertions.assertThrows(ConfigErrorException::class.java) {
runBlocking(Dispatchers.Default) { rootReader.read() }
}
val log = KotlinLogging.logger {}
for (msg in testOutputConsumer.messages()) {
log.info { Jsons.writeValueAsString(msg) }
}
for (testCase in testCases) {
log.info { "checking stream feed for ${testCase.name}" }
val streamStateMessages: List<AirbyteStateMessage> =
testOutputConsumer.states().filter {
it.stream?.streamDescriptor?.name == testCase.name
}
Assertions.assertTrue(streamStateMessages.isEmpty())
}
log.info { "checking global feed" }
val globalStateMessages: List<AirbyteStateMessage> =
testOutputConsumer.states().filter {
it.type == AirbyteStateMessage.AirbyteStateType.GLOBAL
}
Assertions.assertTrue(globalStateMessages.isEmpty())
}

companion object {
const val CONSTRAINED = 2
}
Expand Down Expand Up @@ -571,7 +617,7 @@ class TestPartitionReader(
)
}

class TestPartitionsCreatorFactory(
open class TestPartitionsCreatorFactory(
val resource: Semaphore,
vararg val testCases: TestCase,
) : PartitionsCreatorFactory {
Expand All @@ -582,18 +628,7 @@ class TestPartitionsCreatorFactory(
feed: Feed,
): PartitionsCreator {
if (feed is Global) {
return object : PartitionsCreator {
override fun tryAcquireResources(): PartitionsCreator.TryAcquireResourcesStatus {
return PartitionsCreator.TryAcquireResourcesStatus.READY_TO_RUN
}

override suspend fun run(): List<PartitionReader> {
// Do nothing.
return emptyList()
}

override fun releaseResources() {}
}
return makeGlobalPartitionsCreator()
}
// For a stream feed, pick the CreatorCase in the corresponding TestCase
// which is the successor of the one whose corresponding state is in the StateQuerier.
Expand All @@ -613,6 +648,40 @@ class TestPartitionsCreatorFactory(
resource,
)
}

protected open fun makeGlobalPartitionsCreator(): PartitionsCreator {
return object : PartitionsCreator {
override fun tryAcquireResources(): PartitionsCreator.TryAcquireResourcesStatus {
return PartitionsCreator.TryAcquireResourcesStatus.READY_TO_RUN
}

override suspend fun run(): List<PartitionReader> {
// Do nothing.
return emptyList()
}

override fun releaseResources() {}
}
}
}

class ConfigErrorThrowingGlobalPartitionsCreatorFactory(
resource: Semaphore,
vararg testCases: TestCase,
) : TestPartitionsCreatorFactory(resource, *testCases) {
override fun makeGlobalPartitionsCreator(): PartitionsCreator {
return object : PartitionsCreator {
override fun tryAcquireResources(): PartitionsCreator.TryAcquireResourcesStatus {
return PartitionsCreator.TryAcquireResourcesStatus.READY_TO_RUN
}

override suspend fun run(): List<PartitionReader> {
throw ConfigErrorException("some config error")
}

override fun releaseResources() {}
}
}
}

/** Tests should succeed and not timeout. */
Expand Down
1 change: 1 addition & 0 deletions airbyte-cdk/bulk/core/load/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {

testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1")
implementation "org.jetbrains.kotlin:kotlin-reflect:2.0.20"
testFixturesImplementation "uk.org.webcompere:system-stubs-jupiter:2.1.7"
}

task integrationTest(type: Test) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.load.test.util.NoopNameMapper
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import org.junit.jupiter.api.Test

class MockBasicFunctionalityIntegrationTest :
BasicFunctionalityIntegrationTest(
Expand All @@ -16,4 +17,9 @@ class MockBasicFunctionalityIntegrationTest :
NoopDestinationCleaner,
NoopExpectedRecordMapper,
NoopNameMapper
)
) {
@Test
override fun testBasicWrite() {
super.testBasicWrite()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,52 @@ import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton
import java.nio.file.Path

/**
* To implement a [DestinationConfiguration]:
*
* - Create a class `{MyDestination}Specification` extending [ConfigurationSpecification]
*
* - Add any mixin `...Specification`s from this package (the jackson annotations will be inherited)
*
* - Add any required custom fields to the spec w/ jackson annotations
*
* - Create a class `{MyDestination}Configuration` extending [DestinationConfiguration]
*
* - Add the corresponding mixin `...ConfigurationProvider`s for any added spec mixins
*
* - (Add overrides for any fields provided by the providers)
*
* - Add custom config to the configuration as needed
*
* - Implement `DestinationConfigurationFactory` as a @[Singleton], using the `to...Configuration`
* methods from the specs to map to the provided configuration fields
*
* - (Set your custom fields as needed.)
*
* - Add a @[Factory] injected with [DestinationConfiguration], returning a @[Singleton] downcast to
* your implementation; ie,
*
* ```
* @Factory
* class MyDestinationConfigurationProvider(
* private val config: DestinationConfiguration
* ){
* @Singleton
* fun destinationConfig(): MyDestinationConfiguration =
* config as MyDestinationConfiguration
* }
* ```
*
* Now your configuration will be automatically parsed and available for injection. ie,
*
* ```
* @Singleton
* class MyDestinationWriter(
* private val config: MyDestinationConfiguration // <- automatically injected by micronaut
* ): DestinationWriter {
* // ...
* ```
*/
abstract class DestinationConfiguration : Configuration {
open val recordBatchSizeBytes: Long = 200L * 1024L * 1024L
open val tmpFileDirectory: Path = Path.of("airbyte-cdk-load")
Expand Down
Loading

0 comments on commit 0ef6860

Please sign in to comment.