Skip to content

Commit

Permalink
Merge branch 'v2.7' into v2.x-logging_mem
Browse files Browse the repository at this point in the history
  • Loading branch information
renecannao committed Nov 12, 2024
2 parents 124fb28 + 0f1262f commit 737f3a1
Show file tree
Hide file tree
Showing 12 changed files with 961 additions and 21 deletions.
1 change: 1 addition & 0 deletions deps/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ endif
# patches for replication testing
cd mariadb-client-library/mariadb_client && patch -p0 < ../mariadb_rpl.patch
cd mariadb-client-library/mariadb_client && patch -p0 < ../cmakelists.txt.patch
cd mariadb-client-library/mariadb_client && patch -p0 < ../mariadb_lib.c.metadata_column_check.patch
cd mariadb-client-library/mariadb_client && CC=${CC} CXX=${CXX} ${MAKE} mariadbclient
# cd mariadb-client-library/mariadb_client/include && make my_config.h

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
diff --git libmariadb/mariadb_lib.c libmariadb/mariadb_lib.c
index 027167f1..58b8283a 100644
--- libmariadb/mariadb_lib.c
+++ libmariadb/mariadb_lib.c
@@ -3021,6 +3021,12 @@ MYSQL_FIELD *ma_duplicate_resultset_metadata(MYSQL_FIELD *fields, size_t count,
return result;
}

+static uint8_t mysql_encode_length(uint64_t len) {
+ if (len < 251) { return 1; }
+ if (len < 65536) { return 3; }
+ if (len < 16777216) { return 4; }
+ return 9;
+}

