From 5a3037785ff83ed96592fa6eb3f1df58e98fb930 Mon Sep 17 00:00:00 2001 From: anphucbui Date: Wed, 20 Dec 2023 21:53:57 +0000 Subject: [PATCH 1/4] Add support for AWS RDS MySQL Multi-AZ Cluster auto-discovery --- include/MySQL_HostGroups_Manager.h | 18 +++ include/MySQL_Monitor.hpp | 4 + include/MySQL_Thread.h | 3 + include/proxysql_structs.h | 2 + include/proxysql_utils.h | 2 + lib/MySQL_HostGroups_Manager.cpp | 177 +++++++++++++------- lib/MySQL_Monitor.cpp | 248 +++++++++++++++++++++++++++++ lib/MySQL_Thread.cpp | 11 ++ lib/proxysql_utils.cpp | 20 +++ 9 files changed, 426 insertions(+), 59 deletions(-) diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index 1f5c1d8dcc..ea75310aa6 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -814,6 +814,24 @@ class MySQL_HostGroups_Manager { void set_server_current_latency_us(char *hostname, int port, unsigned int _current_latency_us); unsigned long long Get_Memory_Stats(); + struct serverDetails { + long int hostgroup_id; + string hostname; + uint16_t port; + uint16_t gtid_port; + string status; + int64_t weight; + unsigned int compression; + int64_t max_connections; + unsigned int max_replication_lag; + int32_t use_ssl; + unsigned int max_latency_ms; + string comment; + }; + + int add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector servers_to_add, unordered_map hostname_values_mapping); + void rebuild_hostname_hostgroup_mapping(); + void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error); void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error); void update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup); diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index 25388264f3..1d5eb3bc82 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -56,6 +56,8 @@ struct cmp_str { #define N_L_ASE 16 +#define AWS_ENDPOINT_SUFFIX_STRING "rds.amazonaws.com" + /* Implementation of monitoring in AWS Aurora will be different than previous modules @@ -418,6 +420,8 @@ class MySQL_Monitor { static bool update_dns_cache_from_mysql_conn(const MYSQL* mysql); static void trigger_dns_cache_update(); + vector discover_topology(const char* hostname, int port); + void discover_topology_and_add_to_mysql_servers(); private: std::vector *tables_defs_monitor; diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index e1915a549a..20e39a504d 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -305,6 +305,7 @@ struct p_th_gauge { mysql_monitor_ping_interval, mysql_monitor_ping_timeout, mysql_monitor_ping_max_failures, + mysql_monitor_topology_discovery_interval, mysql_monitor_read_only_interval, mysql_monitor_read_only_timeout, mysql_monitor_writer_is_also_reader, @@ -386,6 +387,8 @@ class MySQL_Threads_Handler int monitor_ping_max_failures; //! Monitor ping timeout. Unit: 'ms'. int monitor_ping_timeout; + //! Monitor topology discovery interval. Unit: 'one discovery check per X monitor_read_only checks'. + int monitor_topology_discovery_interval; //! Monitor read only timeout. Unit: 'ms'. int monitor_read_only_interval; //! Monitor read only timeout. Unit: 'ms'. diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index d2abf393ea..56e55682c1 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -899,6 +899,7 @@ __thread int mysql_thread___monitor_connect_timeout; __thread int mysql_thread___monitor_ping_interval; __thread int mysql_thread___monitor_ping_max_failures; __thread int mysql_thread___monitor_ping_timeout; +__thread int mysql_thread___monitor_topology_discovery_interval; __thread int mysql_thread___monitor_read_only_interval; __thread int mysql_thread___monitor_read_only_timeout; __thread int mysql_thread___monitor_read_only_max_timeout_count; @@ -1068,6 +1069,7 @@ extern __thread int mysql_thread___monitor_connect_timeout; extern __thread int mysql_thread___monitor_ping_interval; extern __thread int mysql_thread___monitor_ping_max_failures; extern __thread int mysql_thread___monitor_ping_timeout; +extern __thread int mysql_thread___monitor_topology_discovery_interval; extern __thread int mysql_thread___monitor_read_only_interval; extern __thread int mysql_thread___monitor_read_only_timeout; extern __thread int mysql_thread___monitor_read_only_max_timeout_count; diff --git a/include/proxysql_utils.h b/include/proxysql_utils.h index ffe3fe1bba..2984648c38 100644 --- a/include/proxysql_utils.h +++ b/include/proxysql_utils.h @@ -218,4 +218,6 @@ void close_all_non_term_fd(std::vector excludeFDs); */ std::pair get_dollar_quote_error(const char* version); +long parseLong(const char* s); + #endif diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 80f3608271..90e8994437 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -1969,65 +1969,7 @@ bool MySQL_HostGroups_Manager::commit( if (hgsm_mysql_servers_checksum != table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] || hgsm_mysql_replication_hostgroups_checksum != table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]) { - proxy_info("Rebuilding 'Hostgroup_Manager_Mapping' due to checksums change - mysql_servers { old: 0x%lX, new: 0x%lX }, mysql_replication_hostgroups { old:0x%lX, new:0x%lX }\n", - hgsm_mysql_servers_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS], - hgsm_mysql_replication_hostgroups_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]); - - char* error = NULL; - int cols = 0; - int affected_rows = 0; - SQLite3_result* resultset = NULL; - - const char* query = "SELECT DISTINCT hostname, port, '1' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=writer_hostgroup WHERE status<>3 \ - UNION \ - SELECT DISTINCT hostname, port, '0' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=reader_hostgroup WHERE status<>3 \ - ORDER BY hostname, port"; - - mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset); - - hostgroup_server_mapping.clear(); - - if (resultset && resultset->rows_count) { - std::string fetched_server_id; - HostGroup_Server_Mapping* fetched_server_mapping = NULL; - - for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { - SQLite3_row* r = *it; - - const std::string& server_id = std::string(r->fields[0]) + ":::" + r->fields[1]; - - if (fetched_server_mapping == NULL || server_id != fetched_server_id) { - - auto itr = hostgroup_server_mapping.find(server_id); - - if (itr == hostgroup_server_mapping.end()) { - std::unique_ptr server_mapping(new HostGroup_Server_Mapping(this)); - fetched_server_mapping = server_mapping.get(); - hostgroup_server_mapping.insert( std::pair> { - server_id, std::move(server_mapping) - } ); - } - else { - fetched_server_mapping = itr->second.get(); - } - - fetched_server_id = server_id; - } - - HostGroup_Server_Mapping::Node node; - //node.server_status = static_cast(atoi(r->fields[3])); - node.reader_hostgroup_id = atoi(r->fields[4]); - node.writer_hostgroup_id = atoi(r->fields[5]); - node.srv = reinterpret_cast(atoll(r->fields[6])); - - HostGroup_Server_Mapping::Type type = (r->fields[2] && r->fields[2][0] == '1') ? HostGroup_Server_Mapping::Type::WRITER : HostGroup_Server_Mapping::Type::READER; - fetched_server_mapping->add(type, node); - } - } - delete resultset; - - hgsm_mysql_servers_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS]; - hgsm_mysql_replication_hostgroups_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]; + rebuild_hostname_hostgroup_mapping(); } ev_async_send(gtid_ev_loop, gtid_ev_async); @@ -7977,3 +7919,120 @@ void MySQL_HostGroups_Manager::HostGroup_Server_Mapping::remove_HGM(MySrvC* srv) srv->status = MYSQL_SERVER_STATUS_OFFLINE_HARD; srv->ConnectionsFree->drop_all_connections(); } + +/** +* @brief Updates replication hostgroups by adding autodiscovered mysql servers. +* @details Adds each server from 'servers_to_add' to the 'runtime_mysql_servers' table. +* We then rebuild the 'mysql_servers' table as well as the internal 'hostname_hostgroup_mapping'. +* @param servers_to_add A vector containing the strings representing the hostnames of servers to add to 'mysql_servers'. +* @param hostname_values_mapping A mapping containing the hostname of the discovered server mapped to the metadata of server from which it was discovered, stored in a 'serverDetails' struct. +* +* @return Returns EXIT_FAILURE code on failure and EXIT_SUCCESS code on success. +*/ +int MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector servers_to_add, unordered_map hostname_values_mapping) { + int exit_code = EXIT_SUCCESS; + wrlock(); + + try { + for (string host : servers_to_add) { + long int hostgroup_id = hostname_values_mapping[host].hostgroup_id; + uint16_t port = hostname_values_mapping[host].port; + + uint16_t gtid_port = hostname_values_mapping[host].gtid_port; + int64_t weight = hostname_values_mapping[host].weight; + unsigned int compression = hostname_values_mapping[host].compression; + int64_t max_connections = hostname_values_mapping[host].max_connections; + unsigned int max_replication_lag = hostname_values_mapping[host].max_replication_lag; + int32_t use_ssl = hostname_values_mapping[host].use_ssl; + unsigned int max_latency_ms = hostname_values_mapping[host].max_latency_ms; + + MySrvC* mysrvc = new MySrvC( + const_cast(host.c_str()), port, gtid_port, weight, MYSQL_SERVER_STATUS_ONLINE, + compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, const_cast("Discovered endpoint") + ); + add(mysrvc, hostgroup_id); + + proxy_info( + "Adding new discovered server %s:%d with: hostgroup=%d, weight=%ld, max_connections=%ld, use_ssl=%d\n", + host.c_str(), port, hostgroup_id, mysrvc->weight, mysrvc->max_connections, mysrvc->use_ssl + ); + } + + purge_mysql_servers_table(); + mydb->execute("DELETE FROM mysql_servers"); + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n"); + generate_mysql_servers_table(); + update_table_mysql_servers_for_monitor(false); + rebuild_hostname_hostgroup_mapping(); + + } catch (...) { + exit_code = EXIT_FAILURE; + } + + wrunlock(); + return exit_code; +} + +/** +* @brief Rebuilds the 'hostname_hostgroup_mapping' +* @details Rebuilds the internal 'hostname_hostgroup_mapping' assuming new data has been entered +* and calculates new checksums for 'mysql_servers' and 'mysql_replication_hostgroups'. +*/ +void MySQL_HostGroups_Manager::rebuild_hostname_hostgroup_mapping() { + proxy_info("Rebuilding 'Hostgroup_Manager_Mapping' due to checksums change - mysql_servers { old: 0x%lX, new: 0x%lX }, mysql_replication_hostgroups { old:0x%lX, new:0x%lX }\n", + hgsm_mysql_servers_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS], + hgsm_mysql_replication_hostgroups_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]); + + char* error = NULL; + int cols = 0; + int affected_rows = 0; + SQLite3_result* resultset = NULL; + + const char* query = "SELECT DISTINCT hostname, port, '1' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=writer_hostgroup WHERE status<>3 \ + UNION \ + SELECT DISTINCT hostname, port, '0' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=reader_hostgroup WHERE status<>3 \ + ORDER BY hostname, port"; + + mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset); + + hostgroup_server_mapping.clear(); + + if (resultset && resultset->rows_count) { + std::string fetched_server_id; + HostGroup_Server_Mapping* fetched_server_mapping = NULL; + + for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); it++) { + SQLite3_row *r = *it; + + const std::string& server_id = std::string(r->fields[0]) + ":::" + r->fields[1]; + + if (fetched_server_mapping == NULL || server_id != fetched_server_id) { + auto itr = hostgroup_server_mapping.find(server_id); + + if (itr == hostgroup_server_mapping.end()) { + std::unique_ptr server_mapping(new HostGroup_Server_Mapping(this)); + fetched_server_mapping = server_mapping.get(); + hostgroup_server_mapping.insert( std::pair> { + server_id, std::move(server_mapping) + } ); + } else { + fetched_server_mapping = itr->second.get(); + } + + fetched_server_id = server_id; + } + + HostGroup_Server_Mapping::Node node; + node.reader_hostgroup_id = atoi(r->fields[4]); + node.writer_hostgroup_id = atoi(r->fields[5]); + node.srv = reinterpret_cast(atoll(r->fields[6])); + + HostGroup_Server_Mapping::Type type = (r->fields[2] && r->fields[2][0] == '1') ? HostGroup_Server_Mapping::Type::WRITER : HostGroup_Server_Mapping::Type::READER; + fetched_server_mapping->add(type, node); + } + } + delete resultset; + + hgsm_mysql_servers_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS]; + hgsm_mysql_replication_hostgroups_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]; +} \ No newline at end of file diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 2ec9822433..536d312dbd 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -3256,6 +3256,240 @@ VALGRIND_ENABLE_ERROR_REPORTING; return ret; } + +/** +* @brief Discovers the topology of a server. +* @details Discovers the topology of the server specified by hostname and port. +* The monitor user must explicitly be granted permissions to view 'mysql.rds_topology'. +* @param hostname Hostname of the server. +* @param port Server port. +* +* @return Returns a vector of 'MYSQL_ROW' objects which contain the discovered servers. +*/ +vector MySQL_Monitor::discover_topology(const char* hostname, int port) { + std::unique_ptr mmsd(new MySQL_Monitor_State_Data(MON_CONNECT, const_cast (hostname), port)); + mmsd->mondb = monitordb; + mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get()); + + unsigned long long start_time = monotonic_time(); + mmsd->t1=start_time; + + bool read_only_success = false; + bool crc = false; + if (mmsd->mysql == NULL) { // we don't have a connection, let's create it + bool rc; + rc = mmsd->create_new_connection(); + if (mmsd->mysql) { + GloMyMon->My_Conn_Pool->conn_register(mmsd.get()); + } + crc = true; + if (rc == false) { + unsigned long long now = monotonic_time(); + char *new_error = (char *) malloc(50 + strlen(mmsd->mysql_error_msg)); + snprintf(new_error, sizeof(mmsd->mysql_error_msg), "timeout on creating new connection: %s", mmsd->mysql_error_msg); + free(mmsd->mysql_error_msg); + mmsd->mysql_error_msg = new_error; + proxy_error("Timeout on discover_topology check for %s:%d after %lldms. Unable to create a connection. If the server is overload, increase mysql-monitor_connect_timeout. Error: %s.\n", mmsd->hostname, mmsd->port, (now-mmsd->t1)/1000, new_error); + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_READ_ONLY_CHECK_CONN_TIMEOUT); + goto __exit_monitor_discover_topology; + } + } + + mmsd->interr = 0; // reset the value + mmsd->async_exit_status = mysql_query_start(&mmsd->interr,mmsd->mysql, "SELECT * from mysql.rds_topology"); + while (mmsd->async_exit_status) { + const unsigned long long now = monotonic_time(); + mmsd->async_exit_status = wait_for_mysql(mmsd->mysql, mmsd->async_exit_status); + + if (now > mmsd->t1 + mysql_thread___monitor_read_only_timeout * 1000) { + mmsd->mysql_error_msg = strdup("timeout check"); + proxy_error("Timeout on discover_topology check for %s:%d after %lldms. If the server is overload, increase mysql-monitor_read_only_timeout.\n", mmsd->hostname, mmsd->port, (now-mmsd->t1)/1000); + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_READ_ONLY_CHECK_TIMEOUT); + goto __exit_monitor_discover_topology; + } + + if (mmsd->interr) { + // error during query + mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql)); + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); + goto __exit_monitor_discover_topology; + } + + if ((mmsd->async_exit_status & MYSQL_WAIT_TIMEOUT) == 0) { + mmsd->async_exit_status = mysql_query_cont(&mmsd->interr, mmsd->mysql, mmsd->async_exit_status); + } + } + + if (mmsd->interr) { + // error during query + mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql)); + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); + goto __exit_monitor_discover_topology; + } + + mmsd->async_exit_status = mysql_store_result_start(&mmsd->result,mmsd->mysql); + while (mmsd->async_exit_status && ((mmsd->async_exit_status & MYSQL_WAIT_TIMEOUT) == 0)) { + mmsd->async_exit_status = wait_for_mysql(mmsd->mysql, mmsd->async_exit_status); + const unsigned long long now = monotonic_time(); + + if (now > mmsd->t1 + mysql_thread___monitor_read_only_timeout * 1000) { + mmsd->mysql_error_msg = strdup("timeout check"); + proxy_error("Timeout on discover_topology check for %s:%d after %lldms. If the server is overload, increase mysql-monitor_read_only_timeout.\n", mmsd->hostname, mmsd->port, (now-mmsd->t1)/1000); + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_READ_ONLY_CHECK_TIMEOUT); + goto __exit_monitor_discover_topology; + } + + if ((mmsd->async_exit_status & MYSQL_WAIT_TIMEOUT) == 0) { + mmsd->async_exit_status = mysql_store_result_cont(&mmsd->result, mmsd->mysql, mmsd->async_exit_status); + } + } + + if (mmsd->interr) { // ping failed + mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql)); + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); + } + +__exit_monitor_discover_topology: + if (mmsd->mysql) { + // if we reached here we didn't put the connection back + if (mmsd->mysql_error_msg) { + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); + GloMyMon->My_Conn_Pool->conn_unregister(mmsd.get()); + mysql_close(mmsd->mysql); // if we reached here we should destroy it + mmsd->mysql = NULL; + } else { + if (crc) { + bool rc = mmsd->set_wait_timeout(); + if (rc) { + GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname, mmsd->port, mmsd->mysql); + } else { + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); + GloMyMon->My_Conn_Pool->conn_unregister(mmsd.get()); + mysql_close(mmsd->mysql); // set_wait_timeout failed + } + mmsd->mysql = NULL; + } else { // really not sure how we reached here, drop it + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); + GloMyMon->My_Conn_Pool->conn_unregister(mmsd.get()); + mysql_close(mmsd->mysql); + mmsd->mysql = NULL; + } + } + } + + // Process the output of the query, if any + vector discovered_rows; + if (mmsd->result) { + MYSQL_FIELD *fields = mysql_fetch_fields(mmsd->result); + int num_fields = mysql_num_fields(mmsd->result); + + for (int i = 0; i < num_fields; i++) { + MYSQL_ROW curr_row = mysql_fetch_row(mmsd->result); + string discovered_hostname = curr_row[1]; + string discovered_port = curr_row[2]; + + if (strcmp(hostname, curr_row[1]) != 0) { + discovered_rows.push_back(curr_row); + } + } + + mysql_free_result(mmsd->result); + mmsd->result = NULL; + } else { + proxy_info("Unable to query for topology.\n"); + } + + return discovered_rows; +} + +/** +* @brief Discovers the topology of a server and adds the discovered servers to 'mysql_servers'. +* @details Helper method which calls the 'discover_topology' method as well as +* 'MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups' in order to discover topology +* and then add it to 'mysql_servers'. +*/ +void MySQL_Monitor::discover_topology_and_add_to_mysql_servers() { + char *error = NULL; + int cols = 0; + int affected_rows = 0; + SQLite3_result *runtime_mysql_servers = NULL; + + char *query=(char *)"SELECT hostgroup_id,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment FROM main.runtime_mysql_servers ORDER BY hostgroup_id, hostname, port"; + proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); + admindb->execute_statement(query, &error , &cols , &affected_rows , &runtime_mysql_servers); + + if (error) { + proxy_error("Error on %s : %s\n", query, error); + } else { + set existing_servers; + vector servers_to_add; + unordered_map hostname_values_mapping; + + // Do an initial loop through query results to keep track of existing server hostnames + for (std::vector::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) { + SQLite3_row *r1 = *it; + + string current_hostname = r1->fields[1]; + if (std::find(existing_servers.begin(), existing_servers.end(), current_hostname) == existing_servers.end()) { + existing_servers.insert(current_hostname); + } + } + + // Discover topology for each server in runtime_mysql_servers that have an aws endpoint + for (std::vector::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) { + SQLite3_row *r1 = *it; + + long int hostgroup = parseLong(r1->fields[0]); + string current_hostname = r1->fields[1]; + long int port = parseLong(r1->fields[2]); + + if (current_hostname.find(AWS_ENDPOINT_SUFFIX_STRING) != std::string::npos) { + vector discovered_servers = GloMyMon->discover_topology(current_hostname.c_str(), port); + + if (!discovered_servers.empty()) { + for (MYSQL_ROW s: discovered_servers) { + vector value_vector; + string discovered_id = s[0]; + string discovered_hostname = s[1]; + string discovered_port = s[2]; + + // Add discovered servers that don't already exist in runtime_mysql_servers + if (std::find(existing_servers.begin(), existing_servers.end(), discovered_hostname) == existing_servers.end()) { + servers_to_add.push_back(discovered_hostname); + + MySQL_HostGroups_Manager::serverDetails original_server_values = { + parseLong(r1->fields[0]), // hostgroup_id + r1->fields[1], // hostname + parseLong(discovered_port.c_str()), // port, use from topology discovery instead of from originating server + parseLong(r1->fields[3]), // gtid_port + r1->fields[4], // status, but not using it + parseLong(r1->fields[5]), // weight + parseLong(r1->fields[6]), // compression + parseLong(r1->fields[7]), // max_connections + parseLong(r1->fields[8]), // max_replication_lag + parseLong(r1->fields[9]), // use_ssl + parseLong(r1->fields[10]), // max_latency_ms + r1->fields[11] // comment, but not using it + }; + + hostname_values_mapping[discovered_hostname] = original_server_values; + } + } + } + } + } + if (!servers_to_add.empty()) { + int successfully_added_all_servers = MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(servers_to_add, hostname_values_mapping); + + if (successfully_added_all_servers == EXIT_FAILURE) { + proxy_info("Inserting auto-discovered servers failed.\n"); + } else { + proxy_info("Inserting auto-discovered servers succeeded.\n"); + } + } + } +} + void * MySQL_Monitor::monitor_read_only() { mysql_close(mysql_init(NULL)); // initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it) @@ -3269,6 +3503,8 @@ void * MySQL_Monitor::monitor_read_only() { unsigned long long t1; unsigned long long t2; unsigned long long next_loop_at=0; + int topology_loop = 0; + int topology_loop_max = mysql_thread___monitor_topology_discovery_interval; while (GloMyMon->shutdown==false && mysql_thread___monitor_enabled==true) { @@ -3287,6 +3523,18 @@ void * MySQL_Monitor::monitor_read_only() { next_loop_at=0; } + if (topology_loop >= topology_loop_max) { + try { + discover_topology_and_add_to_mysql_servers(); + topology_loop = 0; + } catch (std::runtime_error &e) { + proxy_error("Error during topology auto-discovery: %s\n", e.what()); + } catch (...) { + proxy_error("Unknown error during topology auto-discovery.\n"); + } + } + topology_loop += 1; + if (t1 < next_loop_at) { goto __sleep_monitor_read_only; } diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 945ebe332f..1efbbfb191 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -307,6 +307,7 @@ static char * mysql_thread_variables_names[]= { (char *)"monitor_ping_interval", (char *)"monitor_ping_max_failures", (char *)"monitor_ping_timeout", + (char *)"monitor_topology_discovery_interval", (char *)"monitor_read_only_interval", (char *)"monitor_read_only_timeout", (char *)"monitor_read_only_max_timeout_count", @@ -822,6 +823,12 @@ th_metrics_map = std::make_tuple( "Reached maximum ping attempts from monitor.", metric_tags {} ), + std::make_tuple ( + p_th_gauge::mysql_monitor_topology_discovery_interval, + "proxysql_mysql_monitor_topology_discovery_interval", + "How frequently a topology discovery is performed, e.g. a value of 500 means one topology discovery every 500 read-only checks ", + metric_tags {} + ), std::make_tuple ( p_th_gauge::mysql_monitor_read_only_interval, "proxysql_mysql_monitor_read_only_interval_seconds", @@ -912,6 +919,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.monitor_ping_interval=8000; variables.monitor_ping_max_failures=3; variables.monitor_ping_timeout=1000; + variables.monitor_topology_discovery_interval=1000; variables.monitor_read_only_interval=1000; variables.monitor_read_only_timeout=800; variables.monitor_read_only_max_timeout_count=3; @@ -2021,6 +2029,7 @@ char ** MySQL_Threads_Handler::get_variables_list() { VariablesPointers_int["monitor_ping_timeout"] = make_tuple(&variables.monitor_ping_timeout, 100, 600*1000, false); VariablesPointers_int["monitor_ping_max_failures"] = make_tuple(&variables.monitor_ping_max_failures, 1, 1000*1000, false); + VariablesPointers_int["monitor_topology_discovery_interval"] = make_tuple(&variables.monitor_topology_discovery_interval, 1, 100000, false); VariablesPointers_int["monitor_read_only_interval"] = make_tuple(&variables.monitor_read_only_interval, 100, 7*24*3600*1000, false); VariablesPointers_int["monitor_read_only_timeout"] = make_tuple(&variables.monitor_read_only_timeout, 100, 600*1000, false); VariablesPointers_int["monitor_read_only_max_timeout_count"] = make_tuple(&variables.monitor_read_only_max_timeout_count, 1, 1000*1000, false); @@ -3922,6 +3931,7 @@ void MySQL_Thread::refresh_variables() { mysql_thread___monitor_ping_interval=GloMTH->get_variable_int((char *)"monitor_ping_interval"); mysql_thread___monitor_ping_max_failures=GloMTH->get_variable_int((char *)"monitor_ping_max_failures"); mysql_thread___monitor_ping_timeout=GloMTH->get_variable_int((char *)"monitor_ping_timeout"); + mysql_thread___monitor_topology_discovery_interval=GloMTH->get_variable_int((char *)"monitor_topology_discovery_interval"); mysql_thread___monitor_read_only_interval=GloMTH->get_variable_int((char *)"monitor_read_only_interval"); mysql_thread___monitor_read_only_timeout=GloMTH->get_variable_int((char *)"monitor_read_only_timeout"); mysql_thread___monitor_read_only_max_timeout_count=GloMTH->get_variable_int((char *)"monitor_read_only_max_timeout_count"); @@ -5201,6 +5211,7 @@ void MySQL_Threads_Handler::p_update_metrics() { this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_enabled]->Set(this->variables.monitor_enabled); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_ping_timeout]->Set(this->variables.monitor_ping_timeout/1000.0); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_ping_max_failures]->Set(this->variables.monitor_ping_max_failures); + this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_topology_discovery_interval]->Set(this->variables.monitor_topology_discovery_interval); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_read_only_interval]->Set(this->variables.monitor_read_only_interval/1000.0); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_read_only_timeout]->Set(this->variables.monitor_read_only_timeout/1000.0); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_writer_is_also_reader]->Set(this->variables.monitor_writer_is_also_reader); diff --git a/lib/proxysql_utils.cpp b/lib/proxysql_utils.cpp index 5475afb459..965858c1d6 100644 --- a/lib/proxysql_utils.cpp +++ b/lib/proxysql_utils.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -451,3 +452,22 @@ std::pair get_dollar_quote_error(const char* version) { } } } + +/** +* @brief Parses a string into a long. +* @details Parses a string into a long, with error checks. Throws an exception if parse fails. +* @param s The string to parse. +* +* @return The parsed value of the string as a long. +*/ +long parseLong(const char* s) { + errno = 0; + char *temp; + long val = strtol(s, &temp, 0); + + if (temp == s || *temp != '\0' || ((val == LONG_MIN || val == LONG_MAX) && errno == ERANGE)) { + throw std::runtime_error("Could not parse long."); + } + + return val; +} From bb35b207192f2da253b300e8dbd0a65f79d27df9 Mon Sep 17 00:00:00 2001 From: anphucbui Date: Fri, 2 Feb 2024 16:46:48 +0000 Subject: [PATCH 2/4] Moving core discovery logic to fit into read_only framework as a new task type --- include/MySQL_HostGroups_Manager.h | 4 +- include/MySQL_Monitor.hpp | 9 +- include/MySQL_Thread.h | 6 +- include/proxysql_structs.h | 4 +- lib/MySQL_HostGroups_Manager.cpp | 54 +++-- lib/MySQL_Monitor.cpp | 331 ++++++++++------------------- lib/MySQL_Thread.cpp | 14 +- 7 files changed, 165 insertions(+), 257 deletions(-) diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index ea75310aa6..c83b449618 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -816,7 +816,7 @@ class MySQL_HostGroups_Manager { struct serverDetails { long int hostgroup_id; - string hostname; + string originating_hostname; uint16_t port; uint16_t gtid_port; string status; @@ -829,7 +829,7 @@ class MySQL_HostGroups_Manager { string comment; }; - int add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector servers_to_add, unordered_map hostname_values_mapping); + int add_discovered_servers_to_mysql_servers_and_replication_hostgroups(unordered_map new_server_values_mapping); void rebuild_hostname_hostgroup_mapping(); void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error); diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index 1d5eb3bc82..667fc26ddd 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -57,6 +57,7 @@ struct cmp_str { #define N_L_ASE 16 #define AWS_ENDPOINT_SUFFIX_STRING "rds.amazonaws.com" +#define QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY "SELECT @@global.read_only read_only, id, endpoint, port from mysql.rds_topology" /* @@ -191,7 +192,8 @@ enum MySQL_Monitor_State_Data_Task_Type { MON_GROUP_REPLICATION, MON_REPLICATION_LAG, MON_GALERA, - MON_AWS_AURORA + MON_AWS_AURORA, + MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY }; enum class MySQL_Monitor_State_Data_Task_Result { @@ -420,8 +422,7 @@ class MySQL_Monitor { static bool update_dns_cache_from_mysql_conn(const MYSQL* mysql); static void trigger_dns_cache_update(); - vector discover_topology(const char* hostname, int port); - void discover_topology_and_add_to_mysql_servers(); + void process_discovered_topology(const std::string& originating_server_hostname, vector discovered_servers); private: std::vector *tables_defs_monitor; @@ -533,7 +534,7 @@ class MySQL_Monitor { * Note: Calling init_async is mandatory before executing tasks asynchronously. */ void monitor_ping_async(SQLite3_result* resultset); - void monitor_read_only_async(SQLite3_result* resultset); + void monitor_read_only_async(SQLite3_result* resultset, bool do_discovery_check); void monitor_replication_lag_async(SQLite3_result* resultset); void monitor_group_replication_async(); void monitor_galera_async(); diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 20e39a504d..66d8f740d5 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -305,7 +305,7 @@ struct p_th_gauge { mysql_monitor_ping_interval, mysql_monitor_ping_timeout, mysql_monitor_ping_max_failures, - mysql_monitor_topology_discovery_interval, + mysql_monitor_aws_rds_topology_discovery_interval, mysql_monitor_read_only_interval, mysql_monitor_read_only_timeout, mysql_monitor_writer_is_also_reader, @@ -387,8 +387,8 @@ class MySQL_Threads_Handler int monitor_ping_max_failures; //! Monitor ping timeout. Unit: 'ms'. int monitor_ping_timeout; - //! Monitor topology discovery interval. Unit: 'one discovery check per X monitor_read_only checks'. - int monitor_topology_discovery_interval; + //! Monitor aws rds topology discovery interval. Unit: 'one discovery check per X monitor_read_only checks'. + int monitor_aws_rds_topology_discovery_interval; //! Monitor read only timeout. Unit: 'ms'. int monitor_read_only_interval; //! Monitor read only timeout. Unit: 'ms'. diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 56e55682c1..81c72d65ec 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -899,7 +899,7 @@ __thread int mysql_thread___monitor_connect_timeout; __thread int mysql_thread___monitor_ping_interval; __thread int mysql_thread___monitor_ping_max_failures; __thread int mysql_thread___monitor_ping_timeout; -__thread int mysql_thread___monitor_topology_discovery_interval; +__thread int mysql_thread___monitor_aws_rds_topology_discovery_interval; __thread int mysql_thread___monitor_read_only_interval; __thread int mysql_thread___monitor_read_only_timeout; __thread int mysql_thread___monitor_read_only_max_timeout_count; @@ -1069,7 +1069,7 @@ extern __thread int mysql_thread___monitor_connect_timeout; extern __thread int mysql_thread___monitor_ping_interval; extern __thread int mysql_thread___monitor_ping_max_failures; extern __thread int mysql_thread___monitor_ping_timeout; -extern __thread int mysql_thread___monitor_topology_discovery_interval; +extern __thread int mysql_thread___monitor_aws_rds_topology_discovery_interval; extern __thread int mysql_thread___monitor_read_only_interval; extern __thread int mysql_thread___monitor_read_only_timeout; extern __thread int mysql_thread___monitor_read_only_max_timeout_count; diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 90e8994437..51a46548e9 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -7922,29 +7922,36 @@ void MySQL_HostGroups_Manager::HostGroup_Server_Mapping::remove_HGM(MySrvC* srv) /** * @brief Updates replication hostgroups by adding autodiscovered mysql servers. -* @details Adds each server from 'servers_to_add' to the 'runtime_mysql_servers' table. +* @details Adds each server from 'new_server_values_mapping' to the 'runtime_mysql_servers' table. * We then rebuild the 'mysql_servers' table as well as the internal 'hostname_hostgroup_mapping'. -* @param servers_to_add A vector containing the strings representing the hostnames of servers to add to 'mysql_servers'. -* @param hostname_values_mapping A mapping containing the hostname of the discovered server mapped to the metadata of server from which it was discovered, stored in a 'serverDetails' struct. +* @param new_server_values_mapping A mapping containing the hostname of the discovered server mapped to the metadata of server from which it was discovered, stored in a 'serverDetails' struct. * * @return Returns EXIT_FAILURE code on failure and EXIT_SUCCESS code on success. */ -int MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector servers_to_add, unordered_map hostname_values_mapping) { +int MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(unordered_map new_server_values_mapping) { int exit_code = EXIT_SUCCESS; + bool added_new_server = false; wrlock(); try { - for (string host : servers_to_add) { - long int hostgroup_id = hostname_values_mapping[host].hostgroup_id; - uint16_t port = hostname_values_mapping[host].port; - - uint16_t gtid_port = hostname_values_mapping[host].gtid_port; - int64_t weight = hostname_values_mapping[host].weight; - unsigned int compression = hostname_values_mapping[host].compression; - int64_t max_connections = hostname_values_mapping[host].max_connections; - unsigned int max_replication_lag = hostname_values_mapping[host].max_replication_lag; - int32_t use_ssl = hostname_values_mapping[host].use_ssl; - unsigned int max_latency_ms = hostname_values_mapping[host].max_latency_ms; + for (const auto &s : new_server_values_mapping) { + if (new_server_values_mapping.find(s.first) == new_server_values_mapping.end()) { + continue; + } + + string host = s.first; + MySQL_HostGroups_Manager::serverDetails new_server_values = new_server_values_mapping[host]; + + long int hostgroup_id = new_server_values.hostgroup_id; + uint16_t port = new_server_values.port; + + uint16_t gtid_port = new_server_values.gtid_port; + int64_t weight = new_server_values.weight; + unsigned int compression = new_server_values.compression; + int64_t max_connections = new_server_values.max_connections; + unsigned int max_replication_lag = new_server_values.max_replication_lag; + int32_t use_ssl = new_server_values.use_ssl; + unsigned int max_latency_ms = new_server_values.max_latency_ms; MySrvC* mysrvc = new MySrvC( const_cast(host.c_str()), port, gtid_port, weight, MYSQL_SERVER_STATUS_ONLINE, @@ -7956,15 +7963,18 @@ int MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replic "Adding new discovered server %s:%d with: hostgroup=%d, weight=%ld, max_connections=%ld, use_ssl=%d\n", host.c_str(), port, hostgroup_id, mysrvc->weight, mysrvc->max_connections, mysrvc->use_ssl ); - } - purge_mysql_servers_table(); - mydb->execute("DELETE FROM mysql_servers"); - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n"); - generate_mysql_servers_table(); - update_table_mysql_servers_for_monitor(false); - rebuild_hostname_hostgroup_mapping(); + added_new_server = true; + } + if (added_new_server) { + purge_mysql_servers_table(); + mydb->execute("DELETE FROM mysql_servers"); + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n"); + generate_mysql_servers_table(); + update_table_mysql_servers_for_monitor(false); + rebuild_hostname_hostgroup_mapping(); + } } catch (...) { exit_code = EXIT_FAILURE; } diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 536d312dbd..79c6988d97 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -592,6 +592,12 @@ void MySQL_Monitor_State_Data::init_async() { task_timeout_ = mysql_thread___monitor_read_only_timeout; task_handler_ = &MySQL_Monitor_State_Data::read_only_handler; break; + case MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY: + query_ = QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY; + async_state_machine_ = ASYNC_QUERY_START; + task_timeout_ = mysql_thread___monitor_read_only_timeout; + task_handler_ = &MySQL_Monitor_State_Data::read_only_handler; + break; #else // TEST_READONLY case MON_READ_ONLY: case MON_INNODB_READ_ONLY: @@ -1597,6 +1603,8 @@ void * monitor_read_only_thread(void *arg) { mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only&@@global.innodb_read_only read_only"); } else if (mmsd->get_task_type() == MON_READ_ONLY__OR__INNODB_READ_ONLY) { mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only|@@global.innodb_read_only read_only"); + } else if (mmsd->get_task_type() == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) { + mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql, QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY); } else { // default mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only read_only"); } @@ -3256,159 +3264,14 @@ VALGRIND_ENABLE_ERROR_REPORTING; return ret; } - -/** -* @brief Discovers the topology of a server. -* @details Discovers the topology of the server specified by hostname and port. -* The monitor user must explicitly be granted permissions to view 'mysql.rds_topology'. -* @param hostname Hostname of the server. -* @param port Server port. -* -* @return Returns a vector of 'MYSQL_ROW' objects which contain the discovered servers. -*/ -vector MySQL_Monitor::discover_topology(const char* hostname, int port) { - std::unique_ptr mmsd(new MySQL_Monitor_State_Data(MON_CONNECT, const_cast (hostname), port)); - mmsd->mondb = monitordb; - mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get()); - - unsigned long long start_time = monotonic_time(); - mmsd->t1=start_time; - - bool read_only_success = false; - bool crc = false; - if (mmsd->mysql == NULL) { // we don't have a connection, let's create it - bool rc; - rc = mmsd->create_new_connection(); - if (mmsd->mysql) { - GloMyMon->My_Conn_Pool->conn_register(mmsd.get()); - } - crc = true; - if (rc == false) { - unsigned long long now = monotonic_time(); - char *new_error = (char *) malloc(50 + strlen(mmsd->mysql_error_msg)); - snprintf(new_error, sizeof(mmsd->mysql_error_msg), "timeout on creating new connection: %s", mmsd->mysql_error_msg); - free(mmsd->mysql_error_msg); - mmsd->mysql_error_msg = new_error; - proxy_error("Timeout on discover_topology check for %s:%d after %lldms. Unable to create a connection. If the server is overload, increase mysql-monitor_connect_timeout. Error: %s.\n", mmsd->hostname, mmsd->port, (now-mmsd->t1)/1000, new_error); - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_READ_ONLY_CHECK_CONN_TIMEOUT); - goto __exit_monitor_discover_topology; - } - } - - mmsd->interr = 0; // reset the value - mmsd->async_exit_status = mysql_query_start(&mmsd->interr,mmsd->mysql, "SELECT * from mysql.rds_topology"); - while (mmsd->async_exit_status) { - const unsigned long long now = monotonic_time(); - mmsd->async_exit_status = wait_for_mysql(mmsd->mysql, mmsd->async_exit_status); - - if (now > mmsd->t1 + mysql_thread___monitor_read_only_timeout * 1000) { - mmsd->mysql_error_msg = strdup("timeout check"); - proxy_error("Timeout on discover_topology check for %s:%d after %lldms. If the server is overload, increase mysql-monitor_read_only_timeout.\n", mmsd->hostname, mmsd->port, (now-mmsd->t1)/1000); - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_READ_ONLY_CHECK_TIMEOUT); - goto __exit_monitor_discover_topology; - } - - if (mmsd->interr) { - // error during query - mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql)); - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); - goto __exit_monitor_discover_topology; - } - - if ((mmsd->async_exit_status & MYSQL_WAIT_TIMEOUT) == 0) { - mmsd->async_exit_status = mysql_query_cont(&mmsd->interr, mmsd->mysql, mmsd->async_exit_status); - } - } - - if (mmsd->interr) { - // error during query - mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql)); - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); - goto __exit_monitor_discover_topology; - } - - mmsd->async_exit_status = mysql_store_result_start(&mmsd->result,mmsd->mysql); - while (mmsd->async_exit_status && ((mmsd->async_exit_status & MYSQL_WAIT_TIMEOUT) == 0)) { - mmsd->async_exit_status = wait_for_mysql(mmsd->mysql, mmsd->async_exit_status); - const unsigned long long now = monotonic_time(); - - if (now > mmsd->t1 + mysql_thread___monitor_read_only_timeout * 1000) { - mmsd->mysql_error_msg = strdup("timeout check"); - proxy_error("Timeout on discover_topology check for %s:%d after %lldms. If the server is overload, increase mysql-monitor_read_only_timeout.\n", mmsd->hostname, mmsd->port, (now-mmsd->t1)/1000); - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_READ_ONLY_CHECK_TIMEOUT); - goto __exit_monitor_discover_topology; - } - - if ((mmsd->async_exit_status & MYSQL_WAIT_TIMEOUT) == 0) { - mmsd->async_exit_status = mysql_store_result_cont(&mmsd->result, mmsd->mysql, mmsd->async_exit_status); - } - } - - if (mmsd->interr) { // ping failed - mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql)); - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); - } - -__exit_monitor_discover_topology: - if (mmsd->mysql) { - // if we reached here we didn't put the connection back - if (mmsd->mysql_error_msg) { - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); - GloMyMon->My_Conn_Pool->conn_unregister(mmsd.get()); - mysql_close(mmsd->mysql); // if we reached here we should destroy it - mmsd->mysql = NULL; - } else { - if (crc) { - bool rc = mmsd->set_wait_timeout(); - if (rc) { - GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname, mmsd->port, mmsd->mysql); - } else { - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); - GloMyMon->My_Conn_Pool->conn_unregister(mmsd.get()); - mysql_close(mmsd->mysql); // set_wait_timeout failed - } - mmsd->mysql = NULL; - } else { // really not sure how we reached here, drop it - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); - GloMyMon->My_Conn_Pool->conn_unregister(mmsd.get()); - mysql_close(mmsd->mysql); - mmsd->mysql = NULL; - } - } - } - - // Process the output of the query, if any - vector discovered_rows; - if (mmsd->result) { - MYSQL_FIELD *fields = mysql_fetch_fields(mmsd->result); - int num_fields = mysql_num_fields(mmsd->result); - - for (int i = 0; i < num_fields; i++) { - MYSQL_ROW curr_row = mysql_fetch_row(mmsd->result); - string discovered_hostname = curr_row[1]; - string discovered_port = curr_row[2]; - - if (strcmp(hostname, curr_row[1]) != 0) { - discovered_rows.push_back(curr_row); - } - } - - mysql_free_result(mmsd->result); - mmsd->result = NULL; - } else { - proxy_info("Unable to query for topology.\n"); - } - - return discovered_rows; -} - /** -* @brief Discovers the topology of a server and adds the discovered servers to 'mysql_servers'. -* @details Helper method which calls the 'discover_topology' method as well as -* 'MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups' in order to discover topology -* and then add it to 'mysql_servers'. +* @brief Processes the discovered servers to eventually add them to 'runtime_mysql_servers'. +* @details This method takes a vector of discovered servers, compares them against the existing servers, and adds the new servers to 'runtime_mysql_servers' with the +* values from their originating server. +* @param originating_server_hostname A string which denotes the hostname of the originating server, from which the discovered servers were queried and found. +* @param discovered_servers A vector of servers discovered when querying the cluster's topology. */ -void MySQL_Monitor::discover_topology_and_add_to_mysql_servers() { +void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, vector discovered_servers) { char *error = NULL; int cols = 0; int affected_rows = 0; @@ -3421,65 +3284,59 @@ void MySQL_Monitor::discover_topology_and_add_to_mysql_servers() { if (error) { proxy_error("Error on %s : %s\n", query, error); } else { - set existing_servers; - vector servers_to_add; - unordered_map hostname_values_mapping; + set existing_runtime_servers; + unordered_map new_server_values_mapping; - // Do an initial loop through query results to keep track of existing server hostnames + // Do an initial loop through the query results to keep track of existing runtime server hostnames for (std::vector::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) { SQLite3_row *r1 = *it; - string current_hostname = r1->fields[1]; - if (std::find(existing_servers.begin(), existing_servers.end(), current_hostname) == existing_servers.end()) { - existing_servers.insert(current_hostname); - } - } - - // Discover topology for each server in runtime_mysql_servers that have an aws endpoint - for (std::vector::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) { - SQLite3_row *r1 = *it; - - long int hostgroup = parseLong(r1->fields[0]); - string current_hostname = r1->fields[1]; - long int port = parseLong(r1->fields[2]); - - if (current_hostname.find(AWS_ENDPOINT_SUFFIX_STRING) != std::string::npos) { - vector discovered_servers = GloMyMon->discover_topology(current_hostname.c_str(), port); - - if (!discovered_servers.empty()) { - for (MYSQL_ROW s: discovered_servers) { - vector value_vector; - string discovered_id = s[0]; - string discovered_hostname = s[1]; - string discovered_port = s[2]; - - // Add discovered servers that don't already exist in runtime_mysql_servers - if (std::find(existing_servers.begin(), existing_servers.end(), discovered_hostname) == existing_servers.end()) { - servers_to_add.push_back(discovered_hostname); - - MySQL_HostGroups_Manager::serverDetails original_server_values = { - parseLong(r1->fields[0]), // hostgroup_id - r1->fields[1], // hostname - parseLong(discovered_port.c_str()), // port, use from topology discovery instead of from originating server - parseLong(r1->fields[3]), // gtid_port - r1->fields[4], // status, but not using it - parseLong(r1->fields[5]), // weight - parseLong(r1->fields[6]), // compression - parseLong(r1->fields[7]), // max_connections - parseLong(r1->fields[8]), // max_replication_lag - parseLong(r1->fields[9]), // use_ssl - parseLong(r1->fields[10]), // max_latency_ms - r1->fields[11] // comment, but not using it - }; - - hostname_values_mapping[discovered_hostname] = original_server_values; - } + string current_runtime_hostname = r1->fields[1]; + if (std::find(existing_runtime_servers.begin(), existing_runtime_servers.end(), current_runtime_hostname) == existing_runtime_servers.end()) { + existing_runtime_servers.insert(current_runtime_hostname); + } + } + + // Loop through discovered servers and process the ones we plan to add + for (MYSQL_ROW s : discovered_servers) { + string current_discovered_id = s[1]; + string current_discovered_hostname = s[2]; + string current_discovered_port = s[3]; + + // We only add the discovered server if it is not the originating server and it does not already exist in 'runtime_mysql_servers' and it is not already saved to be added + bool already_exists = std::find(existing_runtime_servers.begin(), existing_runtime_servers.end(), current_discovered_hostname) != existing_runtime_servers.end(); + bool already_saved = new_server_values_mapping.find(current_discovered_hostname) != new_server_values_mapping.end(); + if (current_discovered_hostname != originating_server_hostname && !already_exists && !already_saved) { + // Search for the originating server's values and store it + for (std::vector::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) { + SQLite3_row *r1 = *it; + + string current_runtime_hostname = r1->fields[1]; + if (current_runtime_hostname == originating_server_hostname) { + MySQL_HostGroups_Manager::serverDetails originating_server_values; + + originating_server_values.hostgroup_id = parseLong(r1->fields[0]); + originating_server_values.originating_hostname = current_runtime_hostname; + originating_server_values.port = parseLong(current_discovered_port.c_str()); + originating_server_values.gtid_port = parseLong(r1->fields[3]); + originating_server_values.status = r1->fields[4]; // not used + originating_server_values.weight = parseLong(r1->fields[5]); + originating_server_values.compression = parseLong(r1->fields[6]); + originating_server_values.max_connections = parseLong(r1->fields[7]); + originating_server_values.max_replication_lag = parseLong(r1->fields[8]); + originating_server_values.use_ssl = parseLong(r1->fields[9]); + originating_server_values.max_latency_ms = parseLong(r1->fields[10]); + originating_server_values.comment = r1->fields[11]; // not used + + new_server_values_mapping[current_discovered_hostname] = originating_server_values; } } } } - if (!servers_to_add.empty()) { - int successfully_added_all_servers = MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(servers_to_add, hostname_values_mapping); + + // Add the new servers if any + if (!new_server_values_mapping.empty()) { + int successfully_added_all_servers = MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_server_values_mapping); if (successfully_added_all_servers == EXIT_FAILURE) { proxy_info("Inserting auto-discovered servers failed.\n"); @@ -3504,9 +3361,10 @@ void * MySQL_Monitor::monitor_read_only() { unsigned long long t2; unsigned long long next_loop_at=0; int topology_loop = 0; - int topology_loop_max = mysql_thread___monitor_topology_discovery_interval; + int topology_loop_max = mysql_thread___monitor_aws_rds_topology_discovery_interval; while (GloMyMon->shutdown==false && mysql_thread___monitor_enabled==true) { + bool do_discovery_check = false; unsigned int glover; char *error=NULL; @@ -3523,17 +3381,6 @@ void * MySQL_Monitor::monitor_read_only() { next_loop_at=0; } - if (topology_loop >= topology_loop_max) { - try { - discover_topology_and_add_to_mysql_servers(); - topology_loop = 0; - } catch (std::runtime_error &e) { - proxy_error("Error during topology auto-discovery: %s\n", e.what()); - } catch (...) { - proxy_error("Unknown error during topology auto-discovery.\n"); - } - } - topology_loop += 1; if (t1 < next_loop_at) { goto __sleep_monitor_read_only; @@ -3551,8 +3398,14 @@ void * MySQL_Monitor::monitor_read_only() { goto __end_monitor_read_only_loop; } + if (topology_loop >= topology_loop_max) { + do_discovery_check = true; + topology_loop = 0; + } + topology_loop += 1; + // resultset must be initialized before calling monitor_read_only_async - monitor_read_only_async(resultset); + monitor_read_only_async(resultset, do_discovery_check); if (shutdown) return NULL; __end_monitor_read_only_loop: @@ -7417,7 +7270,7 @@ bool MySQL_Monitor::monitor_read_only_process_ready_tasks(const std::vector mysql_servers; for (auto& mmsd : mmsds) { - + string originating_server_hostname = mmsd->hostname; const auto task_result = mmsd->get_task_result(); assert(task_result != MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_PENDING); @@ -7487,6 +7340,44 @@ VALGRIND_ENABLE_ERROR_REPORTING; } rc = (*proxy_sqlite3_bind_int64)(statement, 5, read_only); ASSERT_SQLITE_OK(rc, mmsd->mondb); + } else if (fields && mmsd->get_task_type() == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) { + // Process the read_only field as above and store the first server + vector discovered_servers; + for (k = 0; k < num_fields; k++) { + if (strcmp((char*)"read_only", (char*)fields[k].name) == 0) { + j = k; + } + } + if (j > -1) { + MYSQL_ROW row = mysql_fetch_row(mmsd->result); + if (row) { + discovered_servers.push_back(row); +VALGRIND_DISABLE_ERROR_REPORTING; + if (row[j]) { + if (!strcmp(row[j], "0") || !strcasecmp(row[j], "OFF")) + read_only = 0; + } +VALGRIND_ENABLE_ERROR_REPORTING; + } + } + + // Store the remaining servers + int num_rows = mysql_num_rows(mmsd->result); + for (int i = 1; i < num_rows; i++) { + MYSQL_ROW row = mysql_fetch_row(mmsd->result); + discovered_servers.push_back(row); + } + + // Process the discovered servers and add them to 'runtime_mysql_servers' + if (!discovered_servers.empty()) { + try { + process_discovered_topology(originating_server_hostname, discovered_servers); + } catch (std::runtime_error &e) { + proxy_error("Error during topology auto-discovery: %s\n", e.what()); + } catch (...) { + proxy_error("Unknown error during topology auto-discovery.\n"); + } + } } else { proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port); rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb); @@ -7547,7 +7438,7 @@ VALGRIND_ENABLE_ERROR_REPORTING; return true; } -void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset) { +void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_discovery_check) { assert(resultset); std::vector> mmsds; @@ -7570,6 +7461,12 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset) { } else if (strcasecmp(r->fields[3], (char*)"read_only|innodb_read_only") == 0) { task_type = MON_READ_ONLY__OR__INNODB_READ_ONLY; } + + // Change task type if it's time to do discovery check. Only for aws rds endpoints + string hostname = r->fields[0]; + if (do_discovery_check && hostname.find(AWS_ENDPOINT_SUFFIX_STRING) != std::string::npos) { + task_type = MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY; + } } std::unique_ptr mmsd( diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 1efbbfb191..544414d436 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -307,7 +307,7 @@ static char * mysql_thread_variables_names[]= { (char *)"monitor_ping_interval", (char *)"monitor_ping_max_failures", (char *)"monitor_ping_timeout", - (char *)"monitor_topology_discovery_interval", + (char *)"monitor_aws_rds_topology_discovery_interval", (char *)"monitor_read_only_interval", (char *)"monitor_read_only_timeout", (char *)"monitor_read_only_max_timeout_count", @@ -824,8 +824,8 @@ th_metrics_map = std::make_tuple( metric_tags {} ), std::make_tuple ( - p_th_gauge::mysql_monitor_topology_discovery_interval, - "proxysql_mysql_monitor_topology_discovery_interval", + p_th_gauge::mysql_monitor_aws_rds_topology_discovery_interval, + "proxysql_mysql_monitor_aws_rds_topology_discovery_interval", "How frequently a topology discovery is performed, e.g. a value of 500 means one topology discovery every 500 read-only checks ", metric_tags {} ), @@ -919,7 +919,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.monitor_ping_interval=8000; variables.monitor_ping_max_failures=3; variables.monitor_ping_timeout=1000; - variables.monitor_topology_discovery_interval=1000; + variables.monitor_aws_rds_topology_discovery_interval=1000; variables.monitor_read_only_interval=1000; variables.monitor_read_only_timeout=800; variables.monitor_read_only_max_timeout_count=3; @@ -2029,7 +2029,7 @@ char ** MySQL_Threads_Handler::get_variables_list() { VariablesPointers_int["monitor_ping_timeout"] = make_tuple(&variables.monitor_ping_timeout, 100, 600*1000, false); VariablesPointers_int["monitor_ping_max_failures"] = make_tuple(&variables.monitor_ping_max_failures, 1, 1000*1000, false); - VariablesPointers_int["monitor_topology_discovery_interval"] = make_tuple(&variables.monitor_topology_discovery_interval, 1, 100000, false); + VariablesPointers_int["monitor_aws_rds_topology_discovery_interval"] = make_tuple(&variables.monitor_aws_rds_topology_discovery_interval, 1, 100000, false); VariablesPointers_int["monitor_read_only_interval"] = make_tuple(&variables.monitor_read_only_interval, 100, 7*24*3600*1000, false); VariablesPointers_int["monitor_read_only_timeout"] = make_tuple(&variables.monitor_read_only_timeout, 100, 600*1000, false); VariablesPointers_int["monitor_read_only_max_timeout_count"] = make_tuple(&variables.monitor_read_only_max_timeout_count, 1, 1000*1000, false); @@ -3931,7 +3931,7 @@ void MySQL_Thread::refresh_variables() { mysql_thread___monitor_ping_interval=GloMTH->get_variable_int((char *)"monitor_ping_interval"); mysql_thread___monitor_ping_max_failures=GloMTH->get_variable_int((char *)"monitor_ping_max_failures"); mysql_thread___monitor_ping_timeout=GloMTH->get_variable_int((char *)"monitor_ping_timeout"); - mysql_thread___monitor_topology_discovery_interval=GloMTH->get_variable_int((char *)"monitor_topology_discovery_interval"); + mysql_thread___monitor_aws_rds_topology_discovery_interval=GloMTH->get_variable_int((char *)"monitor_aws_rds_topology_discovery_interval"); mysql_thread___monitor_read_only_interval=GloMTH->get_variable_int((char *)"monitor_read_only_interval"); mysql_thread___monitor_read_only_timeout=GloMTH->get_variable_int((char *)"monitor_read_only_timeout"); mysql_thread___monitor_read_only_max_timeout_count=GloMTH->get_variable_int((char *)"monitor_read_only_max_timeout_count"); @@ -5211,7 +5211,7 @@ void MySQL_Threads_Handler::p_update_metrics() { this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_enabled]->Set(this->variables.monitor_enabled); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_ping_timeout]->Set(this->variables.monitor_ping_timeout/1000.0); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_ping_max_failures]->Set(this->variables.monitor_ping_max_failures); - this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_topology_discovery_interval]->Set(this->variables.monitor_topology_discovery_interval); + this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_aws_rds_topology_discovery_interval]->Set(this->variables.monitor_aws_rds_topology_discovery_interval); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_read_only_interval]->Set(this->variables.monitor_read_only_interval/1000.0); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_read_only_timeout]->Set(this->variables.monitor_read_only_timeout/1000.0); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_writer_is_also_reader]->Set(this->variables.monitor_writer_is_also_reader); From d04173b0267f2943f6a3397723c33e3031c10dd6 Mon Sep 17 00:00:00 2001 From: anphucbui Date: Mon, 4 Mar 2024 06:36:21 +0000 Subject: [PATCH 3/4] Update discovery logic to add discovered servers with default values instead of originating server's values, add new field in mmsd for reader hostgroup, and query monitor db instead of admin db --- include/MySQL_HostGroups_Manager.h | 17 +------- include/MySQL_Monitor.hpp | 3 +- lib/MySQL_HostGroups_Manager.cpp | 32 ++++---------- lib/MySQL_Monitor.cpp | 67 ++++++++++-------------------- 4 files changed, 33 insertions(+), 86 deletions(-) diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index 67d19c25da..d0f05e4d8d 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -1019,22 +1019,7 @@ class MySQL_HostGroups_Manager { void set_server_current_latency_us(char *hostname, int port, unsigned int _current_latency_us); unsigned long long Get_Memory_Stats(); - struct serverDetails { - long int hostgroup_id; - string originating_hostname; - uint16_t port; - uint16_t gtid_port; - string status; - int64_t weight; - unsigned int compression; - int64_t max_connections; - unsigned int max_replication_lag; - int32_t use_ssl; - unsigned int max_latency_ms; - string comment; - }; - - int add_discovered_servers_to_mysql_servers_and_replication_hostgroups(unordered_map new_server_values_mapping); + int add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector> new_servers); void rebuild_hostname_hostgroup_mapping(); void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error); diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index c6c1dc73c4..bf6cf49dfa 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -233,6 +233,7 @@ class MySQL_Monitor_State_Data { char *hostname; int port; int writer_hostgroup; // used only by group replication + int reader_hostgroup; bool writer_is_also_reader; // used only by group replication int max_transactions_behind; // used only by group replication int max_transactions_behind_count; // used only by group replication @@ -446,7 +447,7 @@ class MySQL_Monitor { static bool update_dns_cache_from_mysql_conn(const MYSQL* mysql); static void trigger_dns_cache_update(); - void process_discovered_topology(const std::string& originating_server_hostname, vector discovered_servers); + void process_discovered_topology(const std::string& originating_server_hostname, vector discovered_servers, int reader_hostgroup); private: std::vector *tables_defs_monitor; diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 8b4d0092ef..ed24738fcf 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -8168,40 +8168,26 @@ void MySQL_HostGroups_Manager::HostGroup_Server_Mapping::remove_HGM(MySrvC* srv) /** * @brief Updates replication hostgroups by adding autodiscovered mysql servers. -* @details Adds each server from 'new_server_values_mapping' to the 'runtime_mysql_servers' table. +* @details Adds each server from 'new_servers' to the 'runtime_mysql_servers' table. * We then rebuild the 'mysql_servers' table as well as the internal 'hostname_hostgroup_mapping'. -* @param new_server_values_mapping A mapping containing the hostname of the discovered server mapped to the metadata of server from which it was discovered, stored in a 'serverDetails' struct. +* @param new_servers A vector of tuples where each tuple contains the values needed to add each new server. * * @return Returns EXIT_FAILURE code on failure and EXIT_SUCCESS code on success. */ -int MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(unordered_map new_server_values_mapping) { +int MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector> new_servers) { int exit_code = EXIT_SUCCESS; bool added_new_server = false; wrlock(); try { - for (const auto &s : new_server_values_mapping) { - if (new_server_values_mapping.find(s.first) == new_server_values_mapping.end()) { - continue; - } - - string host = s.first; - MySQL_HostGroups_Manager::serverDetails new_server_values = new_server_values_mapping[host]; - - long int hostgroup_id = new_server_values.hostgroup_id; - uint16_t port = new_server_values.port; - - uint16_t gtid_port = new_server_values.gtid_port; - int64_t weight = new_server_values.weight; - unsigned int compression = new_server_values.compression; - int64_t max_connections = new_server_values.max_connections; - unsigned int max_replication_lag = new_server_values.max_replication_lag; - int32_t use_ssl = new_server_values.use_ssl; - unsigned int max_latency_ms = new_server_values.max_latency_ms; + for (tuple s : new_servers) { + string host = std::get<0>(s); + uint16_t port = std::get<1>(s); + long int hostgroup_id = std::get<2>(s); + // Add the discovered server with default values MySrvC* mysrvc = new MySrvC( - const_cast(host.c_str()), port, gtid_port, weight, MYSQL_SERVER_STATUS_ONLINE, - compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, const_cast("Discovered endpoint") + const_cast(host.c_str()), port, 0, 1, MYSQL_SERVER_STATUS_ONLINE, 0, -1, 0, -1, 0, const_cast("Discovered endpoint") ); add(mysrvc, hostgroup_id); diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index c28bea5dbc..c57119541f 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -3290,77 +3290,51 @@ VALGRIND_ENABLE_ERROR_REPORTING; /** * @brief Processes the discovered servers to eventually add them to 'runtime_mysql_servers'. -* @details This method takes a vector of discovered servers, compares them against the existing servers, and adds the new servers to 'runtime_mysql_servers' with the -* values from their originating server. +* @details This method takes a vector of discovered servers, compares them against the existing servers, and adds the new servers to 'runtime_mysql_servers'. * @param originating_server_hostname A string which denotes the hostname of the originating server, from which the discovered servers were queried and found. * @param discovered_servers A vector of servers discovered when querying the cluster's topology. +* @param reader_hostgroup Reader hostgroup to which we will add the discovered servers. */ -void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, vector discovered_servers) { +void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, vector discovered_servers, int reader_hostgroup) { char *error = NULL; int cols = 0; int affected_rows = 0; SQLite3_result *runtime_mysql_servers = NULL; - char *query=(char *)"SELECT hostgroup_id,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment FROM main.runtime_mysql_servers ORDER BY hostgroup_id, hostname, port"; + char *query=(char *)"SELECT DISTINCT hostname FROM monitor_internal.mysql_servers ORDER BY hostname"; proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); - admindb->execute_statement(query, &error , &cols , &affected_rows , &runtime_mysql_servers); + monitordb->execute_statement(query, &error, &cols, &affected_rows, &runtime_mysql_servers); if (error) { proxy_error("Error on %s : %s\n", query, error); } else { - set existing_runtime_servers; - unordered_map new_server_values_mapping; + vector> new_servers; + vector saved_hostnames; + saved_hostnames.push_back(originating_server_hostname); - // Do an initial loop through the query results to keep track of existing runtime server hostnames + // Do an initial loop through the query results to save existing runtime server hostnames for (std::vector::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) { SQLite3_row *r1 = *it; + string current_runtime_hostname = r1->fields[0]; - string current_runtime_hostname = r1->fields[1]; - if (std::find(existing_runtime_servers.begin(), existing_runtime_servers.end(), current_runtime_hostname) == existing_runtime_servers.end()) { - existing_runtime_servers.insert(current_runtime_hostname); - } + saved_hostnames.push_back(current_runtime_hostname); } - // Loop through discovered servers and process the ones we plan to add + // Loop through discovered servers and process the ones we haven't saved yet for (MYSQL_ROW s : discovered_servers) { - string current_discovered_id = s[1]; string current_discovered_hostname = s[2]; string current_discovered_port = s[3]; - // We only add the discovered server if it is not the originating server and it does not already exist in 'runtime_mysql_servers' and it is not already saved to be added - bool already_exists = std::find(existing_runtime_servers.begin(), existing_runtime_servers.end(), current_discovered_hostname) != existing_runtime_servers.end(); - bool already_saved = new_server_values_mapping.find(current_discovered_hostname) != new_server_values_mapping.end(); - if (current_discovered_hostname != originating_server_hostname && !already_exists && !already_saved) { - // Search for the originating server's values and store it - for (std::vector::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) { - SQLite3_row *r1 = *it; - - string current_runtime_hostname = r1->fields[1]; - if (current_runtime_hostname == originating_server_hostname) { - MySQL_HostGroups_Manager::serverDetails originating_server_values; - - originating_server_values.hostgroup_id = parseLong(r1->fields[0]); - originating_server_values.originating_hostname = current_runtime_hostname; - originating_server_values.port = parseLong(current_discovered_port.c_str()); - originating_server_values.gtid_port = parseLong(r1->fields[3]); - originating_server_values.status = r1->fields[4]; // not used - originating_server_values.weight = parseLong(r1->fields[5]); - originating_server_values.compression = parseLong(r1->fields[6]); - originating_server_values.max_connections = parseLong(r1->fields[7]); - originating_server_values.max_replication_lag = parseLong(r1->fields[8]); - originating_server_values.use_ssl = parseLong(r1->fields[9]); - originating_server_values.max_latency_ms = parseLong(r1->fields[10]); - originating_server_values.comment = r1->fields[11]; // not used - - new_server_values_mapping[current_discovered_hostname] = originating_server_values; - } - } + if (find(saved_hostnames.begin(), saved_hostnames.end(), current_discovered_hostname) == saved_hostnames.end()) { + tuple new_server(current_discovered_hostname, parseLong(current_discovered_port.c_str()), reader_hostgroup); + new_servers.push_back(new_server); + saved_hostnames.push_back(current_discovered_hostname); } } // Add the new servers if any - if (!new_server_values_mapping.empty()) { - int successfully_added_all_servers = MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_server_values_mapping); + if (!new_servers.empty()) { + int successfully_added_all_servers = MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_servers); if (successfully_added_all_servers == EXIT_FAILURE) { proxy_info("Inserting auto-discovered servers failed.\n"); @@ -3394,7 +3368,7 @@ void * MySQL_Monitor::monitor_read_only() { char *error=NULL; SQLite3_result *resultset=NULL; // add support for SSL - char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl, check_type FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE status NOT IN (2,3) GROUP BY hostname, port ORDER BY RANDOM()"; + char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl, check_type, reader_hostgroup FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE status NOT IN (2,3) GROUP BY hostname, port ORDER BY RANDOM()"; t1=monotonic_time(); if (!GloMTH) return NULL; // quick exit during shutdown/restart @@ -7399,7 +7373,7 @@ VALGRIND_ENABLE_ERROR_REPORTING; // Process the discovered servers and add them to 'runtime_mysql_servers' if (!discovered_servers.empty()) { try { - process_discovered_topology(originating_server_hostname, discovered_servers); + process_discovered_topology(originating_server_hostname, discovered_servers, mmsd->reader_hostgroup); } catch (std::runtime_error &e) { proxy_error("Error during topology auto-discovery: %s\n", e.what()); } catch (...) { @@ -7500,6 +7474,7 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_d std::unique_ptr mmsd( new MySQL_Monitor_State_Data(task_type, r->fields[0], atoi(r->fields[1]), atoi(r->fields[2]))); + mmsd->reader_hostgroup = atoi(r->fields[4]); // set reader_hostgroup mmsd->mondb = monitordb; mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get()); From bbb6176ebfdfe46b97a5b86997fe21b852258a5c Mon Sep 17 00:00:00 2001 From: anphucbui Date: Fri, 29 Mar 2024 23:48:18 +0000 Subject: [PATCH 4/4] Remove redundant exception guards, remove parseLong, add checksum computation after adding a new server, and other small changes based on feedback --- include/MySQL_HostGroups_Manager.h | 3 +- include/MySQL_Monitor.hpp | 2 +- include/proxysql_utils.h | 2 - lib/MySQL_HostGroups_Manager.cpp | 173 +++++++++++------------------ lib/MySQL_Monitor.cpp | 30 +++-- lib/proxysql_utils.cpp | 19 ---- 6 files changed, 81 insertions(+), 148 deletions(-) diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index 7f3e65b074..df011be89b 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -1092,8 +1092,7 @@ class MySQL_HostGroups_Manager { void set_server_current_latency_us(char *hostname, int port, unsigned int _current_latency_us); unsigned long long Get_Memory_Stats(); - int add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector> new_servers); - void rebuild_hostname_hostgroup_mapping(); + void add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector>& new_servers); void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error); void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error); diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index bf6cf49dfa..4116ac3898 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -447,7 +447,7 @@ class MySQL_Monitor { static bool update_dns_cache_from_mysql_conn(const MYSQL* mysql); static void trigger_dns_cache_update(); - void process_discovered_topology(const std::string& originating_server_hostname, vector discovered_servers, int reader_hostgroup); + void process_discovered_topology(const std::string& originating_server_hostname, const vector& discovered_servers, int reader_hostgroup); private: std::vector *tables_defs_monitor; diff --git a/include/proxysql_utils.h b/include/proxysql_utils.h index 96b3c372c6..2e127e57c4 100644 --- a/include/proxysql_utils.h +++ b/include/proxysql_utils.h @@ -258,6 +258,4 @@ void close_all_non_term_fd(std::vector excludeFDs); */ std::pair get_dollar_quote_error(const char* version); -long parseLong(const char* s); - #endif diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 925168bea4..14a842a3a8 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -8254,133 +8254,92 @@ void MySQL_HostGroups_Manager::HostGroup_Server_Mapping::remove_HGM(MySrvC* srv) srv->ConnectionsFree->drop_all_connections(); } +MySQLServers_SslParams * MySQL_HostGroups_Manager::get_Server_SSL_Params(char *hostname, int port, char *username) { + string MapKey = string(hostname) + string(rand_del) + to_string(port) + string(rand_del) + string(username); + std::lock_guard lock(Servers_SSL_Params_map_mutex); + auto it = Servers_SSL_Params_map.find(MapKey); + if (it != Servers_SSL_Params_map.end()) { + MySQLServers_SslParams * MSSP = new MySQLServers_SslParams(it->second); + return MSSP; + } else { + MapKey = string(hostname) + string(rand_del) + to_string(port) + string(rand_del) + ""; // search for empty username + it = Servers_SSL_Params_map.find(MapKey); + if (it != Servers_SSL_Params_map.end()) { + MySQLServers_SslParams * MSSP = new MySQLServers_SslParams(it->second); + return MSSP; + } + } + return NULL; +} + /** * @brief Updates replication hostgroups by adding autodiscovered mysql servers. * @details Adds each server from 'new_servers' to the 'runtime_mysql_servers' table. * We then rebuild the 'mysql_servers' table as well as the internal 'hostname_hostgroup_mapping'. * @param new_servers A vector of tuples where each tuple contains the values needed to add each new server. -* -* @return Returns EXIT_FAILURE code on failure and EXIT_SUCCESS code on success. */ -int MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector> new_servers) { - int exit_code = EXIT_SUCCESS; - bool added_new_server = false; +void MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector>& new_servers) { + int added_new_server; wrlock(); - try { - for (tuple s : new_servers) { - string host = std::get<0>(s); - uint16_t port = std::get<1>(s); - long int hostgroup_id = std::get<2>(s); - - // Add the discovered server with default values - MySrvC* mysrvc = new MySrvC( - const_cast(host.c_str()), port, 0, 1, MYSQL_SERVER_STATUS_ONLINE, 0, -1, 0, -1, 0, const_cast("Discovered endpoint") - ); - add(mysrvc, hostgroup_id); - - proxy_info( - "Adding new discovered server %s:%d with: hostgroup=%d, weight=%ld, max_connections=%ld, use_ssl=%d\n", - host.c_str(), port, hostgroup_id, mysrvc->weight, mysrvc->max_connections, mysrvc->use_ssl - ); - - added_new_server = true; - } + // Add the discovered server with default values + for (tuple s : new_servers) { + string host = std::get<0>(s); + uint16_t port = std::get<1>(s); + long int hostgroup_id = std::get<2>(s); + + srv_info_t srv_info { host.c_str(), port, "AWS RDS" }; + srv_opts_t srv_opts { -1, -1, -1 }; - if (added_new_server) { - purge_mysql_servers_table(); - mydb->execute("DELETE FROM mysql_servers"); - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n"); - generate_mysql_servers_table(); - update_table_mysql_servers_for_monitor(false); - rebuild_hostname_hostgroup_mapping(); - } - } catch (...) { - exit_code = EXIT_FAILURE; + added_new_server = create_new_server_in_hg(hostgroup_id, srv_info, srv_opts); } - wrunlock(); - return exit_code; -} - -/** -* @brief Rebuilds the 'hostname_hostgroup_mapping' -* @details Rebuilds the internal 'hostname_hostgroup_mapping' assuming new data has been entered -* and calculates new checksums for 'mysql_servers' and 'mysql_replication_hostgroups'. -*/ -void MySQL_HostGroups_Manager::rebuild_hostname_hostgroup_mapping() { - proxy_info("Rebuilding 'Hostgroup_Manager_Mapping' due to checksums change - mysql_servers { old: 0x%lX, new: 0x%lX }, mysql_replication_hostgroups { old:0x%lX, new:0x%lX }\n", - hgsm_mysql_servers_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS], - hgsm_mysql_replication_hostgroups_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]); - - char* error = NULL; - int cols = 0; - int affected_rows = 0; - SQLite3_result* resultset = NULL; - - const char* query = "SELECT DISTINCT hostname, port, '1' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=writer_hostgroup WHERE status<>3 \ - UNION \ - SELECT DISTINCT hostname, port, '0' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=reader_hostgroup WHERE status<>3 \ - ORDER BY hostname, port"; - - mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset); - - hostgroup_server_mapping.clear(); - - if (resultset && resultset->rows_count) { - std::string fetched_server_id; - HostGroup_Server_Mapping* fetched_server_mapping = NULL; + // If servers were added, perform necessary updates to internal structures + if (added_new_server > -1) { + purge_mysql_servers_table(); + mydb->execute("DELETE FROM mysql_servers"); + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n"); + generate_mysql_servers_table(); - for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); it++) { - SQLite3_row *r = *it; + // Update the global checksums after 'mysql_servers' regeneration + { + unique_ptr resultset { get_admin_runtime_mysql_servers(mydb) }; + string mysrvs_checksum { get_checksum_from_hash(resultset ? resultset->raw_checksum() : 0) }; + save_runtime_mysql_servers(resultset.release()); - const std::string& server_id = std::string(r->fields[0]) + ":::" + r->fields[1]; + // Update the runtime_mysql_servers checksum with the new checksum + uint64_t raw_checksum = this->runtime_mysql_servers ? this->runtime_mysql_servers->raw_checksum() : 0; + table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] = raw_checksum; - if (fetched_server_mapping == NULL || server_id != fetched_server_id) { - auto itr = hostgroup_server_mapping.find(server_id); + // This is required for preserving coherence in the checksums, otherwise they would be inconsistent with `commit` generated checksums + SpookyHash rep_hgs_hash {}; + bool init = false; + uint64_t servers_v2_hash = table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS_V2]; - if (itr == hostgroup_server_mapping.end()) { - std::unique_ptr server_mapping(new HostGroup_Server_Mapping(this)); - fetched_server_mapping = server_mapping.get(); - hostgroup_server_mapping.insert( std::pair> { - server_id, std::move(server_mapping) - } ); - } else { - fetched_server_mapping = itr->second.get(); + if (servers_v2_hash) { + if (init == false) { + init = true; + rep_hgs_hash.Init(19, 3); } - - fetched_server_id = server_id; + + rep_hgs_hash.Update(&servers_v2_hash, sizeof(servers_v2_hash)); } - HostGroup_Server_Mapping::Node node; - node.reader_hostgroup_id = atoi(r->fields[4]); - node.writer_hostgroup_id = atoi(r->fields[5]); - node.srv = reinterpret_cast(atoll(r->fields[6])); - - HostGroup_Server_Mapping::Type type = (r->fields[2] && r->fields[2][0] == '1') ? HostGroup_Server_Mapping::Type::WRITER : HostGroup_Server_Mapping::Type::READER; - fetched_server_mapping->add(type, node); - } - } - delete resultset; + CUCFT1( + rep_hgs_hash, init, "mysql_replication_hostgroups", "writer_hostgroup", + table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS] + ); - hgsm_mysql_servers_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS]; - hgsm_mysql_replication_hostgroups_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]; -} + proxy_info("Checksum for table %s is %s\n", "mysql_servers", mysrvs_checksum.c_str()); -MySQLServers_SslParams * MySQL_HostGroups_Manager::get_Server_SSL_Params(char *hostname, int port, char *username) { - string MapKey = string(hostname) + string(rand_del) + to_string(port) + string(rand_del) + string(username); - std::lock_guard lock(Servers_SSL_Params_map_mutex); - auto it = Servers_SSL_Params_map.find(MapKey); - if (it != Servers_SSL_Params_map.end()) { - MySQLServers_SslParams * MSSP = new MySQLServers_SslParams(it->second); - return MSSP; - } else { - MapKey = string(hostname) + string(rand_del) + to_string(port) + string(rand_del) + ""; // search for empty username - it = Servers_SSL_Params_map.find(MapKey); - if (it != Servers_SSL_Params_map.end()) { - MySQLServers_SslParams * MSSP = new MySQLServers_SslParams(it->second); - return MSSP; + pthread_mutex_lock(&GloVars.checksum_mutex); + update_glovars_mysql_servers_checksum(mysrvs_checksum); + pthread_mutex_unlock(&GloVars.checksum_mutex); } + + update_table_mysql_servers_for_monitor(false); + update_hostgroup_manager_mappings(); } - return NULL; + + wrunlock(); } diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index c57119541f..cda93d1e0f 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -3295,7 +3295,7 @@ VALGRIND_ENABLE_ERROR_REPORTING; * @param discovered_servers A vector of servers discovered when querying the cluster's topology. * @param reader_hostgroup Reader hostgroup to which we will add the discovered servers. */ -void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, vector discovered_servers, int reader_hostgroup) { +void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, const vector& discovered_servers, int reader_hostgroup) { char *error = NULL; int cols = 0; int affected_rows = 0; @@ -3323,10 +3323,18 @@ void MySQL_Monitor::process_discovered_topology(const std::string& originating_s // Loop through discovered servers and process the ones we haven't saved yet for (MYSQL_ROW s : discovered_servers) { string current_discovered_hostname = s[2]; - string current_discovered_port = s[3]; + string current_discovered_port_string = s[3]; + int current_discovered_port_int; + + try { + current_discovered_port_int = stoi(s[3]); + } catch (...) { + proxy_error("Unable to parse the port value during topology discovery: [%s]. Terminating discovery early.", current_discovered_port_string); + return; + } if (find(saved_hostnames.begin(), saved_hostnames.end(), current_discovered_hostname) == saved_hostnames.end()) { - tuple new_server(current_discovered_hostname, parseLong(current_discovered_port.c_str()), reader_hostgroup); + tuple new_server(current_discovered_hostname, current_discovered_port_int, reader_hostgroup); new_servers.push_back(new_server); saved_hostnames.push_back(current_discovered_hostname); } @@ -3334,13 +3342,7 @@ void MySQL_Monitor::process_discovered_topology(const std::string& originating_s // Add the new servers if any if (!new_servers.empty()) { - int successfully_added_all_servers = MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_servers); - - if (successfully_added_all_servers == EXIT_FAILURE) { - proxy_info("Inserting auto-discovered servers failed.\n"); - } else { - proxy_info("Inserting auto-discovered servers succeeded.\n"); - } + MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_servers); } } } @@ -7372,13 +7374,7 @@ VALGRIND_ENABLE_ERROR_REPORTING; // Process the discovered servers and add them to 'runtime_mysql_servers' if (!discovered_servers.empty()) { - try { - process_discovered_topology(originating_server_hostname, discovered_servers, mmsd->reader_hostgroup); - } catch (std::runtime_error &e) { - proxy_error("Error during topology auto-discovery: %s\n", e.what()); - } catch (...) { - proxy_error("Unknown error during topology auto-discovery.\n"); - } + process_discovered_topology(originating_server_hostname, discovered_servers, mmsd->reader_hostgroup); } } else { proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port); diff --git a/lib/proxysql_utils.cpp b/lib/proxysql_utils.cpp index 9e977abde1..fe30562ed7 100644 --- a/lib/proxysql_utils.cpp +++ b/lib/proxysql_utils.cpp @@ -504,22 +504,3 @@ std::pair get_dollar_quote_error(const char* version) { } } } - -/** -* @brief Parses a string into a long. -* @details Parses a string into a long, with error checks. Throws an exception if parse fails. -* @param s The string to parse. -* -* @return The parsed value of the string as a long. -*/ -long parseLong(const char* s) { - errno = 0; - char *temp; - long val = strtol(s, &temp, 0); - - if (temp == s || *temp != '\0' || ((val == LONG_MIN || val == LONG_MAX) && errno == ERANGE)) { - throw std::runtime_error("Could not parse long."); - } - - return val; -}