From 776ac9447819858430f50f3cf21a51b7e00e59f8 Mon Sep 17 00:00:00 2001 From: XQ <38197356+xyq2834646405@users.noreply.github.com> Date: Wed, 8 Jan 2025 10:16:56 +0800 Subject: [PATCH] [Feature][JDBC source] pg support char types (#8420) --- .../dialect/psql/PostgresTypeConverter.java | 4 +++- .../jdbc/JdbcPostgresIdentifierIT.java | 18 ++++++++++++------ .../jdbc_postgres_ide_source_and_sink.conf | 2 +- .../seatunnel/jdbc/JdbcPostgresIT.java | 8 ++++++++ .../jdbc_postgres_source_and_sink.conf | 2 +- ...dbc_postgres_source_and_sink_copy_stmt.conf | 2 +- ...jdbc_postgres_source_and_sink_parallel.conf | 6 +++--- ...s_source_and_sink_parallel_upper_lower.conf | 6 +++--- .../jdbc_postgres_source_and_sink_xa.conf | 6 +++--- 9 files changed, 35 insertions(+), 19 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java index 980dd760e9f..af2e55a4b47 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java @@ -75,7 +75,8 @@ public class PostgresTypeConverter implements TypeConverter { public static final String PG_MONEY = "money"; // char <=> character <=> bpchar - public static final String PG_CHAR = "bpchar"; + public static final String PG_CHAR = "char"; + public static final String PG_BPCHAR = "bpchar"; public static final String PG_CHARACTER = "character"; // char[] <=> _character <=> _bpchar public static final String PG_CHAR_ARRAY = "_bpchar"; @@ -189,6 +190,7 @@ public Column convert(BasicTypeDefine typeDefine) { builder.scale(2); break; case PG_CHAR: + case PG_BPCHAR: case PG_CHARACTER: builder.dataType(BasicType.STRING_TYPE); if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java index d3e35b05a22..f59972ee264 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java @@ -93,7 +93,8 @@ public class JdbcPostgresIdentifierIT extends TestSuiteBase implements TestResou + " multipolygon geometry(MULTIPOLYGON, 4326),\n" + " geometrycollection geometry(GEOMETRYCOLLECTION, 4326),\n" + " geog geography(POINT, 4326),\n" - + " inet_col INET\n" + + " inet_col INET,\n" + + " char_one_col CHAR(1)\n" + ")"; private static final String PG_SINK_DDL = "CREATE TABLE IF NOT EXISTS test.public.\"PG_IDE_SINK_TABLE\" (\n" @@ -125,7 +126,8 @@ public class JdbcPostgresIdentifierIT extends TestSuiteBase implements TestResou + " \"MULTIPOLYGON\" varchar(2000) NULL,\n" + " \"GEOMETRYCOLLECTION\" varchar(2000) NULL,\n" + " \"GEOG\" varchar(2000) NULL,\n" - + " \"INET_COL\" INET NULL\n" + + " \"INET_COL\" INET NULL,\n" + + " \"CHAR_ONE_COL\" CHAR(1) NULL\n" + " )"; private static final String SOURCE_SQL = @@ -158,7 +160,8 @@ public class JdbcPostgresIdentifierIT extends TestSuiteBase implements TestResou + "multipolygon,\n" + "geometrycollection,\n" + "geog,\n" - + "inet_col\n" + + "inet_col,\n" + + "char_one_col\n" + " from pg_ide_source_table"; private static final String SINK_SQL = "SELECT\n" @@ -190,7 +193,8 @@ public class JdbcPostgresIdentifierIT extends TestSuiteBase implements TestResou + " CAST(\"MULTIPOLYGON\" AS GEOMETRY) AS MULTILINESTRING,\n" + " CAST(\"GEOMETRYCOLLECTION\" AS GEOMETRY) AS GEOMETRYCOLLECTION,\n" + " CAST(\"GEOG\" AS GEOGRAPHY) AS GEOG,\n" - + " \"INET_COL\"\n" + + " \"INET_COL\",\n" + + " \"CHAR_ONE_COL\"\n" + "FROM\n" + " \"PG_IDE_SINK_TABLE\";"; @@ -282,7 +286,8 @@ private void initializeJdbcTable() { + " multipolygon,\n" + " geometrycollection,\n" + " geog,\n" - + " inet_col\n" + + " inet_col,\n" + + " char_one_col\n" + " )\n" + "VALUES\n" + " (\n" @@ -334,7 +339,8 @@ private void initializeJdbcTable() { + " 4326\n" + " ),\n" + " ST_GeographyFromText('POINT(-122.3452 47.5925)'),\n" - + " '192.168.1.1'\n" + + " '192.168.1.1',\n" + + " 'T'\n" + " )"); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_postgres_ide_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_postgres_ide_source_and_sink.conf index d56cf5d9816..98b30eb1cdd 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_postgres_ide_source_and_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_postgres_ide_source_and_sink.conf @@ -28,7 +28,7 @@ source{ password = "test" query ="""select gid, text_col, varchar_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, multipoint, - multilinestring, multipolygon, geometrycollection, geog,inet_col from pg_ide_source_table""" + multilinestring, multipolygon, geometrycollection, geog,inet_col,char_one_col from pg_ide_source_table""" } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java index 653f9ef2b3a..342e0317e11 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java @@ -86,6 +86,7 @@ public class JdbcPostgresIT extends TestSuiteBase implements TestResource { + " uuid_col UUID,\n" + " text_col TEXT,\n" + " varchar_col VARCHAR(255),\n" + + " char_one_col CHAR(1),\n" + " char_col CHAR(10),\n" + " boolean_col bool,\n" + " smallint_col int2,\n" @@ -121,6 +122,7 @@ public class JdbcPostgresIT extends TestSuiteBase implements TestResource { + " uuid_col UUID,\n" + " text_col TEXT,\n" + " varchar_col VARCHAR(255),\n" + + " char_one_col CHAR(1),\n" + " char_col CHAR(10),\n" + " boolean_col bool,\n" + " smallint_col int2,\n" @@ -156,6 +158,7 @@ public class JdbcPostgresIT extends TestSuiteBase implements TestResource { + "uuid_col, \n" + "text_col,\n" + "varchar_col,\n" + + "char_one_col,\n" + "char_col,\n" + "boolean_col,\n" + "smallint_col,\n" @@ -191,6 +194,7 @@ public class JdbcPostgresIT extends TestSuiteBase implements TestResource { + "uuid_col, \n" + " text_col,\n" + " varchar_col,\n" + + " char_one_col,\n" + " char_col,\n" + " boolean_col,\n" + " smallint_col,\n" @@ -392,6 +396,7 @@ private void initializeJdbcTable() { + " uuid_col,\n" + " text_col,\n" + " varchar_col,\n" + + " char_one_col,\n" + " char_col,\n" + " boolean_col,\n" + " smallint_col,\n" @@ -429,6 +434,7 @@ private void initializeJdbcTable() { + " gen_random_uuid(),\n" + " 'Hello World',\n" + " 'Test',\n" + + " 'T',\n" + " 'Testing',\n" + " true,\n" + " 10,\n" @@ -562,6 +568,7 @@ public void testCatalogForSaveMode() { + " pg_ide_sink_table_2 (gid,\n" + " text_col,\n" + " varchar_col,\n" + + " char_one_col,\n" + " char_col,\n" + " boolean_col,\n" + " smallint_col,\n" @@ -598,6 +605,7 @@ public void testCatalogForSaveMode() { + "',\n" + " 'Hello World',\n" + " 'Test',\n" + + " 'T',\n" + " 'Testing',\n" + " true,\n" + " 10,\n" diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf index 76879bc7692..459e0bcb10c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf @@ -26,7 +26,7 @@ source{ driver = "org.postgresql.Driver" user = "test" password = "test" - query ="""select gid, uuid_col, text_col, varchar_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, + query ="""select gid, uuid_col, text_col, varchar_col, char_one_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection, geog, json_col, jsonb_col,xml_col from pg_e2e_source_table""" partition_column = "varchar_col" diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf index fabfdce9ca3..d36c4f351da 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf @@ -26,7 +26,7 @@ source{ driver = "org.postgresql.Driver" user = "test" password = "test" - query ="""select gid, uuid_col, text_col, varchar_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, + query ="""select gid, uuid_col, text_col, varchar_col, char_one_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection, geog, json_col, jsonb_col,xml_col from pg_e2e_source_table""" partition_column = "varchar_col" diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf index 19df04f7047..eecc5be946c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf @@ -26,7 +26,7 @@ source{ driver = "org.postgresql.Driver" user = "test" password = "test" - query ="""select gid, uuid_col, text_col, varchar_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, + query ="""select gid, uuid_col, text_col, varchar_col, char_one_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection, geog, json_col, jsonb_col,xml_col from pg_e2e_source_table""" partition_column= "gid" @@ -45,9 +45,9 @@ sink { user = "test" password = "test" connection_check_timeout_sec = 100 - query ="""INSERT INTO pg_e2e_sink_table ( gid, uuid_col, text_col, varchar_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, + query ="""INSERT INTO pg_e2e_sink_table ( gid, uuid_col, text_col, varchar_col, char_one_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection, geog, json_col, jsonb_col,xml_col) - VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,? )""" + VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,? )""" } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf index de8d76dd2db..22d56fa568c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf @@ -26,7 +26,7 @@ source{ driver = "org.postgresql.Driver" user = "test" password = "test" - query ="""select gid, uuid_col, text_col, varchar_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, + query ="""select gid, uuid_col, text_col, varchar_col, char_one_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection, geog, json_col, jsonb_col,xml_col from pg_e2e_source_table""" partition_column= "gid" @@ -49,9 +49,9 @@ sink { user = "test" password = "test" connection_check_timeout_sec = 100 - query ="""INSERT INTO pg_e2e_sink_table ( gid, uuid_col, text_col, varchar_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, + query ="""INSERT INTO pg_e2e_sink_table ( gid, uuid_col, text_col, varchar_col, char_one_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection, geog, json_col, jsonb_col,xml_col ) - VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ,?)""" + VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ,?)""" } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf index 01fb9b1c4b9..21e1328be39 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf @@ -26,7 +26,7 @@ source { driver = "org.postgresql.Driver" user = "test" password = "test" - query ="""select gid, uuid_col, text_col, varchar_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, + query ="""select gid, uuid_col, text_col, varchar_col, char_one_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection, geog, json_col, jsonb_col ,xml_col from pg_e2e_source_table""" } @@ -42,10 +42,10 @@ sink { user = "test" password = "test" max_retries = 0 - query ="""INSERT INTO pg_e2e_sink_table ( gid, uuid_col, text_col, varchar_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, + query ="""INSERT INTO pg_e2e_sink_table ( gid, uuid_col, text_col, varchar_col, char_one_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection, geog, json_col, jsonb_col ,xml_col) - VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)""" + VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)""" is_exactly_once = "true"