int mthd_my_read_query_result(MYSQL *mysql)
{
@@ -3070,6 +3076,13 @@ get_info:

if (has_metadata)
{
+ // integrity-check: the length encoding of the field count from 'column-count' packet
+ // must match the packet length from header, otherwise packet is malformed.
+ ulong enc_len = mysql_encode_length(field_count);
+ if (enc_len != length) {
+ my_set_error(mysql, CR_MALFORMED_PACKET, SQLSTATE_UNKNOWN, 0);
+ return -1;
+ }
// read packet metadata
mysql->fields =
mthd_my_read_metadata(mysql, field_count, 7 + ma_extended_type_info_rows(mysql));
3 changes: 2 additions & 1 deletion include/MySQL_Monitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,8 @@ struct DNS_Resolve_Data {
std::shared_ptr<DNS_Cache> dns_cache;
std::string hostname;
std::set<std::string> cached_ips;
unsigned int ttl;
unsigned int ttl = 0;
unsigned int refresh_intv = 0;
};


Expand Down
12 changes: 12 additions & 0 deletions include/gen_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,18 @@ inline unsigned long long realtime_time() {
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}

template<int FACTOR, typename T>
inline T overflow_safe_multiply(T val) {
static_assert(std::is_integral<T>::value, "T must be an integer type.");
static_assert(std::is_unsigned_v<T>, "T must be an unsigned integer type.");
static_assert(FACTOR > 0, "Negative factors are not supported.");

if constexpr (FACTOR == 0) return 0;
if (val == 0) return 0;
if (val > std::numeric_limits<T>::max() / FACTOR) return std::numeric_limits<T>::max();
return (val * FACTOR);
}

#endif /* __GEN_FUNCTIONS */

bool Proxy_file_exists(const char *);
Expand Down
29 changes: 25 additions & 4 deletions lib/MySQL_Monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <thread>
#include <future>
#include <sstream>
#include <random>
#include "prometheus/counter.h"
#include "MySQL_Protocol.h"
#include "MySQL_HostGroups_Manager.h"
Expand Down Expand Up @@ -4668,6 +4669,13 @@ void* monitor_dns_resolver_thread(void* args) {
if (!ips.empty()) {

bool to_update_cache = false;
int cache_ttl = dns_resolve_data->ttl;
if (dns_resolve_data->ttl > dns_resolve_data->refresh_intv) {
thread_local std::mt19937 gen(std::random_device{}());
const int jitter = static_cast<int>(dns_resolve_data->ttl * 0.025);
std::uniform_int_distribution<int> dis(-jitter, jitter);
cache_ttl += dis(gen);
}

if (!dns_resolve_data->cached_ips.empty()) {

Expand All @@ -4686,14 +4694,14 @@ void* monitor_dns_resolver_thread(void* args) {
// only update dns_records_bookkeeping
if (!to_update_cache) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "DNS cache record already up-to-date. (Hostname:[%s] IP:[%s])\n", dns_resolve_data->hostname.c_str(), debug_iplisttostring(ips).c_str());
dns_resolve_data->result.set_value(std::make_tuple<>(true, DNS_Cache_Record(dns_resolve_data->hostname, std::move(dns_resolve_data->cached_ips), monotonic_time() + (1000 * dns_resolve_data->ttl))));
dns_resolve_data->result.set_value(std::make_tuple<>(true, DNS_Cache_Record(dns_resolve_data->hostname, std::move(dns_resolve_data->cached_ips), monotonic_time() + (1000 * cache_ttl))));
}
}
else
to_update_cache = true;

if (to_update_cache) {
dns_resolve_data->result.set_value(std::make_tuple<>(true, DNS_Cache_Record(dns_resolve_data->hostname, ips, monotonic_time() + (1000 * dns_resolve_data->ttl))));
dns_resolve_data->result.set_value(std::make_tuple<>(true, DNS_Cache_Record(dns_resolve_data->hostname, ips, monotonic_time() + (1000 * cache_ttl))));
dns_resolve_data->dns_cache->add(dns_resolve_data->hostname, std::move(ips));
}

Expand Down Expand Up @@ -4841,7 +4849,18 @@ void* MySQL_Monitor::monitor_dns_cache() {

std::list<std::future<std::tuple<bool, DNS_Cache_Record>>> dns_resolve_result;

int delay_us = 100;
if (hostnames.empty() == false) {
delay_us = mysql_thread___monitor_local_dns_cache_refresh_interval / 2 / hostnames.size();
delay_us *= 40;
if (delay_us > 1000000 || delay_us <= 0) {
delay_us = 10000;
}
delay_us = delay_us + rand() % delay_us;
}

if (dns_records_bookkeeping.empty() == false) {

unsigned long long current_time = monotonic_time();

for (auto itr = dns_records_bookkeeping.begin();
Expand All @@ -4861,12 +4880,14 @@ void* MySQL_Monitor::monitor_dns_cache() {
dns_resolve_data->hostname = std::move(itr->hostname_);
dns_resolve_data->cached_ips = std::move(itr->ips_);
dns_resolve_data->ttl = mysql_thread___monitor_local_dns_cache_ttl;
dns_resolve_data->refresh_intv = mysql_thread___monitor_local_dns_cache_refresh_interval;
dns_resolve_data->dns_cache = dns_cache;
dns_resolve_result.emplace_back(dns_resolve_data->result.get_future());

proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Removing expired DNS record from bookkeeper. (Hostname:[%s] IP:[%s])\n", itr->hostname_.c_str(), debug_iplisttostring(dns_resolve_data->cached_ips).c_str());
dns_resolver_queue.add(new WorkItem<DNS_Resolve_Data>(dns_resolve_data.release(), monitor_dns_resolver_thread));
itr = dns_records_bookkeeping.erase(itr);
usleep(delay_us);
continue;
}

Expand All @@ -4881,7 +4902,6 @@ void* MySQL_Monitor::monitor_dns_cache() {

if (qsize > (static_cast<unsigned int>(mysql_thread___monitor_local_dns_resolver_queue_maxsize) / 8)) {
proxy_warning("DNS resolver queue too big: %d. Please refer to https://proxysql.com/documentation/dns-cache/ for further information.\n", qsize);

unsigned int threads_max = num_dns_resolver_max_threads;

if (threads_max > num_threads) {
Expand All @@ -4906,14 +4926,15 @@ void* MySQL_Monitor::monitor_dns_cache() {
}

if (hostnames.empty() == false) {

for (const std::string& hostname : hostnames) {
std::unique_ptr<DNS_Resolve_Data> dns_resolve_data(new DNS_Resolve_Data());
dns_resolve_data->hostname = hostname;
dns_resolve_data->ttl = mysql_thread___monitor_local_dns_cache_ttl;
dns_resolve_data->refresh_intv = mysql_thread___monitor_local_dns_cache_refresh_interval;
dns_resolve_data->dns_cache = dns_cache;
dns_resolve_result.emplace_back(dns_resolve_data->result.get_future());
dns_resolver_queue.add(new WorkItem<DNS_Resolve_Data>(dns_resolve_data.release(), monitor_dns_resolver_thread));
usleep(delay_us);
}
}

Expand Down
2 changes: 1 addition & 1 deletion lib/MySQL_Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6013,7 +6013,7 @@ bool MySQL_Thread::set_backend_to_be_skipped_if_frontend_is_slow(MySQL_Data_Stre
// we pause receiving from backend at mysql_thread___threshold_resultset_size * 8
// but assuming that client isn't completely blocked, we will stop checking for data
// only at mysql_thread___threshold_resultset_size * 4
if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size*4) {
if (buffered_data > overflow_safe_multiply<4,unsigned int>(mysql_thread___threshold_resultset_size)) {
mypolls.fds[n].events = 0;
return true;
}
Expand Down
127 changes: 127 additions & 0 deletions lib/ProxySQL_Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,42 @@ int ProxySQL_Config::Write_MySQL_Servers_to_configfile(std::string& data) {
}
}

if (sqlite_resultset)
delete sqlite_resultset;

query = (char *)"SELECT * FROM mysql_hostgroup_attributes";
admindb->execute_statement(query, &error, &cols, &affected_rows, &sqlite_resultset);
if (error) {
proxy_error("Error on read from mysql_hostgroup_attributes: %s\n", error);
return -1;
} else {
if (sqlite_resultset) {
data += "mysql_hostgroup_attributes:\n(\n";
bool isNext = false;
for (auto r : sqlite_resultset->rows) {
if (isNext)
data += ",\n";
data += "\t{\n";
addField(data, "hostgroup_id", r->fields[0], "");
addField(data, "max_num_online_servers", r->fields[1], "");
addField(data, "autocommit", r->fields[2], "");
addField(data, "free_connections_pct", r->fields[3], "");
addField(data, "init_connect", r->fields[4]);
addField(data, "multiplex", r->fields[5], "");
addField(data, "connection_warming", r->fields[6], "");
addField(data, "throttle_connections_per_sec", r->fields[7], "");
addField(data, "ignore_session_variables", r->fields[8]);
addField(data, "hostgroup_settings", r->fields[9]);
addField(data, "servers_defaults", r->fields[10]);
addField(data, "comment", r->fields[11]);

data += "\t}";
isNext = true;
}
data += "\n)\n";
}
}

if (sqlite_resultset)
delete sqlite_resultset;

Expand Down Expand Up @@ -1289,6 +1325,97 @@ int ProxySQL_Config::Read_MySQL_Servers_from_configfile() {
rows++;
}
}
if (root.exists("mysql_hostgroup_attributes") == true) {
const Setting &mysql_hostgroup_attributes = root["mysql_hostgroup_attributes"];
int count = mysql_hostgroup_attributes.getLength();

for (i = 0; i < count; i++) {
const Setting &hostgroup_attributes = mysql_hostgroup_attributes[i];
bool is_first_field = true;
int integer_val = 0;
std::string string_val = "";
std::string fields = "";
std::string values = "";

auto process_field = [&](const std::string &field_name, const std::string &field_value, int is_int) {
if (!is_first_field) {
fields += ", ";
values += ", ";
}
else {
is_first_field = false;
}
fields += field_name;

if (is_int) {
values += field_value;
}
else {
char *cs = strdup(field_value.c_str());
char *ecs = escape_string_single_quotes(cs, false);
values += std::string("'") + ecs + "'";
if (cs != ecs) free(cs);
free(ecs);
}
};

// Only inserting/updating fields which are in configuration file.
// Fields default will be from table schema.

// Parsing integer field
if (hostgroup_attributes.lookupValue("hostgroup_id", integer_val) ) {
process_field("hostgroup_id", to_string(integer_val), true);
}
else {
proxy_error("Admin: detected a mysql_hostgroup_attributes in config file without a mandatory hostgroup_id.\n");
continue;
}
if (hostgroup_attributes.lookupValue("max_num_online_servers", integer_val)) {
process_field("max_num_online_servers", to_string(integer_val), true);
}
if (hostgroup_attributes.lookupValue("autocommit", integer_val)) {
process_field("autocommit", to_string(integer_val), true);
}
if (hostgroup_attributes.lookupValue("free_connections_pct", integer_val)) {
process_field("free_connections_pct", to_string(integer_val), true);
}
if (hostgroup_attributes.lookupValue("multiplex", integer_val)) {
process_field("multiplex", to_string(integer_val), true);
}
if (hostgroup_attributes.lookupValue("connection_warming", integer_val)) {
process_field("connection_warming", to_string(integer_val), true);
}
if (hostgroup_attributes.lookupValue("throttle_connections_per_sec", integer_val)) {
process_field("throttle_connections_per_sec", to_string(integer_val), true);
}
// Parsing string field
if (hostgroup_attributes.lookupValue("init_connect", string_val)) {
process_field("init_connect", string_val, false);
}
if (hostgroup_attributes.lookupValue("ignore_session_variables", string_val)) {
process_field("ignore_session_variables", string_val, false);
}
if (hostgroup_attributes.lookupValue("hostgroup_settings", string_val)) {
process_field("hostgroup_settings", string_val, false);
}
if (hostgroup_attributes.lookupValue("servers_defaults", string_val)) {
process_field("servers_defaults", string_val, false);
}
if (hostgroup_attributes.lookupValue("comment", string_val)) {
process_field("comment", string_val, false);
}

std::string s_query = "INSERT OR REPLACE INTO mysql_hostgroup_attributes (";
s_query += fields + ") VALUES (" + values + ")";

//fprintf(stderr, "%s\n", s_query.c_str());
if (admindb->execute(s_query.c_str()) == false) {
proxy_error("Admin: detected a mysql_hostgroup_attributes invalid value. Failed to insert in the table.\n");
continue;
}
rows++;
}
}
admindb->execute("PRAGMA foreign_keys = ON");
return rows;
}
Expand Down
8 changes: 4 additions & 4 deletions lib/mysql_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1509,7 +1509,7 @@ MDB_ASYNC_ST MySQL_Connection::handler(short event) {
unsigned int buffered_data=0;
buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN;
buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN;
if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size*8) {
if (buffered_data > overflow_safe_multiply<8,unsigned int>(mysql_thread___threshold_resultset_size)) {
next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); // we temporarily pause . See #1232
break;
}
Expand All @@ -1535,7 +1535,7 @@ MDB_ASYNC_ST MySQL_Connection::handler(short event) {
if (rows_read_inner > 1) {
process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT(processed_bytes);
if (
(processed_bytes > (unsigned int)mysql_thread___threshold_resultset_size*8)
(processed_bytes > overflow_safe_multiply<8,unsigned int>(mysql_thread___threshold_resultset_size))
||
( mysql_thread___throttle_ratio_server_to_client && mysql_thread___throttle_max_bytes_per_second_to_client && (processed_bytes > (unsigned long long)mysql_thread___throttle_max_bytes_per_second_to_client/10*(unsigned long long)mysql_thread___throttle_ratio_server_to_client) )
) {
Expand Down Expand Up @@ -1688,7 +1688,7 @@ MDB_ASYNC_ST MySQL_Connection::handler(short event) {
unsigned int buffered_data=0;
buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN;
buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN;
if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size*8) {
if (buffered_data > overflow_safe_multiply<8,unsigned int>(mysql_thread___threshold_resultset_size)) {
next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause . See #1232
break;
}
Expand Down Expand Up @@ -1742,7 +1742,7 @@ MDB_ASYNC_ST MySQL_Connection::handler(short event) {
bytes_info.bytes_recv += br;
processed_bytes+=br; // issue #527 : this variable will store the amount of bytes processed during this event
if (
(processed_bytes > (unsigned int)mysql_thread___threshold_resultset_size*8)
(processed_bytes > overflow_safe_multiply<8,unsigned int>(mysql_thread___threshold_resultset_size))
||
( mysql_thread___throttle_ratio_server_to_client && mysql_thread___throttle_max_bytes_per_second_to_client && (processed_bytes > (unsigned long long)mysql_thread___throttle_max_bytes_per_second_to_client/10*(unsigned long long)mysql_thread___throttle_ratio_server_to_client) )
) {
Expand Down
Loading

0 comments on commit 737f3a1

Please sign in to comment.