Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for AWS RDS MySQL Multi-AZ Cluster auto-discovery #4406

Merged
merged 6 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/MySQL_HostGroups_Manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,9 @@ class MySQL_HostGroups_Manager {
void set_server_current_latency_us(char *hostname, int port, unsigned int _current_latency_us);
unsigned long long Get_Memory_Stats();

int add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector<tuple<string, int, int>> new_servers);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason to pass the vector by value, also, because it's never modified, could be easily converted into const vector<tuple<string, int, int>>&.

void rebuild_hostname_hostgroup_mapping();

void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup);
Expand Down
10 changes: 8 additions & 2 deletions include/MySQL_Monitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ struct cmp_str {

#define N_L_ASE 16

#define AWS_ENDPOINT_SUFFIX_STRING "rds.amazonaws.com"
#define QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY "SELECT @@global.read_only read_only, id, endpoint, port from mysql.rds_topology"

/*

Implementation of monitoring in AWS Aurora will be different than previous modules
Expand Down Expand Up @@ -197,7 +200,8 @@ enum MySQL_Monitor_State_Data_Task_Type {
MON_GROUP_REPLICATION,
MON_REPLICATION_LAG,
MON_GALERA,
MON_AWS_AURORA
MON_AWS_AURORA,
MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY
};

enum class MySQL_Monitor_State_Data_Task_Result {
Expand Down Expand Up @@ -229,6 +233,7 @@ class MySQL_Monitor_State_Data {
char *hostname;
int port;
int writer_hostgroup; // used only by group replication
int reader_hostgroup;
bool writer_is_also_reader; // used only by group replication
int max_transactions_behind; // used only by group replication
int max_transactions_behind_count; // used only by group replication
Expand Down Expand Up @@ -442,6 +447,7 @@ class MySQL_Monitor {
static bool update_dns_cache_from_mysql_conn(const MYSQL* mysql);
static void trigger_dns_cache_update();

void process_discovered_topology(const std::string& originating_server_hostname, vector<MYSQL_ROW> discovered_servers, int reader_hostgroup);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if MYSQL_ROW are just pointers, it's a good practice to make the second argument const vector<MYSQL_ROW>&.


private:
std::vector<table_def_t *> *tables_defs_monitor;
Expand Down Expand Up @@ -553,7 +559,7 @@ class MySQL_Monitor {
* Note: Calling init_async is mandatory before executing tasks asynchronously.
*/
void monitor_ping_async(SQLite3_result* resultset);
void monitor_read_only_async(SQLite3_result* resultset);
void monitor_read_only_async(SQLite3_result* resultset, bool do_discovery_check);
void monitor_replication_lag_async(SQLite3_result* resultset);
void monitor_group_replication_async();
void monitor_galera_async();
Expand Down
3 changes: 3 additions & 0 deletions include/MySQL_Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ struct p_th_gauge {
mysql_monitor_ping_interval,
mysql_monitor_ping_timeout,
mysql_monitor_ping_max_failures,
mysql_monitor_aws_rds_topology_discovery_interval,
mysql_monitor_read_only_interval,
mysql_monitor_read_only_timeout,
mysql_monitor_writer_is_also_reader,
Expand Down Expand Up @@ -385,6 +386,8 @@ class MySQL_Threads_Handler
int monitor_ping_max_failures;
//! Monitor ping timeout. Unit: 'ms'.
int monitor_ping_timeout;
//! Monitor aws rds topology discovery interval. Unit: 'one discovery check per X monitor_read_only checks'.
int monitor_aws_rds_topology_discovery_interval;
//! Monitor read only timeout. Unit: 'ms'.
int monitor_read_only_interval;
//! Monitor read only timeout. Unit: 'ms'.
Expand Down
2 changes: 2 additions & 0 deletions include/proxysql_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,7 @@ __thread int mysql_thread___monitor_connect_timeout;
__thread int mysql_thread___monitor_ping_interval;
__thread int mysql_thread___monitor_ping_max_failures;
__thread int mysql_thread___monitor_ping_timeout;
__thread int mysql_thread___monitor_aws_rds_topology_discovery_interval;
__thread int mysql_thread___monitor_read_only_interval;
__thread int mysql_thread___monitor_read_only_timeout;
__thread int mysql_thread___monitor_read_only_max_timeout_count;
Expand Down Expand Up @@ -1073,6 +1074,7 @@ extern __thread int mysql_thread___monitor_connect_timeout;
extern __thread int mysql_thread___monitor_ping_interval;
extern __thread int mysql_thread___monitor_ping_max_failures;
extern __thread int mysql_thread___monitor_ping_timeout;
extern __thread int mysql_thread___monitor_aws_rds_topology_discovery_interval;
extern __thread int mysql_thread___monitor_read_only_interval;
extern __thread int mysql_thread___monitor_read_only_timeout;
extern __thread int mysql_thread___monitor_read_only_max_timeout_count;
Expand Down
2 changes: 2 additions & 0 deletions include/proxysql_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,6 @@ void close_all_non_term_fd(std::vector<int> excludeFDs);
*/
std::pair<int,const char*> get_dollar_quote_error(const char* version);

long parseLong(const char* s);

#endif
114 changes: 114 additions & 0 deletions lib/MySQL_HostGroups_Manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2200,6 +2200,7 @@ bool MySQL_HostGroups_Manager::commit(
// fill Hostgroup_Manager_Mapping with latest records
update_hostgroup_manager_mappings();


ev_async_send(gtid_ev_loop, gtid_ev_async);

__sync_fetch_and_add(&status.servers_table_version,1);
Expand Down Expand Up @@ -8253,6 +8254,119 @@ void MySQL_HostGroups_Manager::HostGroup_Server_Mapping::remove_HGM(MySrvC* srv)
srv->ConnectionsFree->drop_all_connections();
}

/**
* @brief Updates replication hostgroups by adding autodiscovered mysql servers.
* @details Adds each server from 'new_servers' to the 'runtime_mysql_servers' table.
* We then rebuild the 'mysql_servers' table as well as the internal 'hostname_hostgroup_mapping'.
* @param new_servers A vector of tuples where each tuple contains the values needed to add each new server.
*
* @return Returns EXIT_FAILURE code on failure and EXIT_SUCCESS code on success.
*/
int MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector<tuple<string, int, int>> new_servers) {
int exit_code = EXIT_SUCCESS;
bool added_new_server = false;
wrlock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing lock on Admin GloAdmin->mysql_servers_wrlock(), since get_admin_runtime_mysql_servers is used, Admin should be locked, in other places were both locks needs to be taken, we take them together for simplicity. See MySQL_HostGroups_Manager::update_aws_aurora_set_reader.


try {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment when the function is called about error handling.

for (tuple<string, int, int> s : new_servers) {
string host = std::get<0>(s);
uint16_t port = std::get<1>(s);
long int hostgroup_id = std::get<2>(s);

// Add the discovered server with default values
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a newer abstraction that can be used for server placement MySQL_HostGroups_Manager::create_new_server_in_hg. Is there any motivation not to use it? If not, would be preferred using it for this case. If there is a limitation in the interface it may be simpler to fix it that duplicating code. For reference, here it's an usage of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I will make use of the new abstraction. For the third field of srv_info_t struct, the kind field, what should I use? I will default to AWS RDS string, but let me know if it should be something else.

MySrvC* mysrvc = new MySrvC(
const_cast<char*>(host.c_str()), port, 0, 1, MYSQL_SERVER_STATUS_ONLINE, 0, -1, 0, -1, 0, const_cast<char*>("Discovered endpoint")
);
add(mysrvc, hostgroup_id);

proxy_info(
"Adding new discovered server %s:%d with: hostgroup=%d, weight=%ld, max_connections=%ld, use_ssl=%d\n",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following hostgroup=%d should be replaced with hostgroup=%ld as hostgroup_id is a long int, or remove the unnecessary casting into long int from the original int that is received inside the tuple.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this logging statement here as create_new_server_in_hg already has the same logging statement

host.c_str(), port, hostgroup_id, mysrvc->weight, mysrvc->max_connections, mysrvc->use_ssl
);

added_new_server = true;
}

if (added_new_server) {
purge_mysql_servers_table();
mydb->execute("DELETE FROM mysql_servers");
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n");
generate_mysql_servers_table();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block, starting at previous if is missing the update on the global checksums after this line. This section can be taken as an example update_group_replication_add_autodiscovered. Here we can see that the following part is missing:

		// Update the global checksums after 'mysql_servers' regeneration
		{
			unique_ptr<SQLite3_result> resultset { get_admin_runtime_mysql_servers(mydb) };
			string mysrvs_checksum { get_checksum_from_hash(resultset ? resultset->raw_checksum() : 0) };
			save_runtime_mysql_servers(resultset.release());
			proxy_info("Checksum for table %s is %s\n", "mysql_servers", mysrvs_checksum.c_str());

			pthread_mutex_lock(&GloVars.checksum_mutex);
			update_glovars_mysql_servers_checksum(mysrvs_checksum);
			pthread_mutex_unlock(&GloVars.checksum_mutex);
		}

Checksum re-computation is mandatory in this case after re-generating mysql_servers_table. But in this case there are a couple of extra operations that are required:

			// Update the global checksums after 'mysql_servers' regeneration
			{
				unique_ptr<SQLite3_result> resultset { get_admin_runtime_mysql_servers(mydb) };
				string mysrvs_checksum { get_checksum_from_hash(resultset ? resultset->raw_checksum() : 0) };
				save_runtime_mysql_servers(resultset.release());

				// PROPOSAL: Required to update the runtime_mysql_servers checksum with the new checksum, in a
				// similar way it's done during `commit` via `commit_update_checksum_from_mysql_servers`
	    	    uint64_t raw_checksum = this->runtime_mysql_servers ? this->runtime_mysql_servers->raw_checksum() : 0;
	    	    table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] = raw_checksum;

				// PROPOSAL: Required to update with the new 'mysql_group_replication_hostgroups'; this is
				// only required for preserving coherence in the checksums, otherwise they would be
				// inconsistent with `commit` generated checksums. This should be reworked into a function.
				{
					SpookyHash rep_hgs_hash {};
					bool init = false;
					uint64_t servers_v2_hash = table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS_V2];

					if (servers_v2_hash) {
						if (init == false) {
							init = true;
							rep_hgs_hash.Init(19, 3);
						}
				
						rep_hgs_hash.Update(&servers_v2_hash, sizeof(servers_v2_hash));
					}

					CUCFT1(
						rep_hgs_hash, init, "mysql_replication_hostgroups", "writer_hostgroup",
						table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]
					);
				}

				proxy_info("Checksum for table %s is %s\n", "mysql_servers", mysrvs_checksum.c_str());

				pthread_mutex_lock(&GloVars.checksum_mutex);
				update_glovars_mysql_servers_checksum(mysrvs_checksum);
				pthread_mutex_unlock(&GloVars.checksum_mutex);
			}

The comments with PROPOSAL mark the proposed changes to the previously mentioned required block. All this code is a proposal, and has not being tested.

update_table_mysql_servers_for_monitor(false);
rebuild_hostname_hostgroup_mapping();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the previous suggested changes, this function should be replaced by update_hostgroup_manager_mappings. The two functions are identical, only differing in the missing check on hgsm_mysql_servers_checksum in the new rebuild_hostname_hostgroup_mapping function. Updating this checksums is addressed in the previous comment, as it's better for keeping general consistency. If I have not missed anything, I suggest deleting this new function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleting the new rebuild_hostname_hostgroup_mapping and using update_hostgroup_manager_mappings instead

}
} catch (...) {
exit_code = EXIT_FAILURE;
}

