Skip to content

Commit

Permalink
source-mysql, source-mssql: parallelize test execution (airbytehq#32772)
Browse files Browse the repository at this point in the history
Co-authored-by: postamar <postamar@users.noreply.github.com>
  • Loading branch information
postamar and postamar authored Nov 27, 2023
1 parent cdd3952 commit d97a399
Show file tree
Hide file tree
Showing 126 changed files with 3,988 additions and 5,800 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 0.5.0 | 2023-11-22 | [\#32656](https://github.com/airbytehq/airbyte/pull/32656) | Introduce TestDatabase test fixture, refactor database source test base classes. |
| 0.4.11 | 2023-11-14 | [\#32526](https://github.com/airbytehq/airbyte/pull/32526) | Clean up memory manager logs. |
| 0.4.10 | 2023-11-13 | [\#32285](https://github.com/airbytehq/airbyte/pull/32285) | Fix UUID codec ordering for MongoDB connector |
| 0.4.9 | 2023-11-13 | [\#32468](https://github.com/airbytehq/airbyte/pull/32468) | Further error grouping improvements for DV2 connectors |
Expand Down
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ dependencies {
testImplementation libs.testcontainers.jdbc
testImplementation libs.testcontainers.mysql
testImplementation libs.testcontainers.postgresql
testImplementation libs.testcontainers.mssqlserver
implementation 'org.codehaus.plexus:plexus-utils:3.4.2'

// bouncycastle is pinned to version-match the transitive dependency from kubernetes client-java
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@
package io.airbyte.cdk.db;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import org.testcontainers.containers.PostgreSQLContainer;

public class PostgresUtils {

Expand All @@ -26,74 +23,4 @@ public static PgLsn getLsn(final JdbcDatabase database) throws SQLException {
return PgLsn.fromPgString(jsonNodes.get(0).get("pg_current_wal_lsn").asText());
}

@VisibleForTesting
public static Certificate getCertificate(final PostgreSQLContainer<?> container) throws IOException, InterruptedException {
container.execInContainer("su", "-c", "psql -U test -c \"CREATE USER postgres WITH PASSWORD 'postgres';\"");
container.execInContainer("su", "-c", "psql -U test -c \"GRANT CONNECT ON DATABASE \"test\" TO postgres;\"");
container.execInContainer("su", "-c", "psql -U test -c \"ALTER USER postgres WITH SUPERUSER;\"");

container.execInContainer("su", "-c", "openssl ecparam -name prime256v1 -genkey -noout -out ca.key");
container.execInContainer("su", "-c", "openssl req -new -x509 -sha256 -key ca.key -out ca.crt -subj \"/CN=127.0.0.1\"");
container.execInContainer("su", "-c", "openssl ecparam -name prime256v1 -genkey -noout -out server.key");
container.execInContainer("su", "-c", "openssl req -new -sha256 -key server.key -out server.csr -subj \"/CN=localhost\"");
container.execInContainer("su", "-c",
"openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 365 -sha256");
container.execInContainer("su", "-c", "cp server.key /etc/ssl/private/");
container.execInContainer("su", "-c", "cp server.crt /etc/ssl/private/");
container.execInContainer("su", "-c", "cp ca.crt /etc/ssl/private/");
container.execInContainer("su", "-c", "chmod og-rwx /etc/ssl/private/server.* /etc/ssl/private/ca.*");
container.execInContainer("su", "-c", "chown postgres:postgres /etc/ssl/private/server.crt /etc/ssl/private/server.key /etc/ssl/private/ca.crt");
container.execInContainer("su", "-c", "echo \"ssl = on\" >> /var/lib/postgresql/data/postgresql.conf");
container.execInContainer("su", "-c", "echo \"ssl_cert_file = '/etc/ssl/private/server.crt'\" >> /var/lib/postgresql/data/postgresql.conf");
container.execInContainer("su", "-c", "echo \"ssl_key_file = '/etc/ssl/private/server.key'\" >> /var/lib/postgresql/data/postgresql.conf");
container.execInContainer("su", "-c", "echo \"ssl_ca_file = '/etc/ssl/private/ca.crt'\" >> /var/lib/postgresql/data/postgresql.conf");
container.execInContainer("su", "-c", "mkdir root/.postgresql");
container.execInContainer("su", "-c",
"echo \"hostssl all all 127.0.0.1/32 cert clientcert=verify-full\" >> /var/lib/postgresql/data/pg_hba.conf");

final var caCert = container.execInContainer("su", "-c", "cat ca.crt").getStdout().trim();

container.execInContainer("su", "-c", "openssl ecparam -name prime256v1 -genkey -noout -out client.key");
container.execInContainer("su", "-c", "openssl req -new -sha256 -key client.key -out client.csr -subj \"/CN=postgres\"");
container.execInContainer("su", "-c",
"openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out client.crt -days 365 -sha256");
container.execInContainer("su", "-c", "cp client.crt ~/.postgresql/postgresql.crt");
container.execInContainer("su", "-c", "cp client.key ~/.postgresql/postgresql.key");
container.execInContainer("su", "-c", "chmod 0600 ~/.postgresql/postgresql.crt ~/.postgresql/postgresql.key");
container.execInContainer("su", "-c", "cp ca.crt root/.postgresql/ca.crt");
container.execInContainer("su", "-c", "chown postgres:postgres ~/.postgresql/ca.crt");

container.execInContainer("su", "-c", "psql -U test -c \"SELECT pg_reload_conf();\"");

final var clientKey = container.execInContainer("su", "-c", "cat client.key").getStdout().trim();
final var clientCert = container.execInContainer("su", "-c", "cat client.crt").getStdout().trim();
return new Certificate(caCert, clientCert, clientKey);
}

public static class Certificate {

private final String caCertificate;
private final String clientCertificate;
private final String clientKey;

public Certificate(final String caCertificate, final String clientCertificate, final String clientKey) {
this.caCertificate = caCertificate;
this.clientCertificate = clientCertificate;
this.clientKey = clientKey;
}

public String getCaCertificate() {
return caCertificate;
}

public String getClientCertificate() {
return clientCertificate;
}

public String getClientKey() {
return clientKey;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import com.zaxxer.hikari.HikariDataSource;
import java.io.Closeable;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import javax.sql.DataSource;

/**
Expand Down Expand Up @@ -188,10 +190,10 @@ private static class DataSourceBuilder {
private DataSourceBuilder() {}

/**
* Retrieves connectionTimeout value from connection properties in seconds, default minimum timeout
* Retrieves connectionTimeout value from connection properties in millis, default minimum timeout
* is 60 seconds since Hikari default of 30 seconds is not enough for acceptance tests. In the case
* the value is 0, pass the value along as Hikari and Postgres use default max value for 0 timeout
* value
* value.
*
* NOTE: HikariCP uses milliseconds for all time values:
* https://github.com/brettwooldridge/HikariCP#gear-configuration-knobs-baby whereas Postgres is
Expand All @@ -203,27 +205,32 @@ private DataSourceBuilder() {}
* @return DataSourceBuilder class used to create dynamic fields for DataSource
*/
private static long getConnectionTimeoutMs(final Map<String, String> connectionProperties, String driverClassName) {
// TODO: the usage of CONNECT_TIMEOUT is Postgres specific, may need to extend for other databases
if (driverClassName.equals(DatabaseDriver.POSTGRESQL.getDriverClassName())) {
final String pgPropertyConnectTimeout = CONNECT_TIMEOUT.getName();
// If the PGProperty.CONNECT_TIMEOUT was set by the user, then take its value, if not take the
// default
if (connectionProperties.containsKey(pgPropertyConnectTimeout)
&& (Long.parseLong(connectionProperties.get(pgPropertyConnectTimeout)) >= 0)) {
return Duration.ofSeconds(Long.parseLong(connectionProperties.get(pgPropertyConnectTimeout))).toMillis();
} else {
return Duration.ofSeconds(Long.parseLong(Objects.requireNonNull(CONNECT_TIMEOUT.getDefaultValue()))).toMillis();
}
final Optional<Duration> parsedConnectionTimeout = switch (DatabaseDriver.findByDriverClassName(driverClassName)) {
case POSTGRESQL -> maybeParseDuration(connectionProperties.get(CONNECT_TIMEOUT.getName()), ChronoUnit.SECONDS)
.or(() -> maybeParseDuration(CONNECT_TIMEOUT.getDefaultValue(), ChronoUnit.SECONDS));
case MYSQL -> maybeParseDuration(connectionProperties.get("connectTimeout"), ChronoUnit.MILLIS);
case MSSQLSERVER -> maybeParseDuration(connectionProperties.get("loginTimeout"), ChronoUnit.SECONDS);
default -> maybeParseDuration(connectionProperties.get(CONNECT_TIMEOUT_KEY), ChronoUnit.SECONDS)
// Enforce minimum timeout duration for unspecified data sources.
.filter(d -> d.compareTo(CONNECT_TIMEOUT_DEFAULT) >= 0);
};
return parsedConnectionTimeout.orElse(CONNECT_TIMEOUT_DEFAULT).toMillis();
}

private static Optional<Duration> maybeParseDuration(final String stringValue, TemporalUnit unit) {
if (stringValue == null) {
return Optional.empty();
}
final long number;
try {
number = Long.parseLong(stringValue);
} catch (NumberFormatException __) {
return Optional.empty();
}
final Duration connectionTimeout;
connectionTimeout =
connectionProperties.containsKey(CONNECT_TIMEOUT_KEY) ? Duration.ofSeconds(Long.parseLong(connectionProperties.get(CONNECT_TIMEOUT_KEY)))
: CONNECT_TIMEOUT_DEFAULT;
if (connectionTimeout.getSeconds() == 0) {
return connectionTimeout.toMillis();
} else {
return (connectionTimeout.compareTo(CONNECT_TIMEOUT_DEFAULT) > 0 ? connectionTimeout : CONNECT_TIMEOUT_DEFAULT).toMillis();
if (number < 0) {
return Optional.empty();
}
return Optional.of(Duration.of(number, unit));
}

public DataSourceBuilder withConnectionProperties(final Map<String, String> connectionProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.testcontainers.containers.Network;
import org.testcontainers.images.builder.ImageFromDockerfile;

public class SshBastionContainer {
public class SshBastionContainer implements AutoCloseable {

private static final String SSH_USER = "sshuser";
private static final String SSH_PASSWORD = "secret";
Expand All @@ -36,21 +36,27 @@ public void initAndStartBastion(final Network network) {
bastion.start();
}

public JsonNode getTunnelMethod(final SshTunnel.TunnelMethod tunnelMethod,
final boolean innerAddress)
throws IOException, InterruptedException {
final var containerAddress = innerAddress ? getInnerContainerAddress(bastion) : getOuterContainerAddress(bastion);
return Jsons.jsonNode(ImmutableMap.builder()
.put("tunnel_host",
Objects.requireNonNull(containerAddress.left))
.put("tunnel_method", tunnelMethod)
.put("tunnel_port", containerAddress.right)
.put("tunnel_user", SSH_USER)
.put("tunnel_user_password", tunnelMethod.equals(SSH_PASSWORD_AUTH) ? SSH_PASSWORD : "")
.put("ssh_key", tunnelMethod.equals(SSH_KEY_AUTH) ? bastion.execInContainer("cat", "var/bastion/id_rsa").getStdout() : "")
.build());
}

public JsonNode getTunnelConfig(final SshTunnel.TunnelMethod tunnelMethod,
final ImmutableMap.Builder<Object, Object> builderWithSchema,
final boolean innerAddress)
throws IOException, InterruptedException {
final var containerAddress = innerAddress ? getInnerContainerAddress(bastion) : getOuterContainerAddress(bastion);
return Jsons.jsonNode(builderWithSchema
.put("tunnel_method", Jsons.jsonNode(ImmutableMap.builder()
.put("tunnel_host",
Objects.requireNonNull(containerAddress.left))
.put("tunnel_method", tunnelMethod)
.put("tunnel_port", containerAddress.right)
.put("tunnel_user", SSH_USER)
.put("tunnel_user_password", tunnelMethod.equals(SSH_PASSWORD_AUTH) ? SSH_PASSWORD : "")
.put("ssh_key", tunnelMethod.equals(SSH_KEY_AUTH) ? bastion.execInContainer("cat", "var/bastion/id_rsa").getStdout() : "")
.build()))
.put("tunnel_method", getTunnelMethod(tunnelMethod, innerAddress))
.build());
}

Expand Down Expand Up @@ -83,6 +89,11 @@ public void stopAndClose() {
bastion.close();
}

@Override
public void close() {
stopAndClose();
}

public GenericContainer getContainer() {
return bastion;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.4.11
version=0.5.0
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MSSQLServerContainer;
import org.testcontainers.containers.MySQLContainer;

/**
Expand Down Expand Up @@ -80,7 +81,7 @@ void testCreatingMySQLDataSourceWithConnectionTimeoutSetBelowDefault() {
try (MySQLContainer<?> mySQLContainer = new MySQLContainer<>("mysql:8.0")) {
mySQLContainer.start();
final Map<String, String> connectionProperties = Map.of(
CONNECT_TIMEOUT, "30");
CONNECT_TIMEOUT, "5000");
final DataSource dataSource = DataSourceFactory.create(
mySQLContainer.getUsername(),
mySQLContainer.getPassword(),
Expand All @@ -89,7 +90,23 @@ void testCreatingMySQLDataSourceWithConnectionTimeoutSetBelowDefault() {
connectionProperties);
assertNotNull(dataSource);
assertEquals(HikariDataSource.class, dataSource.getClass());
assertEquals(60000, ((HikariDataSource) dataSource).getHikariConfigMXBean().getConnectionTimeout());
assertEquals(5000, ((HikariDataSource) dataSource).getHikariConfigMXBean().getConnectionTimeout());
}
}

@Test
void testCreatingMsSQLServerDataSourceWithConnectionTimeoutSetBelowDefault() {
try (var mssqlServerContainer = new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest").acceptLicense()) {
mssqlServerContainer.start();
final DataSource dataSource = DataSourceFactory.create(
mssqlServerContainer.getUsername(),
mssqlServerContainer.getPassword(),
mssqlServerContainer.getDriverClassName(),
mssqlServerContainer.getJdbcUrl(),
Map.of("loginTimeout", "5"));
assertNotNull(dataSource);
assertEquals(HikariDataSource.class, dataSource.getClass());
assertEquals(5000, ((HikariDataSource) dataSource).getHikariConfigMXBean().getConnectionTimeout());
}
}

Expand Down
Loading

0 comments on commit d97a399

Please sign in to comment.