diff --git a/include/MySQL_Logger.hpp b/include/MySQL_Logger.hpp index 57f5c4bed..deab31ad2 100644 --- a/include/MySQL_Logger.hpp +++ b/include/MySQL_Logger.hpp @@ -2,98 +2,522 @@ #define __CLASS_MYSQL_LOGGER_H #include "proxysql.h" #include "cpp.h" +#include #define PROXYSQL_LOGGER_PTHREAD_MUTEX +class MySQL_Logger; + + +/** + * @class MySQL_Event + * @brief Represents a single event logged by the MySQL logger. + * + * This class encapsulates information about a MySQL event, including the query, timestamps, user information, and other relevant details. + * It provides methods for writing the event data to a file in different formats (binary and JSON). The class manages memory dynamically allocated for its members. + */ class MySQL_Event { - private: - uint32_t thread_id; - char *username; - char *schemaname; - size_t username_len; - size_t schemaname_len; - uint64_t start_time; - uint64_t end_time; - uint64_t query_digest; - char *query_ptr; - size_t query_len; - char *server; - char *client; - size_t server_len; - size_t client_len; - //uint64_t total_length; - unsigned char buf[10]; - enum log_event_type et; - uint64_t hid; - char *extra_info; - bool have_affected_rows; - bool have_rows_sent; - bool have_gtid; - uint64_t affected_rows; - uint64_t last_insert_id; - uint64_t rows_sent; - uint32_t client_stmt_id; - const char * gtid; - public: - MySQL_Event(log_event_type _et, uint32_t _thread_id, char * _username, char * _schemaname , uint64_t _start_time , uint64_t _end_time , uint64_t _query_digest, char *_client, size_t _client_len); - uint64_t write(std::fstream *f, MySQL_Session *sess); - uint64_t write_query_format_1(std::fstream *f); - uint64_t write_query_format_2_json(std::fstream *f); - void write_auth(std::fstream *f, MySQL_Session *sess); +private: + uint32_t thread_id; ///< The ID of the thread that executed the query. + char* username; ///< The username associated with the query. Memory managed by the class. + char* schemaname; ///< The schema name associated with the query. Memory managed by the class. + size_t username_len; ///< Length of the username string. + size_t schemaname_len; ///< Length of the schema name string. + uint64_t start_time; ///< Start time of the query in microseconds. + uint64_t end_time; ///< End time of the query in microseconds. + uint64_t query_digest; ///< Digest of the query. + char* query_ptr; ///< Pointer to the query string. Memory managed by the class. + size_t query_len; ///< Length of the query string. + char* server; ///< Server address. Memory managed by the class. + char* client; ///< Client address. Memory managed by the class. + size_t server_len; ///< Length of the server address. + size_t client_len; ///< Length of the client address. + unsigned char buf[10]; ///< Buffer for encoding lengths. + enum log_event_type et; ///< Type of the event. + uint64_t hid; ///< Hostgroup ID. + char* extra_info; ///< Additional information about the event. Memory managed by the class. + bool have_affected_rows; ///< Flag indicating if affected rows are available. + bool have_rows_sent; ///< Flag indicating if rows sent are available. + bool have_gtid; ///< Flag indicating if GTID is available. + bool free_on_delete; ///< Flag indicating whether to free memory in the destructor. + uint64_t affected_rows; ///< Number of rows affected by the query. + uint64_t last_insert_id; ///< Last insert ID. + uint64_t rows_sent; ///< Number of rows sent. + uint32_t client_stmt_id; ///< Client statement ID. + char * gtid; ///< GTID. + char *errmsg; ///< Error message, if generated by ProxySQL (not if generated by the backend) + unsigned int myerrno; ///< MySQL error number + +public: + /** + * @brief Constructor for the MySQL_Event class. + * @param _et The type of the event. + * @param _thread_id The ID of the thread that executed the query. + * @param _username The username associated with the query. + * @param _schemaname The schema name associated with the query. + * @param _start_time The start time of the query in microseconds. + * @param _end_time The end time of the query in microseconds. + * @param _query_digest The digest of the query. + * @param _client The client address. + * @param _client_len The length of the client address. + * + * This constructor initializes the MySQL_Event object with the provided parameters. It does not allocate memory for string members. + */ + MySQL_Event(log_event_type _et, uint32_t _thread_id, char* _username, char* _schemaname, uint64_t _start_time, uint64_t _end_time, uint64_t _query_digest, char* _client, size_t _client_len); + + /** + * @brief Copy constructor for the MySQL_Event class. + * @param other The MySQL_Event object to copy. + * + * This copy constructor creates a deep copy of the provided MySQL_Event object. + */ + MySQL_Event(const MySQL_Event& other); + + /** + * @brief Destructor for the MySQL_Event class. + * + * This destructor deallocates the memory used by the object's string members if `free_on_delete` is true. + */ + ~MySQL_Event(); + + /** + * @brief Writes the event data to a file stream. + * @param f A pointer to the file stream. + * @param sess A pointer to the MySQL_Session object. + * @return The total number of bytes written. + * + * This function writes the event data to the specified file stream based on the event type and the configured log format. + */ + uint64_t write(std::fstream* f, MySQL_Session* sess); + + /** + * @brief Writes the event data in binary format (format 1) to a file stream. + * @param f A pointer to the file stream to write to. Must not be NULL. + * @return The total number of bytes written to the stream. + * + * This function serializes the event data into a binary format according to the MySQL event log format 1 specification. + * It encodes lengths using MySQL's length encoding scheme. + * The function writes the event type, thread ID, username, schema name, client address, hostgroup ID (if available), server address (if available), timestamps, client statement ID (if applicable), affected rows, last insert ID, rows sent, query digest, and query string to the file stream. + * The function writes all fields as defined by the MySQL event log format. + * It handles variable-length fields using MySQL's length encoding, which means that the length of each field is written before the field data itself. + * The function carefully handles potential errors during file writing operations. + */ + uint64_t write_query_format_1(std::fstream* f); + + + /** + * @brief Writes the event data in JSON format (format 2) to a file stream. + * @param f A pointer to the file stream to write to. Must not be NULL. + * @return The total number of bytes written to the stream (always 0 in the current implementation). + * + * This function serializes the event data into a JSON format. + * It converts various data fields into a JSON object and writes this object to the file stream. + * The function uses the nlohmann::json library for JSON serialization. + * This function currently always returns 0. + * The function constructs a JSON object containing relevant event information such as the hostgroup ID, thread ID, event type, username, schema name, client and server addresses, affected rows, last insert ID, rows sent, query string, timestamps, query digest, and client statement ID (if applicable). + * After constructing the JSON object, it serializes it into a string using the `dump()` method of the nlohmann::json library and writes the resulting string to the output file stream. + */ + uint64_t write_query_format_2_json(std::fstream* f); + + /** + * @brief Writes authentication-related event data to a file stream. + * @param f A pointer to the file stream. + * @param sess A pointer to the MySQL_Session object. + */ + void write_auth(std::fstream* f, MySQL_Session* sess); + + /** + * @brief Sets the client statement ID for the event. + * @param client_stmt_id The client statement ID. + */ void set_client_stmt_id(uint32_t client_stmt_id); - void set_query(const char *ptr, int len); - void set_server(int _hid, const char *ptr, int len); - void set_extra_info(char *); + + /** + * @brief Sets the query string for the event. + * @param ptr A pointer to the query string. + * @param len The length of the query string. + * + * This method sets the query string for the event. The provided pointer and length are stored; ownership is not transferred. + */ + void set_query(const char* ptr, int len); + + /** + * @brief Sets the server address and hostgroup ID for the event. + * @param _hid The hostgroup ID. + * @param ptr A pointer to the server address string. + * @param len The length of the server address string. + * + * This method sets the server address and hostgroup ID for the event. The provided pointer and length are stored; ownership is not transferred. + */ + void set_server(int _hid, const char* ptr, int len); + + /** + * @brief Sets additional information for the event. + * @param _err A pointer to the extra information string. + * + * This method sets additional information for the event. A copy of the string is made; the original string is not modified. + */ + void set_extra_info(char* _err); + + /** + * @brief Sets the affected rows and last insert ID for the event. + * @param ar The number of affected rows. + * @param lid The last insert ID. + */ void set_affected_rows(uint64_t ar, uint64_t lid); + + /** + * @brief Sets the number of rows sent for the event. + * @param rs The number of rows sent. + */ void set_rows_sent(uint64_t rs); - void set_gtid(MySQL_Session *sess); + + /** + * @brief Sets the GTID for the event from a MySQL session. + * @param sess A pointer to the MySQL_Session object. + * + * This method extracts the GTID from the provided MySQL session and sets it for the event. + */ + void set_gtid(MySQL_Session* sess); + + /** + * @brief Sets the error message for the event + * @param _myerrno MySQL error code + * @param _errmsg A pointer to the error + */ + void set_errmsg(const unsigned int _myerrno, const char * _errmsg); + + /** + * @brief Declares MySQL_Logger as a friend class, granting it access to private members of MySQL_Event. + */ + friend class MySQL_Logger; }; +/** + * @class MySQL_Logger_CircularBuffer + * @brief A thread-safe circular buffer for storing MySQL events. + * + * This class implements a circular buffer that stores pointers to MySQL_Event objects. + * It provides thread-safe methods for inserting events and retrieving all stored events. + * The buffer automatically manages memory for the stored events. Once an event is inserted, the buffer assumes ownership. + */ +class MySQL_Logger_CircularBuffer { +private: + std::deque event_buffer; ///< The internal deque storing event pointers. + std::mutex mutex; ///< Mutex for thread safety. + std::atomic eventsAddedCount; ///< Total number of events added to the buffer. + std::atomic eventsDroppedCount; ///< Total number of events dropped from the buffer. + +public: + std::atomic buffer_size; ///< Atomic variable to store the buffer size. (Public for direct access) + /** + * @brief Constructor for the MySQL_Logger_CircularBuffer class. + * @param size The initial size of the circular buffer. + */ + MySQL_Logger_CircularBuffer(size_t size); + + /** + * @brief Destructor for the MySQL_Logger_CircularBuffer class. + * + * This destructor deallocates the memory used by the buffer and the MySQL_Event objects it contains. + */ + ~MySQL_Logger_CircularBuffer(); + + /** + * @brief Inserts a new MySQL_Event into the circular buffer. + * @param event A pointer to the MySQL_Event object to insert. The buffer takes ownership. + * + * If the buffer is full, the oldest event is removed before inserting the new event. + */ + void insert(MySQL_Event* event); + + /** + * @brief Retrieves all events from the circular buffer and populates a provided vector. + * @param events A reference to a vector that will be populated with the events from the buffer. + * The caller takes ownership of the events and is responsible for deleting them. + * + * This method clears the buffer after retrieving the events. The function reserves space in the vector to avoid unnecessary reallocations. + */ + void get_all_events(std::vector& events); + + /** + * @brief Returns the current size of the buffer. + * @return The number of events currently in the buffer. + */ + size_t size(); + + /** + * @brief Gets the current size of the buffer. + * @return The current size of the circular buffer. + */ + size_t getBufferSize() const; + + /** + * @brief Sets the size of the buffer. + * @param newSize The new size of the circular buffer. + */ + void setBufferSize(size_t newSize); + + /** + * @brief Returns the total number of events added to the buffer. + * @return The total number of events added to the buffer. + */ + unsigned long long getEventsAddedCount() const { return eventsAddedCount; } + + /** + * @brief Returns the total number of events dropped from the buffer. + * @return The total number of events dropped from the buffer. + */ + unsigned long long getEventsDroppedCount() const { return eventsDroppedCount; } +}; + + +/** + * @class MySQL_Logger + * @brief A class for logging MySQL events and audit entries. + * + * This class manages the logging of MySQL events (queries, connections, etc.) and audit entries to files. It uses circular buffers for efficient event handling. + * It provides methods for configuring log files, opening and closing log files, flushing log buffers, and logging various events. + * The class uses mutexes or rwlocks for thread safety, depending on the compilation settings. + */ class MySQL_Logger { - private: +private: + /** + * @brief Structure to hold configuration and state for event logging. + */ struct { - bool enabled; - char *base_filename; - char *datadir; - unsigned int log_file_id; - unsigned int max_log_file_size; - std::fstream *logfile; + bool enabled; ///< Flag indicating whether event logging is enabled. + char* base_filename; ///< Base filename for event log files. Memory managed by the class. + char* datadir; ///< Directory for event log files. Memory managed by the class. + unsigned int log_file_id; ///< ID of the current event log file. + unsigned int max_log_file_size; ///< Maximum size of an event log file in bytes. + std::fstream* logfile; ///< File stream for event logging. } events; + + /** + * @brief Structure to hold configuration and state for audit logging. + */ struct { - bool enabled; - char *base_filename; - char *datadir; - unsigned int log_file_id; - unsigned int max_log_file_size; - std::fstream *logfile; + bool enabled; ///< Flag indicating whether audit logging is enabled. + char* base_filename; ///< Base filename for audit log files. Memory managed by the class. + char* datadir; ///< Directory for audit log files. Memory managed by the class. + unsigned int log_file_id; ///< ID of the current audit log file. + unsigned int max_log_file_size; ///< Maximum size of an audit log file in bytes. + std::fstream* logfile; ///< File stream for audit logging. } audit; + + /** + * @brief Structure to hold performance metrics for the MySQL event logger. + * + * This structure keeps track of various metrics related to the performance of the event logging system, + * including the number of times events are copied to memory and disk, the total time spent on + * these operations, and the total number of events copied. + * All members are atomic to ensure thread safety. + */ + struct EventLogMetrics { + /** @brief Number of times events were copied to the in-memory database. */ + std::atomic memoryCopyCount; + /** @brief Number of times events were copied to the on-disk database. */ + std::atomic diskCopyCount; + /** @brief Number of times the `get_all_events` method was called. */ + std::atomic getAllEventsCallsCount; + /** @brief Total number of events retrieved by the `get_all_events` method. */ + std::atomic getAllEventsEventsCount; + /** @brief Total time spent copying events to the in-memory database (microseconds). */ + std::atomic totalMemoryCopyTimeMicros; + /** @brief Total time spent copying events to the on-disk database (microseconds). */ + std::atomic totalDiskCopyTimeMicros; + /** @brief Total time spent in `get_all_events` (microseconds). */ + std::atomic totalGetAllEventsDiskCopyTimeMicros; + /** @brief Total number of events copied to the in-memory database. */ + std::atomic totalEventsCopiedToMemory; + /** @brief Total number of events copied to the on-disk database. */ + std::atomic totalEventsCopiedToDisk; + //std::atomic eventsAddedToBufferCount; ///< Total number of events added to the buffer. + //std::atomic eventsCurrentlyInBufferCount; ///< Number of events currently in the buffer. + } metrics; + + + // Mutex or rwlock for thread safety #ifdef PROXYSQL_LOGGER_PTHREAD_MUTEX - pthread_mutex_t wmutex; + pthread_mutex_t wmutex; ///< Pthread mutex for thread safety. #else - rwlock_t rwlock; + rwlock_t rwlock; ///< rwlock for thread safety. #endif + + /** + * @brief Closes the event log file. This function should only be called while holding the write lock. + */ void events_close_log_unlocked(); + + /** + * @brief Opens the event log file. This function should only be called while holding the write lock. + */ void events_open_log_unlocked(); + + /** + * @brief Closes the audit log file. This function should only be called while holding the write lock. + */ void audit_close_log_unlocked(); + + /** + * @brief Opens the audit log file. This function should only be called while holding the write lock. + */ void audit_open_log_unlocked(); + + /** + * @brief Finds the next available ID for an event log file. + * @return The next available ID. + */ unsigned int events_find_next_id(); + + /** + * @brief Finds the next available ID for an audit log file. + * @return The next available ID. + */ unsigned int audit_find_next_id(); - public: + +public: + /** + * @brief Constructor for the MySQL_Logger class. + * + * This constructor initializes the logger with default settings. + */ MySQL_Logger(); + + /** + * @brief Destructor for the MySQL_Logger class. + * + * This destructor deallocates resources used by the logger, including log files and mutexes. + */ ~MySQL_Logger(); + + /** + * @brief Prints the version information of the logger. + */ void print_version(); + + /** + * @brief Flushes the event and audit log buffers to disk. + */ void flush_log(); + + /** + * @brief Flushes the event log buffer to disk. This function should only be called while holding the write lock. + */ void events_flush_log_unlocked(); + + /** + * @brief Flushes the audit log buffer to disk. This function should only be called while holding the write lock. + */ void audit_flush_log_unlocked(); - void events_set_datadir(char *); + + /** + * @brief Sets the data directory for event log files. + * @param s The path to the data directory. + */ + void events_set_datadir(char* s); + + /** + * @brief Sets the base filename for event log files. + */ void events_set_base_filename(); - void audit_set_datadir(char *); + + /** + * @brief Sets the data directory for audit log files. + * @param s The path to the data directory. + */ + void audit_set_datadir(char* s); + + /** + * @brief Sets the base filename for audit log files. + */ void audit_set_base_filename(); - void log_request(MySQL_Session *, MySQL_Data_Stream *); - void log_audit_entry(log_event_type, MySQL_Session *, MySQL_Data_Stream *, char *e = NULL); + + /** + * @brief Logs a request event. + * @param sess A pointer to the MySQL_Session object. + * @param myds A pointer to the MySQL_Data_Stream object. + * @param myerrno MySQL error code, if present. + * @param errmsg A pointer to an error, if present. + * + * This function logs information about a MySQL request, including the query, timestamps, user information, and other relevant details. + * It creates a MySQL_Event object, populates it with data from the session and data stream, and writes it to the event log file and/or the circular buffer. + * The function handles different types of requests (normal queries, prepared statements, etc.) and manages memory carefully. + * The function handles different query types (standard queries, prepared statements). It extracts relevant information from the session object, such as timestamps, user credentials, and query details. + * If the circular buffer is enabled, it creates a copy of the event and adds it to the buffer for later processing. + * The function also checks the size of the log file and flushes it if it exceeds the maximum configured size. + * The function uses mutexes to protect shared resources and avoid race conditions. + * The function assumes ownership of the dynamically allocated memory for the `MySQL_Event` object created within this function. + */ + void log_request(MySQL_Session* sess, MySQL_Data_Stream* myds , const unsigned int myerrno = 0 , const char * errmsg = nullptr); + + /** + * @brief Logs an audit entry. + * @param _et The type of the audit event. + * @param sess A pointer to the MySQL_Session object. + * @param myds A pointer to the MySQL_Data_Stream object. + * @param xi Additional information for the audit entry. + */ + void log_audit_entry(log_event_type _et, MySQL_Session* sess, MySQL_Data_Stream* myds, char* xi = NULL); + + /** + * @brief Flushes the log files. + */ void flush(); + + /** + * @brief Acquires a write lock. + */ void wrlock(); + + /** + * @brief Releases a write lock. + */ void wrunlock(); + + MySQL_Logger_CircularBuffer* MyLogCB; ///< Pointer to the circular buffer for managing events. + + /** + * @brief Inserts a batch of MySQL events into a specified SQLite table. + * @param db A pointer to the SQLite3DB object representing the database connection. + * @param tableName The name of the SQLite table to insert into. + * @param numEvents The number of events to insert. + * @param begin An iterator pointing to the beginning of the range of MySQL_Event* in the vector to insert. + * @return 0 if the insertion was successful, a negative error code otherwise. + * + * This function inserts a batch of MySQL events into the specified SQLite table using bulk insert techniques for efficiency. + * It handles the conversion of MySQL_Event data to a format suitable for SQLite insertion. Error handling includes logging of errors. + * The function uses a prepared statement for bulk insertion. + * The function assumes that the provided events have been allocated with `new` and will not be deleted by this function. + */ + void insertMysqlEventsIntoDb(SQLite3DB * db, const std::string& tableName, size_t numEvents, std::vector::const_iterator begin); + /** + * @brief Processes and inserts MySQL events into in-memory and/or on-disk SQLite databases. + * @param statsdb A pointer to the SQLite3DB object for the in-memory database (can be nullptr). + * @param statsdb_disk A pointer to the SQLite3DB object for the on-disk database (can be nullptr). + * @return The number of events processed. Returns a negative value if an error occurs. + * + * This function retrieves events from the circular buffer, handles in-memory table size limits, and inserts them into the specified SQLite databases. + * If either statsdb or statsdb_disk is nullptr, events are only written to the other database. + * It handles in-memory table size limits by deleting existing entries if necessary. + * The function ensures that the in-memory table size does not exceed a predefined limit (`eventslog_table_memory_size`). + * The function assumes ownership of the MySQL_Event pointers and deletes them after processing. + */ + int processEvents(SQLite3DB * statsdb , SQLite3DB * statsdb_disk); + + /** + * @brief Retrieves all performance metrics from the logger and circular buffer. + * @return An unordered map containing all performance metrics. + * The keys are strings representing the metric names, and the values are unsigned long long integers representing the metric values. + * + * This function gathers all relevant performance metrics from the logger's internal EventLogMetrics structure and the circular buffer. + * It returns these metrics as a key-value map for easy access. + * The function ensures thread safety by acquiring a lock on the circular buffer's mutex before accessing its internal metrics. + */ + std::unordered_map getAllMetrics() const; + + }; diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index fc7327b69..fe324949f 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -197,8 +197,8 @@ class MySQL_Session * @param myds If not null, should point to a MySQL_Data_Stream (backend connection) which connection status * should be updated, and previous query resources cleanup. */ - void RequestEnd(MySQL_Data_Stream *); - void LogQuery(MySQL_Data_Stream *); + void RequestEnd(MySQL_Data_Stream * myds, const unsigned int myerrno = 0, const char * errmsg = nullptr); + void LogQuery(MySQL_Data_Stream * myds, const unsigned int myerrno = 0, const char * errmsg = nullptr); void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session(); int handler_again___status_PINGING_SERVER(); diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index b238915b4..498549037 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -535,6 +535,9 @@ class MySQL_Threads_Handler int connpoll_reset_queue_length; char *eventslog_filename; int eventslog_filesize; + int eventslog_buffer_history_size; + int eventslog_table_memory_size; + int eventslog_buffer_max_query_length; int eventslog_default_log; int eventslog_format; char *auditlog_filename; diff --git a/include/ProxySQL_Statistics.hpp b/include/ProxySQL_Statistics.hpp index 7e356b819..69284c924 100644 --- a/include/ProxySQL_Statistics.hpp +++ b/include/ProxySQL_Statistics.hpp @@ -79,6 +79,7 @@ #define STATSDB_SQLITE_TABLE_HISTORY_MYSQL_QUERY_DIGEST "CREATE TABLE history_mysql_query_digest (dump_time INT , hostgroup INT , schemaname VARCHAR NOT NULL , username VARCHAR NOT NULL , client_address VARCHAR NOT NULL , digest VARCHAR NOT NULL , digest_text VARCHAR NOT NULL , count_star INTEGER NOT NULL , first_seen INTEGER NOT NULL , last_seen INTEGER NOT NULL , sum_time INTEGER NOT NULL , min_time INTEGER NOT NULL , max_time INTEGER NOT NULL , sum_rows_affected INTEGER NOT NULL , sum_rows_sent INTEGER NOT NULL)" +#define STATSDB_SQLITE_TABLE_HISTORY_MYSQL_QUERY_EVENTS "CREATE TABLE history_mysql_query_events (id INTEGER PRIMARY KEY AUTOINCREMENT , thread_id INTEGER , username TEXT , schemaname TEXT , start_time INTEGER , end_time INTEGER , query_digest TEXT , query TEXT , server TEXT , client TEXT , event_type INTEGER , hid INTEGER , extra_info TEXT , affected_rows INTEGER , last_insert_id INTEGER , rows_sent INTEGER , client_stmt_id INTEGER , gtid TEXT , errno INT , error TEXT)" class ProxySQL_Statistics { SQLite3DB *statsdb_mem; // internal statistics DB std::vector *tables_defs_statsdb_mem; @@ -90,6 +91,7 @@ class ProxySQL_Statistics { unsigned long long next_timer_MySQL_Threads_Handler; unsigned long long next_timer_mysql_query_digest_to_disk; unsigned long long next_timer_system_cpu; + unsigned long long last_timer_mysql_dump_eventslog_to_disk = 0; #ifndef NOJEM unsigned long long next_timer_system_memory; #endif @@ -105,6 +107,7 @@ class ProxySQL_Statistics { int stats_mysql_query_cache; int stats_system_cpu; int stats_mysql_query_digest_to_disk; + int stats_mysql_eventslog_sync_buffer_to_disk; #ifndef NOJEM int stats_system_memory; #endif @@ -117,6 +120,16 @@ class ProxySQL_Statistics { bool MySQL_Threads_Handler_timetoget(unsigned long long); bool mysql_query_digest_to_disk_timetoget(unsigned long long); bool system_cpu_timetoget(unsigned long long); + /** + * @brief Checks if it's time to dump the events log to disk based on the configured interval. + * @param currentTimeMicros The current time in microseconds. + * @return True if it's time to dump the events log, false otherwise. + * + * This function checks if the current time exceeds the last dump time plus the configured dump interval. + * The dump interval is retrieved from the ProxySQL configuration. If the dump interval is 0, no dumping is performed. + */ + bool MySQL_Logger_dump_eventslog_timetoget(unsigned long long currentTimeMicros); + #ifndef NOJEM bool system_memory_timetoget(unsigned long long); #endif diff --git a/include/proxysql_admin.h b/include/proxysql_admin.h index 81f7aae6e..2000787e7 100644 --- a/include/proxysql_admin.h +++ b/include/proxysql_admin.h @@ -255,6 +255,7 @@ class ProxySQL_Admin { int stats_mysql_connections; int stats_mysql_query_cache; int stats_mysql_query_digest_to_disk; + int stats_mysql_eventslog_sync_buffer_to_disk; int stats_system_cpu; int stats_system_memory; int mysql_show_processlist_extended; diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 0762e50ca..dd4a5b444 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -893,6 +893,9 @@ __thread char * mysql_thread___ssl_p2s_crlpath; /* variables used by events log */ __thread char * mysql_thread___eventslog_filename; __thread int mysql_thread___eventslog_filesize; +__thread int mysql_thread___eventslog_buffer_history_size; +__thread int mysql_thread___eventslog_table_memory_size; +__thread int mysql_thread___eventslog_buffer_max_query_length; __thread int mysql_thread___eventslog_default_log; __thread int mysql_thread___eventslog_format; @@ -1066,6 +1069,9 @@ extern __thread char * mysql_thread___ssl_p2s_crlpath; /* variables used by events log */ extern __thread char * mysql_thread___eventslog_filename; extern __thread int mysql_thread___eventslog_filesize; +extern __thread int mysql_thread___eventslog_buffer_history_size; +extern __thread int mysql_thread___eventslog_table_memory_size; +extern __thread int mysql_thread___eventslog_buffer_max_query_length; extern __thread int mysql_thread___eventslog_default_log; extern __thread int mysql_thread___eventslog_format; diff --git a/lib/MySQL_Logger.cpp b/lib/MySQL_Logger.cpp index 6dd183846..e97ca8784 100644 --- a/lib/MySQL_Logger.cpp +++ b/lib/MySQL_Logger.cpp @@ -1,6 +1,7 @@ #include #include "proxysql.h" #include "cpp.h" +#include #include "MySQL_Data_Stream.h" #include "query_processor.h" @@ -62,6 +63,83 @@ MySQL_Event::MySQL_Event (log_event_type _et, uint32_t _thread_id, char * _usern rows_sent=0; client_stmt_id=0; gtid = NULL; + errmsg = nullptr; + myerrno = 0; + free_on_delete = false; // by default, this is false. This because pointers do not belong to this object +} + +MySQL_Event::MySQL_Event(const MySQL_Event &other) { + + // Initialize basic members using memcpy + memcpy(this, &other, sizeof(MySQL_Event)); + + // Copy char pointers using strdup (if not null) + if (other.username != nullptr) { + username = strdup(other.username); + } + if (other.schemaname != nullptr) { + schemaname = strdup(other.schemaname); + } + // query_ptr is NOT null terminated + if (other.query_ptr != nullptr) { + size_t maxQueryLen = mysql_thread___eventslog_buffer_max_query_length; + size_t lenToCopy = std::min(other.query_len, maxQueryLen); + query_ptr = (char*)malloc(lenToCopy + 1); // +1 for null terminator + memcpy(query_ptr, other.query_ptr, lenToCopy); + query_ptr[lenToCopy] = '\0'; // Null-terminate the copied string + query_len = lenToCopy; + } + // server is NOT null terminated + if (other.server != nullptr) { + server = (char *)malloc(server_len+1); + memcpy(server, other.server, server_len); + server[server_len] = '\0'; + } + // client is NOT null terminated + if (other.client != nullptr) { + client = (char *)malloc(client_len+1); + memcpy(client, other.client, client_len); + client[client_len] = '\0'; + } + if (other.extra_info != nullptr) { + extra_info = strdup(other.extra_info); + } + if (other.gtid != nullptr) { + gtid = strdup(other.gtid); + } + if (other.errmsg != nullptr) { + errmsg = strdup(other.errmsg); + } + free_on_delete = true; // pointers belong to this object +} + +MySQL_Event::~MySQL_Event() { + if (free_on_delete == true) { + if (username != nullptr) { + free(username); username = nullptr; + } + if (schemaname != nullptr) { + free(schemaname); schemaname = nullptr; + } + if (query_ptr != nullptr) { + free(query_ptr); query_ptr = nullptr; + } + if (server != nullptr) { + free(server); server = nullptr; + } + if (client != nullptr) { + free(client); client = nullptr; + } + if (extra_info != nullptr) { + free(extra_info); extra_info = nullptr; + } + if (gtid != nullptr) { + free(gtid); gtid = nullptr; + } + if (errmsg != nullptr) { + free(errmsg); errmsg = nullptr; + } + } } void MySQL_Event::set_client_stmt_id(uint32_t client_stmt_id) { @@ -90,6 +168,12 @@ void MySQL_Event::set_gtid(MySQL_Session *sess) { } } +void MySQL_Event::set_errmsg(const unsigned int _myerrno, const char * _errmsg) { + myerrno = _myerrno; + if (_errmsg != nullptr) + errmsg = strdup(_errmsg); +} + void MySQL_Event::set_extra_info(char *_err) { extra_info = _err; } @@ -289,7 +373,7 @@ uint64_t MySQL_Event::write_query_format_1(std::fstream *f) { // for performance reason, we are moving the write lock // right before the write to disk //GloMyLogger->wrlock(); - //move wrlock() function to log_request() function, avoid to get a null pointer in a multithreaded environment + //move wrlock() function to log_request() function, avoid to get a null pointer in a multithreaded environment // write total length , fixed size f->write((const char *)&total_bytes,sizeof(uint64_t)); @@ -424,6 +508,10 @@ uint64_t MySQL_Event::write_query_format_2_json(std::fstream *f) { if (have_gtid == true) { j["last_gtid"] = gtid; } + j["errno"] = myerrno; + if (errmsg != nullptr) { + j["error"] = errmsg; + } j["query"] = string(query_ptr,query_len); j["starttime_timestamp_us"] = start_time; { @@ -459,7 +547,7 @@ uint64_t MySQL_Event::write_query_format_2_json(std::fstream *f) { // for performance reason, we are moving the write lock // right before the write to disk //GloMyLogger->wrlock(); - //move wrlock() function to log_request() function, avoid to get a null pointer in a multithreaded environment + //move wrlock() function to log_request() function, avoid to get a null pointer in a multithreaded environment *f << j.dump(-1, ' ', false, json::error_handler_t::replace) << std::endl; return total_bytes; // always 0 @@ -467,7 +555,7 @@ uint64_t MySQL_Event::write_query_format_2_json(std::fstream *f) { extern Query_Processor *GloQPro; -MySQL_Logger::MySQL_Logger() { +MySQL_Logger::MySQL_Logger() : metrics{0, 0, 0, 0, 0, 0, 0, 0, 0} { events.enabled=false; events.base_filename=NULL; events.datadir=NULL; @@ -487,6 +575,7 @@ MySQL_Logger::MySQL_Logger() { audit.logfile=NULL; audit.log_file_id=0; audit.max_log_file_size=100*1024*1024; + MyLogCB = new MySQL_Logger_CircularBuffer(0); }; MySQL_Logger::~MySQL_Logger() { @@ -498,13 +587,14 @@ MySQL_Logger::~MySQL_Logger() { free(audit.datadir); } free(audit.base_filename); + delete MyLogCB; }; void MySQL_Logger::wrlock() { #ifdef PROXYSQL_LOGGER_PTHREAD_MUTEX pthread_mutex_lock(&wmutex); #else - spin_wrlock(&rwlock); + spin_wrlock(&rwlock); #endif }; @@ -512,7 +602,7 @@ void MySQL_Logger::wrunlock() { #ifdef PROXYSQL_LOGGER_PTHREAD_MUTEX pthread_mutex_unlock(&wmutex); #else - spin_wrunlock(&rwlock); + spin_wrunlock(&rwlock); #endif }; @@ -673,9 +763,12 @@ void MySQL_Logger::audit_set_datadir(char *s) { flush_log(); }; -void MySQL_Logger::log_request(MySQL_Session *sess, MySQL_Data_Stream *myds) { - if (events.enabled==false) return; - if (events.logfile==NULL) return; +void MySQL_Logger::log_request(MySQL_Session *sess, MySQL_Data_Stream *myds, const unsigned int myerrno, const char * errmsg) { + int elmhs = mysql_thread___eventslog_buffer_history_size; + if (elmhs == 0) { + if (events.enabled==false) return; + if (events.logfile==NULL) return; + } // 'MySQL_Session::client_myds' could be NULL in case of 'RequestEnd' being called over a freshly created session // due to a failed 'CONNECTION_RESET'. Because this scenario isn't a client request, we just return. if (sess->client_myds==NULL || sess->client_myds->myconn== NULL) return; @@ -767,6 +860,9 @@ void MySQL_Logger::log_request(MySQL_Session *sess, MySQL_Data_Stream *myds) { } me.set_rows_sent(sess->CurrentQuery.rows_sent); me.set_gtid(sess); + if (myerrno != 0) { + me.set_errmsg(myerrno, errmsg); + } int sl=0; char *sa=(char *)""; // default @@ -791,17 +887,30 @@ void MySQL_Logger::log_request(MySQL_Session *sess, MySQL_Data_Stream *myds) { // right before the write to disk //wrlock(); - //add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump - GloMyLogger->wrlock(); - me.write(events.logfile, sess); + if ((events.enabled == true) && (events.logfile != nullptr)) { + //add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump + GloMyLogger->wrlock(); + + me.write(events.logfile, sess); - unsigned long curpos=events.logfile->tellp(); - if (curpos > events.max_log_file_size) { - events_flush_log_unlocked(); + unsigned long curpos=events.logfile->tellp(); + if (curpos > events.max_log_file_size) { + events_flush_log_unlocked(); + } + wrunlock(); + } + if (MyLogCB->buffer_size != 0) { + MySQL_Event *me2 = new MySQL_Event(me); + MyLogCB->insert(me2); +#if 0 + for (int i=0; i<10000; i++) { + MySQL_Event *me2 = new MySQL_Event(me); + MyLogCB->insert(me2); + } +#endif // 0 } - wrunlock(); if (cl && sess->client_myds->addr.port) { free(ca); @@ -816,7 +925,7 @@ void MySQL_Logger::log_audit_entry(log_event_type _et, MySQL_Session *sess, MySQ if (audit.logfile==NULL) return; if (sess == NULL) return; - if (sess->client_myds == NULL) return; + if (sess->client_myds == NULL) return; MySQL_Connection_userinfo *ui= NULL; if (sess) { @@ -941,7 +1050,7 @@ void MySQL_Logger::log_audit_entry(log_event_type _et, MySQL_Session *sess, MySQ //wrlock(); //add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump - GloMyLogger->wrlock(); + GloMyLogger->wrlock(); me.write(audit.logfile, sess); @@ -1004,15 +1113,15 @@ unsigned int MySQL_Logger::events_find_next_id() { free(eval_dirname); free(eval_filename); } - if (eval_pathname) { - free(eval_pathname); - } + if (eval_pathname) { + free(eval_pathname); + } return maxidx; } else { - /* could not open directory */ + /* could not open directory */ proxy_error("Unable to open datadir: %s\n", eval_dirname); exit(EXIT_FAILURE); - } + } return 0; } @@ -1050,19 +1159,243 @@ unsigned int MySQL_Logger::audit_find_next_id() { free(eval_dirname); free(eval_filename); } - if (eval_pathname) { - free(eval_pathname); - } + if (eval_pathname) { + free(eval_pathname); + } return maxidx; } else { - /* could not open directory */ + /* could not open directory */ proxy_error("Unable to open datadir: %s\n", eval_dirname); exit(EXIT_FAILURE); - } + } return 0; } void MySQL_Logger::print_version() { - fprintf(stderr,"Standard ProxySQL MySQL Logger rev. %s -- %s -- %s\n", PROXYSQL_MYSQL_LOGGER_VERSION, __FILE__, __TIMESTAMP__); + fprintf(stderr,"Standard ProxySQL MySQL Logger rev. %s -- %s -- %s\n", PROXYSQL_MYSQL_LOGGER_VERSION, __FILE__, __TIMESTAMP__); }; +MySQL_Logger_CircularBuffer::MySQL_Logger_CircularBuffer(size_t size) : event_buffer(size), + eventsAddedCount(0), eventsDroppedCount(0), + buffer_size(size) {} + +MySQL_Logger_CircularBuffer::~MySQL_Logger_CircularBuffer() { + std::lock_guard lock(mutex); + for (MySQL_Event* event : event_buffer) { + delete event; + } +} + +void MySQL_Logger_CircularBuffer::insert(MySQL_Event* event) { + std::lock_guard lock(mutex); + eventsAddedCount++; + if (event_buffer.size() == buffer_size) { + delete event_buffer.front(); + event_buffer.pop_front(); + eventsDroppedCount++; + } + event_buffer.push_back(event); +} + + +size_t MySQL_Logger_CircularBuffer::size() { + std::lock_guard lock(mutex); + return event_buffer.size(); +} + +void MySQL_Logger_CircularBuffer::get_all_events(std::vector& events) { + std::lock_guard lock(mutex); + events.reserve(event_buffer.size()); + events.insert(events.end(), event_buffer.begin(), event_buffer.end()); + event_buffer.clear(); +} + +size_t MySQL_Logger_CircularBuffer::getBufferSize() const { + return buffer_size; +} + +void MySQL_Logger_CircularBuffer::setBufferSize(size_t newSize) { + std::lock_guard lock(mutex); + buffer_size = newSize; +} + + +void MySQL_Logger::insertMysqlEventsIntoDb(SQLite3DB * db, const std::string& tableName, size_t numEvents, std::vector::const_iterator begin){ + int rc = 0; + sqlite3_stmt *statement1=NULL; + sqlite3_stmt *statement32=NULL; + char *query1=NULL; + char *query32=NULL; + const int numcols = 19; + std::string query1s = ""; + std::string query32s = ""; + + std::string coldefs = "(thread_id, username, schemaname, start_time, end_time, query_digest, query, server, client, event_type, hid, extra_info, affected_rows, last_insert_id, rows_sent, client_stmt_id, gtid, errno, error)"; + + query1s = "INSERT INTO " + tableName + coldefs + " VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19)"; + query32s = "INSERT INTO " + tableName + coldefs + " VALUES " + generate_multi_rows_query(32, numcols); + query1 = (char *)query1s.c_str(); + query32 = (char *)query32s.c_str(); + rc = db->prepare_v2(query1, &statement1); + ASSERT_SQLITE_OK(rc, db); + rc = db->prepare_v2(query32, &statement32); + ASSERT_SQLITE_OK(rc, db); + + char digest_hex_str[20]; // 2+sizeof(unsigned long long)*2+2 + + db->execute("BEGIN"); + + int row_idx=0; + int max_bulk_row_idx=numEvents/32; + max_bulk_row_idx=max_bulk_row_idx*32; + for (std::vector::const_iterator it = begin ; it != begin + numEvents; ++it) { + MySQL_Event *event = *it; + int idx=row_idx%32; + + if (row_idxthread_id); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+2, event->username, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+3, event->schemaname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement32, (idx*numcols)+4, event->start_time); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement32, (idx*numcols)+5, event->end_time); ASSERT_SQLITE_OK(rc, db); + sprintf(digest_hex_str, "0x%016llX", (long long unsigned int)event->query_digest); + rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+6, digest_hex_str, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+7, event->query_ptr, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); // MySQL_Events from circular-buffer are all null-terminated + rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+8, event->server, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+9, event->client, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int)(statement32, (idx*numcols)+10, (int)event->et); ASSERT_SQLITE_OK(rc, db); // Assuming event_type is an enum mapped to integers + rc = (*proxy_sqlite3_bind_int64)(statement32, (idx*numcols)+11, event->hid); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+12, event->extra_info, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement32, (idx*numcols)+13, event->affected_rows); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement32, (idx*numcols)+14, event->last_insert_id); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement32, (idx*numcols)+15, event->rows_sent); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int)(statement32, (idx*numcols)+16, event->client_stmt_id); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+17, event->gtid, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int)(statement32, (idx*numcols)+18, event->myerrno); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+19, event->errmsg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + if (idx==31) { + SAFE_SQLITE3_STEP2(statement32); + rc=(*proxy_sqlite3_clear_bindings)(statement32); ASSERT_SQLITE_OK(rc, db); + rc=(*proxy_sqlite3_reset)(statement32); ASSERT_SQLITE_OK(rc, db); + } + } else { // single row + //Bind parameters. Handle potential errors in binding. + rc = (*proxy_sqlite3_bind_int)(statement1, 1, event->thread_id); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 2, event->username, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 3, event->schemaname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 4, event->start_time); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 5, event->end_time); ASSERT_SQLITE_OK(rc, db); + sprintf(digest_hex_str, "0x%016llX", (long long unsigned int)event->query_digest); + rc = (*proxy_sqlite3_bind_text)(statement1, 6, digest_hex_str, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 7, event->query_ptr, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); // MySQL_Events from circular-buffer are all null-terminated + rc = (*proxy_sqlite3_bind_text)(statement1, 8, event->server, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 9, event->client, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int)(statement1, 10, (int)event->et); ASSERT_SQLITE_OK(rc, db); // Assuming event_type is an enum mapped to integers + rc = (*proxy_sqlite3_bind_int64)(statement1, 11, event->hid); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 12, event->extra_info, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 13, event->affected_rows); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 14, event->last_insert_id); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 15, event->rows_sent); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int)(statement1, 16, event->client_stmt_id); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 17, event->gtid, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int)(statement1, 18, event->myerrno); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 19, event->errmsg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + SAFE_SQLITE3_STEP2(statement1); + rc=(*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, db); + rc=(*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, db); + } + row_idx++; + } + (*proxy_sqlite3_finalize)(statement1); + (*proxy_sqlite3_finalize)(statement32); + db->execute("COMMIT"); +} + + +int MySQL_Logger::processEvents(SQLite3DB * statsdb , SQLite3DB * statsdb_disk) { + unsigned long long startTimeMicros = monotonic_time(); + std::vector events = {}; + MyLogCB->get_all_events(events); + + metrics.getAllEventsCallsCount++; + if (events.empty()) return 0; + + unsigned long long afterGetAllEventsTimeMicros = monotonic_time(); + metrics.getAllEventsEventsCount += events.size(); + metrics.totalGetAllEventsDiskCopyTimeMicros += (afterGetAllEventsTimeMicros-startTimeMicros); + + if (statsdb_disk != nullptr) { + // Write to on-disk database first + unsigned long long diskStartTimeMicros = monotonic_time(); + insertMysqlEventsIntoDb(statsdb_disk, "history_mysql_query_events", events.size(), events.begin()); + unsigned long long diskEndTimeMicros = monotonic_time(); + metrics.diskCopyCount++; + metrics.totalDiskCopyTimeMicros += (diskEndTimeMicros - diskStartTimeMicros); + metrics.totalEventsCopiedToDisk += events.size(); + } + + if (statsdb != nullptr) { + unsigned long long memoryStartTimeMicros = monotonic_time(); + size_t maxInMemorySize = mysql_thread___eventslog_table_memory_size; + size_t numEventsToInsert = std::min(events.size(), maxInMemorySize); + + if (events.size() >= maxInMemorySize) { + // delete everything from stats_mysql_query_events + statsdb->execute("DELETE FROM stats_mysql_query_events"); + } else { + // make enough room in stats_mysql_query_events + int current_rows = statsdb->return_one_int((char *)"SELECT COUNT(*) FROM stats_mysql_query_events"); + int rows_to_keep = maxInMemorySize - events.size(); + if (current_rows > rows_to_keep) { + int rows_to_delete = (current_rows - rows_to_keep); + string delete_stmt = "DELETE FROM stats_mysql_query_events ORDER BY id LIMIT " + to_string(rows_to_delete); + statsdb->execute(delete_stmt.c_str()); + } + } + + // Pass iterators to avoid copying + insertMysqlEventsIntoDb(statsdb, "stats_mysql_query_events", numEventsToInsert, events.begin()); + unsigned long long memoryEndTimeMicros = monotonic_time(); + metrics.memoryCopyCount++; + metrics.totalMemoryCopyTimeMicros += (memoryEndTimeMicros - memoryStartTimeMicros); + metrics.totalEventsCopiedToMemory += numEventsToInsert; + } + + // cleanup of all events + for (MySQL_Event* event : events) { + delete event; + } + size_t ret = events.size(); +#if 1 // FIXME: TEMPORARY , TO REMOVE + std::cerr << "Circular:" << endl; + std::cerr << " EventsAddedCount: " << MyLogCB->getEventsAddedCount() << endl; + std::cerr << " EventsDroppedCount: " << MyLogCB->getEventsDroppedCount() << endl; + std::cerr << " Size: " << MyLogCB->size() << endl; + std::cerr << "memoryCopy: Count: " << metrics.memoryCopyCount << " , TimeUs: " << metrics.totalMemoryCopyTimeMicros << endl; + std::cerr << "diskCopy: Count: " << metrics.diskCopyCount << " , TimeUs: " << metrics.totalDiskCopyTimeMicros << endl; +#endif // 1 , FIXME: TEMPORARY , TO REMOVE + return ret; +} + + +std::unordered_map MySQL_Logger::getAllMetrics() const { + std::unordered_map allMetrics; + + allMetrics["memoryCopyCount"] = metrics.memoryCopyCount; + allMetrics["diskCopyCount"] = metrics.diskCopyCount; + allMetrics["getAllEventsCallsCount"] = metrics.getAllEventsCallsCount; + allMetrics["getAllEventsEventsCount"] = metrics.getAllEventsEventsCount; + allMetrics["totalMemoryCopyTimeMicros"] = metrics.totalMemoryCopyTimeMicros; + allMetrics["totalDiskCopyTimeMicros"] = metrics.totalDiskCopyTimeMicros; + allMetrics["totalGetAllEventsDiskCopyTimeMicros"] = metrics.totalGetAllEventsDiskCopyTimeMicros; + allMetrics["totalEventsCopiedToMemory"] = metrics.totalEventsCopiedToMemory; + allMetrics["totalEventsCopiedToDisk"] = metrics.totalEventsCopiedToDisk; + //allMetrics["eventsAddedToBufferCount"] = metrics.eventsAddedToBufferCount; + //allMetrics["eventsDroppedFromBufferCount"] = metrics.eventsDroppedFromBufferCount; + allMetrics["circularBuffereventsAddedCount"] = MyLogCB->getEventsAddedCount(); + allMetrics["circularBufferEventsDroppedCount"] = MyLogCB->getEventsDroppedCount(); + allMetrics["circularBufferEventsSize"] = MyLogCB->size(); + + return allMetrics; +} diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index d7c81a273..da950ec21 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -1452,9 +1452,11 @@ void MySQL_Session::return_proxysql_internal(PtrSize_t *pkt) { } // default client_myds->DSS=STATE_QUERY_SENT_NET; - client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1064,(char *)"42000",(char *)"Unknown PROXYSQL INTERNAL command",true); + string errmsg = "Unknown PROXYSQL INTERNAL command"; + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1047,(char *)"08S01", errmsg.c_str() ,true); if (mirror==false) { - RequestEnd(NULL); + MyHGM->add_mysql_errors(current_hostgroup, (char *)"", 0, client_myds->myconn->userinfo->username, (client_myds->addr.addr ? client_myds->addr.addr : (char *)"unknown" ), client_myds->myconn->userinfo->schemaname, 1047, (char *)errmsg.c_str()); + RequestEnd(NULL, 1047, errmsg.c_str()); } else { client_myds->DSS=STATE_SLEEP; status=WAITING_CLIENT_DATA; @@ -1604,7 +1606,7 @@ bool MySQL_Session::handler_special_queries(PtrSize_t *pkt) { status=WAITING_CLIENT_DATA; if (mirror==false) { - RequestEnd(NULL); + RequestEnd(NULL, err_info.first, err_info.second); } l_free(pkt->size,pkt->ptr); @@ -1803,9 +1805,11 @@ bool MySQL_Session::handler_special_queries(PtrSize_t *pkt) { if ( (pkt->size >= 22 + 5) && (strncasecmp((char *)"LOAD DATA LOCAL INFILE",(char *)pkt->ptr+5, 22)==0) ) { if (mysql_thread___enable_load_data_local_infile == false) { client_myds->DSS=STATE_QUERY_SENT_NET; - client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1047,(char *)"HY000",(char *)"Unsupported 'LOAD DATA LOCAL INFILE' command",true); + string errmsg = "Unsupported 'LOAD DATA LOCAL INFILE' command"; + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1047,(char *)"08S01", errmsg.c_str(), true); if (mirror==false) { - RequestEnd(NULL); + MyHGM->add_mysql_errors(current_hostgroup, (char *)"", 0, client_myds->myconn->userinfo->username, (client_myds->addr.addr ? client_myds->addr.addr : (char *)"unknown" ), client_myds->myconn->userinfo->schemaname, 1047, (char *)errmsg.c_str()); + RequestEnd(NULL, 1047, errmsg.c_str()); } else { client_myds->DSS=STATE_SLEEP; status=WAITING_CLIENT_DATA; @@ -3220,7 +3224,8 @@ bool MySQL_Session::handler_again___status_CONNECTING_SERVER(int *_rc) { thread->status_variables.stvar[st_var_max_connect_timeout_err]++; } client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,9001,(char *)"HY000", errmsg.c_str(), true); - RequestEnd(mybe->server_myds); + MyHGM->add_mysql_errors(current_hostgroup, (char *)"", 0, client_myds->myconn->userinfo->username, (client_myds->addr.addr ? client_myds->addr.addr : (char *)"unknown" ), client_myds->myconn->userinfo->schemaname, 9001, (char *)errmsg.c_str()); + RequestEnd(mybe->server_myds, 9001, errmsg.c_str()); string hg_status {}; generate_status_one_hostgroup(current_hostgroup, hg_status); @@ -3353,21 +3358,23 @@ bool MySQL_Session::handler_again___status_CONNECTING_SERVER(int *_rc) { } else { __exit_handler_again___status_CONNECTING_SERVER_with_err: int myerr=mysql_errno(myconn->mysql); + string errmsg = ""; if (myerr) { char sqlstate[10]; + errmsg = string(mysql_error(myconn->mysql)); sprintf(sqlstate,"%s",mysql_sqlstate(myconn->mysql)); - client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,mysql_errno(myconn->mysql),sqlstate,mysql_error(myconn->mysql),true); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,mysql_errno(myconn->mysql),sqlstate, errmsg.c_str(), true); } else { - char buf[256]; - sprintf(buf,"Max connect failure while reaching hostgroup %d", current_hostgroup); - client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,9002,(char *)"HY000",buf,true); + errmsg = "Max connect failure while reaching hostgroup " + to_string(current_hostgroup); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,9002,(char *)"HY000", errmsg.c_str(), true); if (thread) { thread->status_variables.stvar[st_var_max_connect_timeout_err]++; } } if (session_fast_forward==false) { // see bug #979 - RequestEnd(myds); + MyHGM->add_mysql_errors(current_hostgroup, (char *)"", 0, client_myds->myconn->userinfo->username, (client_myds->addr.addr ? client_myds->addr.addr : (char *)"unknown" ), client_myds->myconn->userinfo->schemaname, 9002, (char *)errmsg.c_str()); + RequestEnd(myds, 9002, errmsg.c_str()); } while (previous_status.size()) { st=previous_status.top(); @@ -3629,7 +3636,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end); client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,9005,(char *)"HY000",buf, true); thread->status_variables.stvar[st_var_hostgroup_locked_queries]++; - RequestEnd(NULL); + RequestEnd(NULL, 9005, buf); free(buf); l_free(pkt.size,pkt.ptr); return; @@ -3802,7 +3809,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end); client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,9005,(char *)"HY000",buf, true); thread->status_variables.stvar[st_var_hostgroup_locked_queries]++; - RequestEnd(NULL); + RequestEnd(NULL, 9005, buf); free(buf); l_free(pkt.size,pkt.ptr); return; @@ -4521,7 +4528,7 @@ int MySQL_Session::get_pkts_from_client(bool& wrong_pass, PtrSize_t& pkt) { sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end); client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,9005,(char *)"HY000",buf, true); thread->status_variables.stvar[st_var_hostgroup_locked_queries]++; - RequestEnd(NULL); + RequestEnd(NULL, 9005, buf); free(buf); l_free(pkt.size,pkt.ptr); break; @@ -5535,7 +5542,7 @@ int MySQL_Session::handler() { return handler_ret; } handler_minus1_GenerateErrorMessage(myds, myconn, wrong_pass); - RequestEnd(myds); + RequestEnd(myds, myerr); handler_minus1_HandleBackendConnection(myds, myconn); } } else { @@ -5673,6 +5680,7 @@ bool MySQL_Session::handler_again___status_SHOW_WARNINGS(MySQL_Data_Stream* myds myds->revents,(char *)"SHOW WARNINGS", strlen((char *)"SHOW WARNINGS") ); if (rc == 0 || rc == -1) { + int myerr = 0; // Cleanup the connection resulset from 'SHOW WARNINGS' for the next query. if (myconn->MyRS != NULL) { delete myconn->MyRS; @@ -5680,13 +5688,13 @@ bool MySQL_Session::handler_again___status_SHOW_WARNINGS(MySQL_Data_Stream* myds } if (rc == -1) { - int myerr = mysql_errno(myconn->mysql); + myerr = mysql_errno(myconn->mysql); proxy_error( "'SHOW WARNINGS' failed to be executed over backend connection with error: '%d'\n", myerr ); } - RequestEnd(myds); + RequestEnd(myds, myerr); finishQuery(myds,myconn,prepared_stmt_with_no_params); return false; @@ -6177,7 +6185,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C msg = errmsg + ": " + nq; } client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,1148,(char *)"42000", msg.c_str()); - RequestEnd(NULL); + RequestEnd(NULL, 1148, msg.c_str()); } client_myds->DSS=STATE_SLEEP; } else { @@ -6263,7 +6271,8 @@ void MySQL_Session::handler_WCD_SS_MCQ_qpo_OK_msg(PtrSize_t *pkt) { void MySQL_Session::handler_WCD_SS_MCQ_qpo_error_msg(PtrSize_t *pkt) { client_myds->DSS=STATE_QUERY_SENT_NET; client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,1148,(char *)"42000",qpo->error_msg); - RequestEnd(NULL); + MyHGM->add_mysql_errors(current_hostgroup, (char *)"", 0, client_myds->myconn->userinfo->username, (client_myds->addr.addr ? client_myds->addr.addr : (char *)"unknown" ), client_myds->myconn->userinfo->schemaname, 1148, (char *)qpo->error_msg); + RequestEnd(NULL, 1148, qpo->error_msg); l_free(pkt->size,pkt->ptr); } @@ -6280,8 +6289,10 @@ void MySQL_Session::handler_WCD_SS_MCQ_qpo_error_msg(PtrSize_t *pkt) { void MySQL_Session::handler_WCD_SS_MCQ_qpo_LargePacket(PtrSize_t *pkt) { // ER_NET_PACKET_TOO_LARGE client_myds->DSS=STATE_QUERY_SENT_NET; - client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,1153,(char *)"08S01",(char *)"Got a packet bigger than 'max_allowed_packet' bytes", true); - RequestEnd(NULL); + string errmsg = "Got a packet bigger than 'max_allowed_packet' bytes"; + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,1153,(char *)"08S01", errmsg.c_str(), true); + MyHGM->add_mysql_errors(current_hostgroup, (char *)"", 0, client_myds->myconn->userinfo->username, (client_myds->addr.addr ? client_myds->addr.addr : (char *)"unknown" ), client_myds->myconn->userinfo->schemaname, 1153, (char *)errmsg.c_str()); + RequestEnd(NULL, 1153, errmsg.c_str()); l_free(pkt->size,pkt->ptr); } @@ -7373,7 +7384,7 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C sprintf(buf,"ProxySQL Error: connection is locked to hostgroup %d but trying to reach hostgroup %d", locked_on_hostgroup, current_hostgroup); client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,9006,(char *)"Y0000",buf); thread->status_variables.stvar[st_var_hostgroup_locked_queries]++; - RequestEnd(NULL); + RequestEnd(NULL, 9006, buf); l_free(pkt->size,pkt->ptr); return true; } @@ -8022,24 +8033,24 @@ unsigned long long MySQL_Session::IdleTime() { // this is called either from RequestEnd(), or at the end of executing // prepared statements -void MySQL_Session::LogQuery(MySQL_Data_Stream *myds) { +void MySQL_Session::LogQuery(MySQL_Data_Stream *myds, const unsigned int myerrno, const char * errmsg) { // we need to access statistics before calling CurrentQuery.end() // so we track the time here CurrentQuery.end_time=thread->curtime; if (qpo) { if (qpo->log==1) { - GloMyLogger->log_request(this, myds); // we send for logging only if logging is enabled for this query + GloMyLogger->log_request(this, myds, myerrno, errmsg); // we send for logging only if logging is enabled for this query } else { if (qpo->log==-1) { if (mysql_thread___eventslog_default_log==1) { - GloMyLogger->log_request(this, myds); // we send for logging only if enabled by default + GloMyLogger->log_request(this, myds, myerrno, errmsg); // we send for logging only if enabled by default } } } } } -void MySQL_Session::RequestEnd(MySQL_Data_Stream *myds) { +void MySQL_Session::RequestEnd(MySQL_Data_Stream *myds, const unsigned int myerrno, const char * errmsg) { // check if multiplexing needs to be disabled char *qdt = NULL; @@ -8060,7 +8071,7 @@ void MySQL_Session::RequestEnd(MySQL_Data_Stream *myds) { break; default: if (session_fast_forward==false) { - LogQuery(myds); + LogQuery(myds, myerrno, errmsg); } break; } diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 70a019af7..b62de85d6 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -349,6 +349,9 @@ static char * mysql_thread_variables_names[]= { (char *)"enable_load_data_local_infile", (char *)"eventslog_filename", (char *)"eventslog_filesize", + (char *)"eventslog_buffer_history_size", + (char *)"eventslog_table_memory_size", + (char *)"eventslog_buffer_max_query_length", (char *)"eventslog_default_log", (char *)"eventslog_format", (char *)"auditlog_filename", @@ -1070,6 +1073,9 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.server_version=strdup((char *)"8.0.11"); // changed in 2.6.0 , was 5.5.30 variables.eventslog_filename=strdup((char *)""); // proxysql-mysql-eventslog is recommended variables.eventslog_filesize=100*1024*1024; + variables.eventslog_buffer_history_size=0; + variables.eventslog_table_memory_size=10000; + variables.eventslog_buffer_max_query_length = 32*1024; variables.eventslog_default_log=0; variables.eventslog_format=1; variables.auditlog_filename=strdup((char *)""); @@ -2270,6 +2276,9 @@ char ** MySQL_Threads_Handler::get_variables_list() { // logs VariablesPointers_int["auditlog_filesize"] = make_tuple(&variables.auditlog_filesize, 1024*1024, 1*1024*1024*1024, false); VariablesPointers_int["eventslog_filesize"] = make_tuple(&variables.eventslog_filesize, 1024*1024, 1*1024*1024*1024, false); + VariablesPointers_int["eventslog_buffer_history_size"] = make_tuple(&variables.eventslog_buffer_history_size, 0, 8*1024*1024, false); + VariablesPointers_int["eventslog_table_memory_size"] = make_tuple(&variables.eventslog_table_memory_size, 0, 8*1024*1024, false); + VariablesPointers_int["eventslog_buffer_max_query_length"] = make_tuple(&variables.eventslog_buffer_max_query_length, 128, 32*1024*1024, false); VariablesPointers_int["eventslog_default_log"] = make_tuple(&variables.eventslog_default_log, 0, 1, false); // various VariablesPointers_int["long_query_time"] = make_tuple(&variables.long_query_time, 0, 20*24*3600*1000, false); @@ -4398,6 +4407,15 @@ void MySQL_Thread::refresh_variables() { REFRESH_VARIABLE_CHAR(server_version); REFRESH_VARIABLE_INT(eventslog_filesize); + REFRESH_VARIABLE_INT(eventslog_table_memory_size); + REFRESH_VARIABLE_INT(eventslog_buffer_history_size); + { + int elmhs = mysql_thread___eventslog_buffer_history_size; + if (GloMyLogger->MyLogCB->getBufferSize() != elmhs) { + GloMyLogger->MyLogCB->setBufferSize(elmhs); + } + } + REFRESH_VARIABLE_INT(eventslog_buffer_max_query_length); REFRESH_VARIABLE_INT(eventslog_default_log); REFRESH_VARIABLE_INT(eventslog_format); REFRESH_VARIABLE_CHAR(eventslog_filename); diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index 4b3c3b770..9b1e59bf9 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -602,6 +602,10 @@ MHD_Result http_handler(void *cls, struct MHD_Connection *connection, const char #define ADMIN_SQLITE_TABLE_STATS_MYSQL_PREPARED_STATEMENTS_INFO "CREATE TABLE stats_mysql_prepared_statements_info (global_stmt_id INT NOT NULL , schemaname VARCHAR NOT NULL , username VARCHAR NOT NULL , digest VARCHAR NOT NULL , ref_count_client INT NOT NULL , ref_count_server INT NOT NULL , num_columns INT NOT NULL, num_params INT NOT NULL, query VARCHAR NOT NULL)" + +#define ADMIN_SQLITE_TABLE_STATS_MYSQL_QUERY_EVENTS "CREATE TABLE stats_mysql_query_events (id INTEGER PRIMARY KEY AUTOINCREMENT , thread_id INTEGER , username TEXT , schemaname TEXT , start_time INTEGER , end_time INTEGER , query_digest TEXT , query TEXT , server TEXT , client TEXT , event_type INTEGER , hid INTEGER , extra_info TEXT , affected_rows INTEGER , last_insert_id INTEGER , rows_sent INTEGER , client_stmt_id INTEGER , gtid TEXT , errno INT , error TEXT)" + + static char * admin_variables_names[]= { (char *)"admin_credentials", (char *)"stats_credentials", @@ -609,6 +613,7 @@ static char * admin_variables_names[]= { (char *)"stats_mysql_connection_pool", (char *)"stats_mysql_query_cache", (char *)"stats_mysql_query_digest_to_disk", + (char *)"stats_mysql_eventslog_sync_buffer_to_disk", (char *)"stats_system_cpu", (char *)"stats_system_memory", (char *)"mysql_ifaces", @@ -3788,6 +3793,33 @@ void admin_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *pkt) { } } + + if (!strncasecmp("DUMP EVENTSLOG ", query_no_space, strlen("DUMP EVENTSLOG "))) { + int num_rows = 0; + proxy_debug(PROXY_DEBUG_ADMIN, 4, "Received command DUMP EVENTSLOG: %s\n", query_no_space); + proxy_info("Received command DUMP EVENTSLOG: %s\n", query_no_space); + + // Use a map for better efficiency and readability + std::map> commandMap = { + {"DUMP EVENTSLOG FROM BUFFER TO MEMORY", {SPA->statsdb, nullptr}}, + {"DUMP EVENTSLOG FROM BUFFER TO DISK", {nullptr, SPA->statsdb_disk}}, + {"DUMP EVENTSLOG FROM BUFFER TO BOTH", {SPA->statsdb, SPA->statsdb_disk}} + }; + + string s = string(query_no_space); + auto it = commandMap.find(s); + if (it != commandMap.end()) { + num_rows = GloMyLogger->processEvents(it->second.first, it->second.second); + SPA->send_MySQL_OK(&sess->client_myds->myprot, NULL, num_rows); + } else { + proxy_warning("Received invalid command DUMP EVENTSLOG: %s\n", query_no_space); + SPA->send_MySQL_ERR(&sess->client_myds->myprot, (char *)"Invalid DUMP EVENTSLOG command"); + } + run_query = false; + goto __run_query; + } + + // handle special queries from Cluster // for bug #1188 , ProxySQL Admin needs to know the exact query @@ -5817,6 +5849,14 @@ static void * admin_main_loop(void *arg) curtime2 = curtime2/1000; proxy_info("Automatically saved stats_mysql_query_digest to disk: %llums to write %d entries\n", curtime2-curtime1, r1); } + if (GloProxyStats->MySQL_Logger_dump_eventslog_timetoget(curtime)) { + unsigned long long curtime1=monotonic_time(); + int r1 = GloMyLogger->processEvents(nullptr, SPA->statsdb_disk); + unsigned long long curtime2=monotonic_time(); + curtime1 = curtime1/1000; + curtime2 = curtime2/1000; + proxy_info("Automatically saved history_mysql_query_events to disk: %llums to write %d entries\n", curtime2-curtime1, r1); + } if (GloProxyStats->system_cpu_timetoget(curtime)) { GloProxyStats->system_cpu_sets(); } @@ -6033,12 +6073,14 @@ ProxySQL_Admin::ProxySQL_Admin() : variables.stats_mysql_connections = 60; variables.stats_mysql_query_cache = 60; variables.stats_mysql_query_digest_to_disk = 0; + variables.stats_mysql_eventslog_sync_buffer_to_disk = 0; variables.stats_system_cpu = 60; variables.stats_system_memory = 60; GloProxyStats->variables.stats_mysql_connection_pool = 60; GloProxyStats->variables.stats_mysql_connections = 60; GloProxyStats->variables.stats_mysql_query_cache = 60; GloProxyStats->variables.stats_mysql_query_digest_to_disk = 0; + GloProxyStats->variables.stats_mysql_eventslog_sync_buffer_to_disk = 0; GloProxyStats->variables.stats_system_cpu = 60; #ifndef NOJEM GloProxyStats->variables.stats_system_memory = 60; @@ -6578,6 +6620,8 @@ bool ProxySQL_Admin::init(const bootstrap_info_t& bootstrap_info) { insert_into_tables_defs(tables_defs_stats,"stats_mysql_prepared_statements_info", ADMIN_SQLITE_TABLE_STATS_MYSQL_PREPARED_STATEMENTS_INFO); insert_into_tables_defs(tables_defs_stats,"stats_mysql_client_host_cache", STATS_SQLITE_TABLE_MYSQL_CLIENT_HOST_CACHE); insert_into_tables_defs(tables_defs_stats,"stats_mysql_client_host_cache_reset", STATS_SQLITE_TABLE_MYSQL_CLIENT_HOST_CACHE_RESET); + insert_into_tables_defs(tables_defs_stats,"stats_mysql_query_events", ADMIN_SQLITE_TABLE_STATS_MYSQL_QUERY_EVENTS); + // ProxySQL Cluster insert_into_tables_defs(tables_defs_admin,"proxysql_servers", ADMIN_SQLITE_TABLE_PROXYSQL_SERVERS); @@ -8434,6 +8478,10 @@ char * ProxySQL_Admin::get_variable(char *name) { sprintf(intbuf,"%d",variables.stats_mysql_query_digest_to_disk); return strdup(intbuf); } + if (!strcasecmp(name,"stats_mysql_eventslog_sync_buffer_to_disk")) { + sprintf(intbuf,"%d",variables.stats_mysql_eventslog_sync_buffer_to_disk); + return strdup(intbuf); + } if (!strcasecmp(name,"stats_system_cpu")) { sprintf(intbuf,"%d",variables.stats_system_cpu); return strdup(intbuf); @@ -8739,6 +8787,16 @@ bool ProxySQL_Admin::set_variable(char *name, char *value, bool lock) { // this return false; } } + if (!strcasecmp(name,"stats_mysql_eventslog_sync_buffer_to_disk")) { + int intv=atoi(value); + if (intv >= 0 && intv <= 24*3600) { + variables.stats_mysql_eventslog_sync_buffer_to_disk=intv; + GloProxyStats->variables.stats_mysql_eventslog_sync_buffer_to_disk=intv; + return true; + } else { + return false; + } + } if (!strcasecmp(name,"stats_system_cpu")) { int intv=atoi(value); if (intv >= 0 && intv <= 600) { diff --git a/lib/ProxySQL_Statistics.cpp b/lib/ProxySQL_Statistics.cpp index 523623109..d584f11ad 100644 --- a/lib/ProxySQL_Statistics.cpp +++ b/lib/ProxySQL_Statistics.cpp @@ -97,6 +97,8 @@ void ProxySQL_Statistics::init() { insert_into_tables_defs(tables_defs_statsdb_disk,"history_mysql_query_digest", STATSDB_SQLITE_TABLE_HISTORY_MYSQL_QUERY_DIGEST); + insert_into_tables_defs(tables_defs_statsdb_disk,"history_mysql_query_events", STATSDB_SQLITE_TABLE_HISTORY_MYSQL_QUERY_EVENTS); + disk_upgrade_mysql_connections(); check_and_build_standard_tables(statsdb_mem, tables_defs_statsdb_disk); @@ -110,6 +112,9 @@ void ProxySQL_Statistics::init() { // statsdb_disk->execute("CREATE INDEX IF NOT EXISTS idx_history_mysql_query_digest_last_seen ON history_mysql_query_digest (last_seen)"); statsdb_disk->execute("CREATE INDEX IF NOT EXISTS idx_history_mysql_query_digest_dump_time ON history_mysql_query_digest (dump_time)"); statsdb_disk->execute("CREATE INDEX IF NOT EXISTS idx_history_mysql_status_variable_id_timestamp ON history_mysql_status_variables(variable_id,timestamp)"); + + statsdb_disk->execute("CREATE INDEX IF NOT EXISTS idx_history_mysql_query_events_start_time ON history_mysql_query_events(start_time)"); + statsdb_disk->execute("CREATE INDEX IF NOT EXISTS idx_history_mysql_query_events_query_digest ON history_mysql_query_events(query_digest)"); } void ProxySQL_Statistics::disk_upgrade_mysql_connections() { @@ -174,6 +179,17 @@ void ProxySQL_Statistics::drop_tables_defs(std::vector *tables_de } } +bool ProxySQL_Statistics::MySQL_Logger_dump_eventslog_timetoget(unsigned long long currentTimeMicros) { + if (variables.stats_mysql_eventslog_sync_buffer_to_disk) { // only proceed if not zero + unsigned long long t = variables.stats_mysql_eventslog_sync_buffer_to_disk; // originally in seconds + t = t * 1000 * 1000; + if (currentTimeMicros > last_timer_mysql_dump_eventslog_to_disk + t) { + last_timer_mysql_dump_eventslog_to_disk = currentTimeMicros; + return true; + } + } + return false; +} bool ProxySQL_Statistics::MySQL_Threads_Handler_timetoget(unsigned long long curtime) { unsigned int i = (unsigned int)variables.stats_mysql_connections; diff --git a/test/tap/tests/mysql_query_logging_memory-t.cpp b/test/tap/tests/mysql_query_logging_memory-t.cpp new file mode 100644 index 000000000..9d160d9eb --- /dev/null +++ b/test/tap/tests/mysql_query_logging_memory-t.cpp @@ -0,0 +1,316 @@ +/** + * @file mysql_query_logging_memory-t.cpp + * @brief This file contains a TAP test for testing query memory logging + */ + +// TODO: we also need to add checks for stats_mysql_errors + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "mysql.h" +#include "mysqld_error.h" + +#include "tap.h" +#include "command_line.h" +#include "utils.h" + +using std::string; + +/** @brief Expected DDL for the stats_mysql_query_events table. */ +const std::string expected_stats_mysql_query_events = R"(CREATE TABLE stats_mysql_query_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + thread_id INTEGER, + username TEXT, + schemaname TEXT, + start_time INTEGER, + end_time INTEGER, + query_digest TEXT, + query TEXT, + server TEXT, + client TEXT, + event_type INTEGER, + hid INTEGER, + extra_info TEXT, + affected_rows INTEGER, + last_insert_id INTEGER, + rows_sent INTEGER, + client_stmt_id INTEGER, + gtid TEXT, + errno INT, + error TEXT))"; + +/** @brief Expected DDL for the history_mysql_query_events table. */ +const std::string expected_history_mysql_query_events = R"(CREATE TABLE history_mysql_query_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + thread_id INTEGER, + username TEXT, + schemaname TEXT, + start_time INTEGER, + end_time INTEGER, + query_digest TEXT, + query TEXT, + server TEXT, + client TEXT, + event_type INTEGER, + hid INTEGER, + extra_info TEXT, + affected_rows INTEGER, + last_insert_id INTEGER, + rows_sent INTEGER, + client_stmt_id INTEGER, + gtid TEXT, + errno INT, + error TEXT))"; + +/** + * @brief Removes multiple spaces from a string, replacing them with a single space. + * + * @param str The input string. + * @return The string with multiple spaces replaced by single spaces. + */ +std::string removeMultipleSpacesRegex(std::string str) { + std::replace(str.begin(), str.end(), '\n', ' '); //Replace newlines first + std::regex multipleSpaces("\\s+"); // Matches one or more whitespace characters + std::string result = std::regex_replace(str, multipleSpaces, " "); + return result; +} + +/** + * @brief Checks if the structure of a table matches the expected DDL. + * + * @param conn The MySQL connection. + * @param schemaname The name of the schema. + * @param table_name The name of the table. + * @param expected_ddl The expected DDL for the table. + * @return True if the table structure matches, false otherwise. + */ +bool runAndCheckTable(MYSQL* conn, const std::string& schemaname, const std::string& table_name, std::string expected_ddl) { + std::string query = "SHOW CREATE TABLE " + schemaname + "." + table_name; + if (mysql_query(conn, query.c_str())) { + diag("Error querying table '%s': %s", table_name.c_str(), mysql_error(conn)); + return false; + } + + MYSQL_RES* result = mysql_store_result(conn); + if (!result) { + diag("Error storing result for table '%s': %s", table_name.c_str(), mysql_error(conn)); + return false; + } + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || row[1] == nullptr) { + diag("Unexpected result for table '%s'", table_name.c_str()); + mysql_free_result(result); + return false; + } + + std::string actual_ddl(row[1]); + mysql_free_result(result); + size_t pos = actual_ddl.find('\n'); + while (pos != std::string::npos) { + actual_ddl.replace(pos, 1, " "); + pos = actual_ddl.find('\n', pos + 1); + } + + actual_ddl = removeMultipleSpacesRegex(actual_ddl); + expected_ddl = removeMultipleSpacesRegex(expected_ddl); + + bool success = (actual_ddl == expected_ddl); + ok(success, "Table '%s' structure %s match expectation", table_name.c_str(), (success ? "matches" : "does not match")); + if (success == false) { + diag("Table structure actual : %s", actual_ddl.c_str()); + diag("Table structure expected: %s", expected_ddl.c_str()); + } + return success; +} + + +/** + * @brief Checks the result of a query against expected results. + * + * @param conn The MySQL connection. + * @param query The query to execute. + * @param expectedResults The expected results as a map of errno to count. + * @return True if the query results match the expected results, false otherwise. + */ +bool checkQueryResult(MYSQL* conn, const std::string& query, const std::map& expectedResults) { + if (mysql_query(conn, query.c_str())) { + diag("Error executing query '%s': %s", query.c_str(), mysql_error(conn)); + return false; + } + + MYSQL_RES* result = mysql_store_result(conn); + if (!result) { + diag("Error storing result for query '%s': %s", query.c_str(), mysql_error(conn)); + return false; + } + + std::map actualResults; + MYSQL_ROW row; + while ((row = mysql_fetch_row(result))) { + actualResults[std::stoi(row[0])] = std::stoi(row[1]); + } + mysql_free_result(result); + + return actualResults == expectedResults; +} + + +int main() { + + + CommandLine cl; + if (cl.getEnv()) { + diag("Failed to get the required environmental variables."); + return -1; + } + + const unsigned int num_selects = 200; // Number of "SELECT 1" queries to run + unsigned int p = 2; // Number of tests for table structure checks + p += num_selects/10; // Number of tests for SELECT 1 queries (one every 10 iterations) + p += 1; // Number of tests for syntax error + p += 1; // Number of tests for empty hostgroup error + p += 1; // Number of tests for non-existing schema error + p += 2; // Number of tests for checking query results in stats and history tables + plan(p); + + MYSQL* admin_conn = mysql_init(nullptr); + if (!admin_conn) { + diag("Failed to initialize MySQL connection."); + return -1; + } + + if (!mysql_real_connect(admin_conn, cl.host, cl.admin_username, cl.admin_password, nullptr, cl.admin_port, nullptr, 0)) { + diag("Failed to connect to ProxySQL admin: %s", mysql_error(admin_conn)); + mysql_close(admin_conn); + return -1; + } + + // Check table structures + runAndCheckTable(admin_conn, "stats", "stats_mysql_query_events", expected_stats_mysql_query_events); + runAndCheckTable(admin_conn, "stats_history", "history_mysql_query_events", expected_history_mysql_query_events); + + // Prepare for testing + MYSQL_QUERY(admin_conn, "SET mysql-eventslog_buffer_history_size=1000000"); + MYSQL_QUERY(admin_conn, "SET mysql-eventslog_default_log=1"); + MYSQL_QUERY(admin_conn, "LOAD MYSQL VARIABLES TO RUNTIME"); + MYSQL_QUERY(admin_conn, "DUMP EVENTSLOG FROM BUFFER TO BOTH"); + MYSQL_QUERY(admin_conn, "DELETE FROM stats_mysql_query_events"); + MYSQL_QUERY(admin_conn, "DELETE FROM history_mysql_query_events"); + + + + MYSQL* proxy = mysql_init(NULL); + if (!proxy) { + diag("Failed to initialize MySQL connection to ProxySQL."); + mysql_close(admin_conn); //Close admin connection before exiting. + return -1; + } + + if (!mysql_real_connect(proxy, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) { + diag("Failed to connect to ProxySQL: %s", mysql_error(proxy)); + mysql_close(admin_conn); + mysql_close(proxy); + return -1; + } + + + // Run 200 "SELECT 1" queries + for (int i = 0; i < 200; ++i) { + if (mysql_query(proxy, "SELECT 1")) { + diag("Error executing 'SELECT 1' query (iteration %d): %s", i, mysql_error(proxy)); + mysql_close(admin_conn); + mysql_close(proxy); + return -1; + } + MYSQL_RES *result = mysql_store_result(proxy); + if (result) mysql_free_result(result); //Clean up result if it exists + if ((i+1)%10 == 0) { // avoid too much logging + ok(1, "SELECT 1 query successful (iteration %d)", i+1); + } + } + + // Test syntax error + if (mysql_query(proxy, "SELEEEEECT 1")) { + //Check if we received a syntax error (adjust error code as needed for your MySQL version). + int error_code = mysql_errno(proxy); + ok(error_code == 1064, "Syntax error detected correctly (error code: %d)", error_code); //1064 is a common syntax error code + } else { + diag("Expected syntax error, but query succeeded."); + mysql_close(admin_conn); + mysql_close(proxy); + return -1; + } + + // Test hostgroup error + if (mysql_query(proxy, "SELECT /* hostgroup=1234 */ 1")) { + int error_code = mysql_errno(proxy); + ok(error_code == 9001, "Hostgroup error detected correctly (error code: %d)", error_code); + } else { + diag("Expected hostgroup error (error code 9001), but query succeeded."); + mysql_close(admin_conn); + mysql_close(proxy); + return -1; + } + + // Test connection to non-existent schema with query + MYSQL* nonExistentSchemaConn = mysql_init(NULL); + if (!nonExistentSchemaConn) { + diag("Failed to initialize MySQL connection for non-existent schema test."); + mysql_close(admin_conn); + mysql_close(proxy); + return -1; + } + + // Replace 'nonexistent_schema' with the actual name of a non-existent schema. + if (!mysql_real_connect(nonExistentSchemaConn, cl.host, cl.username, cl.password, "nonexistent_schema", cl.port, NULL, 0)) { + diag("Failed to connect to non-existent schema 'nonexistent_schema': %s", mysql_error(nonExistentSchemaConn)); + mysql_close(nonExistentSchemaConn); + mysql_close(admin_conn); + mysql_close(proxy); + return -1; + } + + if (mysql_query(nonExistentSchemaConn, "SELECT 1")) { + int error_code = mysql_errno(nonExistentSchemaConn); + ok(error_code == 1044, "Query on non-existent schema returned expected error (1044): %d", error_code); + } else { + diag("Query on non-existent schema succeeded unexpectedly."); + mysql_close(nonExistentSchemaConn); + mysql_close(admin_conn); + mysql_close(proxy); + return -1; + } + + // dump eventslog + MYSQL_QUERY(admin_conn, "DUMP EVENTSLOG FROM BUFFER TO BOTH"); + + + // Expected results for both queries + std::map expectedResults = { + {0, 200}, + {1064, 1}, + {9001, 1}, + {9002, 1} + }; + + // Test history_mysql_query_events + bool historyCheck = checkQueryResult(admin_conn, "SELECT errno, COUNT(*) FROM history_mysql_query_events GROUP BY errno ORDER BY errno", expectedResults); + ok(historyCheck, "history_mysql_query_events query results match expectation"); + + // Test stats_mysql_query_events + bool statsCheck = checkQueryResult(admin_conn, "SELECT errno, COUNT(*) FROM stats_mysql_query_events GROUP BY errno ORDER BY errno", expectedResults); + ok(statsCheck, "stats_mysql_query_events query results match expectation"); + + mysql_close(nonExistentSchemaConn); + mysql_close(proxy); + mysql_close(admin_conn); + return exit_status(); +}