diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraDbITBase.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraDbITBase.java index 5b50cf3124..2121fcde2d 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraDbITBase.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraDbITBase.java @@ -212,39 +212,9 @@ public PipelineLauncher.LaunchInfo launchDataflowJob( return jobInfo; } - private String toCqlStatement( - String tableName, Map columns, String primaryKeyColumn) { - StringBuilder cql = new StringBuilder("CREATE TABLE IF NOT EXISTS "); - cql.append(tableName).append(" ("); - - columns.forEach( - (columnName, columnType) -> { - cql.append(columnName).append(" ").append(columnType.toLowerCase()); - if (columnName.equals(primaryKeyColumn)) { - cql.append(" PRIMARY KEY"); - } - cql.append(", "); - }); - - if (cql.length() > 0) { - cql.setLength(cql.length() - 2); - } - - cql.append(");"); - return cql.toString(); - } - protected void createCassandraSchema( CassandraSharedResourceManager cassandraResourceManager, String cassandraSchemaFile) throws IOException { - - Map columns = new HashMap<>(); - columns.put("id", "int"); - columns.put("name", "text"); - String idColumn = "id"; - String createTableSql = toCqlStatement("test", columns, idColumn); - cassandraResourceManager.executeStatement(createTableSql); - String ddl = String.join( " ", diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbDatatypeIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbDatatypeIT.java index 8bd24f5743..0d19be724e 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbDatatypeIT.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbDatatypeIT.java @@ -21,7 +21,6 @@ import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; -import com.google.cloud.ByteArray; import com.google.cloud.Date; import com.google.cloud.Timestamp; import com.google.cloud.spanner.Mutation; @@ -32,10 +31,7 @@ import com.google.pubsub.v1.SubscriptionName; import java.io.IOException; import java.math.BigDecimal; -import java.math.BigInteger; -import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.time.ZoneOffset; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -158,7 +154,7 @@ private long getRowCount() { } private void writeRowInSpanner() { - Mutation m = + Mutation mutation = Mutation.newInsertOrUpdateBuilder(TABLE) .set("varchar_column") .to("value1") @@ -167,7 +163,7 @@ private void writeRowInSpanner() { .set("text_column") .to("text_column_value") .set("date_column") - .to(Value.date(Date.fromYearMonthDay(2024, 05, 24))) + .to(Value.date(Date.fromYearMonthDay(2024, 5, 24))) .set("smallint_column") .to(50) .set("mediumint_column") @@ -198,28 +194,14 @@ private void writeRowInSpanner() { .to("mediumtext_column_value") .set("longtext_column") .to("longtext_column_value") - .set("tinyblob_column") - .to(Value.bytes(ByteArray.copyFrom("tinyblob_column_value"))) - .set("blob_column") - .to(Value.bytes(ByteArray.copyFrom("blob_column_value"))) - .set("mediumblob_column") - .to(Value.bytes(ByteArray.copyFrom("mediumblob_column_value"))) - .set("longblob_column") - .to(Value.bytes(ByteArray.copyFrom("longblob_column_value"))) .set("enum_column") .to("2") .set("bool_column") .to(Value.bool(Boolean.FALSE)) .set("other_bool_column") .to(Value.bool(Boolean.TRUE)) - .set("binary_column") - .to(Value.bytes(ByteArray.copyFrom("binary_col"))) - .set("varbinary_column") - .to(Value.bytes(ByteArray.copyFrom("varbinary"))) - .set("bit_column") - .to(Value.bytes(ByteArray.copyFrom("a"))) .build(); - spannerResourceManager.write(m); + spannerResourceManager.write(mutation); } private void assertAll(Runnable... assertions) throws MultipleFailureException { @@ -254,7 +236,7 @@ private void assertRowInCassandraDB() throws InterruptedException, MultipleFailu assertThat(rows).hasSize(1); assertAll( () -> assertThat(row.getString("varchar_column")).isEqualTo("value1"), - () -> assertThat(row.getByte("tinyint_column")).isEqualTo((byte) 10), + () -> assertThat(row.getInt("tinyint_column")).isEqualTo(10), () -> assertThat(row.getString("text_column")).isEqualTo("text_column_value"), () -> assertThat(row.getLocalDate("date_column")) @@ -263,16 +245,23 @@ private void assertRowInCassandraDB() throws InterruptedException, MultipleFailu () -> assertThat(row.getInt("mediumint_column")).isEqualTo(1000), () -> assertThat(row.getInt("int_column")).isEqualTo(50000), () -> assertThat(row.getLong("bigint_column")).isEqualTo(987654321L), - () -> assertThat(row.getFloat("float_column")).isEqualTo(45.67f), - () -> assertThat(row.getDouble("double_column")).isEqualTo(123.789), + () -> { + float expectedFloat = 45.67f; + float actualFloat = row.getFloat("float_column"); + assertThat(Math.abs(actualFloat - expectedFloat)).isLessThan(0.001f); + }, + () -> { + double expectedDouble = 123.789; + double actualDouble = row.getDouble("double_column"); + assertThat(Math.abs(actualDouble - expectedDouble)).isLessThan(0.001); + }, () -> assertThat(row.getBigDecimal("decimal_column")).isEqualTo(new BigDecimal("1234.56")), () -> assertThat(row.getInstant("datetime_column")) - .isEqualTo( - java.time.LocalDateTime.of(2024, 2, 8, 8, 15, 30).toInstant(ZoneOffset.UTC)), + .isEqualTo(java.time.Instant.parse("2024-02-08T08:15:30Z")), () -> assertThat(row.getInstant("timestamp_column")) - .isEqualTo(java.sql.Timestamp.valueOf("2024-02-08 08:15:30").toInstant()), + .isEqualTo(java.time.Instant.parse("2024-02-08T08:15:30Z")), () -> assertThat(row.getLocalTime("time_column")) .isEqualTo(java.time.LocalTime.of(14, 30, 0)), @@ -281,29 +270,8 @@ private void assertRowInCassandraDB() throws InterruptedException, MultipleFailu () -> assertThat(row.getString("tinytext_column")).isEqualTo("tinytext_column_value"), () -> assertThat(row.getString("mediumtext_column")).isEqualTo("mediumtext_column_value"), () -> assertThat(row.getString("longtext_column")).isEqualTo("longtext_column_value"), - () -> - assertThat(row.getByte("tinyblob_column")) - .isEqualTo("tinyblob_column_value".getBytes(StandardCharsets.UTF_8)), - () -> - assertThat(row.getByte("blob_column")) - .isEqualTo("blob_column_value".getBytes(StandardCharsets.UTF_8)), - () -> - assertThat(row.getByte("mediumblob_column")) - .isEqualTo("mediumblob_column_value".getBytes(StandardCharsets.UTF_8)), - () -> - assertThat(row.getByte("longblob_column")) - .isEqualTo("longblob_column_value".getBytes(StandardCharsets.UTF_8)), () -> assertThat(row.getString("enum_column")).isEqualTo("2"), - () -> assertThat(row.getBoolean("bool_column")).isEqualTo(false), - () -> assertThat(row.getBoolean("other_bool_column")).isEqualTo(true), - () -> - assertThat(row.getByte("binary_column")) - .isEqualTo("binary_col".getBytes(StandardCharsets.UTF_8)), - () -> - assertThat(row.getByte("varbinary_column")) - .isEqualTo("varbinary".getBytes(StandardCharsets.UTF_8)), - () -> - assertThat(row.getBigInteger("bit_column")) - .isEqualTo(new BigInteger("a".getBytes(StandardCharsets.UTF_8)))); + () -> assertThat(row.getBoolean("bool_column")).isFalse(), + () -> assertThat(row.getBoolean("other_bool_column")).isTrue()); } } diff --git a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceDbDatatypeIT/cassandra-schema.sql b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceDbDatatypeIT/cassandra-schema.sql index 9c8ce6eda8..7e1256a611 100644 --- a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceDbDatatypeIT/cassandra-schema.sql +++ b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceDbDatatypeIT/cassandra-schema.sql @@ -1,6 +1,6 @@ CREATE TABLE AllDatatypeColumns ( varchar_column text PRIMARY KEY, - tinyint_column tinyint, + tinyint_column int, text_column text, date_column date, smallint_column smallint, @@ -15,17 +15,10 @@ CREATE TABLE AllDatatypeColumns ( time_column time, year_column text, char_column text, - tinyblob_column blob, tinytext_column text, - blob_column blob, - mediumblob_column blob, mediumtext_column text, - longblob_column blob, longtext_column text, enum_column text, bool_column boolean, other_bool_column boolean, - binary_column blob, - varbinary_column blob, - bit_column varint -); +); \ No newline at end of file diff --git a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceDbDatatypeIT/spanner-schema.sql b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceDbDatatypeIT/spanner-schema.sql index 43baf6811d..9cfcec8782 100644 --- a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceDbDatatypeIT/spanner-schema.sql +++ b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceDbDatatypeIT/spanner-schema.sql @@ -15,19 +15,12 @@ CREATE TABLE IF NOT EXISTS alldatatypecolumns ( time_column STRING(MAX), year_column STRING(MAX), char_column STRING(10), - tinyblob_column BYTES(MAX), tinytext_column STRING(MAX), - blob_column BYTES(MAX), - mediumblob_column BYTES(MAX), mediumtext_column STRING(MAX), - longblob_column BYTES(MAX), longtext_column STRING(MAX), enum_column STRING(MAX), bool_column BOOL, other_bool_column BOOL, - binary_column BYTES(MAX), - varbinary_column BYTES(20), - bit_column BYTES(MAX), ) PRIMARY KEY(varchar_column); CREATE CHANGE STREAM allstream diff --git a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/cassandra-schema.sql b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/cassandra-schema.sql index 46f2f3c47c..e220267733 100644 --- a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/cassandra-schema.sql +++ b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/cassandra-schema.sql @@ -1,5 +1,5 @@ CREATE TABLE users ( id int PRIMARY KEY, - name text, + full_name text, "from" text ); \ No newline at end of file