diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index 5e0752fecc..6514bf1aa8 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -1,3 +1,9 @@ +/** + * @file MySQL_Session.h + * @brief Declaration of the MySQL_Session class and associated types and enums. + */ + + #ifndef __CLASS_MYSQL_SESSION_H #define __CLASS_MYSQL_SESSION_H @@ -13,6 +19,10 @@ using json = nlohmann::json; extern class MySQL_Variables mysql_variables; +/** + * @enum proxysql_session_type + * @brief Defines the types of ProxySQL sessions. + */ enum proxysql_session_type { PROXYSQL_SESSION_MYSQL, PROXYSQL_SESSION_ADMIN, @@ -24,6 +34,10 @@ enum proxysql_session_type { PROXYSQL_SESSION_NONE }; +/** + * @enum ps_type + * @brief Defines types for prepared statement handling. + */ enum ps_type : uint8_t { ps_type_not_set = 0x0, ps_type_prepare_stmt = 0x1, @@ -32,9 +46,14 @@ enum ps_type : uint8_t { std::string proxysql_session_type_str(enum proxysql_session_type session_type); -// these structs will be used for various regex hardcoded -// their initial use will be for sql_log_bin , sql_mode and time_zone -// issues #509 , #815 and #816 +/** + * @class Session_Regex + * @brief Encapsulates regex operations for session handling. + * + * This class is used for matching patterns in SQL queries, specifically for + * settings like sql_log_bin, sql_mode, and time_zone. + * See issues #509 , #815 and #816 + */ class Session_Regex { private: void *opt; @@ -46,6 +65,13 @@ class Session_Regex { bool match(char *m); }; +/** + * @class Query_Info + * @brief Holds information about a SQL query within a session. + * + * This class encapsulates various details about a query such as its text, + * execution times, affected rows, and more, to facilitate query processing and logging. + */ class Query_Info { public: SQP_par_t QueryParserArgs; @@ -84,6 +110,13 @@ class Query_Info { bool is_select_NOT_for_update(); }; +/** + * @class MySQL_Session + * @brief Manages a client session, including query parsing, backend connections, and state transitions. + * + * This class is central to ProxySQL's handling of client connections. It manages the lifecycle + * of a session, processes queries, and communicates with backend MySQL servers. + */ class MySQL_Session { private: diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index da6cd21d47..37c848fece 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -1584,16 +1584,43 @@ int MySQL_HostGroups_Manager::servers_add(SQLite3_result *resultset) { return 0; } +/** + * @brief Execute a SQL query and retrieve the resultset. + * + * This function executes a SQL query using the provided query string and returns the resultset obtained from the + * database operation. It also provides an optional error parameter to capture any error messages encountered during + * query execution. + * + * @param query A pointer to a null-terminated string containing the SQL query to be executed. + * @param error A pointer to a char pointer where any error message encountered during query execution will be stored. + * Pass nullptr if error handling is not required. + * @return A pointer to a SQLite3_result object representing the resultset obtained from the query execution. This + * pointer may be nullptr if the query execution fails or returns an empty result. + */ SQLite3_result * MySQL_HostGroups_Manager::execute_query(char *query, char **error) { int cols=0; int affected_rows=0; SQLite3_result *resultset=NULL; wrlock(); - mydb->execute_statement(query, error , &cols , &affected_rows , &resultset); + mydb->execute_statement(query, error , &cols , &affected_rows , &resultset); wrunlock(); return resultset; } +/** + * @brief Calculate and update the checksum for a specified table in the database. + * + * This function calculates the checksum for a specified table in the database using the provided SpookyHash object. + * The checksum is computed based on the table's contents, sorted by the specified column name. If the initialization + * flag is false, the SpookyHash object is initialized with predefined parameters. The calculated checksum is stored + * in the raw_checksum parameter. + * + * @param myhash A reference to the SpookyHash object used for calculating the checksum. + * @param init A reference to a boolean flag indicating whether the SpookyHash object has been initialized. + * @param TableName The name of the table for which the checksum is to be calculated. + * @param ColumnName The name of the column to be used for sorting the table before calculating the checksum. + * @param raw_checksum A reference to a uint64_t variable where the calculated checksum will be stored. + */ void MySQL_HostGroups_Manager::CUCFT1( SpookyHash& myhash, bool& init, const string& TableName, const string& ColumnName, uint64_t& raw_checksum ) { @@ -1620,6 +1647,18 @@ void MySQL_HostGroups_Manager::CUCFT1( } } +/** + * @brief Compute and update checksum values for specified tables. + * + * This function computes checksum values for specified tables by executing checksum calculation queries for each table. + * It updates the checksum values in the `table_resultset_checksum` array. + * + * @param myhash A reference to a SpookyHash object used for computing the checksums. + * @param init A reference to a boolean flag indicating whether the checksum computation has been initialized. + * @note This function resets the current checksum values for all tables except MYSQL_SERVERS and MYSQL_SERVERS_V2 + * before recomputing the checksums. + * @note The computed checksum values are stored in the `table_resultset_checksum` array. + */ void MySQL_HostGroups_Manager::commit_update_checksums_from_tables(SpookyHash& myhash, bool& init) { // Always reset the current table values before recomputing for (size_t i = 0; i < table_resultset_checksum.size(); i++) { @@ -1836,6 +1875,18 @@ void update_glovars_mysql_servers_v2_checksum( ); } +/** + * @brief Commit and update checksum from the MySQL servers. + * + * This function commits updates and calculates the checksum from the MySQL servers. It performs the following steps: + * 1. Deletes existing data from the 'mysql_servers' table. + * 2. Generates a new 'mysql_servers' table. + * 3. Saves the runtime MySQL servers data obtained from the provided result set or from the database if the result set is null. + * 4. Calculates the checksum of the runtime MySQL servers data and updates the checksum value in the 'table_resultset_checksum' array. + * + * @param runtime_mysql_servers A pointer to the result set containing runtime MySQL servers data. + * @return The raw checksum value calculated from the runtime MySQL servers data. + */ uint64_t MySQL_HostGroups_Manager::commit_update_checksum_from_mysql_servers(SQLite3_result* runtime_mysql_servers) { mydb->execute("DELETE FROM mysql_servers"); generate_mysql_servers_table(); @@ -1853,6 +1904,16 @@ uint64_t MySQL_HostGroups_Manager::commit_update_checksum_from_mysql_servers(SQL return raw_checksum; } +/** + * @brief Commit and update checksum from the MySQL servers V2. + * + * This function commits updates and calculates the checksum from the MySQL servers V2 data. It performs the following steps: + * 1. Saves the provided MySQL servers V2 data if not null, or retrieves and saves the data from the database. + * 2. Calculates the checksum of the MySQL servers V2 data and updates the checksum value in the 'table_resultset_checksum' array. + * + * @param mysql_servers_v2 A pointer to the result set containing MySQL servers V2 data. + * @return The raw checksum value calculated from the MySQL servers V2 data. + */ uint64_t MySQL_HostGroups_Manager::commit_update_checksum_from_mysql_servers_v2(SQLite3_result* mysql_servers_v2) { if (mysql_servers_v2 == nullptr) { unique_ptr resultset { get_mysql_servers_v2() }; @@ -2270,6 +2331,22 @@ uint64_t MySQL_HostGroups_Manager::get_mysql_servers_checksum(SQLite3_result* ru return table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS]; } +/** + * @brief Check if a GTID exists for a given MySQL server connection. + * + * This function checks whether a GTID (Global Transaction Identifier) exists for the specified MySQL server connection. + * It performs the following steps: + * 1. Acquires a read lock on the GTID read-write lock. + * 2. Constructs a string representation of the MySQL server address and port. + * 3. Searches for the GTID information associated with the MySQL server in the GTID map using the constructed string as the key. + * 4. If the GTID information is found and is active, it checks whether the specified GTID exists. + * 5. Releases the read lock on the GTID read-write lock. + * + * @param mysrvc A pointer to the MySQL server connection. + * @param gtid_uuid A pointer to the character array representing the GTID UUID. + * @param gtid_trxid The GTID transaction ID. + * @return True if the specified GTID exists for the MySQL server connection, false otherwise. + */ bool MySQL_HostGroups_Manager::gtid_exists(MySrvC *mysrvc, char * gtid_uuid, uint64_t gtid_trxid) { bool ret = false; pthread_rwlock_rdlock(>id_rwlock); @@ -2368,6 +2445,14 @@ void MySQL_HostGroups_Manager::generate_mysql_gtid_executed_tables() { pthread_rwlock_unlock(>id_rwlock); } +/** + * @brief Purge the MySQL servers table by removing offline hard servers with no active connections. + * + * This function iterates through each host group in the host groups manager and examines each server within the host group. + * For each server that is marked as offline hard and has no active connections (both used and free), it removes the server from the host group. + * After removing the server, it deletes the server object to free up memory. + * This process ensures that offline hard servers with no connections are properly removed from the MySQL servers table. + */ void MySQL_HostGroups_Manager::purge_mysql_servers_table() { for (unsigned int i=0; ilen; i++) { MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i); @@ -2527,6 +2612,17 @@ void MySQL_HostGroups_Manager::generate_mysql_servers_table(int *_onlyhg) { delete lst; } +/** + * @brief Generate the mysql_replication_hostgroups table based on incoming data. + * + * This function populates the mysql_replication_hostgroups table in the host groups manager database + * using the incoming replication hostgroups data. It iterates through each row of the incoming data, + * constructs an SQL INSERT query to insert the data into the table, and executes the query. + * If verbose mode is enabled, it logs information about each row being processed. + * + * @note This function assumes that the incoming_replication_hostgroups member variable is not NULL. + * If it is NULL, the function returns without performing any action. + */ void MySQL_HostGroups_Manager::generate_mysql_replication_hostgroups_table() { if (incoming_replication_hostgroups==NULL) return; @@ -2791,6 +2887,19 @@ void MySQL_HostGroups_Manager::update_table_mysql_servers_for_monitor(bool lock) MySQL_Monitor::trigger_dns_cache_update(); } +/** + * @brief Dump data from a specified MySQL table. + * + * This function retrieves data from the specified MySQL table and returns it as a result set. + * The table name determines the SQL query to be executed to fetch the data. If the table is + * one of the predefined tables with special handling (e.g., mysql_servers), additional actions + * such as purging and generating the table may be performed before fetching the data. + * + * @param name The name of the MySQL table from which to dump data. + * @return A SQLite3_result pointer representing the result set containing the dumped data. + * The caller is responsible for managing the memory of the result set. + * @note If the provided table name is not recognized, the function assertion fails. + */ SQLite3_result * MySQL_HostGroups_Manager::dump_table_mysql(const string& name) { char * query = (char *)""; if (name == "mysql_aws_aurora_hostgroups") { @@ -2830,12 +2939,30 @@ SQLite3_result * MySQL_HostGroups_Manager::dump_table_mysql(const string& name) return resultset; } - +/** + * @brief Create a new MySQL host group container. + * + * This function creates a new instance of the MySQL host group container (`MyHGC`) with + * the specified host group ID and returns a pointer to it. + * + * @param _hid The host group ID for the new container. + * @return A pointer to the newly created `MyHGC` instance. + */ MyHGC * MySQL_HostGroups_Manager::MyHGC_create(unsigned int _hid) { MyHGC *myhgc=new MyHGC(_hid); return myhgc; } +/** + * @brief Find a MySQL host group container by host group ID. + * + * This function searches for a MySQL host group container with the specified host group ID + * in the list of host groups. If found, it returns a pointer to the container; otherwise, + * it returns a null pointer. + * + * @param _hid The host group ID to search for. + * @return A pointer to the found `MyHGC` instance if found; otherwise, a null pointer. + */ MyHGC * MySQL_HostGroups_Manager::MyHGC_find(unsigned int _hid) { if (MyHostGroups->len < 100) { // for few HGs, we use the legacy search @@ -2858,6 +2985,18 @@ MyHGC * MySQL_HostGroups_Manager::MyHGC_find(unsigned int _hid) { return NULL; } +/** + * @brief Lookup or create a MySQL host group container by host group ID. + * + * This function looks up a MySQL host group container with the specified host group ID. If + * found, it returns a pointer to the existing container; otherwise, it creates a new container + * with the specified host group ID, adds it to the list of host groups, and returns a pointer + * to it. + * + * @param _hid The host group ID to lookup or create. + * @return A pointer to the found or newly created `MyHGC` instance. + * @note The function assertion fails if a newly created container is not found. + */ MyHGC * MySQL_HostGroups_Manager::MyHGC_lookup(unsigned int _hid) { MyHGC *myhgc=NULL; myhgc=MyHGC_find(_hid); @@ -2877,58 +3016,123 @@ void MySQL_HostGroups_Manager::increase_reset_counter() { status.myconnpoll_reset++; wrunlock(); } + +/** + * @brief Pushes a MySQL_Connection back to the connection pool. + * + * This method is responsible for returning a MySQL_Connection object back to its associated connection pool + * after it has been used. It performs various checks and optimizations before deciding whether to return + * the connection to the pool or destroy it. + * + * @param c The MySQL_Connection object to be pushed back to the pool. + * @param _lock Boolean flag indicating whether to acquire a lock before performing the operation. Default is true. + * + * @note The method assumes that the provided MySQL_Connection object has a valid parent server (MySrvC). + * If the parent server is not valid, unexpected behavior may occur. + * + * @note The method also assumes that the global thread handler (GloMTH) is available and initialized properly. + * If the global thread handler is not initialized, certain checks may fail, leading to unexpected behavior. + */ void MySQL_HostGroups_Manager::push_MyConn_to_pool(MySQL_Connection *c, bool _lock) { + // Ensure that the provided connection has a valid parent server assert(c->parent); - MySrvC *mysrvc=NULL; + + MySrvC *mysrvc = nullptr; // Pointer to the parent server object + + // Acquire a lock if specified if (_lock) wrlock(); + + // Reset the auto-increment delay token associated with the connection c->auto_increment_delay_token = 0; + + // Increment the counter tracking the number of connections pushed back to the pool status.myconnpoll_push++; - mysrvc=(MySrvC *)c->parent; + + // Obtain a pointer to the parent server (MySrvC) + mysrvc = static_cast(c->parent); + + // Log debug information about the connection being returned to the pool proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL_Connection %p, server %s:%d with status %d\n", c, mysrvc->address, mysrvc->port, mysrvc->status); + + // Remove the connection from the list of used connections for the parent server mysrvc->ConnectionsUsed->remove(c); - if (GloMTH == NULL) { goto __exit_push_MyConn_to_pool; } + + // If the global thread handler (GloMTH) is not available, skip further processing + if (GloMTH == nullptr) { + goto __exit_push_MyConn_to_pool; + } + + // If the largest query length exceeds the threshold, destroy the connection if (c->largest_query_length > (unsigned int)GloMTH->variables.threshold_query_length) { proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d with status %d . largest_query_length = %lu\n", c, mysrvc->address, mysrvc->port, mysrvc->status, c->largest_query_length); delete c; goto __exit_push_MyConn_to_pool; - } + } + + // If the server is online and the connection is in the idle state if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE) { if (c->async_state_machine==ASYNC_IDLE) { if (GloMTH == NULL) { goto __exit_push_MyConn_to_pool; } + // Check if the connection has too many prepared statements if (c->local_stmts->get_num_backend_stmts() > (unsigned int)GloMTH->variables.max_stmts_per_connection) { + // Log debug information about destroying the connection due to too many prepared statements proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d with status %d because has too many prepared statements\n", c, mysrvc->address, mysrvc->port, mysrvc->status); // delete c; - mysrvc->ConnectionsUsed->add(c); - destroy_MyConn_from_pool(c, false); + mysrvc->ConnectionsUsed->add(c); // Add the connection back to the list of used connections + destroy_MyConn_from_pool(c, false); // Destroy the connection from the pool } else { - c->optimize(); - mysrvc->ConnectionsFree->add(c); + c->optimize(); // Optimize the connection + mysrvc->ConnectionsFree->add(c); // Add the connection to the list of free connections } } else { + // Log debug information about destroying the connection proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d with status %d\n", c, mysrvc->address, mysrvc->port, mysrvc->status); - delete c; + delete c; // Destroy the connection } } else { + // Log debug information about destroying the connection proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d with status %d\n", c, mysrvc->address, mysrvc->port, mysrvc->status); - delete c; + delete c; // Destroy the connection } + +// Exit point for releasing the lock __exit_push_MyConn_to_pool: if (_lock) - wrunlock(); + wrunlock(); // Release the lock if acquired } +/** + * @brief Pushes an array of MySQL_Connection objects back to the connection pool. + * + * This method is responsible for returning an array of MySQL_Connection objects back to their associated + * connection pool after they have been used. It iterates through the array and calls the push_MyConn_to_pool + * method for each connection without acquiring a lock for each individual push operation. + * + * @param ca An array of MySQL_Connection pointers representing the connections to be pushed back to the pool. + * @param cnt The number of connections in the array. + * + * @note This method assumes that the array of connections is valid and does not contain any nullptr entries. + * Unexpected behavior may occur if the array contains invalid pointers. + */ void MySQL_HostGroups_Manager::push_MyConn_to_pool_array(MySQL_Connection **ca, unsigned int cnt) { - unsigned int i=0; - MySQL_Connection *c=NULL; + unsigned int i=0; // Index variable for iterating through the array + MySQL_Connection *c = nullptr; // Pointer to hold the current connection from the array c=ca[i]; + + // Acquire a write lock to perform the operations atomically wrlock(); + + // Iterate through the array of connections while (iget_random_MySrvC(gtid_uuid, gtid_trxid, max_lag_ms, sess); if (mysrvc) { // a MySrvC exists. If not, we return NULL = no targets + // Attempt to get a random MySQL_Connection from the server's free connection pool conn=mysrvc->ConnectionsFree->get_random_MyConn(sess, ff); + + // If a connection is obtained, mark it as used and update connection pool statistics if (conn) { mysrvc->ConnectionsUsed->add(conn); status.myconnpoll_get_ok++; mysrvc->update_max_connections_used(); } } + + // Release the write lock after accessing the connection pool wrunlock(); + + // Debug message indicating the retrieved MySQL_Connection and its server details proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, (conn ? conn->parent->address : "") , (conn ? conn->parent->port : 0 )); + + // Return the retrieved MySQL_Connection (or nullptr if none available) return conn; } @@ -3673,23 +3913,46 @@ void update_hg_attrs_server_defaults(MySrvC* mysrvc, MyHGC* myhgc) { } } +/** + * @brief Adds a MySQL server connection (MySrvC) to the specified hostgroup. + * + * This method adds a MySQL server connection (MySrvC) to the hostgroup identified by the given hostgroup ID (_hid). + * It performs necessary updates to the server metrics and attributes associated with the hostgroup. Additionally, it + * updates the endpoint metrics for the server based on its address and port. + * + * @param mysrvc A pointer to the MySQL server connection (MySrvC) to be added to the hostgroup. + * @param _hid The ID of the hostgroup to which the server connection is being added. + * + * @note The method updates various metrics and attributes associated with the server and hostgroup. It also ensures + * that endpoint metrics are updated to reflect the addition of the server to the hostgroup. + */ void MySQL_HostGroups_Manager::add(MySrvC *mysrvc, unsigned int _hid) { + + // Debug message indicating the addition of the MySQL server connection to the hostgroup proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Adding MySrvC %p (%s:%d) for hostgroup %d\n", mysrvc, mysrvc->address, mysrvc->port, _hid); + // Construct the endpoint ID using the hostgroup ID, server address, and port + std::string endpoint_id { std::to_string(_hid) + ":" + string { mysrvc->address } + ":" + std::to_string(mysrvc->port) }; + // Since metrics for servers are stored per-endpoint; the metrics for a particular endpoint can live longer than the // 'MySrvC' itself. For example, a failover or a server config change could remove the server from a particular // hostgroup, and a subsequent one bring it back to the original hostgroup. For this reason, everytime a 'mysrvc' is // created and added to a particular hostgroup, we update the endpoint metrics for it. - std::string endpoint_id { std::to_string(_hid) + ":" + string { mysrvc->address } + ":" + std::to_string(mysrvc->port) }; + // Update server metrics based on endpoint ID mysrvc->bytes_recv = get_prometheus_counter_val(this->status.p_conn_pool_bytes_data_recv_map, endpoint_id); mysrvc->bytes_sent = get_prometheus_counter_val(this->status.p_conn_pool_bytes_data_sent_map, endpoint_id); mysrvc->connect_ERR = get_prometheus_counter_val(this->status.p_connection_pool_conn_err_map, endpoint_id); mysrvc->connect_OK = get_prometheus_counter_val(this->status.p_connection_pool_conn_ok_map, endpoint_id); mysrvc->queries_sent = get_prometheus_counter_val(this->status.p_connection_pool_queries_map, endpoint_id); + // Lookup the hostgroup by ID and add the server connection to it MyHGC *myhgc=MyHGC_lookup(_hid); + + // Update server defaults with hostgroup attributes update_hg_attrs_server_defaults(mysrvc, myhgc); + + // Add the server to the hostgroup's servers list myhgc->mysrvs->add(mysrvc); } @@ -4114,6 +4377,31 @@ void MySQL_HostGroups_Manager::save_mysql_servers_v2(SQLite3_result* s) { incoming_mysql_servers_v2 = s; } +/** + * @brief Retrieves the current SQLite3 result set associated with the specified MySQL table name. + * + * This method retrieves the current SQLite3 result set corresponding to the specified MySQL table name. + * The method is used to obtain the result set for various MySQL tables, such as hostgroups, replication configurations, + * SSL parameters, and runtime server information. + * + * @param name The name of the MySQL table for which the current SQLite3 result set is to be retrieved. + * Supported table names include: + * - "mysql_aws_aurora_hostgroups" + * - "mysql_galera_hostgroups" + * - "mysql_group_replication_hostgroups" + * - "mysql_replication_hostgroups" + * - "mysql_hostgroup_attributes" + * - "mysql_servers_ssl_params" + * - "cluster_mysql_servers" + * - "mysql_servers_v2" + * + * @return A pointer to the current SQLite3 result set associated with the specified MySQL table name. + * If the table name is not recognized or no result set is available for the specified table, NULL is returned. + * + * @note The method assumes that the result sets are stored in class member variables, and it returns the pointer to + * the appropriate result set based on the provided table name. If the table name is not recognized, an assertion + * failure occurs, indicating an invalid table name. + */ SQLite3_result* MySQL_HostGroups_Manager::get_current_mysql_table(const string& name) { if (name == "mysql_aws_aurora_hostgroups") { return this->incoming_aws_aurora_hostgroups; @@ -4132,7 +4420,7 @@ SQLite3_result* MySQL_HostGroups_Manager::get_current_mysql_table(const string& } else if (name == "mysql_servers_v2") { return this->incoming_mysql_servers_v2; } else { - assert(0); + assert(0); // Assertion failure for unrecognized table name } return NULL; } @@ -5191,27 +5479,50 @@ SQLite3_result * MySQL_HostGroups_Manager::SQL3_Get_ConnPool_Stats() { return result; } - +/** + * @brief Retrieves memory usage statistics for the MySQL host groups manager. + * + * This method calculates the total memory usage of the MySQL host groups manager, including memory allocated for + * host groups, server connections, and MySQL connections. It iterates over all host groups and their associated + * server connections to compute the memory usage. + * + * @return The total memory usage of the MySQL host groups manager in bytes. + */ unsigned long long MySQL_HostGroups_Manager::Get_Memory_Stats() { + // Initialize the memory size counter unsigned long long intsize=0; + // Acquire write lock to ensure thread safety during memory calculation wrlock(); - MySrvC *mysrvc=NULL; - for (unsigned int i=0; ilen; i++) { + MySrvC *mysrvc=NULL; // Pointer to a MySQL server connection + + // Iterate over all hostgroups + for (unsigned int i=0; ilen; i++) { + // Add memory size for the hostgroup object intsize+=sizeof(MyHGC); - MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i); + // Get the hostgroup object + MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i); unsigned int j,k; + // Get the number of server connections in the hostgroup unsigned int l=myhgc->mysrvs->cnt(); + // Iterate over all server connections in the hostgroup if (l) { for (j=0; jmysrvs->idx(j); + // Calculate memory usage for each connection in the "ConnectionsFree" list intsize+=((mysrvc->ConnectionsUsed->conns_length())*sizeof(MySQL_Connection *)); for (k=0; kConnectionsFree->conns_length(); k++) { - //MySQL_Connection *myconn=(MySQL_Connection *)mysrvc->ConnectionsFree->conns->index(k); + // Get a MySQL connection MySQL_Connection *myconn=mysrvc->ConnectionsFree->index(k); + // Add memory size for MySQL connection object and MYSQL struct intsize+=sizeof(MySQL_Connection)+sizeof(MYSQL); + // Add memory size for the MySQL packet buffer intsize+=myconn->mysql->net.max_packet; + // Add memory size for the default stack size of the asynchronous context intsize+=(4096*15); // ASYNC_CONTEXT_DEFAULT_STACK_SIZE + // Add memory size for result set object if present if (myconn->MyRS) { intsize+=myconn->MyRS->current_size(); } @@ -5220,7 +5531,9 @@ unsigned long long MySQL_HostGroups_Manager::Get_Memory_Stats() { } } } + // Release the write lock wrunlock(); + // Return the total memory usage return intsize; } diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 432abdc8a6..644c14d50d 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -137,6 +137,12 @@ extern ClickHouse_Authentication *GloClickHouseAuth; extern ClickHouse_Server *GloClickHouseServer; #endif /* PROXYSQLCLICKHOUSE */ + +/** + * @brief Converts session type to a human-readable string. + * @param session_type The session type to convert. + * @return A string representing the session type. + */ std::string proxysql_session_type_str(enum proxysql_session_type session_type) { if (session_type == PROXYSQL_SESSION_MYSQL) { return "PROXYSQL_SESSION_MYSQL"; @@ -155,6 +161,14 @@ std::string proxysql_session_type_str(enum proxysql_session_type session_type) { } }; +/** + * @brief Constructs a Session_Regex object with the specified pattern. + * + * This constructor initializes a Session_Regex object with the provided pattern. + * It sets up the regular expression engine with case insensitivity. + * + * @param[in] p The regular expression pattern. + */ Session_Regex::Session_Regex(char *p) { s=strdup(p); re2::RE2::Options *opt2=new re2::RE2::Options(RE2::Quiet); @@ -163,17 +177,33 @@ Session_Regex::Session_Regex(char *p) { re=(RE2 *)new RE2(s, *opt2); } + +/** + * @brief Destroys the Session_Regex object. + * + * This destructor releases the memory allocated for the regular expression pattern, + * the regular expression object, and its associated options. + */ Session_Regex::~Session_Regex() { free(s); delete (RE2 *)re; delete (re2::RE2::Options *)opt; } +/** + * @brief Matches the given input against the regular expression pattern. + * + * This function attempts to match the input string against the regular expression pattern. + * + * @param[in] m The input string to match. + * @return true if the input matches the pattern, false otherwise. + */ bool Session_Regex::match(char *m) { bool rc=false; rc=RE2::PartialMatch(m,*(RE2 *)re); return rc; } + KillArgs::KillArgs(char* u, char* p, char* h, unsigned int P, unsigned int _hid, unsigned long i, int kt, int _use_ssl, MySQL_Thread* _mt) : KillArgs(u, p, h, P, _hid, i, kt, _use_ssl, _mt, NULL) { // resolving DNS if available in Cache @@ -185,6 +215,7 @@ KillArgs::KillArgs(char* u, char* p, char* h, unsigned int P, unsigned int _hid, } } } + KillArgs::KillArgs(char* u, char* p, char* h, unsigned int P, unsigned int _hid, unsigned long i, int kt, int _use_ssl, MySQL_Thread *_mt, char *ip) { username=strdup(u); password=strdup(p); @@ -217,23 +248,39 @@ const char* KillArgs::get_host_address() const { return host_address; } + +/** + * @brief Thread function to kill a query or connection on a MySQL server. + * + * This function is executed in a separate thread to kill a query or connection on a MySQL server. + * It establishes a connection to the MySQL server and sends a kill command to terminate the specified query or connection. + * + * @param[in] arg A pointer to a KillArgs structure containing the necessary parameters for killing the query or connection. + * @return nullptr. + */ void* kill_query_thread(void *arg) { KillArgs *ka=(KillArgs *)arg; + //! It initializes a new MySQL_Thread object to handle MySQL-related operations. std::unique_ptr mysql_thr(new MySQL_Thread()); + //! Retrieves the current time and refreshes thread variables. mysql_thr->curtime=monotonic_time(); mysql_thr->refresh_variables(); + //! Initializes ssl_params to NULL, which holds SSL parameters for the MySQL connection. MySQLServers_SslParams * ssl_params = NULL; + //! Initializes a new MySQL connection using mysql_init(NULL). MYSQL *mysql=mysql_init(NULL); if (!mysql) { goto __exit_kill_query_thread; } + //! Sets specific connection attributes such as program_name and _server_host using mysql_options4(). mysql_options4(mysql, MYSQL_OPT_CONNECT_ATTR_ADD, "program_name", "proxysql_killer"); mysql_options4(mysql, MYSQL_OPT_CONNECT_ATTR_ADD, "_server_host", ka->hostname); + //! If SSL is enabled and port information is available, it retrieves SSL parameters for the server from MyHGM and configures the MySQL connection accordingly. if (ka->use_ssl && ka->port) { ssl_params = MyHGM->get_Server_SSL_Params(ka->hostname, ka->port, ka->username); MySQL_Connection::set_ssl_params(mysql,ssl_params); @@ -241,6 +288,8 @@ void* kill_query_thread(void *arg) { } MYSQL *ret; + + //! Depending on the type of operation (kill_type), constructs a KILL command string (buf) to terminate the specified query or connection. if (ka->port) { switch (ka->kill_type) { case KILL_QUERY: @@ -301,9 +350,11 @@ void* kill_query_thread(void *arg) { sprintf(buf,"KILL %lu", ka->id); break; } + //! Executes the KILL command using mysql_query() on the established MySQL connection. Note that this call is blocking. // FIXME: these 2 calls are blocking, fortunately on their own thread mysql_query(mysql,buf); __exit_kill_query_thread: + //! clean-up if (mysql) mysql_close(mysql); delete ka; @@ -319,6 +370,11 @@ extern Query_Cache *GloQC; extern ProxySQL_Admin *GloAdmin; extern MySQL_Threads_Handler *GloMTH; + +/** + * @brief Default constructor. + * Initializes all member variables to their default values. + */ Query_Info::Query_Info() { MyComQueryCmd=MYSQL_COM_QUERY___NONE; QueryPointer=NULL; @@ -338,6 +394,10 @@ Query_Info::Query_Info() { stmt_client_id=0; } +/** + * @brief Destructor. + * Frees resources associated with QueryParserArgs and stmt_info. + */ Query_Info::~Query_Info() { GloQPro->query_parser_free(&QueryParserArgs); if (stmt_info) { @@ -345,6 +405,12 @@ Query_Info::~Query_Info() { } } +/** + * @brief Initializes query information. + * @param _p Pointer to the query data. + * @param len Length of the query data. + * @param mysql_header Flag indicating whether MySQL header is present. + */ void Query_Info::begin(unsigned char *_p, int len, bool mysql_header) { MyComQueryCmd=MYSQL_COM_QUERY___NONE; QueryPointer=NULL; @@ -371,6 +437,11 @@ void Query_Info::begin(unsigned char *_p, int len, bool mysql_header) { stmt_client_id=0; } + +/** + * @brief Finalizes query information. + * Updates query counters and performs clean-up. + */ void Query_Info::end() { query_parser_update_counters(); query_parser_free(); @@ -396,6 +467,12 @@ void Query_Info::end() { } } +/** + * @brief Initializes query information with the given parameters. + * @param _p Pointer to the query data. + * @param len Length of the query data. + * @param mysql_header Flag indicating whether MySQL header is present. + */ void Query_Info::init(unsigned char *_p, int len, bool mysql_header) { QueryLength=(mysql_header ? len-5 : len); QueryPointer=(mysql_header ? _p+5 : _p); @@ -409,19 +486,33 @@ void Query_Info::init(unsigned char *_p, int len, bool mysql_header) { rows_sent=0; } +/** + * @brief Initializes the query parser. + */ void Query_Info::query_parser_init() { GloQPro->query_parser_init(&QueryParserArgs,(char *)QueryPointer,QueryLength,0); } +/** + * @brief Retrieves the command type of the query from the query parser. + * @return The command type of the query. + */ enum MYSQL_COM_QUERY_command Query_Info::query_parser_command_type() { MyComQueryCmd=GloQPro->query_parser_command_type(&QueryParserArgs); return MyComQueryCmd; } +/** + * @brief Frees resources associated with the query parser. + */ void Query_Info::query_parser_free() { GloQPro->query_parser_free(&QueryParserArgs); } +/** + * @brief Updates query counters and resets member variables. + * @return The number of rows affected by the query. + */ unsigned long long Query_Info::query_parser_update_counters() { if (stmt_info) { MyComQueryCmd=stmt_info->MyComQueryCmd; @@ -435,10 +526,18 @@ unsigned long long Query_Info::query_parser_update_counters() { return ret; } +/** + * @brief Retrieves the digest text of the query from the query parser. + * @return The digest text of the query. + */ char * Query_Info::get_digest_text() { return GloQPro->get_digest_text(&QueryParserArgs); } +/** + * @brief Checks if the query is a SELECT statement with the NOT FOR UPDATE clause. + * @return True if the query is a SELECT statement with the NOT FOR UPDATE clause, false otherwise. + */ bool Query_Info::is_select_NOT_for_update() { if (stmt_info) { // we are processing a prepared statement. We already have the information return stmt_info->is_select_NOT_for_update; @@ -567,7 +666,9 @@ void MySQL_Session::set_status(enum session_status e) { status=e; } - +/** + * @brief Constructs a new MySQL session object. + */ MySQL_Session::MySQL_Session() { thread_session_id=0; //handler_ret = 0; @@ -640,6 +741,9 @@ MySQL_Session::MySQL_Session() { use_ldap_auth = false; } +/** + * @brief Initializes the MySQL session. + */ void MySQL_Session::init() { transaction_persistent_hostgroup=-1; transaction_persistent=false; @@ -648,6 +752,9 @@ void MySQL_Session::init() { SLDH=new StmtLongDataHandler(); } +/** + * @brief Resets the MySQL session to its initial state. + */ void MySQL_Session::reset() { autocommit=true; autocommit_handled=false; @@ -694,6 +801,9 @@ void MySQL_Session::reset() { } } +/** + * @brief Destructor for the MySQL session. + */ MySQL_Session::~MySQL_Session() { reset(); // we moved this out to allow CHANGE_USER @@ -747,7 +857,12 @@ MySQL_Session::~MySQL_Session() { } -// scan the pointer array of mysql backends (mybes) looking for a backend for the specified hostgroup_id +/** + * @brief Find a backend associated with the specified hostgroup ID. + * + * @param hostgroup_id The ID of the hostgroup to search for. + * @return A pointer to the MySQL backend associated with the specified hostgroup ID, or nullptr if not found. + */ MySQL_Backend * MySQL_Session::find_backend(int hostgroup_id) { MySQL_Backend *_mybe; unsigned int i; @@ -760,12 +875,23 @@ MySQL_Backend * MySQL_Session::find_backend(int hostgroup_id) { return NULL; // NULL = backend not found }; +/** + * @brief Update expired connections based on specified checks. + * + * This function iterates through the list of backends and their connections + * to determine if any connections have expired based on the provided checks. + * If a connection is found to be expired, its hostgroup ID is added to the + * list of expired connections for further processing. + * + * @param checks A vector of function objects representing checks to determine if a connection has expired. + */ void MySQL_Session::update_expired_conns(const vector>& checks) { - for (uint32_t i = 0; i < mybes->len; i++) { + for (uint32_t i = 0; i < mybes->len; i++) { // iterate through the list of backends MySQL_Backend* mybe = static_cast(mybes->index(i)); MySQL_Data_Stream* myds = mybe != nullptr ? mybe->server_myds : nullptr; MySQL_Connection* myconn = myds != nullptr ? myds->myconn : nullptr; + //! it performs a series of checks to determine if it has expired if (myconn != nullptr) { const bool is_active_transaction = myconn->IsActiveTransaction(); const bool multiplex_disabled = myconn->MultiplexDisabled(false); @@ -775,6 +901,9 @@ void MySQL_Session::update_expired_conns(const vectorreusable==true && is_active_transaction==false && multiplex_disabled==false && is_idle) { for (const function& check : checks) { if (check(myconn)) { + // If a connection is found to be expired based on the provided checks, + // its hostgroup ID is added to the list of expired connections (hgs_expired_conns) + // for further processing. this->hgs_expired_conns.push_back(mybe->hostgroup_id); break; } @@ -784,6 +913,17 @@ void MySQL_Session::update_expired_conns(const vectorserver_myds->DSS=STATE_NOT_INITIALIZED; _mybe->server_myds->init(MYDS_BACKEND_NOT_CONNECTED, this, 0); } + // the newly created backend is added to the session's list of backends (mybes) and a pointer to it is returned. mybes->add(_mybe); return _mybe; }; +/** + * @brief Find or create a MySQL backend associated with the specified hostgroup ID and data stream. + * + * This function first attempts to find an existing MySQL backend associated with the provided + * hostgroup ID. If a backend is found, its pointer is returned. Otherwise, a new MySQL backend + * is created and associated with the hostgroup ID and data stream. If the data stream is not provided + * (_myds is nullptr), a new MySQL_Data_Stream object is created and initialized for the new backend. + * + * @param hostgroup_id The ID of the hostgroup to which the backend belongs. + * @param _myds The MySQL data stream associated with the backend. + * @return A pointer to the MySQL_Backend object found or created. + */ MySQL_Backend * MySQL_Session::find_or_create_backend(int hostgroup_id, MySQL_Data_Stream *_myds) { MySQL_Backend *_mybe=find_backend(hostgroup_id); proxy_debug(PROXY_DEBUG_NET,4,"HID=%d, _myds=%p, _mybe=%p\n" , hostgroup_id, _myds, _mybe); + // The pointer to the found or newly created backend is returned. return ( _mybe ? _mybe : create_backend(hostgroup_id, _myds) ); }; +/** + * @brief Reset all MySQL backends associated with this session. + * + * This function resets all MySQL backends associated with the current session. + * It iterates over all backends stored in the session, resets each backend, and then deletes it. + * + */ void MySQL_Session::reset_all_backends() { MySQL_Backend *mybe; while(mybes->len) { @@ -814,6 +975,35 @@ void MySQL_Session::reset_all_backends() { } }; +/** + * @brief Writes data from the session to the network with optional throttling and flow control. + * + * The writeout() function in the MySQL_Session class is responsible for writing data from the session to the network. + * It supports throttling, which limits the rate at which data is sent to the client. Throttling is controlled by the + * mysql_thread___throttle_max_bytes_per_second_to_client configuration parameter. If throttling is disabled (the parameter + * is set to 0), the function bypasses throttling. + * + * This function first ensures that any pending data in the session's data stream (client_myds) is written to the network. + * This ensures that the network buffers are emptied, allowing new data to be sent. + * + * After writing data to the network, the function checks if flow control is necessary. If the total amount of data written + * exceeds the maximum allowed per call (mwpl), or if the data is sent too quickly, the function pauses writing for a brief + * period to control the flow of data. + * + * If throttling is enabled, the function adjusts the throttle based on the amount of data written and the configured maximum + * bytes per second. If the current throughput exceeds the configured limit, the function increases the pause duration to + * regulate the flow of data. + * + * Finally, if the session has a backend associated with it (mybe), and the backend has a server data stream (server_myds), + * the function also writes data from the server data stream to the network. + * + * @note This function assumes that necessary session and network structures are properly initialized. + * + * @see mysql_thread___throttle_max_bytes_per_second_to_client + * @see MySQL_Session::client_myds + * @see MySQL_Session::mybe + * @see MySQL_Backend::server_myds + */ void MySQL_Session::writeout() { int tps = 10; // throttling per second , by default every 100ms int total_written = 0; @@ -903,6 +1093,30 @@ void MySQL_Session::writeout() { proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Writeout Session %p\n" , this->thread, this, this); } +/** + * @brief Handles COMMIT or ROLLBACK commands received from the client. + * + * The handler_CommitRollback() function processes COMMIT or ROLLBACK commands received from the client. It checks + * the command type and verifies if the command matches the expected syntax for COMMIT or ROLLBACK. If the command + * matches, it updates the respective commit or rollback count in the global monitor's status. Additionally, it + * checks for the presence of an active transaction to determine whether to forward the command or reply with an OK + * status. + * + * If there is an active transaction, the function sets the current hostgroup to the hostgroup of the active transaction + * and returns false, indicating that the command should be forwarded. If there are no active transactions, the function + * replies with an OK status and updates the client data stream accordingly. + * + * @param pkt Pointer to the packet containing the COMMIT or ROLLBACK command. + * @return True if the command is successfully handled and replied to, false otherwise. + * + * @see MySQL_Session::FindOneActiveTransaction() + * @see MySQL_Session::current_hostgroup + * @see MySQL_Session::autocommit + * @see MySQL_Session::client_myds + * @see MySQL_Data_Stream::DSS + * @see MySQL_Protocol::generate_pkt_OK() + * @see MySQL_Session::RequestEnd() + */ bool MySQL_Session::handler_CommitRollback(PtrSize_t *pkt) { if (pkt->size <= 5) { return false; } char c=((char *)pkt->ptr)[5]; @@ -965,7 +1179,27 @@ bool MySQL_Session::handler_CommitRollback(PtrSize_t *pkt) { return false; } - +/** + * @brief Handles the SET AUTOCOMMIT command received from the client. + * + * The handler_SetAutocommit() function processes the SET AUTOCOMMIT command received from the client. + * It parses the command to determine the new autocommit value and updates the autocommit status accordingly. + * Additionally, it checks for the presence of active transactions and handles the forwarding of the command if needed. + * The function also replies with an OK status to the client after processing the command. + * + * @param pkt Pointer to the packet containing the SET AUTOCOMMIT command. + * @return True if the command is successfully handled and replied to, false otherwise. + * + * @see MySQL_Session::autocommit_handled + * @see MySQL_Session::sending_set_autocommit + * @see MySQL_Session::current_hostgroup + * @see MySQL_Session::autocommit + * @see MySQL_Session::client_myds + * @see MySQL_Session::NumActiveTransactions() + * @see MySQL_Data_Stream::DSS + * @see MySQL_Protocol::generate_pkt_OK() + * @see MySQL_Session::RequestEnd() + */ bool MySQL_Session::handler_SetAutocommit(PtrSize_t *pkt) { autocommit_handled=false; sending_set_autocommit=false; @@ -1823,6 +2057,27 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C } } +/** + * @brief Handles the process of pinging the server again in the PINGING_SERVER status. + * + * The handler_again___status_PINGING_SERVER() function is responsible for managing the re-pinging process + * when the session status is set to PINGING_SERVER. It asserts the existence of the server connection's data stream + * and connection. It then performs an asynchronous ping operation on the server connection and processes the result. + * Depending on the ping result, it either returns -1 to indicate a termination or continues with further actions. + * + * @return -1 if the session should be terminated, 0 otherwise. + * + * @see MySQL_Session::mybe + * @see MySQL_Backend::server_myds + * @see MySQL_Data_Stream + * @see MySQL_Connection + * @see MySQL_Connection::async_ping() + * @see MySQL_Connection::compute_unknown_transaction_status() + * @see MySQL_Data_Stream::return_MySQL_Connection_To_Pool() + * @see MySQL_Data_Stream::destroy_MySQL_Connection_From_Pool() + * @see MySQL_Session::set_status() + * @see ProxySQL_MySQL_Error_Counter::p_update_mysql_error_counter() + */ int MySQL_Session::handler_again___status_PINGING_SERVER() { assert(mybe->server_myds->myconn); MySQL_Data_Stream *myds=mybe->server_myds; @@ -1870,6 +2125,25 @@ int MySQL_Session::handler_again___status_PINGING_SERVER() { return 0; } +/** + * @brief Handles the process of resetting the connection in the RESETTING_CONNECTION status. + * + * The handler_again___status_RESETTING_CONNECTION() function manages the resetting of the connection when + * the session status is set to RESETTING_CONNECTION. It asserts the existence of the server connection's data stream + * and connection. It then performs an asynchronous change user operation on the server connection and processes the result. + * Depending on the result of the change user operation, it either returns -1 to indicate termination or continues with further actions. + * + * @return -1 if the session should be terminated, 0 otherwise. + * + * @see MySQL_Session::mybe + * @see MySQL_Backend::server_myds + * @see MySQL_Data_Stream + * @see MySQL_Connection + * @see MySQL_Connection::async_change_user() + * @see MySQL_Data_Stream::return_MySQL_Connection_To_Pool() + * @see MySQL_Session::set_status() + * @see ProxySQL_MySQL_Error_Counter::p_update_mysql_error_counter() + */ int MySQL_Session::handler_again___status_RESETTING_CONNECTION() { assert(mybe->server_myds->myconn); MySQL_Data_Stream *myds=mybe->server_myds; @@ -1942,7 +2216,26 @@ int MySQL_Session::handler_again___status_RESETTING_CONNECTION() { return 0; } - +/** + * @brief Initiates a new thread to kill the connection associated with the session. + * + * The handler_again___new_thread_to_kill_connection() function creates a new thread to execute the query + * needed to kill the connection associated with the session. It first retrieves the data stream of the server connection. + * If the connection exists and is associated with a MySQL instance, and if it has not been marked for killing before, + * the function prepares the necessary information for the kill operation and spawns a new thread to execute it. + * The kill operation is performed asynchronously in the new thread. + * + * @note This function is used to handle the situation where the session is in the process of resetting the connection, + * and the connection needs to be forcibly terminated. + * + * @see MySQL_Session::mybe + * @see MySQL_Backend::server_myds + * @see MySQL_Data_Stream + * @see MySQL_Connection + * @see MySQL_Connection_userinfo + * @see KillArgs + * @see kill_query_thread + */ void MySQL_Session::handler_again___new_thread_to_kill_connection() { MySQL_Data_Stream *myds=mybe->server_myds; if (myds->myconn && myds->myconn->mysql) { @@ -1984,6 +2277,20 @@ void MySQL_Session::handler_again___new_thread_to_kill_connection() { // true should jump to handler_again #define NEXT_IMMEDIATE_NEW(new_st) do { set_status(new_st); return true; } while (0) +/** + * @brief Verifies and synchronizes the multi-statement capability between client and server connections. + * + * The handler_again___verify_backend_multi_statement() function verifies whether the multi-statement capability + * of the client connection matches that of the server connection. If there is a mismatch, it synchronizes the + * capability by updating the server connection's options accordingly. + * + * @return Always returns false. + * + * @see MySQL_Session::client_myds + * @see MySQL_Session::mybe + * @see MySQL_Data_Stream + * @see MySQL_Connection + */ bool MySQL_Session::handler_again___verify_backend_multi_statement() { if ((client_myds->myconn->options.client_flag & CLIENT_MULTI_STATEMENTS) != (mybe->server_myds->myconn->options.client_flag & CLIENT_MULTI_STATEMENTS)) { @@ -2013,6 +2320,20 @@ bool MySQL_Session::handler_again___verify_backend_multi_statement() { return false; } +/** + * @brief Verifies and sets the init_connect option for the server connection. + * + * The handler_again___verify_init_connect() function checks if the init_connect option has been sent + * to the server connection. If not, it sets the option to true and sets the init_connect queries + * based on the attributes defined for the hostgroup. It then updates the status for handling the + * init_connect setting. + * + * @return Always returns false. + * + * @see MySQL_Session::mybe + * @see MySQL_Data_Stream + * @see MySQL_Connection + */ bool MySQL_Session::handler_again___verify_init_connect() { if (mybe->server_myds->myconn->options.init_connect_sent==false) { // we needs to set it to true @@ -2048,6 +2369,23 @@ bool MySQL_Session::handler_again___verify_init_connect() { return false; } +/** + * @brief Verifies and sets the session_track_gtids option for the backend connection. + * + * The handler_again___verify_backend_session_track_gtids() function verifies whether the backend + * connection supports session_track_gtids and sets the option accordingly based on the client's + * request. If the backend does not support session_track_gtids, the function returns false. + * If the backend supports session_track_gtids, it sets the option to OWN_GTID if the client + * requests GTIDs or if the option is not configured yet. Otherwise, it sets the option to OFF. + * The function then updates the backend's session_track_gtids setting and switches the status + * to handle the session_track_gtids setting. + * + * @return True if the session_track_gtids option is set for the backend connection, false otherwise. + * + * @see MySQL_Session::mybe + * @see MySQL_Data_Stream + * @see MySQL_Connection + */ bool MySQL_Session::handler_again___verify_backend_session_track_gtids() { bool ret = false; proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Session %p , client: %s , backend: %s\n", this, client_myds->myconn->options.session_track_gtids, mybe->server_myds->myconn->options.session_track_gtids); @@ -2112,6 +2450,23 @@ bool MySQL_Session::handler_again___verify_backend_session_track_gtids() { return ret; } +/** + * @brief Verifies and sets the ldap_user_variable option for the backend connection. + * + * The handler_again___verify_ldap_user_variable() function verifies whether the ldap_user_variable + * option needs to be set for the backend connection. If the ldap_user_variable_sent flag is false + * or if ldap_user_variable_value is NULL, the function sets the option to true. If the backend + * connection already has ldap_user_variable_sent set to true and the ldap_user_variable_value + * differs from the client's frontend username, the function resets the ldap_user_variable settings + * and sets the option to true. Finally, if the option needs to be set, the function updates the + * backend's ldap_user_variable settings and switches the status to handle the ldap_user_variable setting. + * + * @return Always returns false. + * + * @see MySQL_Session::mybe + * @see MySQL_Data_Stream + * @see MySQL_Connection + */ bool MySQL_Session::handler_again___verify_ldap_user_variable() { bool ret = false; if (mybe->server_myds->myconn->options.ldap_user_variable_sent==false) { @@ -4841,6 +5196,20 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA() { */ } +/** + * @brief Perform housekeeping tasks before processing packets. + * + * This function is responsible for performing necessary housekeeping tasks + * before processing packets. These tasks include handling expired connections + * for multiplexing scenarios. If multiplexing is enabled, it iterates over + * the list of expired backend connections and either returns them to the connection pool + * or destroys them based on certain conditions. + * + * @note This function assumes that the `hgs_expired_conns` vector contains the IDs + * of the backend connections that have expired. + * + * @return None. + */ void MySQL_Session::housekeeping_before_pkts() { if (mysql_thread___multiplexing) { for (const int hg_id : hgs_expired_conns) { @@ -4869,6 +5238,19 @@ void MySQL_Session::housekeeping_before_pkts() { } // this function was inline +/** + * @brief Process the GTID received from the MySQL connection when the handler return code is 0. + * + * This function is responsible for processing the GTID (Global Transaction Identifier) + * received from the MySQL connection when the handler return code is 0. It extracts + * the GTID and transaction ID from the MySQL connection and performs additional + * actions if necessary based on the configuration. + * + * @param myconn A pointer to the MySQL_Connection object from which to extract the GTID. + * This object contains the necessary information about the connection. + * + * @return None. + */ void MySQL_Session::handler_rc0_Process_GTID(MySQL_Connection *myconn) { if (myconn->get_gtid(mybe->gtid_uuid,&mybe->gtid_trxid)) { if (mysql_thread___client_session_track_gtid) { @@ -4913,7 +5295,9 @@ int MySQL_Session::handler() { } housekeeping_before_pkts(); + // The function get_pkts_from_client() is called to retrieve packets from the client, passing a reference to wrong_pass and the pkt variable. handler_ret = get_pkts_from_client(wrong_pass, pkt); + // If get_pkts_from_client() returns a non-zero value, indicating an error, the function returns that value immediately. if (handler_ret != 0) { return handler_ret; } @@ -4961,22 +5345,30 @@ int MySQL_Session::handler() { case PROCESSING_STMT_EXECUTE: case PROCESSING_QUERY: //fprintf(stderr,"PROCESSING_QUERY\n"); + // Pause Check + // It checks if pause_until is greater than the current time (thread->curtime). + // If so, it returns handler_ret immediately, indicating that processing should be paused until a later time. if (pause_until > thread->curtime) { handler_ret = 0; return handler_ret; } if (mysql_thread___connect_timeout_server_max) { - if (mybe->server_myds->max_connect_time==0) + if (mybe->server_myds->max_connect_time==0) { + // set max_connect_time to the current time plus the specified timeout value mybe->server_myds->max_connect_time=thread->curtime+(long long)mysql_thread___connect_timeout_server_max*1000; + } } else { + // set max_connect_time to zero, indicating no timeout mybe->server_myds->max_connect_time=0; } - if ( + if ( // two conditions + // If the server connection is in a non-idle state (ASYNC_IDLE), and the current time is greater than or equal to mybe->server_myds->wait_until + // This indicates that the server is taking too long to respond. (mybe->server_myds->myconn && mybe->server_myds->myconn->async_state_machine!=ASYNC_IDLE && mybe->server_myds->wait_until && thread->curtime >= mybe->server_myds->wait_until) - // query timed out || - (killed==true) // session was killed by admin - ) { + // If the session has been marked as killed by an admin. + (killed==true) + ) { // Logging and Action // we only log in case on timing out here. Logging for 'killed' is done in the places that hold that contextual information. if (mybe->server_myds->myconn && (mybe->server_myds->myconn->async_state_machine != ASYNC_IDLE) && mybe->server_myds->wait_until && (thread->curtime >= mybe->server_myds->wait_until)) { std::string query {}; @@ -4994,7 +5386,7 @@ int MySQL_Session::handler() { client_addr = client_myds->addr.addr ? client_myds->addr.addr : ""; client_port = client_myds->addr.port; } - + // it logs a warning message indicating the query that caused the timeout, along with client details. proxy_warning( "Killing connection %s:%d because query '%s' from client '%s':%d timed out.\n", mybe->server_myds->myconn->parent->address, @@ -5004,10 +5396,13 @@ int MySQL_Session::handler() { client_port ); } + // it calls handler_again___new_thread_to_kill_connection() to initiate the killing of the connection associated with the session that timed out. handler_again___new_thread_to_kill_connection(); } + // checks if the backend MySQL server associated with the session has been initialized (STATE_NOT_INITIALIZED) if (mybe->server_myds->DSS==STATE_NOT_INITIALIZED) { // we don't have a backend yet + // It saves the current processing status of the session (status) onto the previous_status stack switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility case PROCESSING_QUERY: previous_status.push(PROCESSING_QUERY); @@ -5024,6 +5419,7 @@ int MySQL_Session::handler() { break; // LCOV_EXCL_STOP } + // It transitions the session to the CONNECTING_SERVER state immediately. NEXT_IMMEDIATE(CONNECTING_SERVER); } else { MySQL_Data_Stream *myds=mybe->server_myds; @@ -5115,25 +5511,33 @@ int MySQL_Session::handler() { } } if (status==PROCESSING_STMT_EXECUTE) { + // It attempts to find the backend statement associated with the current global statement ID (stmt_global_id) in the local statement cache of the connection (myconn). CurrentQuery.mysql_stmt=myconn->local_stmts->find_backend_stmt_by_global_id(CurrentQuery.stmt_global_id); if (CurrentQuery.mysql_stmt==NULL) { + // the connection does not have the prepared statement metadata MySQL_STMT_Global_info *stmt_info=NULL; // the connection we too doesn't have the prepared statements prepared // we try to create it now + // In this case, it proceeds to create the prepared statement based on the global statement ID (stmt_global_id). + // It retrieves the prepared statement information (stmt_info) from a global prepared statement cache (GloMyStmt) using the statement ID. stmt_info=GloMyStmt->find_prepared_statement_by_stmt_id(CurrentQuery.stmt_global_id); + // It updates the CurrentQuery structure with the query information from the prepared statement (stmt_info). CurrentQuery.QueryLength=stmt_info->query_length; CurrentQuery.QueryPointer=(unsigned char *)stmt_info->query; // NOTE: Update 'first_comment' with the 'first_comment' from the retrieved // 'stmt_info' from the found prepared statement. 'CurrentQuery' requires its // own copy of 'first_comment' because it will later be free by 'QueryInfo::end'. if (stmt_info->first_comment) { + // If the prepared statement contains a first_comment, it updates the first_comment field of CurrentQuery. CurrentQuery.QueryParserArgs.first_comment=strdup(stmt_info->first_comment); } + // It pushes the current processing status (PROCESSING_STMT_EXECUTE) onto the previous_status stack to track the previous state transition. previous_status.push(PROCESSING_STMT_EXECUTE); + // It transitions the processing status to PROCESSING_STMT_PREPARE immediately using the NEXT_IMMEDIATE macro. NEXT_IMMEDIATE(PROCESSING_STMT_PREPARE); - if (CurrentQuery.stmt_global_id!=stmt_info->statement_id) { - PROXY_TRACE(); - } + //if (CurrentQuery.stmt_global_id!=stmt_info->statement_id) { + // PROXY_TRACE(); + //} } } } @@ -5143,14 +5547,22 @@ int MySQL_Session::handler() { SetQueryTimeout(); } int rc; + // declares two timespec variables begint and endt to store the start and end times of the query execution. timespec begint; + timespec endt; + // If the stats_time_backend_query flag in the thread's variables is set, + // it records the start time of the query execution using clock_gettime with CLOCK_THREAD_CPUTIME_ID. if (thread->variables.stats_time_backend_query) { clock_gettime(CLOCK_THREAD_CPUTIME_ID,&begint); } + // It calls the RunQuery function to execute the query on the backend. rc = RunQuery(myds, myconn); - timespec endt; + // If the stats_time_backend_query flag is set, it records the end time + // of the query execution using clock_gettime with CLOCK_THREAD_CPUTIME_ID. if (thread->variables.stats_time_backend_query) { clock_gettime(CLOCK_THREAD_CPUTIME_ID,&endt); + // It calculates the duration of the query execution by subtracting the start time (begint) from the end time (endt) + // and adds it to the stvar_backend_query_time status variable in the thread's status variables. thread->status_variables.stvar[st_var_backend_query_time] = thread->status_variables.stvar[st_var_backend_query_time] + (endt.tv_sec*1000000000+endt.tv_nsec) - (begint.tv_sec*1000000000+begint.tv_nsec); @@ -5424,7 +5836,17 @@ int MySQL_Session::handler() { } // end ::handler() - +/** + * @brief Handle multiple session statuses. + * + * This function handles multiple session statuses by invoking specific handler + * functions corresponding to each status. It checks the current session status + * and delegates the handling to the appropriate handler function. + * + * @param[out] rc Pointer to an integer variable where the return code from the handler function will be stored. + * + * @return True if the handling was successful and false otherwise. + */ bool MySQL_Session::handler_again___multiple_statuses(int *rc) { bool ret = false; switch(status) { @@ -5975,6 +6397,15 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C // this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo +/** + * @brief Rewrite the query packet and update the session state accordingly. + * + * This function is responsible for rewriting the query packet and updating the session state + * based on the rewritten query. It frees the old packet, allocates memory for the new packet, + * copies the header and query information, and updates the session state to reflect the new query. + * + * @param[in,out] pkt Pointer to the packet data structure containing the original query packet. + */ void MySQL_Session::handler_WCD_SS_MCQ_qpo_QueryRewrite(PtrSize_t *pkt) { // the query was rewritten l_free(pkt->size,pkt->ptr); // free old pkt @@ -6004,7 +6435,15 @@ void MySQL_Session::handler_WCD_SS_MCQ_qpo_QueryRewrite(PtrSize_t *pkt) { } } -// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo +/** + * @brief Handle the generation and sending of an OK message packet in response to a successful query execution. + * + * This (formely inline) function is responsible for setting up and sending an OK message packet to the client in response + * to a successful query execution. It updates the session state, generates the OK message packet using the + * appropriate protocol functions, and frees the memory occupied by the original packet. + * + * @param[in,out] pkt Pointer to the packet data structure containing the original packet. + */ void MySQL_Session::handler_WCD_SS_MCQ_qpo_OK_msg(PtrSize_t *pkt) { gtid_hid = -1; client_myds->DSS=STATE_QUERY_SENT_NET; @@ -6016,7 +6455,15 @@ void MySQL_Session::handler_WCD_SS_MCQ_qpo_OK_msg(PtrSize_t *pkt) { l_free(pkt->size,pkt->ptr); } -// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo +/** + * @brief Handle the generation and sending of an error message packet in response to a query execution error. + * + * This (formely inline) function is responsible for setting up and sending an error message packet to the client in response + * to a query execution error. It updates the session state, generates the error message packet using the appropriate + * protocol functions, and frees the memory occupied by the original packet. + * + * @param[in,out] pkt Pointer to the packet data structure containing the original packet. + */ void MySQL_Session::handler_WCD_SS_MCQ_qpo_error_msg(PtrSize_t *pkt) { client_myds->DSS=STATE_QUERY_SENT_NET; client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,1148,(char *)"42000",qpo->error_msg); @@ -6024,7 +6471,16 @@ void MySQL_Session::handler_WCD_SS_MCQ_qpo_error_msg(PtrSize_t *pkt) { l_free(pkt->size,pkt->ptr); } -// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo +/** + * @brief Handle the generation and sending of an error message packet for a large packet size. + * + * This (formely inline) function is responsible for setting up and sending an error message packet to the client + * in response to receiving a packet larger than the 'max_allowed_packet' size. It updates the session state, + * generates the error message packet using the appropriate protocol functions, and frees the memory + * occupied by the original packet. + * + * @param[in,out] pkt Pointer to the packet data structure containing the original packet. + */ void MySQL_Session::handler_WCD_SS_MCQ_qpo_LargePacket(PtrSize_t *pkt) { // ER_NET_PACKET_TOO_LARGE client_myds->DSS=STATE_QUERY_SENT_NET; @@ -8178,6 +8634,14 @@ void MySQL_Session::unable_to_parse_set_statement(bool *lock_hostgroup) { } } +/** + * @brief Check if any backend has an active MySQL connection. + * + * This function iterates through all backends associated with the session and checks if any backend has an + * active MySQL connection. If any backend has an active connection, it returns true; otherwise, it returns false. + * + * @return true if any backend has an active MySQL connection, otherwise false. + */ bool MySQL_Session::has_any_backend() { for (unsigned int j=0;j < mybes->len;j++) { MySQL_Backend *tmp_mybe=(MySQL_Backend *)mybes->index(j); @@ -8189,6 +8653,15 @@ bool MySQL_Session::has_any_backend() { return false; } +/** + * @brief Handler for MYSQL_COM_STMT_RESET command in WAITING_CLIENT_DATA state with STATE_SLEEP. + * + * This function handles the MYSQL_COM_STMT_RESET command when the session is in the WAITING_CLIENT_DATA state + * and the data stream state is STATE_SLEEP. It resets the statement with the given statement global ID, + * frees the memory allocated for the packet, generates an OK packet, and sets the session state back to WAITING_CLIENT_DATA. + * + * @param pkt Reference to the packet containing the command and associated data. + */ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_RESET(PtrSize_t& pkt) { uint32_t stmt_global_id=0; memcpy(&stmt_global_id,(char *)pkt.ptr+5,sizeof(uint32_t)); @@ -8203,6 +8676,15 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C status=WAITING_CLIENT_DATA; } +/** + * @brief Handler for MYSQL_COM_STMT_CLOSE command in WAITING_CLIENT_DATA state with STATE_SLEEP. + * + * This function handles the MYSQL_COM_STMT_CLOSE command when the session is in the WAITING_CLIENT_DATA state + * and the data stream state is STATE_SLEEP. It closes the statement with the given client global ID, frees + * associated resources, updates counters, and sets the session state back to WAITING_CLIENT_DATA. + * + * @param pkt Reference to the packet containing the command and associated data. + */ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_CLOSE(PtrSize_t& pkt) { uint32_t client_global_id=0; memcpy(&client_global_id,(char *)pkt.ptr+5,sizeof(uint32_t)); @@ -8273,8 +8755,15 @@ void MySQL_Session::generate_status_one_hostgroup(int hid, std::string& s) { delete resultset; } -void MySQL_Session::reset_warning_hostgroup_flag_and_release_connection() -{ +/** + * @brief Reset the warning hostgroup flag and release the MySQL connection back to the connection pool. + * + * This function resets the warning hostgroup flag and releases the MySQL connection back to the connection pool + * if necessary. If a warning was found in the previous query execution but the current executed query is not + * 'SHOW WARNINGS' or 'SHOW COUNT(*) FROM WARNINGS', it means that the warning has been handled, and the connection + * can be safely returned to the connection pool. + */ +void MySQL_Session::reset_warning_hostgroup_flag_and_release_connection() { if (warning_in_hg > -1) { // if we've reached this point, it means that warning was found in the previous query, but the // current executed query is not 'SHOW WARNINGS' or 'SHOW COUNT(*) FROM WARNINGS', so we can safely reset warning_in_hg and diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 84919d66d9..48ec0e3ea7 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -1153,6 +1153,19 @@ void MySQL_Threads_Handler::commit() { proxy_debug(PROXY_DEBUG_MYSQL_SERVER, 1, "Increasing version number to %d - all threads will notice this and refresh their variables\n", __global_MySQL_Thread_Variables_version); } + +/** + * Retrieves the string value of a specified global variable. + * + * This method searches for a global variable by name and returns its current + * string value. It's used to access configuration settings that are stored + * as strings within the MySQL Threads Handler. If the variable is not found, + * or if it is not a string type, a default or NULL value may be returned. + * + * @param name The name of the variable to retrieve. + * @return A pointer to the string value of the variable, or NULL if the + * variable does not exist or is not of string type. + */ char * MySQL_Threads_Handler::get_variable_string(char *name) { if (!strncmp(name,"monitor_",8)) { if (!strcmp(name,"monitor_username")) return strdup(variables.monitor_username); @@ -1279,6 +1292,21 @@ uint16_t MySQL_Threads_Handler::get_variable_uint16(char *name) { // LCOV_EXCL_STOP } + +/** + * Retrieves the integer value of a specified global variable. + * + * This method is responsible for fetching the value of a global configuration + * variable that is stored as an integer. It looks up the variable by its name + * and returns the integer value associated with it. This is useful for obtaining + * configuration settings that are expected to be numerical values. If the variable + * cannot be found or is not an integer type, a default value may be returned, + * typically indicating an error or not set state. + * + * @param name The name of the variable to retrieve. + * @return The integer value of the variable if found and valid, otherwise a + * default or error-indicating value. + */ int MySQL_Threads_Handler::get_variable_int(const char *name) { // convert name to string, and lowercase std::string nameS = string(name); @@ -1311,6 +1339,17 @@ int MySQL_Threads_Handler::get_variable_int(const char *name) { //VALGRIND_ENABLE_ERROR_REPORTING; } +/** + * Retrieves the value of a specified configuration variable. + * + * This method looks up a global configuration variable by its name and returns its value as a char pointer. + * It's designed to access various types of configuration settings within the MySQL Threads Handler context. + * The function can return values for variables of different types, but the caller must ensure proper type handling. + * If the variable is not found, a NULL pointer is returned. + * + * @param name The name of the variable to retrieve. + * @return The value of the variable as a char pointer, or NULL if the variable does not exist. + */ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public function, accessible from admin //VALGRIND_DISABLE_ERROR_REPORTING; #define INTBUFSIZE 4096 @@ -1472,20 +1511,23 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f } - +/** + * Sets the value of a specified global variable. + * + * This method allows for the modification of global variables within the MySQL Threads Handler + * by specifying the variable's name and the desired new value. It supports changing the values + * of variables across various types, though the input is accepted as a string. Proper conversion + * to the variable's actual type is performed internally. This function is key for dynamic configuration + * updates and runtime adjustments of the proxy's behavior. + * + * @param name The name of the variable to set. + * @param value The new value for the variable, passed as a const char pointer. + * @return True if the variable was successfully updated, false otherwise. + */ bool MySQL_Threads_Handler::set_variable(char *name, const char *value) { // this is the public function, accessible from admin - // IN: - // name: variable name - // value: variable value - // - // OUT: - // false: unable to change the variable value, either because doesn't exist, or because out of range, or read only - // true: variable value changed - // if (!value) return false; size_t vallen=strlen(value); - // convert name to string, and lowercase std::string nameS = string(name); std::transform(nameS.begin(), nameS.end(), nameS.begin(), [](unsigned char c){ return std::tolower(c); }); @@ -1960,7 +2002,18 @@ bool MySQL_Threads_Handler::set_variable(char *name, const char *value) { // thi } -// return variables from both mysql_thread_variables_names AND mysql_tracked_variables +/** + * Retrieves a list of all global configuration variables' names. + * + * This method returns an array of strings, each representing the name of a global configuration + * variable managed by the MySQL Threads Handler. It's utilized to enumerate all available configuration + * settings, facilitating dynamic inspection or modification of the proxy's configuration. The list + * includes variables of all types and categories, providing a comprehensive overview of the proxy's + * configurable parameters. + * + * @return A null-terminated array of char pointers, where each entry is the name of a global variable. + * The last element of the array is NULL to indicate the end of the list. + */ char ** MySQL_Threads_Handler::get_variables_list() { @@ -2170,8 +2223,17 @@ char ** MySQL_Threads_Handler::get_variables_list() { return ret; } -// Returns true if the given name is the name of an existing mysql variable -// scan both mysql_thread_variables_names AND mysql_tracked_variables +/** + * @brief Checks whether the MySQL Threads Handler has a specific variable. + * + * This function checks if the MySQL Threads Handler contains a variable with the given name. + * It first checks if the name starts with "default_" and matches the internal variable names + * stored in the MySQL tracked variables. If not found, it then checks against the list of + * thread variables. + * + * @param name The name of the variable to check. + * @return true if the variable is found, false otherwise. + */ bool MySQL_Threads_Handler::has_variable(const char *name) { if (strlen(name) > 8) { if (strncmp(name, "default_", 8) == 0) { @@ -2201,6 +2263,18 @@ void MySQL_Threads_Handler::print_version() { fprintf(stderr,"Standard MySQL Threads Handler rev. %s -- %s -- %s\n", MYSQL_THREAD_VERSION, __FILE__, __TIMESTAMP__); } +/** + * @brief Initializes the MySQL Threads Handler with the given number of threads and stack size. + * + * This function initializes the MySQL Threads Handler with the specified number of threads and + * stack size. If the stack size is not provided (or is 0), it defaults to DEFAULT_STACK_SIZE. + * If the number of threads is not provided (or is 0), it defaults to DEFAULT_NUM_THREADS. + * After initialization, it sets the stack size using pthread_attr_setstacksize, allocates memory + * for the MySQL threads, and initializes status variables accordingly. + * + * @param num The number of threads to initialize. If 0, defaults to DEFAULT_NUM_THREADS. + * @param stack The size of the stack for each thread. If 0, defaults to DEFAULT_STACK_SIZE. + */ void MySQL_Threads_Handler::init(unsigned int num, size_t stack) { if (stack) { stacksize=stack; @@ -2225,6 +2299,19 @@ void MySQL_Threads_Handler::init(unsigned int num, size_t stack) { #endif // IDLE_THREADS } +/** + * @brief Creates a new MySQL thread and starts its execution. + * + * This function creates a new MySQL thread with the specified thread number, start routine, + * and whether it's an idle thread or not. If idles is set to false, a regular MySQL thread + * is created using pthread_create. If idles is true and idle_threads are enabled, an idle + * MySQL thread is created. After creating the thread, it returns NULL. + * + * @param tn The thread number to assign to the created thread. + * @param start_routine The function the new thread should start executing. + * @param idles A boolean indicating whether the created thread is an idle thread or not. + * @return A pointer to the created MySQL thread. + */ proxysql_mysql_thread_t * MySQL_Threads_Handler::create_thread(unsigned int tn, void *(*start_routine) (void *), bool idles) { if (idles==false) { if (pthread_create(&mysql_threads[tn].thread_id, &attr, start_routine , &mysql_threads[tn]) != 0 ) { @@ -2516,6 +2603,15 @@ void MySQL_Threads_Handler::update_client_host_cache(struct sockaddr* client_soc } } +/** + * @brief Flushes the client host cache. + * + * This function locks the mutex associated with the client host cache, clears the cache, + * and then unlocks the mutex. It is used to remove all entries from the client host cache. + * + * @note This function assumes that the mutex_client_host_cache has been initialized + * and is accessible within the MySQL Threads Handler. + */ void MySQL_Threads_Handler::flush_client_host_cache() { pthread_mutex_lock(&mutex_client_host_cache); client_host_cache.clear(); @@ -2878,6 +2974,18 @@ void MySQL_Thread::unregister_session(int idx) { // this function was inline in MySQL_Thread::run() +/** + * @brief Retrieves multiple idle connections and processes them. + * + * This method retrieves multiple idle connections from the MySQL Hostgroup Manager (MyHGM) and processes them. + * It updates the number of idle connections (num_idles) based on the retrieved connections. For each idle connection, + * it creates a new MySQL session, attaches the connection to the appropriate backend, assigns a file descriptor, + * sets necessary parameters, registers the session with the connection handler, and initiates handling. + * + * @param num_idles Reference to an integer to store the number of idle connections retrieved. + * + * @note This method assumes that MyHGM, my_idle_conns, SESSIONS_FOR_CONNECTIONS_HANDLER, curtime, and relevant MySQL thread variables have been properly initialized and are accessible. + */ void MySQL_Thread::run___get_multiple_idle_connections(int& num_idles) { int i; num_idles=MyHGM->get_multiple_idle_connections(-1, curtime-mysql_thread___ping_interval_server_msec*1000, my_idle_conns, SESSIONS_FOR_CONNECTIONS_HANDLER); @@ -2958,6 +3066,17 @@ void MySQL_Thread::ProcessAllMyDS_BeforePoll() { // this function was inline in MySQL_Thread::run() +/** + * @brief Processes all MySQL Data Streams after polling. + * + * This function iterates through all MySQL polls and processes the associated data streams. + * For each poll, it prints debug information about the file descriptor and its events. + * If a MySQL Data Stream is associated with the poll, it checks for events on the file descriptor. + * If there are no events and a poll timeout is enabled, it checks for sessions timing out. + * If there are events, it checks for invalid file descriptors and handles new connections + * for listener type data streams. For other types of data streams, it processes data and + * handles any potential errors. + */ void MySQL_Thread::ProcessAllMyDS_AfterPoll() { for (unsigned int n = 0; n < mypolls.len; n++) { proxy_debug(PROXY_DEBUG_NET,3, "poll for fd %d events %d revents %d\n", mypolls.fds[n].fd , mypolls.fds[n].events, mypolls.fds[n].revents); @@ -2994,6 +3113,14 @@ void MySQL_Thread::ProcessAllMyDS_AfterPoll() { // this function was inline in MySQL_Thread::run() +/** + * @brief Cleans up the mirror queue by removing excess sessions. + * + * This function checks if the length of the mirror queue exceeds the maximum concurrency limit. + * If it does, it iteratively removes sessions from the mirror queue until its length matches + * the maximum concurrency limit or falls below it. For each removed session, it updates + * the mirror sessions current count and increments the mirror concurrency gauge accordingly. + */ void MySQL_Thread::run___cleanup_mirror_queue() { unsigned int l = (unsigned int)mysql_thread___mirror_max_concurrency; if (mirror_queue_mysql_sessions_cache->len > l) { @@ -3007,6 +3134,14 @@ void MySQL_Thread::run___cleanup_mirror_queue() { } // main loop +/** + * @brief Main loop for the MySQL thread. + * + * This method represents the main loop executed by the MySQL thread. It performs various tasks including handling idle connections, + * processing sessions, performing maintenance, and updating variables. The loop continues executing until shutdown is initiated. + * + * @note This method assumes that relevant variables, mutexes, and objects have been properly initialized. + */ void MySQL_Thread::run() { unsigned int n; int rc; @@ -3061,6 +3196,20 @@ void MySQL_Thread::run() { ProcessAllMyDS_BeforePoll(); #ifdef IDLE_THREADS + /** + * @brief Handles session assignment and retrieval between worker and idle threads. + * + * This block of code checks if the global configuration allows idle threads and if the current thread + * is not an idle maintenance thread. If both conditions are met, it randomly selects an idle worker thread + * and assigns sessions to it. Then, it retrieves sessions from the idle thread. + * + * @note This functionality is part of the management of worker and idle threads in the MySQL thread pool. + * It facilitates the distribution of sessions between active worker threads and idle threads to optimize resource utilization. + * + * @param idle_maintenance_thread Flag indicating whether the current thread is an idle maintenance thread. + * @param GloVars Global configuration variables for the MySQL thread. + * @param GloMTH Global MySQL thread handlers object. + */ if (GloVars.global.idle_threads) { if (idle_maintenance_thread==false) { int r=rand()%(GloMTH->num_threads); @@ -3289,6 +3438,20 @@ unsigned int MySQL_Thread::find_session_idx_in_mysql_sessions(MySQL_Session *ses } #ifdef IDLE_THREADS + +/** + * @brief Moves idle threads to kill idle sessions. + * + * This function iterates through a portion of MySQL sessions to scan for idle sessions. + * If the current time exceeds the wait timeout threshold, it marks sessions idle for longer + * than the timeout as killed. It then removes the killed sessions from the session map, + * adjusts the map if necessary, unregisters the sessions, and adds them to the list of + * sessions to be resumed. Additionally, it removes associated data streams from the poll list + * and epoll control, updating relevant indices and pointers. + * + * @note This function assumes that MySQL sessions and related data structures have been + * initialized and are accessible within the MySQL Thread. + */ void MySQL_Thread::idle_thread_to_kill_idle_sessions() { #define SESS_TO_SCAN 128 if (mysess_idx + SESS_TO_SCAN > mysql_sessions->len) { @@ -3364,6 +3527,17 @@ void MySQL_Thread::idle_thread_check_if_worker_thread_has_unprocess_resumed_sess pthread_mutex_unlock(&thr->myexchange.mutex_resumes); } +/** + * @brief Assigns idle sessions to a worker thread for processing. + * + * This function is executed by an idle thread to assign idle sessions to a specified worker thread + * for processing. It locks the mutex associated with the worker thread's session exchange mechanism, + * checks if there are sessions to resume, and if so, transfers them to the worker thread's list + * of sessions to resume. After transferring sessions, it determines whether to send a signal to the + * worker thread to inform it of the presence of new sessions. + * + * @param thr The worker thread to which idle sessions will be assigned. + */ void MySQL_Thread::idle_thread_assigns_sessions_to_worker_thread(MySQL_Thread *thr) { bool send_signal = false; // send_signal variable will control if we need to signal or not @@ -3389,6 +3563,18 @@ void MySQL_Thread::idle_thread_assigns_sessions_to_worker_thread(MySQL_Thread *t } } +/** + * @brief Assigns idle sessions to an idle thread for processing. + * + * This function is executed by a worker thread to assign idle sessions to a specified idle thread + * for processing. It checks if both the current thread and the idle thread are not in shutdown mode, + * and if there are idle sessions to assign. If conditions are met, it locks the mutex associated with + * the idle thread's session exchange mechanism, transfers idle sessions to the idle thread's list of + * sessions to process, and sends a signal to the idle thread if its session queue was empty before + * transferring sessions. + * + * @param thr The idle thread to which idle sessions will be assigned. + */ void MySQL_Thread::worker_thread_assigns_sessions_to_idle_thread(MySQL_Thread *thr) { if (shutdown==0 && thr->shutdown==0 && idle_mysql_sessions->len) { pthread_mutex_lock(&thr->myexchange.mutex_idles); @@ -3412,18 +3598,31 @@ void MySQL_Thread::worker_thread_assigns_sessions_to_idle_thread(MySQL_Thread *t } } +/** + * @brief Worker thread retrieves sessions from the idle thread for processing. + * + * This function is executed by a worker thread to retrieve sessions from the idle thread + * for processing. It locks the mutex associated with the session exchange mechanism, + * checks if there are sessions to resume, and if so, retrieves them from the idle thread's + * list of sessions to be resumed. For each retrieved session, it registers the session, + * adds its associated data stream to the poll list for monitoring read events, and updates + * the poll timestamp. After processing all available sessions, it unlocks the mutex. + * + * @note This function assumes that the worker thread's session exchange mechanism has been + * initialized and is accessible within the MySQL Thread. + */ void MySQL_Thread::worker_thread_gets_sessions_from_idle_thread() { - pthread_mutex_lock(&myexchange.mutex_resumes); - if (myexchange.resume_mysql_sessions->len) { - //unsigned int maxsess=GloMTH->resume_mysql_sessions->len; - while (myexchange.resume_mysql_sessions->len) { - MySQL_Session *mysess=(MySQL_Session *)myexchange.resume_mysql_sessions->remove_index_fast(0); - register_session(mysess, false); - MySQL_Data_Stream *myds=mysess->client_myds; - mypolls.add(POLLIN, myds->fd, myds, monotonic_time()); - } - } - pthread_mutex_unlock(&myexchange.mutex_resumes); + pthread_mutex_lock(&myexchange.mutex_resumes); + if (myexchange.resume_mysql_sessions->len) { + //unsigned int maxsess=GloMTH->resume_mysql_sessions->len; + while (myexchange.resume_mysql_sessions->len) { + MySQL_Session *mysess=(MySQL_Session *)myexchange.resume_mysql_sessions->remove_index_fast(0); + register_session(mysess, false); + MySQL_Data_Stream *myds=mysess->client_myds; + mypolls.add(POLLIN, myds->fd, myds, monotonic_time()); + } + } + pthread_mutex_unlock(&myexchange.mutex_resumes); } #endif // IDLE_THREADS @@ -3588,6 +3787,16 @@ bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned // this function was inline in MySQL_Thread::process_all_sessions() +/** + * @brief Sort all sessions based on maximum connection time. + * + * This function iterates through all MySQL sessions and sorts them based on their maximum connection time. + * Sessions with a valid maximum connection time are compared, and if one session has a greater maximum connection + * time than another, their positions in the session list are swapped. The sorting is performed in-place. + * + * @note This function assumes that MySQL sessions and their associated data structures have been initialized + * and are accessible within the MySQL Thread. + */ void MySQL_Thread::ProcessAllSessions_SortingSessions() { unsigned int a=0; for (unsigned int n=0; nlen; n++) { @@ -3635,10 +3844,39 @@ void MySQL_Thread::ProcessAllSessions_CompletedMirrorSession(unsigned int& n, My // this function was inline in MySQL_Thread::process_all_sessions() +/** + * @brief Processes a session in the maintenance loop. + * + * This function performs maintenance tasks for a session within the maintenance loop. It handles checks related to session + * timeouts, active transactions, and server table version changes. Depending on the conditions, it may kill the session, + * simulate data in failed backend connections, or update expired connections if multiplexing is enabled. + * + * @param sess The MySQL session to process. + * @param sess_time The time elapsed since the session started, in milliseconds. + * @param total_active_transactions_ Reference to the total number of active transactions across all sessions. + */ void MySQL_Thread::ProcessAllSessions_MaintenanceLoop(MySQL_Session *sess, unsigned long long sess_time, unsigned int& total_active_transactions_) { unsigned int numTrx=0; total_active_transactions_ += sess->active_transactions; sess->to_process=1; + /** + * @brief Handles session timeout conditions and associated actions. + * + * This block of code evaluates whether the session has exceeded either the maximum transaction idle time + * or the wait timeout duration. If either condition is met, it takes appropriate action: + * + * - If the session has active transactions, it checks if the maximum transaction time has been exceeded + * and kills the session if necessary. + * - If the session does not have active transactions, it kills the session if it has been inactive for longer + * than the wait timeout duration. + * + * If none of the timeout conditions are met, it continues to evaluate the session's active transactions + * against the maximum transaction time criteria and kills the session if necessary. + * + * @param sess_time The time elapsed since the session started, in milliseconds. + * @param curtime The current time, in milliseconds. + * @param sess The MySQL session to handle. + */ if ( (sess_time/1000 > (unsigned long long)mysql_thread___max_transaction_idle_time) || (sess_time/1000 > (unsigned long long)mysql_thread___wait_timeout) ) { //numTrx = sess->NumActiveTransactions(); numTrx = sess->active_transactions; @@ -3675,6 +3913,18 @@ void MySQL_Thread::ProcessAllSessions_MaintenanceLoop(MySQL_Session *sess, unsig } } } + /** + * @brief Handles server table version change and its associated actions. + * + * This block of code checks if the current server table version differs from the previous version, + * indicating a change in server configurations. If there is a change, it performs the following actions: + * + * - If the session is in fast forward mode and has offline backends, it immediately kills the client connection. + * - If the session is not in fast forward mode, it searches for connections that should be terminated, + * and simulates data in them by failing the backend connections instead of killing the sessions. + * + * This block also addresses bug fix #1085 related to handling client connections using an OFFLINE node. + */ if (servers_table_version_current != servers_table_version_previous) { // bug fix for #1085 // Immediatelly kill all client connections using an OFFLINE node when session_fast_forward == true if (sess->session_fast_forward) { @@ -3693,6 +3943,14 @@ void MySQL_Thread::ProcessAllSessions_MaintenanceLoop(MySQL_Session *sess, unsig } // Perform the maintenance for expired connections on the session + + /** + * @brief Checks for expired connections and updates them if multiplexing is enabled. + * + * If multiplexing is enabled, this block of code defines lambda functions to check for expired connections + * based on auto-increment delay and connection delay multiplexing criteria. It then creates a vector of these + * functions and passes it to the `update_expired_conns` method of the session object to update expired connections. + */ if (mysql_thread___multiplexing) { const auto auto_incr_delay_multiplex_check = [curtime=this->curtime] (MySQL_Connection* myconn) -> bool { const uint64_t multiplex_timeout_ms = mysql_thread___auto_increment_delay_multiplex_timeout_ms; @@ -3716,6 +3974,34 @@ void MySQL_Thread::ProcessAllSessions_MaintenanceLoop(MySQL_Session *sess, unsig } } + +/** + * @brief Processes all active sessions within the MySQL thread. + * + * This function iterates through all active sessions within the MySQL thread and performs various actions based on the session state and conditions. + * + * If the session sorting flag is enabled and there are more than three sessions, it sorts the sessions. + * + * For each session, it performs the following tasks: + * - Checks if the session is a mirror session and handles completed mirror sessions accordingly. + * - Handles client connection establishment timeout if the session is in the CONNECTING_CLIENT state. + * - Executes maintenance tasks on sessions if the MySQL thread is in maintenance mode. + * - Handles unhealthy sessions by closing them and logging audit entries. + * - Executes the session handler if it needs processing and is not paused, handling session termination if needed. + * + * After processing all sessions, if the MySQL thread is in maintenance mode, it updates the total active transactions. + * + * @param sess_sort Flag indicating whether session sorting is enabled. + * @param mysql_sessions Pointer to the list of MySQL sessions. + * @param mysql_thread___connect_timeout_client Timeout value for establishing client connections. + * @param mysql_thread___wait_timeout Timeout value for inactive sessions. + * @param mysql_thread___log_unhealthy_connections Flag indicating whether to log unhealthy connections. + * @param curtime Current timestamp. + * @param maintenance_loop Flag indicating whether the MySQL thread is in maintenance mode. + * @param status_variables Struct containing status variables for the MySQL thread. + * @param total_active_transactions_ Reference variable to store the total active transactions. + * @param rc Variable to store the return code of session handlers. + */ void MySQL_Thread::process_all_sessions() { unsigned int n; unsigned int total_active_transactions_=0; @@ -3839,6 +4125,20 @@ void MySQL_Thread::process_all_sessions() { } } + +/** + * @brief Refreshes MySQL thread variables from global MySQL thread handler. + * + * This method locks the global MySQL thread handler mutex and refreshes various MySQL thread variables + * from the global MySQL thread handler. It retrieves values for variables such as maximum allowed packet size, + * automatic SQL injection detection, firewall whitelist status, TCP keepalive usage, TCP keepalive time, + * connection throttling per second to host groups, maximum transaction idle time, maximum transaction time, + * threshold query length, threshold result set size, maximum query digest length, maximum query length for digests, + * wait timeout, default variables, replication lag evaluation on server load, and session debugging mode. + * + * @note This method assumes that the global MySQL thread handler (GloMTH) and relevant variables such as + * mysql_thread___default_variables have been properly initialized and are accessible. + */ void MySQL_Thread::refresh_variables() { pthread_mutex_lock(&GloVars.global.ext_glomth_mutex); if (GloMTH==NULL) { @@ -4114,6 +4414,18 @@ MySQL_Thread::MySQL_Thread() { thr_SetParser = NULL; } +/** + * @brief Registers a session with the connection handler. + * + * This method registers a session with the connection handler of the MySQL thread. It sets the thread pointer + * of the session to the current MySQL thread, marks the session as being handled by the connections handler, + * and adds the session to the MySQL sessions list. + * + * @param _sess Pointer to the MySQL_Session object to register. + * @param _new Boolean flag indicating whether the session is new. + * + * @note This method assumes that the MySQL sessions list (mysql_sessions) has been properly initialized and is accessible. + */ void MySQL_Thread::register_session_connection_handler(MySQL_Session *_sess, bool _new) { _sess->thread=this; _sess->connections_handler=true; @@ -4121,6 +4433,18 @@ void MySQL_Thread::register_session_connection_handler(MySQL_Session *_sess, boo mysql_sessions->add(_sess); } + +/** + * @brief Unregisters a session from the connection handler. + * + * This method unregisters a session from the connection handler of the MySQL thread. It removes the session + * from the MySQL sessions list based on the provided index. + * + * @param idx Index of the session in the MySQL sessions list to unregister. + * @param _new Boolean flag indicating whether the session is new. + * + * @note This method assumes that the MySQL sessions list (mysql_sessions) has been properly initialized and is accessible. + */ void MySQL_Thread::unregister_session_connection_handler(int idx, bool _new) { assert(_new); mysql_sessions->remove_index_fast(idx); @@ -4562,6 +4886,26 @@ SQLite3_result * MySQL_Threads_Handler::SQL3_GlobalStatus(bool _memory) { } +/** + * @brief Retrieves memory statistics from all MySQL threads. + * + * This function iterates through all MySQL threads, including both active worker threads and idle maintenance threads if enabled, + * to retrieve memory statistics using the `Get_Memory_Stats()` function for each thread. + * + * It first determines the total number of threads to iterate based on the number of active worker threads (`num_threads`) and + * whether idle threads are enabled (`GloVars.global.idle_threads`). If idle threads are enabled, it doubles the count to include + * both active worker threads and idle maintenance threads. + * + * For each thread, it acquires a lock on the thread mutex to safely retrieve memory statistics and then releases the lock. + * If any thread is found to be NULL during iteration, indicating that it is not ready, the function exits early. + * + * @note This function assumes that the `Get_Memory_Stats()` function is implemented for MySQL_Thread objects to retrieve memory statistics. + * + * @param num_threads Number of active worker threads. + * @param mysql_threads Array of active worker threads. + * @param mysql_threads_idles Array of idle maintenance threads. + * @param GloVars Struct containing global variables. + */ void MySQL_Threads_Handler::Get_Memory_Stats() { unsigned int i; unsigned int j; @@ -5220,6 +5564,19 @@ void MySQL_Threads_Handler::p_update_metrics() { this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_history]->Set(this->variables.monitor_history/1000.0); } +/** + * @brief Retrieves memory statistics for the MySQL thread. + * + * This function calculates memory statistics for the MySQL thread, including backend and frontend buffers, + * as well as memory usage for internal session structures. + * + * Memory statistics are stored in the `status_variables` object, particularly in the `stvar` array. + * + * If there are active MySQL sessions associated with the thread, additional memory statistics are computed + * based on session-related data structures and buffers. + * + * @note This function assumes that the `Memory_Stats()` function is implemented for MySQL_Session objects to retrieve session memory statistics. + */ void MySQL_Thread::Get_Memory_Stats() { unsigned int i; status_variables.stvar[st_var_mysql_backend_buffers_bytes]=0; @@ -5245,6 +5602,21 @@ void MySQL_Thread::Get_Memory_Stats() { } +/** + * @brief Retrieves a MySQL connection from the local cache based on specified criteria. + * + * This function retrieves a MySQL connection from the local cache managed by the MySQL_Thread instance. + * It searches for a suitable connection based on the provided parameters such as host group ID, session information, + * GTID UUID, GTID transaction ID, and maximum lag time. If a matching connection is found, it is removed from the + * cache and returned to the caller. + * + * @param _hid The host group ID to which the connection belongs. + * @param sess The MySQL session associated with the connection. + * @param gtid_uuid The GTID UUID used for replication, or NULL if not used. + * @param gtid_trxid The GTID transaction ID. + * @param max_lag_ms The maximum lag time allowed for the connection in milliseconds. + * @return A pointer to the retrieved MySQL connection if found; otherwise, NULL. + */ MySQL_Connection * MySQL_Thread::get_MyConn_local(unsigned int _hid, MySQL_Session *sess, char *gtid_uuid, uint64_t gtid_trxid, int max_lag_ms) { // some sanity check if (sess == NULL) return NULL; @@ -5304,6 +5676,20 @@ MySQL_Connection * MySQL_Thread::get_MyConn_local(unsigned int _hid, MySQL_Sessi return NULL; } + +/** + * @brief Pushes a MySQL connection to the local connection pool. + * + * This function is responsible for adding a MySQL connection to the local connection pool. + * It resets the insert_id and checks if the associated server is online and the connection is idle + * before adding it to the pool. + * + * If the server is online and the connection is idle, the connection is added to the cached connections pool. + * Otherwise, if the server is not online or the connection is not idle, the connection is pushed to the + * global connection pool managed by MySQL_Host_Group_Manager. + * + * @param c Pointer to the MySQL_Connection object to be pushed to the local connection pool. + */ void MySQL_Thread::push_MyConn_local(MySQL_Connection *c) { MySrvC *mysrvc=NULL; mysrvc=(MySrvC *)c->parent; @@ -5318,22 +5704,37 @@ void MySQL_Thread::push_MyConn_local(MySQL_Connection *c) { MyHGM->push_MyConn_to_pool(c); } + +/** + * @brief Returns all locally cached MySQL connections to the global connection pool. + * + * This function is responsible for returning all locally cached MySQL connections to the global connection pool. + * It checks if there are any cached connections available, and if so, it pushes them back to the global connection pool + * managed by MySQL_Host_Group_Manager. After returning the connections, it clears the local cached connections pool. + */ void MySQL_Thread::return_local_connections() { if (cached_connections->len==0) { return; } -/* - MySQL_Connection **ca=(MySQL_Connection **)malloc(sizeof(MySQL_Connection *)*(cached_connections->len+1)); - unsigned int i=0; -*/ -// ca[i]=NULL; MyHGM->push_MyConn_to_pool_array((MySQL_Connection **)cached_connections->pdata, cached_connections->len); -// free(ca); while (cached_connections->len) { cached_connections->remove_index_fast(0); } } +/** + * @brief Scans sessions to kill based on connection and query IDs stored in the kill queue. + * + * This function scans sessions to kill based on the connection and query IDs stored in the kill queue. + * It iterates over the MySQL sessions and checks if any session matches the IDs stored in the kill queue. + * If a session matches, it is marked for termination. + * + * For idle threads, it also scans sessions in the idle and resume sessions queues maintained by the session exchange. + * + * @note This function assumes that the kill queue (kq) contains connection and query IDs to be scanned. + * + * @see MySQL_Thread::Scan_Sessions_to_Kill + */ void MySQL_Thread::Scan_Sessions_to_Kill_All() { if (kq.conn_ids.size() + kq.query_ids.size()) { Scan_Sessions_to_Kill(mysql_sessions); @@ -5372,6 +5773,17 @@ void MySQL_Thread::Scan_Sessions_to_Kill_All() { kq.query_ids.clear(); } + +/** + * @brief Scans sessions in the provided session array to mark sessions for termination based on kill queue IDs. + * + * This function scans sessions in the provided session array (mysess) to identify sessions that match the connection + * and query IDs stored in the kill queue (kq). If a session matches a connection or query ID, it is marked for termination. + * + * @param mysess Pointer to the session array to be scanned. + * + * @note This function assumes that the kill queue (kq) contains connection and query IDs to be scanned. + */ void MySQL_Thread::Scan_Sessions_to_Kill(PtrArray *mysess) { for (unsigned int n=0; nlen && ( kq.conn_ids.size() + kq.query_ids.size() ) ; n++) { MySQL_Session *_sess=(MySQL_Session *)mysess->index(n); @@ -5414,6 +5826,16 @@ void MySQL_Thread::Scan_Sessions_to_Kill(PtrArray *mysess) { } #ifdef IDLE_THREADS +/** + * @brief Moves a session to the idle session array if it meets the idle criteria. + * + * This function checks if a session should be moved to the idle session array based on its idle time + * and other conditions. If the session meets the idle criteria, it is moved to the idle session array. + * + * @param myds Pointer to the MySQL data stream associated with the session. + * @param n The index of the session in the poll array. + * @return True if the session is moved to the idle session array, false otherwise. + */ bool MySQL_Thread::move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n) { unsigned long long _tmp_idle = mypolls.last_recv[n] > mypolls.last_sent[n] ? mypolls.last_recv[n] : mypolls.last_sent[n] ; if (_tmp_idle < ( (curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) { @@ -5422,15 +5844,6 @@ bool MySQL_Thread::move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, if (myds->sess->client_myds == myds && !myds->available_data_out() && myds->sess->pause_until <= curtime) { //unsigned int j; bool has_backends = myds->sess->has_any_backend(); -/* - for (j=0;jsess->mybes->len;j++) { - MySQL_Backend *tmp_mybe=(MySQL_Backend *)myds->sess->mybes->index(j); - MySQL_Data_Stream *__myds=tmp_mybe->server_myds; - if (__myds->myconn) { - conns++; - } - } -*/ if (has_backends==false) { unsigned long long idle_since = curtime - myds->sess->IdleTime(); mypolls.remove_index_fast(n); @@ -5465,6 +5878,15 @@ bool MySQL_Thread::set_backend_to_be_skipped_if_frontend_is_slow(MySQL_Data_Stre } #ifdef IDLE_THREADS +/** + * @brief Moves sessions from the idle thread's session array to the worker thread's session array. + * + * This function is called by the idle maintenance thread to transfer sessions from the idle session array + * managed by the exchange structure to the worker thread's session array. It locks the mutex associated + * with the idle session array, iterates through the sessions, registers each session, adds it to the worker + * thread's poll array, and adds it to the epoll set for monitoring read events. Finally, it updates the session + * map to map thread IDs to the positions in the worker thread's session array. + */ void MySQL_Thread::idle_thread_gets_sessions_from_worker_thread() { pthread_mutex_lock(&myexchange.mutex_idles); while (myexchange.idle_mysql_sessions->len) { @@ -5524,6 +5946,14 @@ void MySQL_Thread::handle_mirror_queue_mysql_sessions() { } } + +/** + * @brief Handles the kill queues by scanning sessions to kill and setting the maintenance loop flag. + * + * This function is responsible for handling the kill queues. It locks the mutex associated with the kill queues, + * scans the sessions to kill if there are pending connections or queries in the kill queues, and sets the maintenance + * loop flag to true to initiate maintenance tasks. After processing the kill queues, it releases the mutex. + */ void MySQL_Thread::handle_kill_queues() { pthread_mutex_lock(&kq.m); if (kq.conn_ids.size() + kq.query_ids.size()) { @@ -5533,6 +5963,16 @@ void MySQL_Thread::handle_kill_queues() { pthread_mutex_unlock(&kq.m); } + +/** + * @brief Checks for timing out session and marks them for processing. + * + * This function checks for timing out sessions and marks them for processing. Although the logic for managing connection timeout + * was removed due to the addition of the MariaDB client library, this function remains as a placeholder. It checks if the session + * has reached its wait_until or pause_until time, and if so, marks the session for processing. + * + * @param n The index of the session in the MySQL_Data_Stream array. + */ void MySQL_Thread::check_timing_out_session(unsigned int n) { // FIXME: this logic was removed completely because we added mariadb client library. Yet, we need to implement a way to manage connection timeout // check for timeout @@ -5551,6 +5991,15 @@ void MySQL_Thread::check_timing_out_session(unsigned int n) { } } + +/** + * @brief Checks for an invalid file descriptor (FD) and raises an error if found. + * + * This function checks if the file descriptor (FD) at the specified index in the `mypolls.fds` array is invalid (`POLLNVAL`). + * If an invalid FD is found, it raises an error and asserts to ensure that the program does not proceed with an invalid FD. + * + * @param n The index of the file descriptor in the `mypolls.fds` array. + */ void MySQL_Thread::check_for_invalid_fd(unsigned int n) { // check if the FD is valid if (mypolls.fds[n].revents==POLLNVAL) { diff --git a/lib/ProxySQL_Poll.cpp b/lib/ProxySQL_Poll.cpp index 3a35c977ea..8876e5f373 100644 --- a/lib/ProxySQL_Poll.cpp +++ b/lib/ProxySQL_Poll.cpp @@ -8,6 +8,22 @@ #include #include "cpp.h" + +/** + * @file ProxySQL_Poll.cpp + * + * These functions provide functionality for managing file descriptors (FDs) and associated data streams in the ProxySQL_Poll class. + * They handle memory allocation, addition, removal, and searching of FDs within the poll object. + * Additionally, they ensure that memory is managed efficiently by dynamically resizing the internal arrays as needed. +*/ + + +/** + * @brief Shrinks the ProxySQL_Poll object by reallocating memory to fit the current number of elements. + * + * This function reduces the size of the ProxySQL_Poll object by reallocating memory to fit the current number of elements. + * It adjusts the size of internal arrays to a size that is a power of two near the current number of elements. + */ void ProxySQL_Poll::shrink() { unsigned int new_size=l_near_pow_2(len+1); fds=(struct pollfd *)realloc(fds,new_size*sizeof(struct pollfd)); @@ -17,6 +33,14 @@ void ProxySQL_Poll::shrink() { size=new_size; } +/** + * @brief Expands the ProxySQL_Poll object to accommodate additional elements. + * + * This function expands the ProxySQL_Poll object to accommodate the specified number of additional elements. + * If the resulting size after expansion exceeds the current size, it reallocates memory to fit the expanded size. + * + * @param more The number of additional elements to accommodate. + */ void ProxySQL_Poll::expand(unsigned int more) { if ( (len+more) > size ) { unsigned int new_size=l_near_pow_2(len+more); @@ -28,6 +52,11 @@ void ProxySQL_Poll::expand(unsigned int more) { } } +/** + * @brief Constructs a new ProxySQL_Poll object. + * + * This constructor initializes a new ProxySQL_Poll object with default values and allocates memory for internal arrays. + */ ProxySQL_Poll::ProxySQL_Poll() { loop_counters=new StatCounters(15,10); poll_timeout=0; @@ -43,7 +72,11 @@ ProxySQL_Poll::ProxySQL_Poll() { last_sent=(unsigned long long *)malloc(size*sizeof(unsigned long long)); } - +/** + * @brief Destroys the ProxySQL_Poll object and frees allocated memory. + * + * This destructor deallocates memory for internal arrays and releases resources associated with the ProxySQL_Poll object. + */ ProxySQL_Poll::~ProxySQL_Poll() { unsigned int i; for (i=0;ipoll_fds_idx=-1; // this prevents further delete @@ -96,6 +147,15 @@ void ProxySQL_Poll::remove_index_fast(unsigned int i) { } } +/** + * @brief Finds the index of a file descriptor (FD) in the ProxySQL_Poll object. + * + * This function searches for a file descriptor (FD) in the ProxySQL_Poll object and returns its index if found. + * If the FD is not found, it returns -1. + * + * @param fd The file descriptor (FD) to search for. + * @return The index of the file descriptor (FD) if found, otherwise -1. + */ int ProxySQL_Poll::find_index(int fd) { unsigned int i; for (i=0; irows_count==0) return 0; uint64_t hash1, hash2; @@ -406,7 +552,13 @@ uint64_t SQLite3_result::raw_checksum() { return hash1; } - +/** + * @brief Calculates the checksum of the SQLite3_result object. + * + * This function calculates the checksum of the SQLite3_result object, including metadata. + * + * @return The checksum of the result. + */ char *SQLite3_result::checksum() { uint64_t hash1=raw_checksum(); char buf[128]; @@ -417,6 +569,14 @@ char *SQLite3_result::checksum() { return strdup(buf); } +/** + * @brief Dumps the content of the SQLite3_result object to the standard error stream. + * + * This function prints the content of the SQLite3_result object to the standard error stream. + * It is useful for debugging purposes to inspect the content of the result object. + * + * @note This function is intended for debugging purposes and should not be used in production code. + */ void SQLite3_result::dump_to_stderr() { if (columns == 0) return; size_t *columns_lengths = (size_t *)malloc(sizeof(size_t)*columns); @@ -518,6 +678,15 @@ void SQLite3_result::dump_to_stderr() { free(columns_lengths); } +/** + * @brief Constructs a new SQLite3_result object from an existing SQLite3_result object. + * + * This constructor initializes a new SQLite3_result object based on the provided source + * SQLite3_result object. It copies the column definitions, rows, and mutex status from + * the source object to the new object. + * + * @param[in] src Pointer to the source SQLite3_result object to copy from. + */ SQLite3_result::SQLite3_result(SQLite3_result *src) { enabled_mutex = false; // default rows_count=0; @@ -538,6 +707,13 @@ SQLite3_result::SQLite3_result(SQLite3_result *src) { } } +/** + * @brief Retrieves the size of the SQLite3_result object. + * + * This function returns the current size (number of rows) of the SQLite3_result object. + * + * @return The size (number of rows) of the result object. + */ unsigned long long SQLite3_result::get_size() { unsigned long long s = sizeof(SQLite3_result); s += column_definition.size() * sizeof(SQLite3_column *); @@ -553,11 +729,31 @@ unsigned long long SQLite3_result::get_size() { return s; } +/** + * @brief Adds a column definition to the SQLite3_result object. + * + * This function creates a new SQLite3_column object with the specified name and type + * and adds it to the column definitions vector of the SQLite3_result object. + * + * @param[in] a The name of the column to add. + * @param[in] b The type of the column to add. + */ void SQLite3_result::add_column_definition(int a, const char *b) { SQLite3_column *cf=new SQLite3_column(a,b); column_definition.push_back(cf); } +/** + * @brief Adds a row to the SQLite3_result object. + * + * This function adds a row to the SQLite3_result object either from the provided + * SQLite statement or from the specified fields array. If the `skip` parameter is set + * to false, a new row is created and added to the rows vector of the SQLite3_result object. + * + * @param[in] stmt The SQLite statement from which to fetch the row data. + * @param[in] skip A boolean indicating whether to skip adding the row (default is false). + * @return An integer representing the result of the operation (SQLITE_ROW on success). + */ int SQLite3_result::add_row(sqlite3_stmt *stmt, bool skip) { int rc=(*proxy_sqlite3_step)(stmt); if (rc!=SQLITE_ROW) return rc; @@ -570,6 +766,15 @@ int SQLite3_result::add_row(sqlite3_stmt *stmt, bool skip) { return SQLITE_ROW; } +/** + * @brief Adds a row to the SQLite3_result object. + * + * This function adds a row to the SQLite3_result object from the provided fields array. + * A new row is created and added to the rows vector of the SQLite3_result object. + * + * @param[in] _fields The array of fields representing the row data. + * @return An integer representing the result of the operation (SQLITE_ROW on success). + */ int SQLite3_result::add_row(char **_fields) { SQLite3_row *row=new SQLite3_row(columns); row->add_fields(_fields); @@ -612,6 +817,15 @@ int SQLite3_result::add_row(const char* _field, ...) { return this->add_row(const_cast(&fields[0])); } +/** + * @brief Adds a row to the SQLite3_result object based on an existing row. + * + * This function creates a new row in the SQLite3_result object and copies the fields + * from the provided existing row to the new row. + * + * @param[in] old_row Pointer to the existing SQLite3_row object from which to copy fields. + * @return int Returns SQLITE_ROW to indicate successful addition of the row. + */ int SQLite3_result::add_row(SQLite3_row *old_row) { SQLite3_row *row=new SQLite3_row(columns); row->add_fields(old_row->fields); @@ -620,6 +834,15 @@ int SQLite3_result::add_row(SQLite3_row *old_row) { return SQLITE_ROW; } +/** + * @brief Constructs a SQLite3_result object based on the result of a SQLite3 statement. + * + * This constructor initializes a SQLite3_result object using the result of a SQLite3 statement. + * It retrieves the column count and column names/types from the statement and adds them as column definitions. + * It then iterates through the result rows, adding each row to the SQLite3_result object. + * + * @param[in] stmt Pointer to the SQLite3 statement. + */ SQLite3_result::SQLite3_result(sqlite3_stmt *stmt) { enabled_mutex = false; // default rows_count=0; @@ -630,6 +853,18 @@ SQLite3_result::SQLite3_result(sqlite3_stmt *stmt) { while (add_row(stmt)==SQLITE_ROW) {}; } +/** + * @brief Constructs a SQLite3_result object based on the result of a SQLite3 statement with pagination support. + * + * This constructor initializes a SQLite3_result object using the result of a SQLite3 statement with pagination support. + * It retrieves the column count and column names/types from the statement and adds them as column definitions. + * It then iterates through the result rows based on the provided offset and limit, adding each row to the SQLite3_result object. + * + * @param[in] stmt Pointer to the SQLite3 statement. + * @param[out] found_rows Pointer to store the total number of found rows. + * @param[in] offset Offset for pagination. + * @param[in] limit Limit for pagination. + */ SQLite3_result::SQLite3_result(sqlite3_stmt *stmt, int * found_rows, unsigned int offset, unsigned int limit) { enabled_mutex = false; // default rows_count=0; @@ -663,6 +898,14 @@ SQLite3_result::SQLite3_result(sqlite3_stmt *stmt, int * found_rows, unsigned in *found_rows = fr; } +/** + * @brief Constructs a SQLite3_result object with a specified number of columns and mutex enablement. + * + * This constructor initializes a SQLite3_result object with a specified number of columns and enables/disables mutex based on the provided flag. + * + * @param[in] num_columns Number of columns. + * @param[in] en_mutex Flag to enable/disable mutex. + */ SQLite3_result::SQLite3_result(int num_columns, bool en_mutex) { rows_count=0; columns=num_columns; @@ -674,6 +917,11 @@ SQLite3_result::SQLite3_result(int num_columns, bool en_mutex) { } } +/** + * @brief Destructor for the SQLite3_result object. + * + * This destructor cleans up memory by deleting allocated column definitions and rows. + */ SQLite3_result::~SQLite3_result() { for (std::vector::iterator it = column_definition.begin() ; it != column_definition.end(); ++it) { SQLite3_column *c=*it; @@ -685,11 +933,25 @@ SQLite3_result::~SQLite3_result() { } } +/** + * @brief Default constructor for the SQLite3_result object. + * + * This constructor initializes a SQLite3_result object with default values. + */ SQLite3_result::SQLite3_result() { enabled_mutex = false; // default columns=0; } +/** + * @brief Loads a SQLite3 plugin. + * + * This function loads a SQLite3 plugin specified by the given plugin_name. + * It initializes function pointers to SQLite3 API functions provided by the plugin. + * If the plugin_name is NULL, it loads the built-in SQLite3 library and initializes function pointers to its API functions. + * + * @param[in] plugin_name The name of the SQLite3 plugin library to load. + */ void SQLite3DB::LoadPlugin(const char *plugin_name) { proxy_sqlite3_config = NULL; proxy_sqlite3_bind_double = NULL;