diff --git a/airbyte-integrations/connectors/destination-teradata/src/main/java/io/airbyte/integrations/destination/teradata/TeradataDestination.java b/airbyte-integrations/connectors/destination-teradata/src/main/java/io/airbyte/integrations/destination/teradata/TeradataDestination.java deleted file mode 100644 index 55aa93c237b4..000000000000 --- a/airbyte-integrations/connectors/destination-teradata/src/main/java/io/airbyte/integrations/destination/teradata/TeradataDestination.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.teradata; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableMap; -import io.airbyte.cdk.db.jdbc.JdbcUtils; -import io.airbyte.cdk.integrations.base.Destination; -import io.airbyte.cdk.integrations.base.IntegrationRunner; -import io.airbyte.cdk.integrations.destination.StandardNameTransformer; -import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination; -import io.airbyte.commons.json.Jsons; -import java.io.IOException; -import java.io.PrintWriter; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TeradataDestination extends AbstractJdbcDestination implements Destination { - - private static final Logger LOGGER = LoggerFactory.getLogger(TeradataDestination.class); - /** - * Teradata JDBC driver - */ - public static final String DRIVER_CLASS = "com.teradata.jdbc.TeraDriver"; - /** - * Default schema name - */ - protected static final String DEFAULT_SCHEMA_NAME = "def_airbyte_db"; - protected static final String PARAM_MODE = "mode"; - protected static final String PARAM_SSL = "ssl"; - protected static final String PARAM_SSL_MODE = "ssl_mode"; - protected static final String PARAM_SSLMODE = "sslmode"; - protected static final String PARAM_SSLCA = "sslca"; - protected static final String REQUIRE = "require"; - - protected static final String VERIFY_CA = "verify-ca"; - - protected static final String VERIFY_FULL = "verify-full"; - - protected static final String ALLOW = "allow"; - - protected static final String CA_CERTIFICATE = "ca.pem"; - - protected static final String CA_CERT_KEY = "ssl_ca_certificate"; - - protected static final String ENCRYPTDATA = "ENCRYPTDATA"; - - protected static final String ENCRYPTDATA_ON = "ON"; - - public static void main(String[] args) throws Exception { - new IntegrationRunner(new TeradataDestination()).run(args); - } - - public TeradataDestination() { - super(DRIVER_CLASS, new StandardNameTransformer(), new TeradataSqlOperations()); - } - - private static void createCertificateFile(String fileName, String fileValue) throws IOException { - try (final PrintWriter out = new PrintWriter(fileName, StandardCharsets.UTF_8)) { - out.print(fileValue); - } - } - - @Override - protected Map getDefaultConnectionProperties(final JsonNode config) { - final Map additionalParameters = new HashMap<>(); - if (config.has(PARAM_SSL) && config.get(PARAM_SSL).asBoolean()) { - LOGGER.debug("SSL Enabled"); - if (config.has(PARAM_SSL_MODE)) { - LOGGER.debug("Selected SSL Mode : " + config.get(PARAM_SSL_MODE).get(PARAM_MODE).asText()); - additionalParameters.putAll(obtainConnectionOptions(config.get(PARAM_SSL_MODE))); - } else { - additionalParameters.put(PARAM_SSLMODE, REQUIRE); - } - } - additionalParameters.put(ENCRYPTDATA, ENCRYPTDATA_ON); - return additionalParameters; - } - - private Map obtainConnectionOptions(final JsonNode encryption) { - final Map additionalParameters = new HashMap<>(); - if (!encryption.isNull()) { - final var method = encryption.get(PARAM_MODE).asText(); - switch (method) { - case "verify-ca", "verify-full" -> { - additionalParameters.put(PARAM_SSLMODE, method); - try { - createCertificateFile(CA_CERTIFICATE, encryption.get("ssl_ca_certificate").asText()); - } catch (final IOException ioe) { - throw new RuntimeException("Failed to create certificate file"); - } - additionalParameters.put(PARAM_SSLCA, CA_CERTIFICATE); - } - default -> { - additionalParameters.put(PARAM_SSLMODE, method); - } - } - } - return additionalParameters; - } - - @Override - public JsonNode toJdbcConfig(final JsonNode config) { - final String schema = Optional.ofNullable(config.get(JdbcUtils.SCHEMA_KEY)).map(JsonNode::asText).orElse(DEFAULT_SCHEMA_NAME); - - final String jdbcUrl = String.format("jdbc:teradata://%s/", - config.get(JdbcUtils.HOST_KEY).asText()); - - final ImmutableMap.Builder configBuilder = ImmutableMap.builder() - .put(JdbcUtils.USERNAME_KEY, config.get(JdbcUtils.USERNAME_KEY).asText()) - .put(JdbcUtils.JDBC_URL_KEY, jdbcUrl) - .put(JdbcUtils.SCHEMA_KEY, schema); - - if (config.has(JdbcUtils.PASSWORD_KEY)) { - configBuilder.put(JdbcUtils.PASSWORD_KEY, config.get(JdbcUtils.PASSWORD_KEY).asText()); - } - - if (config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) { - configBuilder.put(JdbcUtils.JDBC_URL_PARAMS_KEY, config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText()); - } - return Jsons.jsonNode(configBuilder.build()); - } - -} diff --git a/airbyte-integrations/connectors/destination-teradata/src/main/java/io/airbyte/integrations/destination/teradata/TeradataSqlOperations.java b/airbyte-integrations/connectors/destination-teradata/src/main/java/io/airbyte/integrations/destination/teradata/TeradataSqlOperations.java deleted file mode 100644 index 0106f4dd05cc..000000000000 --- a/airbyte-integrations/connectors/destination-teradata/src/main/java/io/airbyte/integrations/destination/teradata/TeradataSqlOperations.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.teradata; - -import io.airbyte.cdk.db.jdbc.JdbcDatabase; -import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility; -import io.airbyte.cdk.integrations.base.JavaBaseConstants; -import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations; -import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.destination.teradata.util.JSONStruct; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Timestamp; -import java.time.Instant; -import java.util.List; -import java.util.UUID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TeradataSqlOperations extends JdbcSqlOperations { - - private static final Logger LOGGER = LoggerFactory.getLogger(TeradataSqlOperations.class); - - @Override - public void insertRecordsInternal(final JdbcDatabase database, - final List records, - final String schemaName, - final String tableName) - throws SQLException { - if (records.isEmpty()) { - return; - } - final String insertQueryComponent = String.format("INSERT INTO %s.%s (%s, %s, %s) VALUES (?, ?, ?)", schemaName, tableName, - JavaBaseConstants.COLUMN_NAME_AB_ID, - JavaBaseConstants.COLUMN_NAME_DATA, - JavaBaseConstants.COLUMN_NAME_EMITTED_AT); - database.execute(con -> { - try { - - final PreparedStatement pstmt = con.prepareStatement(insertQueryComponent); - - for (final AirbyteRecordMessage record : records) { - - final String uuid = UUID.randomUUID().toString(); - final String jsonData = Jsons.serialize(formatData(record.getData())); - final Timestamp emittedAt = Timestamp.from(Instant.ofEpochMilli(record.getEmittedAt())); - LOGGER.info("uuid: " + uuid); - LOGGER.info("jsonData: " + jsonData); - LOGGER.info("emittedAt: " + emittedAt); - pstmt.setString(1, uuid); - pstmt.setObject(2, new JSONStruct("JSON", new Object[] {jsonData})); - pstmt.setTimestamp(3, emittedAt); - pstmt.addBatch(); - - } - - pstmt.executeBatch(); - - } catch (final SQLException se) { - for (SQLException ex = se; ex != null; ex = ex.getNextException()) { - LOGGER.info(ex.getMessage()); - } - AirbyteTraceMessageUtility.emitSystemErrorTrace(se, - "Connector failed while inserting records to staging table"); - throw new RuntimeException(se); - } catch (final Exception e) { - AirbyteTraceMessageUtility.emitSystemErrorTrace(e, - "Connector failed while inserting records to staging table"); - throw new RuntimeException(e); - } - - }); - } - - @Override - public void createSchemaIfNotExists(final JdbcDatabase database, final String schemaName) throws Exception { - try { - database.execute(String.format("CREATE DATABASE \"%s\" AS PERMANENT = 120e6, SPOOL = 120e6;", schemaName)); - } catch (final SQLException e) { - if (e.getMessage().contains("already exists")) { - LOGGER.warn("Database " + schemaName + " already exists."); - } else { - AirbyteTraceMessageUtility.emitSystemErrorTrace(e, "Connector failed while creating schema "); - throw new RuntimeException(e); - } - } - - } - - @Override - public void createTableIfNotExists(final JdbcDatabase database, final String schemaName, final String tableName) - throws SQLException { - try { - database.execute(createTableQuery(database, schemaName, tableName)); - } catch (final SQLException e) { - if (e.getMessage().contains("already exists")) { - LOGGER.warn("Table " + schemaName + "." + tableName + " already exists."); - } else { - AirbyteTraceMessageUtility.emitSystemErrorTrace(e, "Connector failed while creating table "); - throw new RuntimeException(e); - } - } - } - - @Override - public String createTableQuery(final JdbcDatabase database, final String schemaName, final String tableName) { - return String.format( - "CREATE TABLE %s.%s, FALLBACK ( %s VARCHAR(256), %s JSON, %s TIMESTAMP(6)) " + - " UNIQUE PRIMARY INDEX (%s) ", - schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, - JavaBaseConstants.COLUMN_NAME_EMITTED_AT, JavaBaseConstants.COLUMN_NAME_AB_ID); - } - - @Override - public void dropTableIfExists(final JdbcDatabase database, final String schemaName, final String tableName) - throws SQLException { - try { - database.execute(dropTableIfExistsQueryInternal(schemaName, tableName)); - } catch (final SQLException e) { - AirbyteTraceMessageUtility.emitSystemErrorTrace(e, - "Connector failed while dropping table " + schemaName + "." + tableName); - } - } - - @Override - public String truncateTableQuery(final JdbcDatabase database, final String schemaName, final String tableName) { - try { - return String.format("DELETE %s.%s ALL;\n", schemaName, tableName); - } catch (final Exception e) { - AirbyteTraceMessageUtility.emitSystemErrorTrace(e, - "Connector failed while truncating table " + schemaName + "." + tableName); - } - return ""; - } - - private String dropTableIfExistsQueryInternal(final String schemaName, final String tableName) { - try { - return String.format("DROP TABLE %s.%s;\n", schemaName, tableName); - } catch (final Exception e) { - AirbyteTraceMessageUtility.emitSystemErrorTrace(e, - "Connector failed while dropping table " + schemaName + "." + tableName); - } - return ""; - } - - @Override - public void executeTransaction(final JdbcDatabase database, final List queries) throws Exception { - final StringBuilder appendedQueries = new StringBuilder(); - try { - for (final String query : queries) { - appendedQueries.append(query); - } - database.execute(appendedQueries.toString()); - } catch (final SQLException e) { - AirbyteTraceMessageUtility.emitSystemErrorTrace(e, - "Connector failed while executing queries : " + appendedQueries.toString()); - } - } - -} diff --git a/airbyte-integrations/connectors/destination-teradata/src/main/java/io/airbyte/integrations/destination/teradata/util/JSONStruct.java b/airbyte-integrations/connectors/destination-teradata/src/main/java/io/airbyte/integrations/destination/teradata/util/JSONStruct.java deleted file mode 100644 index ab44bca58dfe..000000000000 --- a/airbyte-integrations/connectors/destination-teradata/src/main/java/io/airbyte/integrations/destination/teradata/util/JSONStruct.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.integrations.destination.teradata.util; - -import java.sql.SQLException; -import java.sql.Struct; - -/** - * Utility class to handle Teradata JSON data type. The JSON data type stores text as a CLOB in - * either CHARACTER SET LATIN or CHARACTER SET UNICODE. A JSON value sent to the Teradata database - * using a Struct containing String or a Reader attribute. - */ -public class JSONStruct implements Struct { - - private final Object[] m_attributes; - private final String m_sqlTypeName; - - /** - * Constructs a new JSONStruct with the specified SQL type name and attributes. - * - * @param sqlTypeName The SQL type name. - * @param attributes The attributes of the JSONStruct. - */ - public JSONStruct(String sqlTypeName, Object[] attributes) { - m_sqlTypeName = sqlTypeName; - m_attributes = attributes; - } - - /** - * Retrieves the attributes of this JSONStruct. - * - * @return An array containing the attributes of this JSONStruct. - * @throws SQLException if a database access error occurs. - */ - public Object[] getAttributes() throws SQLException { - return m_attributes; - } - - /** - * Retrieves the SQL type name of this JSONStruct. - * - * @return The SQL type name of this JSONStruct. - * @throws SQLException if a database access error occurs. - */ - public String getSQLTypeName() throws SQLException { - return m_sqlTypeName; - } - - // This method is not supported, but needs to be included - /** - * Retrieves the attributes of this JSONStruct with the specified map. - * - * @param map A map containing the attributes. - * @return An array containing the attributes of this JSONStruct. - * @throws SQLException if a database access error occurs. - */ - public Object[] getAttributes(java.util.Map map) throws SQLException { - // Unsupported Exception - throw new SQLException("getAttributes (Map) NOT SUPPORTED"); - } - -} diff --git a/airbyte-integrations/connectors/destination-teradata/src/main/kotlin/io/airbyte/integrations/destination/teradata/TeradataDestination.kt b/airbyte-integrations/connectors/destination-teradata/src/main/kotlin/io/airbyte/integrations/destination/teradata/TeradataDestination.kt new file mode 100644 index 000000000000..6b81032f0264 --- /dev/null +++ b/airbyte-integrations/connectors/destination-teradata/src/main/kotlin/io/airbyte/integrations/destination/teradata/TeradataDestination.kt @@ -0,0 +1,243 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.teradata + +import com.fasterxml.jackson.databind.JsonNode +import com.google.common.collect.ImmutableMap +import io.airbyte.cdk.db.factory.DataSourceFactory +import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase +import io.airbyte.cdk.db.jdbc.JdbcDatabase +import io.airbyte.cdk.db.jdbc.JdbcUtils +import io.airbyte.cdk.integrations.base.Destination +import io.airbyte.cdk.integrations.base.IntegrationRunner +import io.airbyte.cdk.integrations.destination.StandardNameTransformer +import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination +import io.airbyte.commons.json.Jsons +import io.airbyte.integrations.destination.teradata.util.TeradataConstants +import java.io.IOException +import java.io.PrintWriter +import java.nio.charset.StandardCharsets +import java.sql.SQLException +import java.util.* +import java.util.regex.Pattern +import javax.sql.DataSource +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +/** + * The TeradataDestination class is responsible for handling the connection to the Teradata database + * as destination from Airbyte. It extends the AbstractJdbcDestination class and implements the + * Destination interface, facilitating the configuration and management of database interactions, + * including setting a query band. + */ +class TeradataDestination : + AbstractJdbcDestination( + TeradataConstants.DRIVER_CLASS, + StandardNameTransformer(), + TeradataSqlOperations(), + ), + Destination { + /** + * Retrieves the data source for the Teradata database connection. + * + * @param config The configuration settings as a JsonNode. + * @return The DataSource object for the Teradata connection. + */ + override fun getDataSource(config: JsonNode): DataSource { + val jdbcConfig = this.toJdbcConfig(config) + val dataSource = + DataSourceFactory.create( + jdbcConfig[JdbcUtils.USERNAME_KEY].asText(), + if (jdbcConfig.has(JdbcUtils.PASSWORD_KEY)) + jdbcConfig[JdbcUtils.PASSWORD_KEY].asText() + else null, + TeradataConstants.DRIVER_CLASS, + jdbcConfig[JdbcUtils.JDBC_URL_KEY].asText(), + this.getConnectionProperties(config), + ) + // set session query band + setQueryBand(getDatabase(dataSource)) + return dataSource + } + /** + * Retrieves the JdbcDatabase instance based on the provided DataSource. + * + * @param dataSource The DataSource to create the JdbcDatabase from. + * @return The JdbcDatabase instance. + */ + override fun getDatabase(dataSource: DataSource): JdbcDatabase { + return DefaultJdbcDatabase(dataSource) + } + /** + * Sets the Teradata session query band to identify the source of SQL requests originating from + * Airbyte. + * + * @param jdbcDatabase The JdbcDatabase instance for which to set the query band. + */ + private fun setQueryBand(jdbcDatabase: JdbcDatabase) { + val setQueryBandSql = + TeradataConstants.QUERY_BAND_SET + + Companion.queryBand + + TeradataConstants.QUERY_BAND_SESSION + try { + jdbcDatabase.execute(setQueryBandSql) + } catch (ex: SQLException) { + LOGGER.error("Error occurred while setting session query band: {}", ex.message, ex) + } + } + /** + * Retrieves the default connection properties for the Teradata database based on the provided + * configuration. + * + * @param config The configuration settings as a JsonNode. + * @return A map of default connection properties. + */ + override fun getDefaultConnectionProperties(config: JsonNode): Map { + val additionalParameters: MutableMap = HashMap() + if ( + config.has(TeradataConstants.PARAM_SSL) && + config[TeradataConstants.PARAM_SSL].asBoolean() + ) { + if (config.has(TeradataConstants.PARAM_SSL_MODE)) { + additionalParameters.putAll( + obtainConnectionOptions(config[TeradataConstants.PARAM_SSL_MODE]) + ) + } else { + additionalParameters[TeradataConstants.PARAM_SSLMODE] = TeradataConstants.REQUIRE + } + } + if (config.has(TeradataConstants.QUERY_BAND_KEY)) { + Companion.queryBand = + handleUserQueryBandText( + config[TeradataConstants.QUERY_BAND_KEY].asText(), + ) + } + additionalParameters[TeradataConstants.ENCRYPTDATA] = TeradataConstants.ENCRYPTDATA_ON + return additionalParameters + } + + /** + * Obtains additional connection options like SSL configuration. + * + * @param encryption The JsonNode containing SSL parameters. + * @return A map of additional connection properties. + */ + private fun obtainConnectionOptions(encryption: JsonNode): Map { + val additionalParameters: MutableMap = HashMap() + if (!encryption.isNull) { + val method = encryption[TeradataConstants.PARAM_MODE].asText() + additionalParameters[TeradataConstants.PARAM_SSLMODE] = method + + if (TeradataConstants.VERIFY_CA == method || TeradataConstants.VERIFY_FULL == method) { + try { + createCertificateFile(encryption[TeradataConstants.CA_CERT_KEY].asText()) + additionalParameters[TeradataConstants.PARAM_SSLCA] = + TeradataConstants.CA_CERTIFICATE + } catch (ioe: IOException) { + throw RuntimeException("Failed to create certificate file", ioe) + } + } + } + return additionalParameters + } + + /** Creates certificate file for verify-ca and verify-full ssl connection */ + @Throws(IOException::class) + private fun createCertificateFile(fileValue: String) { + PrintWriter(TeradataConstants.CA_CERTIFICATE, StandardCharsets.UTF_8).use { out -> + out.print(fileValue) + } + } + /** + * Handles and validates the user-defined query band text. + * + * @param queryBandText The user-defined query band text. + * @return The validated query band text, ensuring required parameters are presentin required + * format. + */ + private fun handleUserQueryBandText(queryBandText: String?): String { + if (queryBandText.isNullOrBlank()) { + return Companion.queryBand + } + var updatedQueryBand = StringBuilder(queryBandText) + // checking org doesn't exist in query_band, appending 'org=teradata-internal-telem' + // If it exists, user might have set some value of their own, so doing nothing in that case + val orgMatcher = Pattern.compile("org\\s*=").matcher(queryBandText) + if (!orgMatcher.find()) { + if (queryBandText.isNotBlank() && !queryBandText.endsWith(";")) { + updatedQueryBand.append(";") + } + updatedQueryBand.append(TeradataConstants.DEFAULT_QUERY_BAND_ORG) + } + + // Ensure appname contains airbyte is present or append it if it exists with different value + val appNameMatcher = Pattern.compile("appname\\s*=\\s*([^;]*)").matcher(updatedQueryBand) + if (appNameMatcher.find()) { + val appNameValue = appNameMatcher.group(1).trim { it <= ' ' } + if (!appNameValue.lowercase(Locale.getDefault()).contains("airbyte")) { + updatedQueryBand = + StringBuilder( + updatedQueryBand + .toString() + .replace( + "appname\\s*=\\s*([^;]*)".toRegex(), + "appname=" + appNameValue + "_airbyte", + ), + ) + } + } else { + if (updatedQueryBand.isNotEmpty() && !updatedQueryBand.toString().endsWith(";")) { + updatedQueryBand.append(";") + } + updatedQueryBand.append(TeradataConstants.DEFAULT_QUERY_BAND_APPNAME) + } + return updatedQueryBand.toString() + } + /** + * Converts the provided configuration into JDBC configuration settings. + * + * @param config The configuration settings as a JsonNode. + * @return The converted JsonNode containing JDBC configuration. + */ + override fun toJdbcConfig(config: JsonNode): JsonNode { + val schema = + Optional.ofNullable(config[JdbcUtils.SCHEMA_KEY]) + .map { obj: JsonNode -> obj.asText() } + .orElse(TeradataConstants.DEFAULT_SCHEMA_NAME) + val jdbcUrl = String.format("jdbc:teradata://%s/", config[JdbcUtils.HOST_KEY].asText()) + + val configBuilder = + ImmutableMap.builder() + .put(JdbcUtils.USERNAME_KEY, config[JdbcUtils.USERNAME_KEY].asText()) + .put(JdbcUtils.JDBC_URL_KEY, jdbcUrl) + .put(JdbcUtils.SCHEMA_KEY, schema) + + if (config.has(JdbcUtils.PASSWORD_KEY)) { + configBuilder.put(JdbcUtils.PASSWORD_KEY, config[JdbcUtils.PASSWORD_KEY].asText()) + } + if (config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) { + configBuilder.put( + JdbcUtils.JDBC_URL_PARAMS_KEY, + config[JdbcUtils.JDBC_URL_PARAMS_KEY].asText(), + ) + } + + return Jsons.jsonNode(configBuilder.build()) + } + + val queryBand: String + get() = Companion.queryBand + + companion object { + private val LOGGER: Logger = LoggerFactory.getLogger(TeradataDestination::class.java) + + private var queryBand = TeradataConstants.DEFAULT_QUERY_BAND + + @Throws(Exception::class) + @JvmStatic + fun main(args: Array) { + IntegrationRunner(TeradataDestination()).run(args) + } + } +} diff --git a/airbyte-integrations/connectors/destination-teradata/src/main/kotlin/io/airbyte/integrations/destination/teradata/TeradataSqlOperations.kt b/airbyte-integrations/connectors/destination-teradata/src/main/kotlin/io/airbyte/integrations/destination/teradata/TeradataSqlOperations.kt new file mode 100644 index 000000000000..3fb613998b60 --- /dev/null +++ b/airbyte-integrations/connectors/destination-teradata/src/main/kotlin/io/airbyte/integrations/destination/teradata/TeradataSqlOperations.kt @@ -0,0 +1,282 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.teradata + +import io.airbyte.cdk.db.jdbc.JdbcDatabase +import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility +import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations +import io.airbyte.commons.json.Jsons +import io.airbyte.integrations.destination.teradata.util.JSONStruct +import io.airbyte.protocol.models.v0.AirbyteRecordMessage +import java.sql.Connection +import java.sql.SQLException +import java.sql.Timestamp +import java.time.Instant +import java.util.* +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +/** + * The TeradataSqlOperations class is responsible for performing SQL operations on the Teradata + * database. It extends the JdbcSqlOperations class to provide functionalities specific to the + * Teradata integration, including inserting records, creating schemas and tables, and executing SQL + * transactions. + */ +class TeradataSqlOperations : JdbcSqlOperations() { + @Throws(SQLException::class) + /** + * Inserts a list of records into a specified table in the Teradata database. + * + * @param database The JdbcDatabase instance to interact with the database. + * @param records The list of AirbyteRecordMessage to be inserted. + * @param schemaName The name of the schema where the table resides. + * @param tableName The name of the table where records will be inserted. + * @throws SQLException If an SQL error occurs during the insert operation. + */ + public override fun insertRecordsInternal( + database: JdbcDatabase, + records: List, + schemaName: String, + tableName: String + ) { + if (records.isEmpty()) { + return + } + val insertQueryComponent = + String.format( + "INSERT INTO %s.%s (%s, %s, %s) VALUES (?, ?, ?)", + schemaName, + tableName, + JavaBaseConstants.COLUMN_NAME_AB_ID, + JavaBaseConstants.COLUMN_NAME_DATA, + JavaBaseConstants.COLUMN_NAME_EMITTED_AT, + ) + database.execute { con: Connection -> + try { + val pstmt = con.prepareStatement(insertQueryComponent) + + for (record in records) { + val uuid = UUID.randomUUID().toString() + val jsonData = + Jsons.serialize( + formatData(record.data), + ) + val emittedAt = Timestamp.from(Instant.ofEpochMilli(record.emittedAt)) + LOGGER.info( + "uuid: $uuid", + ) + LOGGER.info( + "jsonData: $jsonData", + ) + LOGGER.info( + "emittedAt: $emittedAt", + ) + var i = 0 + pstmt.setString(++i, uuid) + pstmt.setObject( + ++i, + JSONStruct( + "JSON", + arrayOf(jsonData), + ), + ) + pstmt.setTimestamp(++i, emittedAt) + pstmt.addBatch() + } + + pstmt.executeBatch() + } catch (se: SQLException) { + handleSQLException(se) + } catch (e: Exception) { + AirbyteTraceMessageUtility.emitSystemErrorTrace( + e, + "Connector failed during inserting records to staging table", + ) + throw RuntimeException(e) + } + } + } + + /** Handles SQL exception */ + private fun handleSQLException(se: SQLException) { + var ex: SQLException? = se + val action = "inserting records to staging table" + while (ex != null) { + LOGGER.error("SQL error during $action: ${ex.message}") + ex = ex.nextException + } + AirbyteTraceMessageUtility.emitSystemErrorTrace(se, "Connector failed during $action") + throw RuntimeException(se) + } + + /** + * Creates a schema in the Teradata database if it does not already exist. + * + * @param database The JdbcDatabase instance to interact with the database. + * @param schemaName The name of the schema to be created. + * @throws Exception If an error occurs while creating the schema. + */ + @Throws(Exception::class) + override fun createSchemaIfNotExists(database: JdbcDatabase, schemaName: String) { + try { + database.execute( + String.format( + "CREATE DATABASE \"%s\" AS PERMANENT = 120e6, SPOOL = 120e6;", + schemaName, + ), + ) + } catch (e: SQLException) { + if (e.message!!.contains("already exists")) { + LOGGER.warn( + "Database $schemaName already exists.", + ) + } else { + AirbyteTraceMessageUtility.emitSystemErrorTrace( + e, + "Connector failed while creating schema ", + ) + throw RuntimeException(e) + } + } + } + /** + * Creates a table in the Teradata database if it does not already exist. + * + * @param database The JdbcDatabase instance to interact with the database. + * @param schemaName The name of the schema where the table resides. + * @param tableName The name of the table to be created. + * @throws SQLException If an SQL error occurs during the creation of the table. + */ + @Throws(SQLException::class) + override fun createTableIfNotExists( + database: JdbcDatabase, + schemaName: String, + tableName: String + ) { + try { + database.execute(createTableQuery(database, schemaName, tableName)) + } catch (e: SQLException) { + if (e.message!!.contains("already exists")) { + LOGGER.warn( + "Table $schemaName.$tableName already exists.", + ) + } else { + AirbyteTraceMessageUtility.emitSystemErrorTrace( + e, + "Connector failed while creating table ", + ) + throw RuntimeException(e) + } + } + } + /** + * Constructs the SQL query for creating a new table in the Teradata database. + * + * @param database The JdbcDatabase instance to interact with the database. + * @param schemaName The name of the schema where the table will be created. + * @param tableName The name of the table to be created. + * @return The SQL query string for creating the table. + */ + override fun createTableQuery( + database: JdbcDatabase, + schemaName: String, + tableName: String + ): String { + return String.format( + "CREATE SET TABLE %s.%s, FALLBACK ( %s VARCHAR(256), %s JSON, %s TIMESTAMP(6)) " + + " UNIQUE PRIMARY INDEX (%s) ", + schemaName, + tableName, + JavaBaseConstants.COLUMN_NAME_AB_ID, + JavaBaseConstants.COLUMN_NAME_DATA, + JavaBaseConstants.COLUMN_NAME_EMITTED_AT, + JavaBaseConstants.COLUMN_NAME_AB_ID, + ) + } + /** + * Drops a specified table from the Teradata database if it exists. + * + * @param database The JdbcDatabase instance to interact with the database. + * @param schemaName The name of the schema where the table resides. + * @param tableName The name of the table to be dropped. + * @throws SQLException If an SQL error occurs during the drop operation. + */ + @Throws(SQLException::class) + override fun dropTableIfExists(database: JdbcDatabase, schemaName: String, tableName: String) { + try { + database.execute(dropTableIfExistsQueryInternal(schemaName, tableName)) + } catch (e: SQLException) { + AirbyteTraceMessageUtility.emitSystemErrorTrace( + e, + "Connector failed while dropping table $schemaName.$tableName", + ) + } + } + /** + * Constructs the SQL query for truncating a table in the Teradata database. + * + * @param database The JdbcDatabase instance to interact with the database. + * @param schemaName The name of the schema where the table resides. + * @param tableName The name of the table to be truncated. + * @return The SQL query string for truncating the table. + */ + override fun truncateTableQuery( + database: JdbcDatabase, + schemaName: String, + tableName: String + ): String { + try { + return String.format("DELETE %s.%s ALL;\n", schemaName, tableName) + } catch (e: Exception) { + AirbyteTraceMessageUtility.emitSystemErrorTrace( + e, + "Connector failed while truncating table $schemaName.$tableName", + ) + } + return "" + } + + /** Drops given table */ + private fun dropTableIfExistsQueryInternal(schemaName: String, tableName: String): String { + try { + return String.format("DROP TABLE %s.%s;\n", schemaName, tableName) + } catch (e: Exception) { + AirbyteTraceMessageUtility.emitSystemErrorTrace( + e, + "Connector failed while dropping table $schemaName.$tableName", + ) + } + return "" + } + /** + * Executes a list of SQL queries as a single transaction. + * + * @param database The JdbcDatabase instance to interact with the database. + * @param queries The list of SQL queries to be executed. + * @throws Exception If an error occurs during the transaction execution. + */ + @Throws(Exception::class) + override fun executeTransaction(database: JdbcDatabase, queries: List) { + val appendedQueries = StringBuilder() + try { + for (query in queries) { + appendedQueries.append(query) + } + database.execute(appendedQueries.toString()) + } catch (e: SQLException) { + AirbyteTraceMessageUtility.emitSystemErrorTrace( + e, + "Connector failed while executing queries : $appendedQueries", + ) + } + } + + companion object { + private val LOGGER: Logger = + LoggerFactory.getLogger( + TeradataSqlOperations::class.java, + ) + } +} diff --git a/airbyte-integrations/connectors/destination-teradata/src/main/kotlin/io/airbyte/integrations/destination/teradata/util/JSONStruct.kt b/airbyte-integrations/connectors/destination-teradata/src/main/kotlin/io/airbyte/integrations/destination/teradata/util/JSONStruct.kt new file mode 100644 index 000000000000..6d5183cc3bce --- /dev/null +++ b/airbyte-integrations/connectors/destination-teradata/src/main/kotlin/io/airbyte/integrations/destination/teradata/util/JSONStruct.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.teradata.util + +import java.sql.SQLException +import java.sql.Struct + +/** + * Utility class to handle Teradata JSON data type. The JSON data type stores text as a CLOB in + * either CHARACTER SET LATIN or CHARACTER SET UNICODE. A JSON value sent to the Teradata database + * using a Struct containing String or a Reader attribute. + */ +class JSONStruct +/** + * Constructs a new JSONStruct with the specified SQL type name and attributes. + * + * @param sqlTypeName The SQL type name. + * @param attributes The attributes of the JSONStruct. + */ +(private val m_sqlTypeName: String, private val m_attributes: Array) : Struct { + /** + * Retrieves the attributes of this JSONStruct. + * + * @return An array containing the attributes of this JSONStruct. + * @throws SQLException if a database access error occurs. + */ + @Throws(SQLException::class) + override fun getAttributes(): Array { + return m_attributes + } + + /** + * Retrieves the SQL type name of this JSONStruct. + * + * @return The SQL type name of this JSONStruct. + * @throws SQLException if a database access error occurs. + */ + @Throws(SQLException::class) + override fun getSQLTypeName(): String { + return m_sqlTypeName + } + + // This method is not supported, but needs to be included + /** + * Retrieves the attributes of this JSONStruct with the specified map. + * + * @param map A map containing the attributes. + * @return An array containing the attributes of this JSONStruct. + * @throws SQLException if a database access error occurs. + */ + @Throws(SQLException::class) + override fun getAttributes(p0: MutableMap>?): Array { + throw SQLException("getAttributes (Map) NOT SUPPORTED") + } +} diff --git a/airbyte-integrations/connectors/destination-teradata/src/main/kotlin/io/airbyte/integrations/destination/teradata/util/TeradataConstants.kt b/airbyte-integrations/connectors/destination-teradata/src/main/kotlin/io/airbyte/integrations/destination/teradata/util/TeradataConstants.kt new file mode 100644 index 000000000000..3ce2e8218002 --- /dev/null +++ b/airbyte-integrations/connectors/destination-teradata/src/main/kotlin/io/airbyte/integrations/destination/teradata/util/TeradataConstants.kt @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.teradata.util + +interface TeradataConstants { + companion object { + const val DRIVER_CLASS: String = "com.teradata.jdbc.TeraDriver" + const val DEFAULT_SCHEMA_NAME: String = "def_airbyte_db" + + // SSL and Query Band parameters + const val PARAM_MODE: String = "mode" + const val PARAM_SSL: String = "ssl" + const val PARAM_SSL_MODE: String = "ssl_mode" + const val PARAM_SSLMODE: String = "sslmode" + const val PARAM_SSLCA: String = "sslca" + const val ENCRYPTDATA: String = "ENCRYPTDATA" + const val ENCRYPTDATA_ON: String = "ON" + const val CA_CERTIFICATE: String = "ca.pem" + const val ALLOW: String = "allow" + const val REQUIRE: String = "require" + const val VERIFY_CA: String = "verify-ca" + const val VERIFY_FULL: String = "verify-full" + const val CA_CERT_KEY: String = "ssl_ca_certificate" + const val QUERY_BAND_KEY: String = "query_band" + const val DEFAULT_QUERY_BAND: String = "org=teradata-internal-telem;appname=airbyte;" + const val DEFAULT_QUERY_BAND_ORG = "org=teradata-internal-telem;" + const val DEFAULT_QUERY_BAND_APPNAME = "appname=airbyte;" + const val QUERY_BAND_SET = "SET QUERY_BAND='" + const val QUERY_BAND_SESSION = "' FOR SESSION" + } +} diff --git a/airbyte-integrations/connectors/destination-teradata/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-teradata/src/main/resources/spec.json index d1653e702a35..709c9cd91158 100644 --- a/airbyte-integrations/connectors/destination-teradata/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-teradata/src/main/resources/spec.json @@ -164,6 +164,12 @@ "title": "JDBC URL Params", "type": "string", "order": 7 + }, + "query_band": { + "description": "Specifies the custom session query band", + "title": "Query Band", + "type": "string", + "order": 8 } } } diff --git a/airbyte-integrations/connectors/destination-teradata/src/test-integration/java/io/airbyte/integrations/destination/teradata/TeradataDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-teradata/src/test-integration/java/io/airbyte/integrations/destination/teradata/TeradataDestinationAcceptanceTest.java index a363a965ab4a..bdf4c3faa392 100644 --- a/airbyte-integrations/connectors/destination-teradata/src/test-integration/java/io/airbyte/integrations/destination/teradata/TeradataDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-teradata/src/test-integration/java/io/airbyte/integrations/destination/teradata/TeradataDestinationAcceptanceTest.java @@ -7,9 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.cdk.db.factory.DataSourceFactory; -import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcDatabase; -import io.airbyte.cdk.db.jdbc.JdbcSourceOperations; import io.airbyte.cdk.db.jdbc.JdbcUtils; import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.cdk.integrations.base.JavaBaseConstants; @@ -21,6 +19,7 @@ import io.airbyte.integrations.destination.teradata.envclient.TeradataHttpClient; import io.airbyte.integrations.destination.teradata.envclient.dto.*; import io.airbyte.integrations.destination.teradata.envclient.exception.BaseException; +import io.airbyte.integrations.destination.teradata.util.TeradataConstants; import java.nio.file.Files; import java.nio.file.Paths; import java.sql.SQLException; @@ -30,10 +29,7 @@ import java.util.Objects; import java.util.concurrent.ExecutionException; import javax.sql.DataSource; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,8 +50,7 @@ public class TeradataDestinationAcceptanceTest extends JdbcDestinationAcceptance private JsonNode configJson; private JdbcDatabase database; private DataSource dataSource; - private TeradataDestination destination = new TeradataDestination(); - private final JdbcSourceOperations sourceOperations = JdbcUtils.getDefaultSourceOperations(); + private final TeradataDestination destination = new TeradataDestination(); @Override protected String getImageName() { @@ -148,7 +143,7 @@ protected void setup(TestDestinationEnv testEnv, HashSet TEST_SCHEMAS) { try { ((ObjectNode) configJson).put("schema", SCHEMA_NAME); dataSource = getDataSource(configJson); - database = getDatabase(dataSource); + database = destination.getDatabase(dataSource); database.execute(createSchemaQuery); } catch (Exception e) { AirbyteTraceMessageUtility.emitSystemErrorTrace(e, "Database " + SCHEMA_NAME + " creation got failed."); @@ -191,14 +186,10 @@ protected DataSource getDataSource(final JsonNode config) { final JsonNode jdbcConfig = destination.toJdbcConfig(config); return DataSourceFactory.create(jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText(), jdbcConfig.has(JdbcUtils.PASSWORD_KEY) ? jdbcConfig.get(JdbcUtils.PASSWORD_KEY).asText() : null, - TeradataDestination.DRIVER_CLASS, jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText(), + TeradataConstants.DRIVER_CLASS, jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText(), getConnectionProperties(config)); } - protected JdbcDatabase getDatabase(final DataSource dataSource) { - return new DefaultJdbcDatabase(dataSource); - } - protected Map getConnectionProperties(final JsonNode config) { final Map customProperties = JdbcUtils.parseJdbcParameters(config, JdbcUtils.JDBC_URL_PARAMS_KEY); @@ -221,4 +212,11 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map getAdditionalParams(final String sslMethod) { switch (sslMethod) { case "verify-ca", "verify-full" -> { additionalParameters = ImmutableMap.of( - TeradataDestination.PARAM_SSL_MODE, Jsons.jsonNode(ImmutableMap.of( - TeradataDestination.PARAM_MODE, sslMethod, - TeradataDestination.CA_CERT_KEY, "dummycertificatecontent"))); + TeradataConstants.PARAM_SSL_MODE, Jsons.jsonNode(ImmutableMap.of( + TeradataConstants.PARAM_MODE, sslMethod, + TeradataConstants.CA_CERT_KEY, "dummycertificatecontent"))); } default -> { additionalParameters = ImmutableMap.of( - TeradataDestination.PARAM_SSL_MODE, Jsons.jsonNode(ImmutableMap.of( - TeradataDestination.PARAM_MODE, sslMethod))); + TeradataConstants.PARAM_SSL_MODE, Jsons.jsonNode(ImmutableMap.of( + TeradataConstants.PARAM_MODE, sslMethod))); } } return additionalParameters; @@ -93,7 +98,7 @@ private Map baseParameters() { private Map sslBaseParameters() { return ImmutableMap.builder() - .put(TeradataDestination.PARAM_SSL, "true") + .put(TeradataConstants.PARAM_SSL, "true") .put(JdbcUtils.HOST_KEY, getHostName()) .put(JdbcUtils.SCHEMA_KEY, getSchemaName()) .put(JdbcUtils.USERNAME_KEY, getUserName()) @@ -121,6 +126,30 @@ JdbcUtils.SCHEMA_KEY, getSchemaName(), JdbcUtils.JDBC_URL_PARAMS_KEY, extraParam)); } + private static Stream provideQueryBandTestCases() { + return Stream.of( + // Each test case includes the input query band and the expected result + org.junit.jupiter.params.provider.Arguments.of("", TeradataConstants.DEFAULT_QUERY_BAND), + org.junit.jupiter.params.provider.Arguments.of(" ", TeradataConstants.DEFAULT_QUERY_BAND), + org.junit.jupiter.params.provider.Arguments.of("appname=test", "appname=test_airbyte;org=teradata-internal-telem;"), + org.junit.jupiter.params.provider.Arguments.of("appname=test;", "appname=test_airbyte;org=teradata-internal-telem;"), + org.junit.jupiter.params.provider.Arguments.of("appname=airbyte", "appname=airbyte;org=teradata-internal-telem;"), + org.junit.jupiter.params.provider.Arguments.of("appname=airbyte;", "appname=airbyte;org=teradata-internal-telem;"), + org.junit.jupiter.params.provider.Arguments.of("org=test;", "org=test;appname=airbyte;"), + org.junit.jupiter.params.provider.Arguments.of("org=test", "org=test;appname=airbyte;"), + org.junit.jupiter.params.provider.Arguments.of("org=teradata-internal-telem", TeradataConstants.DEFAULT_QUERY_BAND), + org.junit.jupiter.params.provider.Arguments.of("org=teradata-internal-telem;", TeradataConstants.DEFAULT_QUERY_BAND), + org.junit.jupiter.params.provider.Arguments.of(TeradataConstants.DEFAULT_QUERY_BAND, TeradataConstants.DEFAULT_QUERY_BAND), + org.junit.jupiter.params.provider.Arguments.of("invalid_queryband", "invalid_queryband;org=teradata-internal-telem;appname=airbyte;"), + org.junit.jupiter.params.provider.Arguments.of("org=teradata-internal-telem;appname=test;", + "org=teradata-internal-telem;appname=test_airbyte;"), + org.junit.jupiter.params.provider.Arguments.of("org=custom;appname=custom;", "org=custom;appname=custom_airbyte;"), + org.junit.jupiter.params.provider.Arguments.of("org=custom;appname=custom", "org=custom;appname=custom_airbyte"), + org.junit.jupiter.params.provider.Arguments.of("org=teradata-internal-telem;appname=airbyte", "org=teradata-internal-telem;appname=airbyte"), + org.junit.jupiter.params.provider.Arguments.of("org = teradata-internal-telem;appname = airbyte", + "org = teradata-internal-telem;appname = airbyte")); + } + @Test void testJdbcUrlAndConfigNoExtraParams() { final JsonNode jdbcConfig = destination.toJdbcConfig(buildConfigNoJdbcParameters()); @@ -153,44 +182,57 @@ void testJdbcUrlExtraParams() { void testDefaultSchemaName() { final JsonNode jdbcConfig = destination.toJdbcConfig(buildConfigDefaultSchema()); assertEquals(EXPECTED_JDBC_URL, jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText()); - assertEquals(TeradataDestination.DEFAULT_SCHEMA_NAME, jdbcConfig.get(JdbcUtils.SCHEMA_KEY).asText()); + assertEquals(TeradataConstants.DEFAULT_SCHEMA_NAME, jdbcConfig.get(JdbcUtils.SCHEMA_KEY).asText()); } @Test void testSSLDisable() { final JsonNode jdbcConfig = createConfig(false); final Map properties = destination.getDefaultConnectionProperties(jdbcConfig); - assertNull(properties.get(TeradataDestination.PARAM_SSLMODE)); + assertNull(properties.get(TeradataConstants.PARAM_SSLMODE)); } @Test void testSSLDefaultMode() { final JsonNode jdbcConfig = createConfig(true); final Map properties = destination.getDefaultConnectionProperties(jdbcConfig); - assertEquals(TeradataDestination.REQUIRE, properties.get(TeradataDestination.PARAM_SSLMODE).toString()); + assertEquals(TeradataConstants.REQUIRE, properties.get(TeradataConstants.PARAM_SSLMODE).toString()); } @Test void testSSLAllowMode() { - final JsonNode jdbcConfig = createConfig(TeradataDestination.ALLOW); + final JsonNode jdbcConfig = createConfig(TeradataConstants.ALLOW); final Map properties = destination.getDefaultConnectionProperties(jdbcConfig); - assertEquals(TeradataDestination.ALLOW, properties.get(TeradataDestination.PARAM_SSLMODE).toString()); + assertEquals(TeradataConstants.ALLOW, properties.get(TeradataConstants.PARAM_SSLMODE).toString()); } @Test void testSSLVerfifyCAMode() { - final JsonNode jdbcConfig = createConfig(TeradataDestination.VERIFY_CA); + final JsonNode jdbcConfig = createConfig(TeradataConstants.VERIFY_CA); final Map properties = destination.getDefaultConnectionProperties(jdbcConfig); - assertEquals(TeradataDestination.VERIFY_CA, properties.get(TeradataDestination.PARAM_SSLMODE).toString()); - assertNotNull(properties.get(TeradataDestination.PARAM_SSLCA).toString()); + assertEquals(TeradataConstants.VERIFY_CA, properties.get(TeradataConstants.PARAM_SSLMODE).toString()); + assertNotNull(properties.get(TeradataConstants.PARAM_SSLCA).toString()); } @Test void testSSLVerfifyFullMode() { - final JsonNode jdbcConfig = createConfig(TeradataDestination.VERIFY_FULL); + final JsonNode jdbcConfig = createConfig(TeradataConstants.VERIFY_FULL); final Map properties = destination.getDefaultConnectionProperties(jdbcConfig); - assertEquals(TeradataDestination.VERIFY_FULL, properties.get(TeradataDestination.PARAM_SSLMODE).toString()); - assertNotNull(properties.get(TeradataDestination.PARAM_SSLCA).toString()); + assertEquals(TeradataConstants.VERIFY_FULL, properties.get(TeradataConstants.PARAM_SSLMODE).toString()); + assertNotNull(properties.get(TeradataConstants.PARAM_SSLCA).toString()); + } + + @ParameterizedTest + @MethodSource("provideQueryBandTestCases") + void testQueryBandCustom(String queryBandInput, String expectedQueryBand) { + Map baseParameters = baseParameters(); // Adjust to your method + ImmutableMap map_custom_QB = ImmutableMap.of( + TeradataConstants.QUERY_BAND_KEY, queryBandInput); + + final JsonNode jdbcConfig = Jsons.jsonNode(MoreMaps.merge(map_custom_QB, baseParameters)); + destination.getDefaultConnectionProperties(jdbcConfig); + + assertEquals(expectedQueryBand, destination.getQueryBand()); } } diff --git a/airbyte-integrations/connectors/destination-teradata/src/test/java/io/airbyte/integrations/destination/teradata/util/JSONStructTest.java b/airbyte-integrations/connectors/destination-teradata/src/test/java/io/airbyte/integrations/destination/teradata/util/JSONStructTest.java index 05184630f026..85572be058d2 100644 --- a/airbyte-integrations/connectors/destination-teradata/src/test/java/io/airbyte/integrations/destination/teradata/util/JSONStructTest.java +++ b/airbyte-integrations/connectors/destination-teradata/src/test/java/io/airbyte/integrations/destination/teradata/util/JSONStructTest.java @@ -48,8 +48,8 @@ void testGetAttributes() throws SQLException { @Test void testGetAttributesException() { SQLException exception = assertThrows(SQLException.class, () -> { - Map map = new HashMap<>(); - struct.getAttributes(map); + Map> inputMap = new HashMap<>(); + struct.getAttributes(inputMap); }); String expectedMessage = "getAttributes (Map) NOT SUPPORTED"; String actualMessage = exception.getMessage(); diff --git a/docs/integrations/destinations/teradata.md b/docs/integrations/destinations/teradata.md index 6aae8d512f78..3b9205c22d17 100644 --- a/docs/integrations/destinations/teradata.md +++ b/docs/integrations/destinations/teradata.md @@ -17,6 +17,9 @@ You'll need the following information to configure the Teradata destination: - **Password** - **Default Schema Name** - Specify the schema (or several schemas separated by commas) to be set in the search-path. These schemas will be used to resolve unqualified object names used in statements executed over this connection. - **JDBC URL Params** (optional) +- **Query Band** (optional) - The [query band ](https://teradata-docs.s3.amazonaws.com/doc/connectivity/jdbc/reference/current/jdbcug_chapter_2.html#BGEGBBAA)is a set of name-value pairs that can be assigned to a Teradata database session. It helps identify the source of SQL requests originating from Airbyte. You can customize the query band to include relevant information such as application name, organization, and user identifiers. + Each entry should be formatted as key=value, separated by semicolons (;). + Example: `appname=myApp;org=myOrganization;` [Refer to this guide for more details](https://downloads.teradata.com/doc/connectivity/jdbc/reference/current/jdbcug_chapter_2.html#BGBHDDGB) @@ -75,10 +78,12 @@ You can also use a pre-existing user but we highly recommend creating a dedicate 6. For **Default Schema**, enter the Default Schema name. The default value is public. 7. For **User** and **Password**, enter the database username and password. 8. To customize the JDBC connection beyond common options, specify additional supported [JDBC URL parameters](https://downloads.teradata.com/doc/connectivity/jdbc/reference/current/jdbcug_chapter_2.html#BGBHDDGB) as key-value pairs separated by the symbol & in the **JDBC URL Params** field. - Example: key1=value1&key2=value2&key3=value3 These parameters will be added at the end of the JDBC URL that the AirByte will use to connect to your Teradata database. +9. To customize the [query band](https://teradata-docs.s3.amazonaws.com/doc/connectivity/jdbc/reference/current/jdbcug_chapter_2.html#BGEGBBAA), specify set of name-value pairs in the **Query Band** field that can be set to the current database session. + + ## Changelog