wrunlock();
return exit_code;
}

/**
* @brief Rebuilds the 'hostname_hostgroup_mapping'
* @details Rebuilds the internal 'hostname_hostgroup_mapping' assuming new data has been entered
* and calculates new checksums for 'mysql_servers' and 'mysql_replication_hostgroups'.
*/
void MySQL_HostGroups_Manager::rebuild_hostname_hostgroup_mapping() {
proxy_info("Rebuilding 'Hostgroup_Manager_Mapping' due to checksums change - mysql_servers { old: 0x%lX, new: 0x%lX }, mysql_replication_hostgroups { old:0x%lX, new:0x%lX }\n",
hgsm_mysql_servers_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS],
hgsm_mysql_replication_hostgroups_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]);

char* error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result* resultset = NULL;

const char* query = "SELECT DISTINCT hostname, port, '1' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=writer_hostgroup WHERE status<>3 \
UNION \
SELECT DISTINCT hostname, port, '0' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=reader_hostgroup WHERE status<>3 \
ORDER BY hostname, port";

mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset);

hostgroup_server_mapping.clear();

if (resultset && resultset->rows_count) {
std::string fetched_server_id;
HostGroup_Server_Mapping* fetched_server_mapping = NULL;

for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); it++) {
SQLite3_row *r = *it;

const std::string& server_id = std::string(r->fields[0]) + ":::" + r->fields[1];

if (fetched_server_mapping == NULL || server_id != fetched_server_id) {
auto itr = hostgroup_server_mapping.find(server_id);

if (itr == hostgroup_server_mapping.end()) {
std::unique_ptr<HostGroup_Server_Mapping> server_mapping(new HostGroup_Server_Mapping(this));
fetched_server_mapping = server_mapping.get();
hostgroup_server_mapping.insert( std::pair<std::string,std::unique_ptr<MySQL_HostGroups_Manager::HostGroup_Server_Mapping>> {
server_id, std::move(server_mapping)
} );
} else {
fetched_server_mapping = itr->second.get();
}

fetched_server_id = server_id;
}

HostGroup_Server_Mapping::Node node;
node.reader_hostgroup_id = atoi(r->fields[4]);
node.writer_hostgroup_id = atoi(r->fields[5]);
node.srv = reinterpret_cast<MySrvC*>(atoll(r->fields[6]));

HostGroup_Server_Mapping::Type type = (r->fields[2] && r->fields[2][0] == '1') ? HostGroup_Server_Mapping::Type::WRITER : HostGroup_Server_Mapping::Type::READER;
fetched_server_mapping->add(type, node);
}
}
delete resultset;

hgsm_mysql_servers_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS];
hgsm_mysql_replication_hostgroups_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS];
}

MySQLServers_SslParams * MySQL_HostGroups_Manager::get_Server_SSL_Params(char *hostname, int port, char *username) {
string MapKey = string(hostname) + string(rand_del) + to_string(port) + string(rand_del) + string(username);
std::lock_guard<std::mutex> lock(Servers_SSL_Params_map_mutex);
Expand Down
Loading