Skip to content

Commit

Permalink
Added reverse merge fixes and IT fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pawankashyapollion committed Jan 23, 2025
1 parent 13bc7c3 commit 72790e8
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,39 +212,9 @@ public PipelineLauncher.LaunchInfo launchDataflowJob(
return jobInfo;
}

private String toCqlStatement(
String tableName, Map<String, String> columns, String primaryKeyColumn) {
StringBuilder cql = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
cql.append(tableName).append(" (");

columns.forEach(
(columnName, columnType) -> {
cql.append(columnName).append(" ").append(columnType.toLowerCase());
if (columnName.equals(primaryKeyColumn)) {
cql.append(" PRIMARY KEY");
}
cql.append(", ");
});

if (cql.length() > 0) {
cql.setLength(cql.length() - 2);
}

cql.append(");");
return cql.toString();
}

protected void createCassandraSchema(
CassandraSharedResourceManager cassandraResourceManager, String cassandraSchemaFile)
throws IOException {

Map<String, String> columns = new HashMap<>();
columns.put("id", "int");
columns.put("name", "text");
String idColumn = "id";
String createTableSql = toCqlStatement("test", columns, idColumn);
cassandraResourceManager.executeStatement(createTableSql);

String ddl =
String.join(
" ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Mutation;
Expand All @@ -32,10 +31,7 @@
import com.google.pubsub.v1.SubscriptionName;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -158,7 +154,7 @@ private long getRowCount() {
}

private void writeRowInSpanner() {
Mutation m =
Mutation mutation =
Mutation.newInsertOrUpdateBuilder(TABLE)
.set("varchar_column")
.to("value1")
Expand All @@ -167,7 +163,7 @@ private void writeRowInSpanner() {
.set("text_column")
.to("text_column_value")
.set("date_column")
.to(Value.date(Date.fromYearMonthDay(2024, 05, 24)))
.to(Value.date(Date.fromYearMonthDay(2024, 5, 24)))
.set("smallint_column")
.to(50)
.set("mediumint_column")
Expand Down Expand Up @@ -198,28 +194,14 @@ private void writeRowInSpanner() {
.to("mediumtext_column_value")
.set("longtext_column")
.to("longtext_column_value")
.set("tinyblob_column")
.to(Value.bytes(ByteArray.copyFrom("tinyblob_column_value")))
.set("blob_column")
.to(Value.bytes(ByteArray.copyFrom("blob_column_value")))
.set("mediumblob_column")
.to(Value.bytes(ByteArray.copyFrom("mediumblob_column_value")))
.set("longblob_column")
.to(Value.bytes(ByteArray.copyFrom("longblob_column_value")))
.set("enum_column")
.to("2")
.set("bool_column")
.to(Value.bool(Boolean.FALSE))
.set("other_bool_column")
.to(Value.bool(Boolean.TRUE))
.set("binary_column")
.to(Value.bytes(ByteArray.copyFrom("binary_col")))
.set("varbinary_column")
.to(Value.bytes(ByteArray.copyFrom("varbinary")))
.set("bit_column")
.to(Value.bytes(ByteArray.copyFrom("a")))
.build();
spannerResourceManager.write(m);
spannerResourceManager.write(mutation);
}

private void assertAll(Runnable... assertions) throws MultipleFailureException {
Expand Down Expand Up @@ -254,7 +236,7 @@ private void assertRowInCassandraDB() throws InterruptedException, MultipleFailu
assertThat(rows).hasSize(1);
assertAll(
() -> assertThat(row.getString("varchar_column")).isEqualTo("value1"),
() -> assertThat(row.getByte("tinyint_column")).isEqualTo((byte) 10),
() -> assertThat(row.getInt("tinyint_column")).isEqualTo(10),
() -> assertThat(row.getString("text_column")).isEqualTo("text_column_value"),
() ->
assertThat(row.getLocalDate("date_column"))
Expand All @@ -263,16 +245,23 @@ private void assertRowInCassandraDB() throws InterruptedException, MultipleFailu
() -> assertThat(row.getInt("mediumint_column")).isEqualTo(1000),
() -> assertThat(row.getInt("int_column")).isEqualTo(50000),
() -> assertThat(row.getLong("bigint_column")).isEqualTo(987654321L),
() -> assertThat(row.getFloat("float_column")).isEqualTo(45.67f),
() -> assertThat(row.getDouble("double_column")).isEqualTo(123.789),
() -> {
float expectedFloat = 45.67f;
float actualFloat = row.getFloat("float_column");
assertThat(Math.abs(actualFloat - expectedFloat)).isLessThan(0.001f);
},
() -> {
double expectedDouble = 123.789;
double actualDouble = row.getDouble("double_column");
assertThat(Math.abs(actualDouble - expectedDouble)).isLessThan(0.001);
},
() -> assertThat(row.getBigDecimal("decimal_column")).isEqualTo(new BigDecimal("1234.56")),
() ->
assertThat(row.getInstant("datetime_column"))
.isEqualTo(
java.time.LocalDateTime.of(2024, 2, 8, 8, 15, 30).toInstant(ZoneOffset.UTC)),
.isEqualTo(java.time.Instant.parse("2024-02-08T08:15:30Z")),
() ->
assertThat(row.getInstant("timestamp_column"))
.isEqualTo(java.sql.Timestamp.valueOf("2024-02-08 08:15:30").toInstant()),
.isEqualTo(java.time.Instant.parse("2024-02-08T08:15:30Z")),
() ->
assertThat(row.getLocalTime("time_column"))
.isEqualTo(java.time.LocalTime.of(14, 30, 0)),
Expand All @@ -281,29 +270,8 @@ private void assertRowInCassandraDB() throws InterruptedException, MultipleFailu
() -> assertThat(row.getString("tinytext_column")).isEqualTo("tinytext_column_value"),
() -> assertThat(row.getString("mediumtext_column")).isEqualTo("mediumtext_column_value"),
() -> assertThat(row.getString("longtext_column")).isEqualTo("longtext_column_value"),
() ->
assertThat(row.getByte("tinyblob_column"))
.isEqualTo("tinyblob_column_value".getBytes(StandardCharsets.UTF_8)),
() ->
assertThat(row.getByte("blob_column"))
.isEqualTo("blob_column_value".getBytes(StandardCharsets.UTF_8)),
() ->
assertThat(row.getByte("mediumblob_column"))
.isEqualTo("mediumblob_column_value".getBytes(StandardCharsets.UTF_8)),
() ->
assertThat(row.getByte("longblob_column"))
.isEqualTo("longblob_column_value".getBytes(StandardCharsets.UTF_8)),
() -> assertThat(row.getString("enum_column")).isEqualTo("2"),
() -> assertThat(row.getBoolean("bool_column")).isEqualTo(false),
() -> assertThat(row.getBoolean("other_bool_column")).isEqualTo(true),
() ->
assertThat(row.getByte("binary_column"))
.isEqualTo("binary_col".getBytes(StandardCharsets.UTF_8)),
() ->
assertThat(row.getByte("varbinary_column"))
.isEqualTo("varbinary".getBytes(StandardCharsets.UTF_8)),
() ->
assertThat(row.getBigInteger("bit_column"))
.isEqualTo(new BigInteger("a".getBytes(StandardCharsets.UTF_8))));
() -> assertThat(row.getBoolean("bool_column")).isFalse(),
() -> assertThat(row.getBoolean("other_bool_column")).isTrue());
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
CREATE TABLE AllDatatypeColumns (
varchar_column text PRIMARY KEY,
tinyint_column tinyint,
tinyint_column int,
text_column text,
date_column date,
smallint_column smallint,
Expand All @@ -15,17 +15,10 @@ CREATE TABLE AllDatatypeColumns (
time_column time,
year_column text,
char_column text,
tinyblob_column blob,
tinytext_column text,
blob_column blob,
mediumblob_column blob,
mediumtext_column text,
longblob_column blob,
longtext_column text,
enum_column text,
bool_column boolean,
other_bool_column boolean,
binary_column blob,
varbinary_column blob,
bit_column varint
);
);
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,12 @@ CREATE TABLE IF NOT EXISTS alldatatypecolumns (
time_column STRING(MAX),
year_column STRING(MAX),
char_column STRING(10),
tinyblob_column BYTES(MAX),
tinytext_column STRING(MAX),
blob_column BYTES(MAX),
mediumblob_column BYTES(MAX),
mediumtext_column STRING(MAX),
longblob_column BYTES(MAX),
longtext_column STRING(MAX),
enum_column STRING(MAX),
bool_column BOOL,
other_bool_column BOOL,
binary_column BYTES(MAX),
varbinary_column BYTES(20),
bit_column BYTES(MAX),
) PRIMARY KEY(varchar_column);

CREATE CHANGE STREAM allstream
Expand Down

0 comments on commit 72790e8

Please sign in to comment.