diff --git a/v1/src/test/java/com/google/cloud/teleport/templates/DLPTextToBigQueryStreamingIT.java b/v1/src/test/java/com/google/cloud/teleport/templates/DLPTextToBigQueryStreamingIT.java
index 13ecb41f1e..3da99c8ba9 100644
--- a/v1/src/test/java/com/google/cloud/teleport/templates/DLPTextToBigQueryStreamingIT.java
+++ b/v1/src/test/java/com/google/cloud/teleport/templates/DLPTextToBigQueryStreamingIT.java
@@ -20,6 +20,7 @@
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
import com.google.cloud.bigquery.TableId;
+import com.google.cloud.teleport.metadata.SkipRunnerV2Test;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.privacy.dlp.v2.CharacterMaskConfig;
import com.google.privacy.dlp.v2.CryptoHashConfig;
@@ -61,7 +62,7 @@
import org.slf4j.LoggerFactory;
/** Integration test for {@link DLPTextToBigQueryStreaming} (Stream_DLP_GCS_Text_to_BigQuery). */
-@Category(TemplateIntegrationTest.class)
+@Category({TemplateIntegrationTest.class, SkipRunnerV2Test.class})
@TemplateIntegrationTest(DLPTextToBigQueryStreaming.class)
@RunWith(JUnit4.class)
public class DLPTextToBigQueryStreamingIT extends TemplateTestBase {
diff --git a/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToAvroIT.java b/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToAvroIT.java
index 72174e4025..2c564b4290 100644
--- a/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToAvroIT.java
+++ b/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToAvroIT.java
@@ -21,6 +21,7 @@
import com.google.cloud.teleport.avro.AvroPubsubMessageRecord;
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
+import com.google.cloud.teleport.metadata.SkipRunnerV2Test;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -55,7 +56,7 @@
/** Integration test for {@link PubsubToAvro} PubSub to Avro. */
// SkipDirectRunnerTest: PubsubIO doesn't trigger panes on the DirectRunner.
-@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class})
+@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class, SkipRunnerV2Test.class})
@TemplateIntegrationTest(value = PubsubToAvro.class, template = "Cloud_PubSub_to_Avro")
@RunWith(JUnit4.class)
public class PubSubToAvroIT extends TemplateTestBase {
diff --git a/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToBigQueryIT.java b/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToBigQueryIT.java
index 9ae0fe8d86..b5771a532e 100644
--- a/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToBigQueryIT.java
+++ b/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToBigQueryIT.java
@@ -26,6 +26,7 @@
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
+import com.google.cloud.teleport.metadata.SkipRunnerV2Test;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
@@ -58,7 +59,7 @@
import org.junit.runners.JUnit4;
/** Integration test for {@link PubSubToBigQuery} classic template. */
-@Category(TemplateIntegrationTest.class)
+@Category({TemplateIntegrationTest.class, SkipRunnerV2Test.class})
@RunWith(JUnit4.class)
public final class PubSubToBigQueryIT extends TemplateTestBase {
diff --git a/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToDatadogIT.java b/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToDatadogIT.java
index 8985fc6fc0..5f1b022e6d 100644
--- a/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToDatadogIT.java
+++ b/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToDatadogIT.java
@@ -26,6 +26,7 @@
import com.google.cloud.teleport.it.datadog.conditions.DatadogLogEntriesCheck;
import com.google.cloud.teleport.metadata.DirectRunnerTest;
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
+import com.google.cloud.teleport.metadata.SkipRunnerV2Test;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
@@ -53,7 +54,7 @@
import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils;
/** Integration test for {@link PubSubToDatadog} classic template. */
-@Category(TemplateIntegrationTest.class)
+@Category({TemplateIntegrationTest.class, SkipRunnerV2Test.class})
@TemplateIntegrationTest(PubSubToDatadog.class)
@RunWith(JUnit4.class)
public class PubSubToDatadogIT extends TemplateTestBase {
diff --git a/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToPubSubIT.java b/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToPubSubIT.java
index b7173972f2..85b9274e04 100644
--- a/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToPubSubIT.java
+++ b/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToPubSubIT.java
@@ -19,6 +19,7 @@
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
+import com.google.cloud.teleport.metadata.SkipRunnerV2Test;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
@@ -44,7 +45,7 @@
import org.junit.runners.JUnit4;
/** Integration test for {@link PubsubToPubsub} (Cloud_PubSub_to_Cloud_PubSub). */
-@Category(TemplateIntegrationTest.class)
+@Category({TemplateIntegrationTest.class, SkipRunnerV2Test.class})
@TemplateIntegrationTest(PubsubToPubsub.class)
@RunWith(JUnit4.class)
public class PubSubToPubSubIT extends TemplateTestBase {
diff --git a/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToSplunkIT.java b/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToSplunkIT.java
index 97f1e2bb54..9a0650424e 100644
--- a/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToSplunkIT.java
+++ b/v1/src/test/java/com/google/cloud/teleport/templates/PubSubToSplunkIT.java
@@ -24,6 +24,7 @@
import com.google.cloud.kms.v1.CryptoKey;
import com.google.cloud.teleport.metadata.DirectRunnerTest;
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
+import com.google.cloud.teleport.metadata.SkipRunnerV2Test;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
@@ -59,7 +60,7 @@
*
* TODO - Change CustomSplunkResourceManager back when Beam 2.55 is released
*/
-@Category(TemplateIntegrationTest.class)
+@Category({TemplateIntegrationTest.class, SkipRunnerV2Test.class})
@TemplateIntegrationTest(PubSubToSplunk.class)
@RunWith(JUnit4.class)
public class PubSubToSplunkIT extends TemplateTestBase {
diff --git a/v1/src/test/java/com/google/cloud/teleport/templates/PubSubTopicToBigQueryIT.java b/v1/src/test/java/com/google/cloud/teleport/templates/PubSubTopicToBigQueryIT.java
index cac172001f..3169641134 100644
--- a/v1/src/test/java/com/google/cloud/teleport/templates/PubSubTopicToBigQueryIT.java
+++ b/v1/src/test/java/com/google/cloud/teleport/templates/PubSubTopicToBigQueryIT.java
@@ -23,6 +23,7 @@
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.TableId;
+import com.google.cloud.teleport.metadata.SkipRunnerV2Test;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
@@ -48,7 +49,7 @@
import org.junit.runners.JUnit4;
/** Integration test for {@link PubSubToBigQuery} PubSub Topic to Bigquery. */
-@Category(TemplateIntegrationTest.class)
+@Category({TemplateIntegrationTest.class, SkipRunnerV2Test.class})
@TemplateIntegrationTest(value = PubSubToBigQuery.class, template = "PubSub_to_BigQuery")
@RunWith(JUnit4.class)
public final class PubSubTopicToBigQueryIT extends TemplateTestBase {
diff --git a/v1/src/test/java/com/google/cloud/teleport/templates/PubsubToTextIT.java b/v1/src/test/java/com/google/cloud/teleport/templates/PubsubToTextIT.java
index 5709831639..56040f07bb 100644
--- a/v1/src/test/java/com/google/cloud/teleport/templates/PubsubToTextIT.java
+++ b/v1/src/test/java/com/google/cloud/teleport/templates/PubsubToTextIT.java
@@ -20,6 +20,7 @@
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
+import com.google.cloud.teleport.metadata.SkipRunnerV2Test;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
@@ -47,7 +48,7 @@
/** Integration test for {@link PubsubToText} (Cloud_PubSub_to_GCS_Text). */
// SkipDirectRunnerTest: PubsubIO doesn't trigger panes on the DirectRunner.
-@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class})
+@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class, SkipRunnerV2Test.class})
@TemplateIntegrationTest(PubsubToText.class)
@RunWith(JUnit4.class)
public final class PubsubToTextIT extends TemplateTestBase {
diff --git a/v1/src/test/java/com/google/cloud/teleport/templates/TextToPubsubStreamIT.java b/v1/src/test/java/com/google/cloud/teleport/templates/TextToPubsubStreamIT.java
index abe3c2b7c9..d0659ab84f 100644
--- a/v1/src/test/java/com/google/cloud/teleport/templates/TextToPubsubStreamIT.java
+++ b/v1/src/test/java/com/google/cloud/teleport/templates/TextToPubsubStreamIT.java
@@ -20,6 +20,7 @@
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
+import com.google.cloud.teleport.metadata.SkipRunnerV2Test;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.common.io.ByteStreams;
import com.google.pubsub.v1.SubscriptionName;
@@ -55,7 +56,7 @@
import org.slf4j.LoggerFactory;
/** Integration test for {@link TextToPubsubStream} template. */
-@Category(TemplateIntegrationTest.class)
+@Category({TemplateIntegrationTest.class, SkipRunnerV2Test.class})
@TemplateIntegrationTest(TextToPubsubStream.class)
@RunWith(JUnit4.class)
public class TextToPubsubStreamIT extends TemplateTestBase {
diff --git a/v2/googlecloud-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/templates/DLPTextToBigQueryStreamingIT.java b/v2/googlecloud-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/templates/DLPTextToBigQueryStreamingIT.java
index 998dd8e14d..9c1c8230a9 100644
--- a/v2/googlecloud-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/templates/DLPTextToBigQueryStreamingIT.java
+++ b/v2/googlecloud-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/templates/DLPTextToBigQueryStreamingIT.java
@@ -20,6 +20,7 @@
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
import com.google.cloud.bigquery.TableId;
+import com.google.cloud.teleport.metadata.SkipRunnerV2Test;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.privacy.dlp.v2.CharacterMaskConfig;
import com.google.privacy.dlp.v2.CryptoHashConfig;
@@ -63,7 +64,7 @@
/**
* Integration test for {@link DLPTextToBigQueryStreaming} (Stream_DLP_GCS_Text_to_BigQuery_Flex).
*/
-@Category(TemplateIntegrationTest.class)
+@Category({TemplateIntegrationTest.class, SkipRunnerV2Test.class})
@TemplateIntegrationTest(DLPTextToBigQueryStreaming.class)
@RunWith(JUnit4.class)
public class DLPTextToBigQueryStreamingIT extends TemplateTestBase {
diff --git a/v2/mongodb-to-googlecloud/src/main/resources/mongodb-cdc-to-bigquery-command-spec.json b/v2/mongodb-to-googlecloud/src/main/resources/mongodb-to-bigquery-cdc-command-spec.json
similarity index 100%
rename from v2/mongodb-to-googlecloud/src/main/resources/mongodb-cdc-to-bigquery-command-spec.json
rename to v2/mongodb-to-googlecloud/src/main/resources/mongodb-to-bigquery-cdc-command-spec.json
diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraConnector.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraConnector.java
index dc6d6791a4..e156274498 100644
--- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraConnector.java
+++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraConnector.java
@@ -17,13 +17,10 @@
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
-import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
-import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder;
import com.datastax.oss.driver.api.core.session.Session;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.schema.CassandraSchemaReference;
import com.google.common.annotations.VisibleForTesting;
-import java.util.List;
import org.jline.utils.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,11 +43,6 @@ public CassandraConnector(
schemaReference);
CqlSessionBuilder builder =
CqlSession.builder().withConfigLoader(getDriverConfigLoader(dataSource));
- builder = setCredentials(builder, dataSource);
- if (dataSource.localDataCenter() != null) {
- builder = builder.addContactPoints(List.copyOf(dataSource.contactPoints()));
- builder = builder.withLocalDatacenter(dataSource.localDataCenter());
- }
if (schemaReference.keyspaceName() != null) {
builder.withKeyspace(schemaReference.keyspaceName());
}
@@ -62,36 +54,9 @@ public CassandraConnector(
schemaReference);
}
- @VisibleForTesting
- protected static CqlSessionBuilder setCredentials(
- CqlSessionBuilder builder, CassandraDataSource cassandraDataSource) {
- if (cassandraDataSource.dbAuth() != null) {
- return builder.withAuthCredentials(
- cassandraDataSource.dbAuth().getUserName().get(),
- cassandraDataSource.dbAuth().getPassword().get());
- } else {
- return builder;
- }
- }
-
@VisibleForTesting
protected static DriverConfigLoader getDriverConfigLoader(CassandraDataSource dataSource) {
- ProgrammaticDriverConfigLoaderBuilder driverConfigLoaderBuilder =
- DriverConfigLoader.programmaticBuilder()
- .withString(
- DefaultDriverOption.REQUEST_CONSISTENCY, dataSource.consistencyLevel().name())
- .withClass(DefaultDriverOption.RETRY_POLICY_CLASS, dataSource.retryPolicy());
- if (dataSource.connectTimeout() != null) {
- driverConfigLoaderBuilder =
- driverConfigLoaderBuilder.withDuration(
- DefaultDriverOption.CONNECTION_CONNECT_TIMEOUT, dataSource.connectTimeout());
- }
- if (dataSource.requestTimeout() != null) {
- driverConfigLoaderBuilder =
- driverConfigLoaderBuilder.withDuration(
- DefaultDriverOption.REQUEST_TIMEOUT, dataSource.requestTimeout());
- }
- return driverConfigLoaderBuilder.build();
+ return dataSource.driverConfigLoader();
}
public Session getSession() {
diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraDataSource.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraDataSource.java
index 613dbab277..8fdb5e48fd 100644
--- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraDataSource.java
+++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraDataSource.java
@@ -16,56 +16,69 @@
package com.google.cloud.teleport.v2.source.reader.io.cassandra.iowrapper;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
-import com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy;
+import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.api.core.config.OptionsMap;
+import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.google.auto.value.AutoValue;
-import com.google.cloud.teleport.v2.source.reader.auth.dbauth.DbAuth;
+import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraDriverConfigLoader;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import java.io.FileNotFoundException;
import java.io.Serializable;
import java.net.InetSocketAddress;
-import java.time.Duration;
import java.util.List;
import javax.annotation.Nullable;
/**
* Encapsulates details of a Cassandra Cluster. Cassandra Cluster can connect to multiple KeySpaces,
- * just like a Mysql instance can have multiple databases. TODO(vardhanvthigle): Take
- * DriverConfiguration as a GCS file for advanced overrides.
+ * just like a Mysql instance can have multiple databases.
*/
@AutoValue
public abstract class CassandraDataSource implements Serializable {
- /** Name of the Cassandra Cluster. */
- public abstract String clusterName();
-
- /** Name of local Datacenter. Must be specified if contactPoints are not empty */
- @Nullable
- public abstract String localDataCenter();
-
- /** Contact points for connecting to a Cassandra Cluster. */
- public abstract ImmutableList contactPoints();
+ /** Options Map. * */
+ abstract OptionsMap optionsMap();
- /** Cassandra Auth details. */
@Nullable
- public abstract DbAuth dbAuth();
+ public abstract String clusterName();
- /** Retry Policy for Cassandra Driver. Defaults to {@link DefaultRetryPolicy}. */
- public abstract Class retryPolicy();
+ public DriverConfigLoader driverConfigLoader() {
+ return CassandraDriverConfigLoader.fromOptionsMap(optionsMap());
+ }
- /** Consistency level for reading the source. Defaults to {@link ConsistencyLevel#QUORUM} */
- public abstract ConsistencyLevel consistencyLevel();
+ /** returns List of ContactPoints. Added for easier compatibility with 3.0 cluster creation. */
+ public ImmutableList contactPoints() {
+ return driverConfigLoader()
+ .getInitialConfig()
+ .getDefaultProfile()
+ .getStringList(TypedDriverOption.CONTACT_POINTS.getRawOption())
+ .stream()
+ .map(
+ contactPoint -> {
+ String[] ipPort = contactPoint.split(":");
+ return new InetSocketAddress(ipPort[0], Integer.parseInt(ipPort[1]));
+ })
+ .collect(ImmutableList.toImmutableList());
+ }
- /** Connection timeout for Cassandra driver. Set null for driver default. */
- @Nullable
- public abstract Duration connectTimeout();
+ /** Returns local datacenter. Added for easier compatibility with 3.0 cluster creation. */
+ public String localDataCenter() {
+ return driverConfigLoader()
+ .getInitialConfig()
+ .getDefaultProfile()
+ .getString(TypedDriverOption.LOAD_BALANCING_LOCAL_DATACENTER.getRawOption());
+ }
- /** Read timeout for Cassandra driver. Set null for driver default. */
- @Nullable
- public abstract Duration requestTimeout();
+ /** Returns the logged Keyspace. */
+ public String loggedKeySpace() {
+ return driverConfigLoader()
+ .getInitialConfig()
+ .getDefaultProfile()
+ .getString(TypedDriverOption.SESSION_KEYSPACE.getRawOption());
+ }
public static Builder builder() {
- return new AutoValue_CassandraDataSource.Builder()
- .setRetryPolicy(DefaultRetryPolicy.class)
- .setConsistencyLevel(ConsistencyLevel.QUORUM);
+ return new AutoValue_CassandraDataSource.Builder();
}
public abstract Builder toBuilder();
@@ -73,26 +86,55 @@ public static Builder builder() {
@AutoValue.Builder
public abstract static class Builder {
- public abstract Builder setClusterName(String value);
+ public abstract Builder setOptionsMap(OptionsMap value);
- public abstract Builder setLocalDataCenter(@Nullable String value);
+ public abstract Builder setClusterName(@Nullable String value);
- public abstract Builder setContactPoints(ImmutableList value);
+ abstract OptionsMap optionsMap();
- public Builder setContactPoints(List value) {
- return setContactPoints(ImmutableList.copyOf(value));
+ public Builder setOptionsMapFromGcsFile(String gcsPath) throws FileNotFoundException {
+ return this.setOptionsMap(CassandraDriverConfigLoader.getOptionsMapFromFile(gcsPath));
}
- public abstract Builder setDbAuth(@Nullable DbAuth value);
-
- public abstract Builder setRetryPolicy(Class value);
+ public Builder overrideOptionInOptionsMap(
+ TypedDriverOption option, ValueT value) {
+ DriverConfigLoader.fromMap(optionsMap())
+ .getInitialConfig()
+ .getProfiles()
+ .keySet()
+ .forEach(profile -> this.optionsMap().put(profile, option, value));
+ return this;
+ }
- public abstract Builder setConsistencyLevel(ConsistencyLevel value);
+ /**
+ * Allowing UT to set the contact points. In UT environment, the port is dynamically determined.
+ * We can't use a static GCS file to provide the contact points.
+ */
+ @VisibleForTesting
+ public Builder setContactPoints(List contactPoints) {
+ overrideOptionInOptionsMap(
+ TypedDriverOption.CONTACT_POINTS,
+ contactPoints.stream()
+ .map(p -> p.getAddress().getHostAddress() + ":" + p.getPort())
+ .collect(ImmutableList.toImmutableList()));
+ return this;
+ }
- public abstract Builder setConnectTimeout(@Nullable Duration value);
+ /** Set the local Datacenter. */
+ @VisibleForTesting
+ public Builder setLocalDataCenter(String localDataCenter) {
+ overrideOptionInOptionsMap(
+ TypedDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, localDataCenter);
+ return this;
+ }
- public abstract Builder setRequestTimeout(@Nullable Duration value);
+ abstract CassandraDataSource autoBuild();
- public abstract CassandraDataSource build();
+ public CassandraDataSource build() {
+ /* Prefer to use quorum read until we encounter a strong use case to not do so. */
+ this.overrideOptionInOptionsMap(
+ TypedDriverOption.REQUEST_CONSISTENCY, ConsistencyLevel.QUORUM.toString());
+ return autoBuild();
+ }
}
}
diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraConnectorTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraConnectorTest.java
index d71b3c4ab0..760755bbda 100644
--- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraConnectorTest.java
+++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraConnectorTest.java
@@ -19,20 +19,11 @@
import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_CQLSH;
import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_KEYSPACE;
import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import com.datastax.oss.driver.api.core.ConsistencyLevel;
-import com.datastax.oss.driver.api.core.CqlSessionBuilder;
-import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
-import com.google.cloud.teleport.v2.source.reader.auth.dbauth.LocalCredentialsProvider;
+import com.datastax.oss.driver.api.core.config.OptionsMap;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.schema.CassandraSchemaReference;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.SharedEmbeddedCassandra;
import java.io.IOException;
-import java.time.Duration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -71,6 +62,7 @@ public void testBasic() throws IOException {
CassandraDataSource cassandraDataSource =
CassandraDataSource.builder()
.setClusterName(sharedEmbeddedCassandra.getInstance().getClusterName())
+ .setOptionsMap(OptionsMap.driverDefaults())
.setContactPoints(sharedEmbeddedCassandra.getInstance().getContactPoints())
.setLocalDataCenter(sharedEmbeddedCassandra.getInstance().getLocalDataCenter())
.build();
@@ -95,77 +87,4 @@ public void testBasic() throws IOException {
assertThat(cassandraConnectorWithNullKeySpace.getSession().getKeyspace()).isEmpty();
}
}
-
- @Test
- public void testCredentialsSetter() {
-
- final String testUserName = "testUseramNe";
- final String testPassword = "test";
-
- CassandraDataSource cassandraDataSource =
- CassandraDataSource.builder()
- .setClusterName(sharedEmbeddedCassandra.getInstance().getClusterName())
- .setContactPoints(sharedEmbeddedCassandra.getInstance().getContactPoints())
- .setLocalDataCenter(sharedEmbeddedCassandra.getInstance().getLocalDataCenter())
- .build();
- CqlSessionBuilder mockSessionBuilder = mock(CqlSessionBuilder.class);
- // No Auth Set
- CassandraConnector.setCredentials(mockSessionBuilder, cassandraDataSource);
- verify(mockSessionBuilder, never()).withAuthCredentials(anyString(), anyString());
- // Auth set
- CassandraConnector.setCredentials(
- mockSessionBuilder,
- cassandraDataSource.toBuilder()
- .setDbAuth(
- LocalCredentialsProvider.builder()
- .setUserName(testUserName)
- .setPassword(testPassword)
- .build())
- .build());
- verify(mockSessionBuilder, times(1)).withAuthCredentials(testUserName, testPassword);
- }
-
- @Test
- public void testConfigLoader() {
-
- CassandraDataSource cassandraDataSource =
- CassandraDataSource.builder()
- .setClusterName(sharedEmbeddedCassandra.getInstance().getClusterName())
- .setContactPoints(sharedEmbeddedCassandra.getInstance().getContactPoints())
- .setLocalDataCenter(sharedEmbeddedCassandra.getInstance().getLocalDataCenter())
- .build();
- assertThat(
- CassandraConnector.getDriverConfigLoader(cassandraDataSource)
- .getInitialConfig()
- .getDefaultProfile()
- .getString(DefaultDriverOption.REQUEST_CONSISTENCY))
- .isEqualTo(ConsistencyLevel.QUORUM.name());
- assertThat(
- CassandraConnector.getDriverConfigLoader(
- cassandraDataSource.toBuilder()
- .setConsistencyLevel(ConsistencyLevel.ONE)
- .build())
- .getInitialConfig()
- .getDefaultProfile()
- .getString(DefaultDriverOption.REQUEST_CONSISTENCY))
- .isEqualTo(ConsistencyLevel.ONE.name());
- assertThat(
- CassandraConnector.getDriverConfigLoader(
- cassandraDataSource.toBuilder()
- .setConnectTimeout(Duration.ofSeconds(42L))
- .build())
- .getInitialConfig()
- .getDefaultProfile()
- .getDuration(DefaultDriverOption.CONNECTION_CONNECT_TIMEOUT))
- .isEqualTo(Duration.ofSeconds(42L));
- assertThat(
- CassandraConnector.getDriverConfigLoader(
- cassandraDataSource.toBuilder()
- .setRequestTimeout(Duration.ofSeconds(42L))
- .build())
- .getInitialConfig()
- .getDefaultProfile()
- .getDuration(DefaultDriverOption.REQUEST_TIMEOUT))
- .isEqualTo(Duration.ofSeconds(42L));
- }
}
diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraDataSourceTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraDataSourceTest.java
index 56976abde3..8654208ff1 100644
--- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraDataSourceTest.java
+++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraDataSourceTest.java
@@ -16,37 +16,87 @@
package com.google.cloud.teleport.v2.source.reader.io.cassandra.iowrapper;
import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.mockStatic;
-import com.google.cloud.teleport.v2.source.reader.auth.dbauth.LocalCredentialsProvider;
+import com.datastax.oss.driver.api.core.config.OptionsMap;
+import com.datastax.oss.driver.api.core.config.TypedDriverOption;
+import com.google.cloud.teleport.v2.spanner.migrations.utils.JarFileReader;
+import com.google.common.io.Resources;
+import java.io.FileNotFoundException;
import java.net.InetSocketAddress;
+import java.net.URL;
import java.util.List;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.MockedStatic;
import org.mockito.junit.MockitoJUnitRunner;
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
/** Test class for {@link CassandraDataSource}. */
@RunWith(MockitoJUnitRunner.class)
public class CassandraDataSourceTest {
+ MockedStatic mockFileReader;
+
+ @Before
+ public void initialize() {
+ mockFileReader = mockStatic(JarFileReader.class);
+ }
+
@Test
- public void testCassandraDataSoureBasic() {
+ public void testCassandraDataSourceBasic() {
String testCluster = "testCluster";
- String testHost = "testHost";
+ String testHost = "127.0.0.1";
int testPort = 9042;
CassandraDataSource cassandraDataSource =
CassandraDataSource.builder()
.setClusterName(testCluster)
+ .setOptionsMap(OptionsMap.driverDefaults())
.setContactPoints(List.of(new InetSocketAddress(testHost, testPort)))
- .setDbAuth(
- LocalCredentialsProvider.builder()
- .setUserName("test-user-name")
- .setPassword("test")
- .build())
+ .overrideOptionInOptionsMap(TypedDriverOption.AUTH_PROVIDER_USER_NAME, "test-user-name")
+ .overrideOptionInOptionsMap(TypedDriverOption.AUTH_PROVIDER_PASSWORD, "test")
.build();
assertThat(cassandraDataSource.clusterName()).isEqualTo(testCluster);
assertThat(cassandraDataSource.contactPoints())
.isEqualTo(ImmutableList.of(new InetSocketAddress(testHost, testPort)));
- assertThat(cassandraDataSource.dbAuth().getUserName().get()).isEqualTo("test-user-name");
- assertThat(cassandraDataSource.dbAuth().getPassword().get()).isEqualTo("test");
+ assertThat(
+ cassandraDataSource
+ .driverConfigLoader()
+ .getInitialConfig()
+ .getDefaultProfile()
+ .getString(TypedDriverOption.AUTH_PROVIDER_USER_NAME.getRawOption()))
+ .isEqualTo("test-user-name");
+ assertThat(
+ cassandraDataSource
+ .driverConfigLoader()
+ .getInitialConfig()
+ .getDefaultProfile()
+ .getString(TypedDriverOption.AUTH_PROVIDER_PASSWORD.getRawOption()))
+ .isEqualTo("test");
+ }
+
+ @Test
+ public void testLoadFromGCS() throws FileNotFoundException {
+ String testGcsPath = "gs://smt-test-bucket/cassandraConfig.conf";
+ URL testUrl = Resources.getResource("CassandraUT/test-cassandra-config.conf");
+ mockFileReader
+ .when(() -> JarFileReader.saveFilesLocally(testGcsPath))
+ .thenReturn(new URL[] {testUrl});
+ CassandraDataSource cassandraDataSource =
+ CassandraDataSource.builder().setOptionsMapFromGcsFile(testGcsPath).build();
+
+ assertThat(cassandraDataSource.loggedKeySpace()).isEqualTo("test-keyspace");
+ assertThat(cassandraDataSource.localDataCenter()).isEqualTo("datacenter1");
+ assertThat(cassandraDataSource.contactPoints())
+ .isEqualTo(
+ ImmutableList.of(
+ new InetSocketAddress("127.0.0.1", 9042),
+ new InetSocketAddress("127.0.0.1", 9043)));
+ }
+
+ @After
+ public void cleanup() {
+ mockFileReader.close();
}
}
diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraSourceRowMapperTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraSourceRowMapperTest.java
index 9f457d2272..f2ad23759d 100644
--- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraSourceRowMapperTest.java
+++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraSourceRowMapperTest.java
@@ -28,6 +28,7 @@
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.oss.driver.api.core.config.OptionsMap;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.iowrapper.CassandraConnector;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.iowrapper.CassandraDataSource;
@@ -82,6 +83,7 @@ public void testCassandraSourceRowMapperBasic() throws RetriableSchemaDiscoveryE
DataSource.ofCassandra(
CassandraDataSource.builder()
.setClusterName(sharedEmbeddedCassandra.getInstance().getClusterName())
+ .setOptionsMap(OptionsMap.driverDefaults())
.setContactPoints(sharedEmbeddedCassandra.getInstance().getContactPoints())
.setLocalDataCenter(sharedEmbeddedCassandra.getInstance().getLocalDataCenter())
.build());
diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscoveryTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscoveryTest.java
index bee3ab67cb..6542688611 100644
--- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscoveryTest.java
+++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscoveryTest.java
@@ -23,6 +23,7 @@
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
+import com.datastax.oss.driver.api.core.config.OptionsMap;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.iowrapper.CassandraDataSource;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.SharedEmbeddedCassandra;
import com.google.cloud.teleport.v2.source.reader.io.datasource.DataSource;
@@ -74,6 +75,7 @@ public void testDiscoverTablesBasic() throws IOException, RetriableSchemaDiscove
DataSource cassandraDataSource =
DataSource.ofCassandra(
CassandraDataSource.builder()
+ .setOptionsMap(OptionsMap.driverDefaults())
.setClusterName(sharedEmbeddedCassandra.getInstance().getClusterName())
.setContactPoints(sharedEmbeddedCassandra.getInstance().getContactPoints())
.setLocalDataCenter(sharedEmbeddedCassandra.getInstance().getLocalDataCenter())
@@ -95,6 +97,7 @@ public void testDiscoverTableSchemaBasic() throws IOException, RetriableSchemaDi
DataSource cassandraDataSource =
DataSource.ofCassandra(
CassandraDataSource.builder()
+ .setOptionsMap(OptionsMap.driverDefaults())
.setClusterName(sharedEmbeddedCassandra.getInstance().getClusterName())
.setContactPoints(sharedEmbeddedCassandra.getInstance().getContactPoints())
.setLocalDataCenter(sharedEmbeddedCassandra.getInstance().getLocalDataCenter())
@@ -121,6 +124,7 @@ public void testCassandraSchemaDiscoveryDriverException() {
DataSource cassandraDataSource =
DataSource.ofCassandra(
CassandraDataSource.builder()
+ .setOptionsMap(OptionsMap.driverDefaults())
.setClusterName(sharedEmbeddedCassandra.getInstance().getClusterName())
.setContactPoints(
sharedEmbeddedCassandra.getInstance().getContactPoints().stream()
@@ -154,6 +158,7 @@ public void testCassandraSchemaDiscoveryArgumentExceptions() {
DataSource cassandraDataSource =
DataSource.ofCassandra(
CassandraDataSource.builder()
+ .setOptionsMap(OptionsMap.driverDefaults())
.setClusterName(sharedEmbeddedCassandra.getInstance().getClusterName())
.setContactPoints(sharedEmbeddedCassandra.getInstance().getContactPoints())
.setLocalDataCenter(sharedEmbeddedCassandra.getInstance().getLocalDataCenter())
diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/MySQLSourceDbToSpannerLT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/MySQLSourceDbToSpannerLT.java
index 72a98c0d63..526e76541e 100644
--- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/MySQLSourceDbToSpannerLT.java
+++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/MySQLSourceDbToSpannerLT.java
@@ -32,6 +32,10 @@
@RunWith(JUnit4.class)
public class MySQLSourceDbToSpannerLT extends SourceDbToSpannerLTBase {
+ private static final String WORKER_MACHINE_TYPE = "n1-highmem-96";
+ private static final String LAUNCHER_MACHINE_TYPE = "n1-highmem-64";
+ private static final String FETCH_SIZE = "8000";
+
@Test
public void mySQLToSpannerBulk1TBTest() throws IOException, ParseException, InterruptedException {
String username =
@@ -57,6 +61,19 @@ public void mySQLToSpannerBulk1TBTest() throws IOException, ParseException, Inte
}
};
- runLoadTest(expectedCountPerTable);
+ Map params =
+ new HashMap<>() {
+ {
+ put("workerMachineType", WORKER_MACHINE_TYPE);
+ put("fetchSize", FETCH_SIZE);
+ }
+ };
+ Map env =
+ new HashMap<>() {
+ {
+ put("launcherMachineType", LAUNCHER_MACHINE_TYPE);
+ }
+ };
+ runLoadTest(expectedCountPerTable, params, env);
}
}
diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/PostgreSQLSourceDbToSpannerLT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/PostgreSQLSourceDbToSpannerLT.java
index 31834e1b3e..9a4ca51411 100644
--- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/PostgreSQLSourceDbToSpannerLT.java
+++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/PostgreSQLSourceDbToSpannerLT.java
@@ -33,7 +33,8 @@
public class PostgreSQLSourceDbToSpannerLT extends SourceDbToSpannerLTBase {
@Test
- public void backfill100Gb() throws IOException, ParseException, InterruptedException {
+ public void postgresToSpannerBulk100GBTest()
+ throws IOException, ParseException, InterruptedException {
String username =
accessSecret(
"projects/269744978479/secrets/nokill-sourcedb-postgresql-to-spanner-cloudsql-username/versions/1");
diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/SourceDbToSpannerLTBase.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/SourceDbToSpannerLTBase.java
index a0104e4aaf..fefb83de20 100644
--- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/SourceDbToSpannerLTBase.java
+++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/SourceDbToSpannerLTBase.java
@@ -53,12 +53,7 @@ public class SourceDbToSpannerLTBase extends TemplateLoadTestBase {
private static final int MAX_WORKERS = 100;
- private static final int NUM_WORKERS = 20;
-
- private static final String WORKER_MACHINE_TYPE = "n1-highmem-96";
- private static final String LAUNCHER_MACHINE_TYPE = "n1-highmem-64";
-
- private static final String FETCH_SIZE = "8000";
+ private static final int NUM_WORKERS = 10;
private static final Duration JOB_TIMEOUT = Duration.ofHours(3);
private static final Duration CHECK_INTERVAL = Duration.ofMinutes(5);
@@ -93,6 +88,7 @@ public void setUp(
SpannerResourceManager.builder(testName, project, region)
.maybeUseStaticInstance()
.setNodeCount(SPANNER_NODE_COUNT)
+ .setMonitoringClient(monitoringClient)
.build();
gcsResourceManager =
@@ -151,8 +147,6 @@ public void runLoadTest(
put("password", sourceDatabaseResource.password());
put("outputDirectory", "gs://" + artifactBucket + "/" + outputDirectory);
put("jdbcDriverClassName", driverClassName());
- put("fetchSize", FETCH_SIZE);
- put("workerMachineType", WORKER_MACHINE_TYPE);
}
};
params.putAll(templateParameters);
@@ -162,7 +156,6 @@ public void runLoadTest(
LaunchConfig.builder(getClass().getSimpleName(), SPEC_PATH)
.addEnvironment("maxWorkers", MAX_WORKERS)
.addEnvironment("numWorkers", NUM_WORKERS)
- .addEnvironment("launcherMachineType", LAUNCHER_MACHINE_TYPE)
.setParameters(params);
environmentOptions.forEach(options::addEnvironment);
diff --git a/v2/sourcedb-to-spanner/src/test/resources/CassandraUT/test-cassandra-config.conf b/v2/sourcedb-to-spanner/src/test/resources/CassandraUT/test-cassandra-config.conf
new file mode 100644
index 0000000000..3ad4cdb5d6
--- /dev/null
+++ b/v2/sourcedb-to-spanner/src/test/resources/CassandraUT/test-cassandra-config.conf
@@ -0,0 +1,12 @@
+ # Configuration for the DataStax Java driver for Apache Cassandra®.
+ # This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md.
+ # This file is meant to be used only in unit tests to test loading configuration from file.
+ # DO NOT USE FOR PRODUCTION.
+
+ datastax-java-driver {
+ basic.contact-points = ["127.0.0.1:9042", "127.0.0.1:9043"]
+ basic.session-keyspace = "test-keyspace"
+ basic.load-balancing-policy {
+ local-datacenter = "datacenter1"
+ }
+ }
diff --git a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertor.java b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertor.java
index 3cfd9c8fcc..cf75ffa86a 100644
--- a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertor.java
+++ b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertor.java
@@ -512,7 +512,10 @@ static String handleRecordFieldType(String fieldName, GenericRecord element, Sch
// Convert to timestamp string.
Long totalMicros = TimeUnit.DAYS.toMicros(Long.valueOf(element.get("date").toString()));
totalMicros += Long.valueOf(element.get("time").toString());
- Instant timestamp = Instant.ofEpochSecond(TimeUnit.MICROSECONDS.toSeconds(totalMicros));
+ Instant timestamp =
+ Instant.ofEpochSecond(
+ TimeUnit.MICROSECONDS.toSeconds(totalMicros),
+ TimeUnit.MICROSECONDS.toNanos(totalMicros % TimeUnit.SECONDS.toMicros(1)));
return timestamp.atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
} else if (fieldSchema.getName().equals("interval")) {
// TODO: For MySQL, we ignore the months field. This might require source-specific handling
diff --git a/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertorTest.java b/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertorTest.java
index e5c9ff4a94..2cd46b196d 100644
--- a/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertorTest.java
+++ b/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertorTest.java
@@ -299,9 +299,9 @@ public void testHandleRecordFieldType() {
result =
GenericRecordTypeConvertor.handleRecordFieldType(
"date_time_column",
- AvroTestingHelper.createDatetimeRecord(738991, 48035000000L),
+ AvroTestingHelper.createDatetimeRecord(20091, 31703699206L),
AvroTestingHelper.DATETIME_SCHEMA);
- assertEquals("Test datetime conversion: ", "3993-04-16T13:20:35Z", result);
+ assertEquals("Test datetime conversion: ", "2025-01-03T08:48:23.699206Z", result);
// Tests for interval type.
result =
diff --git a/v2/spanner-to-sourcedb/README.md b/v2/spanner-to-sourcedb/README.md
index abf8a4889a..ca7c95c034 100644
--- a/v2/spanner-to-sourcedb/README.md
+++ b/v2/spanner-to-sourcedb/README.md
@@ -189,6 +189,9 @@ In addition, there are following application metrics exposed by the job:
| severe_error_count | The number of permanent errors. |
| skipped_record_count | The count of records that were skipped from reverse replication. |
| success_record_count | The number of successfully processed records. This also accounts for the records that were not written to source if the source already had updated data. |
+| custom_transformation_exception | Number of exception encountered in the custom transformation jar |
+| filtered_events_\ | Number of events filtered via custom transformation per shard |
+| apply_custom_transformation_impl_latency_ms | Time taken for the execution of custom transformation logic. |
These can be used to track the pipeline progress.
@@ -323,7 +326,7 @@ Reverse transformation can not be supported for following scenarios out of the b
9. DELETES on Spanner that have Primary key columns different from the Source database column - such records will be dropped
10. Primary key of the source table cannot be determined - such records will be dropped
-In the above cases, custom code will need to be written to perform reverse transformation.Refer the [customization](#customize) section for the source code to extended and write these custom transforms.
+In the above cases, custom code will need to be written to perform reverse transformation.Refer the [customization](#customize) section for the source code to extend and write these custom transforms.
## When to perform cut-back
@@ -400,6 +403,16 @@ Steps to perfrom customization:
3. Invoke the reverse replication flow by passing the custom jar path and custom class path.
4. If any custom parameters are needed in the custom shard identification logic, they can be passed via the *shardingCustomParameters* input to the runner. These parameters will be passed to the *init* method of the custom class. The *init* method is invoked once per worker setup.
+### Reverse transformation customization
+
+In order to make it easier for users to provide custom reverse transformation logic, the template accepts a GCS path that points to a custom jar, the custom class name and a GCS path for writing the filtered records. These parameters are used to invoke custom logic to perform reverse transformation.
+
+Steps to perform customization:
+1. Write custom transformation logic [CustomTransformationFetcher.java](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/spanner-custom-shard/src/main/java/com/custom/CustomTransformationFetcher.java). Details of the MigrationTransformationRequest class can be found [here](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/spanner-migrations-sdk/src/main/java/com/google/cloud/teleport/v2/spanner/utils/MigrationTransformationRequest.java).
+2. Build the [JAR](https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/main/v2/spanner-custom-shard) and upload the jar to GCS
+3. Invoke the reverse replication flow by passing the custom jar path and the custom class path.
+4. If any events are filtered/skipped during the reverse transformation they can be written to a GCS directory by passing the GCS path via the *filterEventsDirectoryName* input to the runner.
+5. If any custom parameters are needed in the custom transformation logic, they can be passed via the *transformationCustomParameters* input to the runner. These parameters will be passed to the *init* method of the custom class. The *init* method is invoked once per worker setup.
## Cost
diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomTransformationIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomTransformationIT.java
index 5f755e8ea7..22b0d38c9b 100644
--- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomTransformationIT.java
+++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomTransformationIT.java
@@ -46,6 +46,7 @@
import org.apache.beam.it.jdbc.MySQLResourceManager;
import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -156,6 +157,7 @@ public static void cleanUp() throws IOException {
}
@Test
+ @Ignore("This test seems flaky, ignoring it for now to unblock other tests")
public void spannerToSourceDbWithCustomTransformation() throws InterruptedException {
assertThatPipeline(jobInfo).isRunning();
// Write row in Spanner