diff --git a/include/PgSQL_Connection.h b/include/PgSQL_Connection.h index 20bbcab6f..56324bd24 100644 --- a/include/PgSQL_Connection.h +++ b/include/PgSQL_Connection.h @@ -465,7 +465,6 @@ class PgSQL_Connection_Placeholder { bool AutocommitFalse_AndSavepoint(); bool MultiplexDisabled(bool check_delay_token = true); bool IsKeepMultiplexEnabledVariables(char *query_digest_text); - void ProcessQueryAndSetStatusFlags(char *query_digest_text); void optimize(); void close_mysql(); @@ -491,6 +490,7 @@ class PgSQL_Connection_Placeholder { bool IsKnownActiveTransaction() { assert(0); return false; } bool IsActiveTransaction() { assert(0); return false; } PG_ASYNC_ST handler(short event) { assert(0); return ASYNC_IDLE; } + void ProcessQueryAndSetStatusFlags(char* query_digest_text); /********* End of remove ******************/ }; @@ -617,6 +617,7 @@ class PgSQL_Connection : public PgSQL_Connection_Placeholder { void optimize() {} void update_bytes_recv(uint64_t bytes_recv); void update_bytes_sent(uint64_t bytes_sent); + void ProcessQueryAndSetStatusFlags(char* query_digest_text); inline const PGconn* get_pg_connection() const { return pgsql_conn; } inline int get_pg_server_version() { return PQserverVersion(pgsql_conn); } @@ -665,6 +666,8 @@ class PgSQL_Connection : public PgSQL_Connection_Placeholder { // Handles the COPY OUT response from the server. // Returns true if it consumes all buffer data, or false if the threshold for result size is reached bool handle_copy_out(const PGresult* result, uint64_t* processed_bytes); + static void notice_handler_cb(void* arg, const PGresult* result); + static void unhandled_notice_cb(void* arg, const PGresult* result); }; #endif /* __CLASS_PGSQL_CONNECTION_H */ diff --git a/include/PgSQL_Protocol.h b/include/PgSQL_Protocol.h index 026449c9d..041739afe 100644 --- a/include/PgSQL_Protocol.h +++ b/include/PgSQL_Protocol.h @@ -291,7 +291,7 @@ class PgSQL_Protocol; #define PGSQL_QUERY_RESULT_ERROR 0x08 #define PGSQL_QUERY_RESULT_EMPTY 0x10 #define PGSQL_QUERY_RESULT_COPY_OUT 0x20 -#define PGSQL_QUERY_RESULT_COPY_IN 0x30 +#define PGSQL_QUERY_RESULT_NOTICE 0x40 class PgSQL_Query_Result { public: @@ -471,6 +471,8 @@ class PgSQL_Query_Result { */ unsigned int add_copy_out_response_end(); + unsigned int add_notice(const PGresult* result); + /** * @brief Retrieves the query result set and copies it to a PtrSizeArray. * @@ -829,11 +831,11 @@ class PgSQL_Protocol : public MySQL_Protocol { unsigned int copy_command_completion_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result, bool extract_affected_rows); /** - * @brief Copies an error message from a PGresult to a PgSQL_Query_Result. + * @brief Copies an error/notice message from a PGresult to a PgSQL_Query_Result. * - * This function copies an error message from a `PGresult` object (typically - * obtained from libpq) to a `PgSQL_Query_Result` object. The error message - * contains information about an error that occurred during query execution. + * This function copies an error/notice message from a `PGresult` object (typically + * obtained from libpq) to a `PgSQL_Query_Result` object. The message + * contains information about an error/notice that occurred during query execution. * * @param send A boolean flag indicating whether to send the generated packet * immediately or just generate it. (Currently not supported). @@ -841,14 +843,15 @@ class PgSQL_Protocol : public MySQL_Protocol { * error message will be copied. * @param result A pointer to the `PGresult` object containing the error * message to be copied. - * + * @param is_error A boolean flag indicating whether the message is an error or a notice. + * * @return The number of bytes copied to the `PgSQL_Query_Result` object. * * @note This function extracts the various error fields (severity, code, * message, detail, etc.) from the `PGresult` object and copies them * to the `PgSQL_Query_Result` object. */ - unsigned int copy_error_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result); + unsigned int copy_error_notice_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result, bool is_error); /** * @brief Copies an empty query response from a PGresult to a diff --git a/lib/MySQL_Protocol.cpp b/lib/MySQL_Protocol.cpp index ee8e2f57b..f64e837cd 100644 --- a/lib/MySQL_Protocol.cpp +++ b/lib/MySQL_Protocol.cpp @@ -1595,6 +1595,10 @@ int MySQL_Protocol::PPHR_1(unsigned char *pkt, unsigned int len, bool& ret, MyPr // this function was inline in process_pkt_handshake_response() , split for readibility bool MySQL_Protocol::PPHR_2(unsigned char *pkt, unsigned int len, bool& ret, MyProt_tmp_auth_vars& vars1) { // process_pkt_handshake_response inner 2 + + // if packet length is less than 4, it's a malformed packet. + if ((len - sizeof(mysql_hdr)) < 4) return false; + vars1.capabilities = CPY4(pkt); // see bug #2916. If CLIENT_MULTI_STATEMENTS is set by the client // we enforce setting CLIENT_MULTI_RESULTS, this is the proper and expected diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 445a3a240..c529cfbdd 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -3954,13 +3954,6 @@ int MySQL_Session::get_pkts_from_client(bool& wrong_pass, PtrSize_t& pkt) { client_myds->PSarrayIN->remove_index(0,&pkt); } - if (pkt.size <= sizeof(mysql_hdr)) { - proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Malformed packet received\n"); - l_free(pkt.size, pkt.ptr); - handler_ret = -1; - return handler_ret; - } - switch (status) { case WAITING_CLIENT_DATA: if (pkt.size==(0xFFFFFF+sizeof(mysql_hdr))) { // we are handling a multi-packet diff --git a/lib/PgSQL_Connection.cpp b/lib/PgSQL_Connection.cpp index 0ee2bc82d..1282d0d91 100644 --- a/lib/PgSQL_Connection.cpp +++ b/lib/PgSQL_Connection.cpp @@ -1185,171 +1185,6 @@ bool PgSQL_Connection_Placeholder::IsKeepMultiplexEnabledVariables(char *query_d return true; } -void PgSQL_Connection_Placeholder::ProcessQueryAndSetStatusFlags(char *query_digest_text) { - if (query_digest_text==NULL) return; - // unknown what to do with multiplex - int mul=-1; - if (myds) { - if (myds->sess) { - if (myds->sess->qpo) { - mul=myds->sess->qpo->multiplex; - if (mul==0) { - set_status(true, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX); - } else { - if (mul==1) { - set_status(false, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX); - } - } - } - } - } - // checking warnings and disabling multiplexing will be effective only when the pgsql-query_digests is enabled - if (get_status(STATUS_MYSQL_CONNECTION_HAS_WARNINGS) == false) { - if (warning_count > 0) { - // 'warning_in_hg' will be used if the next query is 'SHOW WARNINGS' or - // 'SHOW COUNT(*) WARNINGS' - if (myds && myds->sess) - myds->sess->warning_in_hg = myds->sess->current_hostgroup; - // enabling multiplexing - set_status(true, STATUS_MYSQL_CONNECTION_HAS_WARNINGS); - } - } else { // reset warning_in_hg - const char* dig = query_digest_text; - const size_t dig_len = strlen(dig); - // disable multiplexing and reset the 'warning_in_hg' flag only when the current executed query is not - // 'SHOW WARNINGS' or 'SHOW COUNT(*) WARNINGS', as these queries do not clear the warning message list - // on backend. - if (!((dig_len == 22 && strncasecmp(dig, "SHOW COUNT(*) WARNINGS", 22) == 0) || - (dig_len == 13 && strncasecmp(dig, "SHOW WARNINGS", 13) == 0))) { - if (myds && myds->sess) - myds->sess->warning_in_hg = -1; - warning_count = 0; - // disabling multiplexing - set_status(false, STATUS_MYSQL_CONNECTION_HAS_WARNINGS); - } - } - - if (get_status(STATUS_MYSQL_CONNECTION_USER_VARIABLE)==false) { // we search for variables only if not already set -// if ( -// strncasecmp(query_digest_text,"SELECT @@tx_isolation", strlen("SELECT @@tx_isolation")) -// && -// strncasecmp(query_digest_text,"SELECT @@version", strlen("SELECT @@version")) - if (strncasecmp(query_digest_text,"SET ",4)==0) { - // For issue #555 , multiplexing is disabled if --safe-updates is used (see session_vars definition) - int sqloh = pgsql_thread___set_query_lock_on_hostgroup; - switch (sqloh) { - case 0: // old algorithm - if (mul!=2) { - if (index(query_digest_text,'@')) { // mul = 2 has a special meaning : do not disable multiplex for variables in THIS QUERY ONLY - if (!IsKeepMultiplexEnabledVariables(query_digest_text)) { - set_status(true, STATUS_MYSQL_CONNECTION_USER_VARIABLE); - } -/* deprecating session_vars[] because we are introducing a better algorithm - } else { - for (unsigned int i = 0; i < sizeof(session_vars)/sizeof(char *); i++) { - if (strcasestr(query_digest_text,session_vars[i])!=NULL) { - set_status(true, STATUS_MYSQL_CONNECTION_USER_VARIABLE); - break; - } - } -*/ - } - } - break; - case 1: // new algorithm - if (myds->sess->locked_on_hostgroup > -1) { - // locked_on_hostgroup was set, so some variable wasn't parsed - set_status(true, STATUS_MYSQL_CONNECTION_USER_VARIABLE); - } - break; - default: - break; - } - } else { - if (mul!=2 && index(query_digest_text,'@')) { // mul = 2 has a special meaning : do not disable multiplex for variables in THIS QUERY ONLY - if (!IsKeepMultiplexEnabledVariables(query_digest_text)) { - set_status(true, STATUS_MYSQL_CONNECTION_USER_VARIABLE); - } - } - } - } - if (get_status(STATUS_MYSQL_CONNECTION_PREPARED_STATEMENT)==false) { // we search if prepared was already executed - if (!strncasecmp(query_digest_text,"PREPARE ", strlen("PREPARE "))) { - set_status(true, STATUS_MYSQL_CONNECTION_PREPARED_STATEMENT); - } - } - if (get_status(STATUS_MYSQL_CONNECTION_TEMPORARY_TABLE)==false) { // we search for temporary if not already set - if (!strncasecmp(query_digest_text,"CREATE TEMPORARY TABLE ", strlen("CREATE TEMPORARY TABLE "))) { - set_status(true, STATUS_MYSQL_CONNECTION_TEMPORARY_TABLE); - } - } - if (get_status(STATUS_MYSQL_CONNECTION_LOCK_TABLES)==false) { // we search for lock tables only if not already set - if (!strncasecmp(query_digest_text,"LOCK TABLE", strlen("LOCK TABLE"))) { - set_status(true, STATUS_MYSQL_CONNECTION_LOCK_TABLES); - } - } - if (get_status(STATUS_MYSQL_CONNECTION_LOCK_TABLES)==false) { // we search for lock tables only if not already set - if (!strncasecmp(query_digest_text,"FLUSH TABLES WITH READ LOCK", strlen("FLUSH TABLES WITH READ LOCK"))) { // issue 613 - set_status(true, STATUS_MYSQL_CONNECTION_LOCK_TABLES); - } - } - if (get_status(STATUS_MYSQL_CONNECTION_LOCK_TABLES)==true) { - if (!strncasecmp(query_digest_text,"UNLOCK TABLES", strlen("UNLOCK TABLES"))) { - set_status(false, STATUS_MYSQL_CONNECTION_LOCK_TABLES); - } - } - if (get_status(STATUS_MYSQL_CONNECTION_GET_LOCK)==false) { // we search for get_lock if not already set - if (strcasestr(query_digest_text,"GET_LOCK(")) { - set_status(true, STATUS_MYSQL_CONNECTION_GET_LOCK); - } - } - if (get_status(STATUS_MYSQL_CONNECTION_FOUND_ROWS)==false) { // we search for SQL_CALC_FOUND_ROWS if not already set - if (strcasestr(query_digest_text,"SQL_CALC_FOUND_ROWS")) { - set_status(true, STATUS_MYSQL_CONNECTION_FOUND_ROWS); - } - } - if (get_status(STATUS_MYSQL_CONNECTION_HAS_SAVEPOINT)==false) { - if (pgsql) { - if ( - (pgsql->server_status & SERVER_STATUS_IN_TRANS) - || - ((pgsql->server_status & SERVER_STATUS_AUTOCOMMIT) == 0) - ) { - if (!strncasecmp(query_digest_text,"SAVEPOINT ", strlen("SAVEPOINT "))) { - set_status(true, STATUS_MYSQL_CONNECTION_HAS_SAVEPOINT); - } - } - } - } else { - if ( // get_status(STATUS_MYSQL_CONNECTION_HAS_SAVEPOINT) == true - ( - // make sure we don't have a transaction running - // checking just for COMMIT and ROLLBACK is not enough, because `SET autocommit=1` can commit too - (pgsql->server_status & SERVER_STATUS_AUTOCOMMIT) - && - ( (pgsql->server_status & SERVER_STATUS_IN_TRANS) == 0 ) - ) - || - (strcasecmp(query_digest_text,"COMMIT") == 0) - || - (strcasecmp(query_digest_text,"ROLLBACK") == 0) - ) { - set_status(false, STATUS_MYSQL_CONNECTION_HAS_SAVEPOINT); - } - } - if (pgsql) { - if (myds && myds->sess) { - if (myds->sess->client_myds && myds->sess->client_myds->myconn) { - // if SERVER_STATUS_NO_BACKSLASH_ESCAPES is changed it is likely - // because of sql_mode was changed - // we set the same on the client connection - unsigned int ss = pgsql->server_status & SERVER_STATUS_NO_BACKSLASH_ESCAPES; - myds->sess->client_myds->myconn->set_no_backslash_escapes(ss); - } - } - } -} - void PgSQL_Connection_Placeholder::optimize() { if (pgsql->net.max_packet > 65536) { // FIXME: temporary, maybe for very long time . This needs to become a global variable if ( ( pgsql->net.buff == pgsql->net.read_pos ) && ( pgsql->net.read_pos == pgsql->net.write_pos ) ) { @@ -1961,6 +1796,7 @@ PG_ASYNC_ST PgSQL_Connection::handler(short event) { } else { unknown_transaction_status = false; } + PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::unhandled_notice_cb, this); // should be NULL assert(!pgsql_result); assert(!is_copy_out); @@ -2198,6 +2034,9 @@ void PgSQL_Connection::query_start() { reset_error(); processing_multi_statement = false; async_exit_status = PG_EVENT_NONE; + + PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::notice_handler_cb, this); + if (PQsendQuery(pgsql_conn, query.ptr) == 0) { // WARNING: DO NOT RELEASE this PGresult const PGresult* result = PQgetResultFromPGconn(pgsql_conn); @@ -3026,3 +2865,151 @@ bool PgSQL_Connection::handle_copy_out(const PGresult* result, uint64_t* process return true; } + +void PgSQL_Connection::notice_handler_cb(void* arg, const PGresult* result) { + assert(arg); + PgSQL_Connection* conn = (PgSQL_Connection*)arg; + const unsigned int bytes_recv = conn->query_result->add_notice(result); + conn->update_bytes_recv(bytes_recv); +} + +void PgSQL_Connection::unhandled_notice_cb(void* arg, const PGresult* result) { + assert(arg); + PgSQL_Connection* conn = (PgSQL_Connection*)arg; + proxy_error("Unhandled notice: '%s' received from backend [PID: %d] (Host: %s, Port: %d, User: %s, FD: %d, State: %d). Please report this issue for further investigation and enhancements.\n", + PQresultErrorMessage(result), conn->get_pg_backend_pid(), conn->get_pg_host(), atoi(conn->get_pg_port()), conn->get_pg_user(), conn->get_pg_socket_fd(), (int)conn->async_state_machine); +#ifdef DEBUG + assert(0); +#endif +} + +void PgSQL_Connection::ProcessQueryAndSetStatusFlags(char* query_digest_text) { + if (query_digest_text == NULL) return; + // unknown what to do with multiplex + int mul = -1; + if (myds) { + if (myds->sess) { + if (myds->sess->qpo) { + mul = myds->sess->qpo->multiplex; + if (mul == 0) { + set_status(true, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX); + } + else { + if (mul == 1) { + set_status(false, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX); + } + } + } + } + } + + if (get_status(STATUS_MYSQL_CONNECTION_USER_VARIABLE) == false) { // we search for variables only if not already set + if (strncasecmp(query_digest_text, "SET ", 4) == 0) { + // For issue #555 , multiplexing is disabled if --safe-updates is used (see session_vars definition) + int sqloh = pgsql_thread___set_query_lock_on_hostgroup; + switch (sqloh) { + case 0: // old algorithm + if (mul != 2) { + if (index(query_digest_text, '@')) { // mul = 2 has a special meaning : do not disable multiplex for variables in THIS QUERY ONLY + if (!IsKeepMultiplexEnabledVariables(query_digest_text)) { + set_status(true, STATUS_MYSQL_CONNECTION_USER_VARIABLE); + } + } + } + break; + case 1: // new algorithm + if (myds->sess->locked_on_hostgroup > -1) { + // locked_on_hostgroup was set, so some variable wasn't parsed + set_status(true, STATUS_MYSQL_CONNECTION_USER_VARIABLE); + } + break; + default: + break; + } + } + else { + if (mul != 2 && index(query_digest_text, '@')) { // mul = 2 has a special meaning : do not disable multiplex for variables in THIS QUERY ONLY + if (!IsKeepMultiplexEnabledVariables(query_digest_text)) { + set_status(true, STATUS_MYSQL_CONNECTION_USER_VARIABLE); + } + } + } + } + if (get_status(STATUS_MYSQL_CONNECTION_PREPARED_STATEMENT) == false) { // we search if prepared was already executed + if (!strncasecmp(query_digest_text, "PREPARE ", strlen("PREPARE "))) { + set_status(true, STATUS_MYSQL_CONNECTION_PREPARED_STATEMENT); + } + } + if (get_status(STATUS_MYSQL_CONNECTION_TEMPORARY_TABLE) == false) { // we search for temporary if not already set + if (!strncasecmp(query_digest_text, "CREATE TEMPORARY TABLE ", strlen("CREATE TEMPORARY TABLE ")) || + !strncasecmp(query_digest_text, "CREATE TEMP TABLE ", strlen("CREATE TEMP TABLE "))) { + set_status(true, STATUS_MYSQL_CONNECTION_TEMPORARY_TABLE); + } + } + if (get_status(STATUS_MYSQL_CONNECTION_LOCK_TABLES) == false) { // we search for lock tables only if not already set + if (!strncasecmp(query_digest_text, "LOCK TABLE", strlen("LOCK TABLE"))) { + set_status(true, STATUS_MYSQL_CONNECTION_LOCK_TABLES); + } + } + if (get_status(STATUS_MYSQL_CONNECTION_LOCK_TABLES) == false) { // we search for lock tables only if not already set + if (!strncasecmp(query_digest_text, "FLUSH TABLES WITH READ LOCK", strlen("FLUSH TABLES WITH READ LOCK"))) { // issue 613 + set_status(true, STATUS_MYSQL_CONNECTION_LOCK_TABLES); + } + } + if (get_status(STATUS_MYSQL_CONNECTION_LOCK_TABLES) == true) { + if (!strncasecmp(query_digest_text, "UNLOCK TABLES", strlen("UNLOCK TABLES"))) { + set_status(false, STATUS_MYSQL_CONNECTION_LOCK_TABLES); + } + } + if (get_status(STATUS_MYSQL_CONNECTION_GET_LOCK) == false) { // we search for get_lock if not already set + if (strcasestr(query_digest_text, "GET_LOCK(")) { + set_status(true, STATUS_MYSQL_CONNECTION_GET_LOCK); + } + } + /*if (get_status(STATUS_MYSQL_CONNECTION_FOUND_ROWS) == false) { // we search for SQL_CALC_FOUND_ROWS if not already set + if (strcasestr(query_digest_text, "SQL_CALC_FOUND_ROWS")) { + set_status(true, STATUS_MYSQL_CONNECTION_FOUND_ROWS); + } + }*/ + if (get_status(STATUS_MYSQL_CONNECTION_HAS_SAVEPOINT) == false) { + if (pgsql) { + if ( + (pgsql->server_status & SERVER_STATUS_IN_TRANS) + || + ((pgsql->server_status & SERVER_STATUS_AUTOCOMMIT) == 0) + ) { + if (!strncasecmp(query_digest_text, "SAVEPOINT ", strlen("SAVEPOINT "))) { + set_status(true, STATUS_MYSQL_CONNECTION_HAS_SAVEPOINT); + } + } + } + } + else { + if ( // get_status(STATUS_MYSQL_CONNECTION_HAS_SAVEPOINT) == true + ( + // make sure we don't have a transaction running + // checking just for COMMIT and ROLLBACK is not enough, because `SET autocommit=1` can commit too + (pgsql->server_status & SERVER_STATUS_AUTOCOMMIT) + && + ((pgsql->server_status & SERVER_STATUS_IN_TRANS) == 0) + ) + || + (strcasecmp(query_digest_text, "COMMIT") == 0) + || + (strcasecmp(query_digest_text, "ROLLBACK") == 0) + ) { + set_status(false, STATUS_MYSQL_CONNECTION_HAS_SAVEPOINT); + } + } + /*if (pgsql) { + if (myds && myds->sess) { + if (myds->sess->client_myds && myds->sess->client_myds->myconn) { + // if SERVER_STATUS_NO_BACKSLASH_ESCAPES is changed it is likely + // because of sql_mode was changed + // we set the same on the client connection + unsigned int ss = pgsql->server_status & SERVER_STATUS_NO_BACKSLASH_ESCAPES; + myds->sess->client_myds->myconn->set_no_backslash_escapes(ss); + } + } + }*/ +} diff --git a/lib/PgSQL_Protocol.cpp b/lib/PgSQL_Protocol.cpp index 4e085d763..95cab93c9 100644 --- a/lib/PgSQL_Protocol.cpp +++ b/lib/PgSQL_Protocol.cpp @@ -1421,7 +1421,7 @@ unsigned int PgSQL_Protocol::copy_row_description_to_PgSQL_Query_Result(bool sen // if (dump_pkt) { __dump_pkt(__func__, _ptr, size); } //#endif - pg_query_result->resultset_size = size; + pg_query_result->resultset_size += size; if (alloced_new_buffer) { // we created new buffer @@ -1544,7 +1544,7 @@ unsigned int PgSQL_Protocol::copy_command_completion_to_PgSQL_Query_Result(bool return size; } -unsigned int PgSQL_Protocol::copy_error_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result) { +unsigned int PgSQL_Protocol::copy_error_notice_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result, bool is_error) { assert(pg_query_result); assert(result); @@ -1599,7 +1599,7 @@ unsigned int PgSQL_Protocol::copy_error_to_PgSQL_Query_Result(bool send, PgSQL_Q PG_pkt pgpkt(_ptr, size); - pgpkt.put_char('E'); + pgpkt.put_char(is_error ? 'E' : 'N'); pgpkt.put_uint32(size - 1); if (severity) { pgpkt.put_char('S'); @@ -1843,7 +1843,7 @@ unsigned int PgSQL_Protocol::copy_out_response_start_to_PgSQL_Query_Result(bool // if (dump_pkt) { __dump_pkt(__func__, _ptr, size); } //#endif - pg_query_result->resultset_size = size; + pg_query_result->resultset_size += size; if (alloced_new_buffer) { // we created new buffer @@ -2016,12 +2016,17 @@ unsigned int PgSQL_Query_Result::add_copy_out_response_end() { return res; } +unsigned int PgSQL_Query_Result::add_notice(const PGresult* result) { + const unsigned int res = proto->copy_error_notice_to_PgSQL_Query_Result(false, this, result, false); + result_packet_type |= PGSQL_QUERY_RESULT_NOTICE; + return res; +} unsigned int PgSQL_Query_Result::add_error(const PGresult* result) { unsigned int size = 0; if (result) { - size = proto->copy_error_to_PgSQL_Query_Result(false, this, result); + size = proto->copy_error_notice_to_PgSQL_Query_Result(false, this, result, true); PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::proxysql, conn->parent->myhgc->hid, conn->parent->address, conn->parent->port, 1907); } else { diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index ff8ec5523..030eb0e64 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -6260,7 +6260,10 @@ void PgSQL_Session::PgSQL_Result_to_PgSQL_wire(PgSQL_Connection* _conn, PgSQL_Da if (query_result && query_result->get_result_packet_type() != PGSQL_QUERY_RESULT_NO_DATA) { bool transfer_started = query_result->is_transfer_started(); // if there is an error, it will be false so results are not cached - bool is_tuple = query_result->get_result_packet_type() == (PGSQL_QUERY_RESULT_TUPLE | PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_READY); + bool is_tuple = ( + (query_result->get_result_packet_type() == (PGSQL_QUERY_RESULT_TUPLE | PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_READY)) || + (query_result->get_result_packet_type() == (PGSQL_QUERY_RESULT_NOTICE | PGSQL_QUERY_RESULT_TUPLE | PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_READY)) + ); const uint64_t num_rows = query_result->get_num_rows(); const uint64_t resultset_size = query_result->get_resultset_size(); const auto _affected_rows = query_result->get_affected_rows(); diff --git a/test/tap/tests/mysql-test_malformed_packet-t.cpp b/test/tap/tests/mysql-test_malformed_packet-t.cpp new file mode 100644 index 000000000..3f6670265 --- /dev/null +++ b/test/tap/tests/mysql-test_malformed_packet-t.cpp @@ -0,0 +1,174 @@ +/** + * @file mysql-test_malformed_packet-t.cpp + * @brief Validates ProxySQL's stability and ensures it does not crash when subjected to + * multiple malformed packets on its admin and backend connections. + */ +#include +#include +#include +#include "mysql.h" +#include "tap.h" +#include "command_line.h" +#include "utils.h" + +constexpr size_t BUFFER_SIZE = 1024; + +#define REPORT_ERROR_AND_EXIT(fmt, ...) \ + do { \ + fprintf(stderr, "File %s, line %d: " fmt "\n", __FILE__, __LINE__, ##__VA_ARGS__); \ + close(sock); \ + return; \ + } while (0) + +typedef enum { + BACKEND = 0, + ADMIN +} Connection_type_t; + +void execute_test(MYSQL* conn, const std::string& host, int port, const std::vector& data) { + int sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) { + REPORT_ERROR_AND_EXIT("Socket creation failed"); + } + + struct timeval timeout; + // Set the timeout for receive operations + timeout.tv_sec = 60; + timeout.tv_usec = 0; + if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) < 0) { + REPORT_ERROR_AND_EXIT("Failed to set socket timeout"); + } + + sockaddr_in server_addr{}; + server_addr.sin_family = AF_INET; + server_addr.sin_port = htons(port); + + if (inet_pton(AF_INET, host.c_str(), &server_addr.sin_addr) <= 0) { + REPORT_ERROR_AND_EXIT("Invalid address or address not supported"); + } + + if (connect(sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) { + REPORT_ERROR_AND_EXIT("Connection to the server failed"); + } + + diag("Connected to the server. Waiting for server greeting..."); + + std::vector buffer(BUFFER_SIZE); + ssize_t bytes_received = recv(sock, buffer.data(), buffer.size(), 0); + if (bytes_received < 0) { + REPORT_ERROR_AND_EXIT("Failed to receive server greeting"); + } + + diag("Server greeting received (length: %ld bytes).", bytes_received); + + diag("Sending malformed packet to the server..."); + ssize_t bytes_sent = send(sock, data.data(), data.size(), 0); + if (bytes_sent < 0) { + REPORT_ERROR_AND_EXIT("Failed to send data"); + } + diag("Done"); + + bytes_received = recv(sock, buffer.data(), buffer.size(), 0); + ok(bytes_received == 0, "Connection closed by server"); + close(sock); + + usleep(1000000); // 1 second delay + + bool query_success = false; + + if (mysql_query(conn, "SELECT 1")) { + fprintf(stderr, "mysql_query() failed: %s\n", mysql_error(conn)); + } else { + + MYSQL_RES* result = mysql_store_result(conn); + if (result == nullptr) { + fprintf(stderr, "mysql_store_result() failed: %s\n", mysql_error(conn)); + } + else { + MYSQL_ROW row = mysql_fetch_row(result); + if (row && strcmp(row[0], "1") == 0) query_success = true; + mysql_free_result(result); + } + } + + ok(query_success, "ProxySQL should be alive. %s", mysql_error(conn)); +} + +MYSQL* setup_mysql_connection(const CommandLine& cl, Connection_type_t conn_type) { + MYSQL* conn = mysql_init(nullptr); + if (conn == nullptr) { + fprintf(stderr, "File %s, line %d, Error: mysql_init() failed\n", __FILE__, __LINE__); + return nullptr; + } + + if (conn_type == ADMIN) { + if (!mysql_real_connect(conn, cl.admin_host, cl.admin_username, cl.admin_password, nullptr, cl.admin_port, nullptr, 0)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(conn)); + mysql_close(conn); + return nullptr; + } + } else { + if (!mysql_real_connect(conn, cl.host, cl.username, cl.password, nullptr, cl.port, nullptr, 0)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(conn)); + mysql_close(conn); + return nullptr; + } + } + + return conn; +} + +int main(int argc, char** argv) { + CommandLine cl; + + if (cl.getEnv()) { + diag("Failed to get the required environmental variables."); + return EXIT_FAILURE; + } + + std::vector> malformed_pkts = { + {0x01, 0x00}, + {0xFF, 0xFF, 0xFF, 0x00, 0x00, 0xFE, 0x00, 0x00}, + {0x03, 0x00, 0x00, 0xFF, 0x00}, + {0x10, 0x00, 0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE, 0x11, 0x22, 0x33, 0x44, 0x55}, + {0x03, 0x00, 0x00, 0x00, 0xFF, 0x00}, + {0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00}, + {0x05, 0x00, 0x00, 0x00, 0xFF, 0x00, 0x00, 0x00}, + {0x04, 0x00, 0x00, 0xFF, 0x00, 0x00, 0x00}, + {0x03, 0x00, 0x00, 0x2F, 0x2A, 0xE0, 0x00}, + }; + + plan(malformed_pkts.size() * 4); + + { + diag(">>> Sending malformed packets to BACKEND connection <<<"); + MYSQL* conn = setup_mysql_connection(cl, BACKEND); + if (conn == nullptr) { + return EXIT_FAILURE; + } + + for (const auto& pkt : malformed_pkts) { + execute_test(conn, cl.host, cl.port, pkt); + } + + mysql_close(conn); + diag("Done"); + } + + { + diag(">>> Sending malformed packets to ADMIN connection <<<"); + MYSQL* conn = setup_mysql_connection(cl, ADMIN); + if (conn == nullptr) { + return EXIT_FAILURE; + } + + for (const auto& pkt : malformed_pkts) { + execute_test(conn, cl.host, cl.port, pkt); + } + + mysql_close(conn); + diag("Done"); + } + + return exit_status(); +} diff --git a/test/tap/tests/pgsql-notice_test-t.cpp b/test/tap/tests/pgsql-notice_test-t.cpp new file mode 100644 index 000000000..b1199880b --- /dev/null +++ b/test/tap/tests/pgsql-notice_test-t.cpp @@ -0,0 +1,140 @@ +/** + * @file pgsql-notice_test-t.cpp + * @brief This TAP test validates handling of PostgreSQL notices in ProxySQL. + */ + +#include +#include +#include +#include +#include +#include "libpq-fe.h" +#include "command_line.h" +#include "tap.h" +#include "utils.h" + +CommandLine cl; + +using PGConnPtr = std::unique_ptr; + +enum ConnType { + ADMIN, + BACKEND +}; + +PGConnPtr createNewConnection(ConnType conn_type, bool with_ssl) { + + const char* host = (conn_type == BACKEND) ? cl.pgsql_host : cl.pgsql_admin_host; + int port = (conn_type == BACKEND) ? cl.pgsql_port : cl.pgsql_admin_port; + const char* username = (conn_type == BACKEND) ? cl.pgsql_username : cl.admin_username; + const char* password = (conn_type == BACKEND) ? cl.pgsql_password : cl.admin_password; + + std::stringstream ss; + + ss << "host=" << host << " port=" << port; + ss << " user=" << username << " password=" << password; + ss << (with_ssl ? " sslmode=require" : " sslmode=disable"); + + PGconn* conn = PQconnectdb(ss.str().c_str()); + if (PQstatus(conn) != CONNECTION_OK) { + fprintf(stderr, "Connection failed to '%s': %s", (conn_type == BACKEND ? "Backend" : "Admin"), PQerrorMessage(conn)); + PQfinish(conn); + return PGConnPtr(nullptr, &PQfinish); + } + return PGConnPtr(conn, &PQfinish); +} + +bool executeQuery(PGconn* conn, const char* query) { + + diag("Running: %s", query); + PGresult* res = PQexec(conn, query); + bool success = PQresultStatus(res) == PGRES_COMMAND_OK || + PQresultStatus(res) == PGRES_TUPLES_OK; + if (!success) { + diag("Failed to execute query '%s': %s", + query, PQerrorMessage(conn)); + PQclear(res); + return false; + } + PQclear(res); + return true; +} + +void testNoticeAndWarningHandling(PGconn* admin_conn, PGconn* backend_conn) { + // Set up a notice processor to capture notices + std::vector notices; + auto noticeProcessor = [](void* arg, const char* message) { + auto* notices = static_cast*>(arg); + notices->emplace_back(message); + }; + + PQsetNoticeProcessor(backend_conn, noticeProcessor, ¬ices); + + // Execute a query that generates a notice + const char* noticeQuery = "DO $$ BEGIN RAISE NOTICE 'This is a test notice'; END $$;"; + + if (!executeQuery(backend_conn, noticeQuery)) + return; + + // Check if the notice was captured + ok(notices.size() == 1 && notices[0].find("This is a test notice") != std::string::npos, "Notice message was generated"); + + // Execute a query that generates a warning + const char* warningQuery = "DO $$ BEGIN RAISE WARNING 'This is a test warning'; END $$;"; + if (!executeQuery(backend_conn, warningQuery)) + return; + + // Check if the warning was captured + ok(notices.size() == 2 && notices[1].find("This is a test warning") != std::string::npos, "Warning message was generated"); +} + +std::vector> tests = { + { "Notice and Warning Handling Test", testNoticeAndWarningHandling } +}; + +void execute_tests(bool with_ssl, bool diff_conn) { + + if (diff_conn == false) { + PGConnPtr admin_conn = createNewConnection(ConnType::ADMIN, with_ssl); + PGConnPtr backend_conn = createNewConnection(ConnType::BACKEND, with_ssl); + + if (!admin_conn || !backend_conn) { + BAIL_OUT("Error: failed to connect to the database in file %s, line %d\n", __FILE__, __LINE__); + return; + } + + for (const auto& test : tests) { + diag(">>>> Running %s - Shared Connection: %s <<<<", test.first.c_str(), !diff_conn ? "True" : "False"); + test.second(admin_conn.get(), backend_conn.get()); + diag(">>>> Done <<<<"); + } + } + else { + for (const auto& test : tests) { + diag(">>>> Running %s - Shared Connection: %s <<<<", test.first.c_str(), diff_conn ? "False" : "True"); + + PGConnPtr admin_conn = createNewConnection(ConnType::ADMIN, with_ssl); + PGConnPtr backend_conn = createNewConnection(ConnType::BACKEND, with_ssl); + + if (!admin_conn || !backend_conn) { + BAIL_OUT("Error: failed to connect to the database in file %s, line %d\n", __FILE__, __LINE__); + return; + } + test.second(admin_conn.get(), backend_conn.get()); + diag(">>>> Done <<<<"); + } + } +} + +int main(int argc, char** argv) { + + plan(2 * 2); // Total number of tests planned + + if (cl.getEnv()) + return exit_status(); + + execute_tests(true, false); + execute_tests(false, false); + + return exit_status(); +} diff --git a/test/tap/tests/pgsql-query_cache_test-t.cpp b/test/tap/tests/pgsql-query_cache_test-t.cpp index d9ef758cd..3597626bc 100644 --- a/test/tap/tests/pgsql-query_cache_test-t.cpp +++ b/test/tap/tests/pgsql-query_cache_test-t.cpp @@ -817,6 +817,100 @@ void execute_query_cache_store_empty_result_test(PGconn* admin_conn, PGconn* con return; } +void execute_query_cache_notice_test(PGconn* admin_conn, PGconn* conn) { + + if (!executeQueries(admin_conn, { + "DELETE FROM pgsql_query_rules", + "INSERT INTO pgsql_query_rules (rule_id,active,match_digest,cache_ttl) VALUES (2,1,'^SELECT',4000)", + "LOAD PGSQL QUERY RULES TO RUNTIME", + "UPDATE global_variables SET variable_value=0 WHERE variable_name='pgsql-query_cache_soft_ttl_pct'", + "LOAD PGSQL VARIABLES TO RUNTIME" + })) + return; + + const char* create_function_query = + "CREATE OR REPLACE FUNCTION select_and_warn() RETURNS integer AS $$" + "DECLARE " + " result integer; " + "BEGIN " + " SELECT 1 INTO result; " + " RAISE WARNING 'This is a warning message'; " + " RETURN result; " + "END; " + "$$ LANGUAGE plpgsql;"; + + if (!executeQueries(conn, { create_function_query })) + return; + + metrics.before = getQueryCacheMetrics(admin_conn); + + if (!executeQueries(conn, { "SELECT select_and_warn()" })) + return; + + metrics.after = getQueryCacheMetrics(admin_conn); + + printQueryCacheMetrics(); + + // difference query cache metrics + checkMetricDelta<>("Query_Cache_Memory_bytes", 1, std::greater()); + checkMetricDelta<>("Query_Cache_count_GET", 1, std::equal_to()); + checkMetricDelta<>("Query_Cache_count_GET_OK", 0, std::equal_to()); + checkMetricDelta<>("Query_Cache_count_SET", 1, std::equal_to()); + checkMetricDelta<>("Query_Cache_bytes_IN", 1, std::greater()); + checkMetricDelta<>("Query_Cache_bytes_OUT", 0, std::equal_to()); + checkMetricDelta<>("Query_Cache_Purged", 0, std::equal_to()); + checkMetricDelta<>("Query_Cache_Entries", 1, std::equal_to()); + + metrics.swap(); + + if (!executeQueries(conn, { "SELECT select_and_warn()" })) + return; + + metrics.after = getQueryCacheMetrics(admin_conn); + + printQueryCacheMetrics(); + + checkMetricDelta<>("Query_Cache_Memory_bytes", 0, std::equal_to()); + checkMetricDelta<>("Query_Cache_count_GET", 1, std::equal_to()); + checkMetricDelta<>("Query_Cache_count_GET_OK", 1, std::equal_to()); + checkMetricDelta<>("Query_Cache_count_SET", 0, std::equal_to()); + checkMetricDelta<>("Query_Cache_bytes_IN", 0, std::equal_to()); + checkMetricDelta<>("Query_Cache_bytes_OUT", 1, std::greater()); + checkMetricDelta<>("Query_Cache_Purged", 0, std::equal_to()); + checkMetricDelta<>("Query_Cache_Entries", 0, std::equal_to()); + + metrics.swap(); + + usleep(4000000); + + if (!executeQueries(conn, { "SELECT select_and_warn()" })) + return; + + metrics.after = getQueryCacheMetrics(admin_conn); + + printQueryCacheMetrics(); + + checkMetricDelta<>("Query_Cache_Memory_bytes", 1, std::greater()); + checkMetricDelta<>("Query_Cache_count_GET", 1, std::equal_to()); + checkMetricDelta<>("Query_Cache_count_GET_OK", 0, std::equal_to()); + checkMetricDelta<>("Query_Cache_count_SET", 1, std::equal_to()); + checkMetricDelta<>("Query_Cache_bytes_IN", 1, std::greater()); + checkMetricDelta<>("Query_Cache_bytes_OUT", 0, std::equal_to()); + checkMetricDelta<>("Query_Cache_Purged", 0, std::equal_to()); + checkMetricDelta<>("Query_Cache_Entries", 1, std::equal_to()); + + + executeQueries(conn, { "DROP FUNCTION IF EXISTS select_and_warn()" }); + + if (!executeQueries(admin_conn, { + "DELETE FROM pgsql_query_rules", + "LOAD PGSQL QUERY RULES TO RUNTIME", + "UPDATE global_variables SET variable_value=256 WHERE variable_name='pgsql-query_cache_size_MB'", + "LOAD PGSQL VARIABLES TO RUNTIME", + })) + return; +} + std::vector> tests = { { "Basic Test", execute_basic_test }, { "Data Manipulation Test", execute_data_manipulation_test }, @@ -825,7 +919,8 @@ std::vector> tests = { { "Multi Threaded Test", execute_multi_threaded_test }, { "Multi Threaded Purge Test", execute_multi_threaded_purge_test }, { "Transaction Status Test", execute_transaction_status_test }, - { "Query Cache Store Empty Result Test", execute_query_cache_store_empty_result_test } + { "Query Cache Store Empty Result Test", execute_query_cache_store_empty_result_test }, + { "Query Cache Store Notice Result Test", execute_query_cache_notice_test } }; void execute_tests(bool with_ssl, bool diff_conn) { @@ -872,7 +967,7 @@ void execute_tests(bool with_ssl, bool diff_conn) { int main(int argc, char** argv) { - plan(165*2); // Total number of tests planned + plan(189*2); // Total number of tests planned if (cl.getEnv()) return exit_status(); diff --git a/test/tap/tests/pgsql-reg_test_4707_threshold_resultset_size-t.cpp b/test/tap/tests/pgsql-reg_test_4707_threshold_resultset_size-t.cpp index dc6fb21ed..41fd08267 100644 --- a/test/tap/tests/pgsql-reg_test_4707_threshold_resultset_size-t.cpp +++ b/test/tap/tests/pgsql-reg_test_4707_threshold_resultset_size-t.cpp @@ -100,7 +100,8 @@ int main(int argc, char** argv) { ok(success, "Query executed successfully. %s", PQerrorMessage(backend_conn.get())); std::chrono::duration duration = end - start; - ok(duration.count() < 10.00, "Execution time should be less than 10 ms. Actual: %f ms", duration.count()); + // increased threshold value in case no backend connections are available in the connection pool and a new connection is established. + ok(duration.count() < 50.00, "Execution time should be less than 50 ms. Actual: %f ms", duration.count()); if (!executeQueries(admin_conn.get(), { "SET pgsql-threshold_resultset_size=536870912",