From bb598b2006c2ff5498583357222edd29ab374995 Mon Sep 17 00:00:00 2001 From: felipe stival <14948182+v0idpwn@users.noreply.github.com> Date: Fri, 15 Nov 2024 01:37:56 -0300 Subject: [PATCH] Fix regression: not respecting interval in read_with_poll (#343) * Fix regression: not respecting interval in read_with_poll The regression happened in the port to PL/pgSQL. Division of integers on postgres yields integer results. Our unit conversion from milliseconds to seconds didn't consider that, then returning wrong results. This caused `pgmq.read_with_poll` to sleep 0s instead of `poll_interval_ms` if `poll_interval_ms` was under 1000, using 100% of CPU. The fix is simply converting `poll_interval_ms` to `numeric` before doing the division, which produces a `numeric` result as originally intended. * Fix tests --- pgmq-extension/sql/pgmq--1.4.4--1.4.5.sql | 55 +++++ pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql | 237 ++++++++++++++++++++++ pgmq-extension/sql/pgmq.sql | 2 +- pgmq-extension/test/expected/base.out | 2 +- pgmq-extension/test/sql/base.sql | 2 +- 5 files changed, 295 insertions(+), 3 deletions(-) create mode 100644 pgmq-extension/sql/pgmq--1.4.4--1.4.5.sql create mode 100644 pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql diff --git a/pgmq-extension/sql/pgmq--1.4.4--1.4.5.sql b/pgmq-extension/sql/pgmq--1.4.4--1.4.5.sql new file mode 100644 index 00000000..c177dfe5 --- /dev/null +++ b/pgmq-extension/sql/pgmq--1.4.4--1.4.5.sql @@ -0,0 +1,55 @@ +CREATE OR REPLACE FUNCTION pgmq.read_with_poll( + queue_name TEXT, + vt INTEGER, + qty INTEGER, + max_poll_seconds INTEGER DEFAULT 5, + poll_interval_ms INTEGER DEFAULT 100 +) +RETURNS SETOF pgmq.message_record AS $$ +DECLARE + r pgmq.message_record; + stop_at TIMESTAMP; + sql TEXT; + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); +BEGIN + stop_at := clock_timestamp() + make_interval(secs => max_poll_seconds); + LOOP + IF (SELECT clock_timestamp() >= stop_at) THEN + RETURN; + END IF; + + sql := FORMAT( + $QUERY$ + WITH cte AS + ( + SELECT msg_id + FROM pgmq.%I + WHERE vt <= clock_timestamp() + ORDER BY msg_id ASC + LIMIT $1 + FOR UPDATE SKIP LOCKED + ) + UPDATE pgmq.%I m + SET + vt = clock_timestamp() + %L, + read_ct = read_ct + 1 + FROM cte + WHERE m.msg_id = cte.msg_id + RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message; + $QUERY$, + qtable, qtable, make_interval(secs => vt) + ); + + FOR r IN + EXECUTE sql USING qty + LOOP + RETURN NEXT r; + END LOOP; + IF FOUND THEN + RETURN; + ELSE + PERFORM pg_sleep(poll_interval_ms::numeric / 1000); + END IF; + END LOOP; +END; +$$ LANGUAGE plpgsql; diff --git a/pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql b/pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql new file mode 100644 index 00000000..7ca56249 --- /dev/null +++ b/pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql @@ -0,0 +1,237 @@ +-- read +-- reads a number of messages from a queue, setting a visibility timeout on them +DROP FUNCTION IF EXISTS pgmq.read(TEXT, INTEGER, INTEGER); +CREATE FUNCTION pgmq.read( + queue_name TEXT, + vt INTEGER, + qty INTEGER, + conditional JSONB DEFAULT '{}' +) +RETURNS SETOF pgmq.message_record AS $$ +DECLARE + sql TEXT; + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); +BEGIN + sql := FORMAT( + $QUERY$ + WITH cte AS + ( + SELECT msg_id + FROM pgmq.%I + WHERE vt <= clock_timestamp() AND CASE + WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer + ELSE 1 + END = 1 + ORDER BY msg_id ASC + LIMIT $1 + FOR UPDATE SKIP LOCKED + ) + UPDATE pgmq.%I m + SET + vt = clock_timestamp() + %L, + read_ct = read_ct + 1 + FROM cte + WHERE m.msg_id = cte.msg_id + RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message; + $QUERY$, + qtable, conditional, qtable, make_interval(secs => vt) + ); + RETURN QUERY EXECUTE sql USING qty; +END; +$$ LANGUAGE plpgsql; + +---- read_with_poll +---- reads a number of messages from a queue, setting a visibility timeout on them +DROP FUNCTION IF EXISTS pgmq.read_with_poll(TEXT, INTEGER, INTEGER, INTEGER, INTEGER); +CREATE FUNCTION pgmq.read_with_poll( + queue_name TEXT, + vt INTEGER, + qty INTEGER, + max_poll_seconds INTEGER DEFAULT 5, + poll_interval_ms INTEGER DEFAULT 100, + conditional JSONB DEFAULT '{}' +) +RETURNS SETOF pgmq.message_record AS $$ +DECLARE + r pgmq.message_record; + stop_at TIMESTAMP; + sql TEXT; + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); +BEGIN + stop_at := clock_timestamp() + make_interval(secs => max_poll_seconds); + LOOP + IF (SELECT clock_timestamp() >= stop_at) THEN + RETURN; + END IF; + + sql := FORMAT( + $QUERY$ + WITH cte AS + ( + SELECT msg_id + FROM pgmq.%I + WHERE vt <= clock_timestamp() AND CASE + WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer + ELSE 1 + END = 1 + ORDER BY msg_id ASC + LIMIT $1 + FOR UPDATE SKIP LOCKED + ) + UPDATE pgmq.%I m + SET + vt = clock_timestamp() + %L, + read_ct = read_ct + 1 + FROM cte + WHERE m.msg_id = cte.msg_id + RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message; + $QUERY$, + qtable, conditional, qtable, make_interval(secs => vt) + ); + + FOR r IN + EXECUTE sql USING qty + LOOP + RETURN NEXT r; + END LOOP; + IF FOUND THEN + RETURN; + ELSE + PERFORM pg_sleep(poll_interval_ms::numeric / 1000); + END IF; + END LOOP; +END; +$$ LANGUAGE plpgsql; + +DROP FUNCTION IF EXISTS pgmq.drop_queue(TEXT, BOOLEAN); + +CREATE FUNCTION pgmq.drop_queue(queue_name TEXT, partitioned BOOLEAN) +RETURNS BOOLEAN AS $$ +DECLARE + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); + fq_qtable TEXT := 'pgmq.' || qtable; + atable TEXT := pgmq.format_table_name(queue_name, 'a'); + fq_atable TEXT := 'pgmq.' || atable; +BEGIN + RAISE WARNING "drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead."; + + PERFORM pgmq.drop_queue(queue_name); + + RETURN TRUE; +END; +$$ LANGUAGE plpgsql; + +CREATE FUNCTION pgmq.drop_queue(queue_name TEXT) +RETURNS BOOLEAN AS $$ +DECLARE + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); + fq_qtable TEXT := 'pgmq.' || qtable; + atable TEXT := pgmq.format_table_name(queue_name, 'a'); + fq_atable TEXT := 'pgmq.' || atable; + partitioned BOOLEAN; +BEGIN + EXECUTE FORMAT( + $QUERY$ + SELECT is_partitioned FROM pgmq.meta WHERE queue_name = %L + $QUERY$, + queue_name + ) INTO partitioned; + + EXECUTE FORMAT( + $QUERY$ + ALTER EXTENSION pgmq DROP TABLE pgmq.%I + $QUERY$, + qtable + ); + + EXECUTE FORMAT( + $QUERY$ + ALTER EXTENSION pgmq DROP TABLE pgmq.%I + $QUERY$, + atable + ); + + EXECUTE FORMAT( + $QUERY$ + DROP TABLE IF EXISTS pgmq.%I + $QUERY$, + qtable + ); + + EXECUTE FORMAT( + $QUERY$ + DROP TABLE IF EXISTS pgmq.%I + $QUERY$, + atable + ); + + IF EXISTS ( + SELECT 1 + FROM information_schema.tables + WHERE table_name = 'meta' and table_schema = 'pgmq' + ) THEN + EXECUTE FORMAT( + $QUERY$ + DELETE FROM pgmq.meta WHERE queue_name = %L + $QUERY$, + queue_name + ); + END IF; + + IF partitioned THEN + EXECUTE FORMAT( + $QUERY$ + DELETE FROM %I.part_config where parent_table in (%L, %L) + $QUERY$, + pgmq._get_pg_partman_schema(), fq_qtable, fq_atable + ); + END IF; + + RETURN TRUE; +END; +$$ LANGUAGE plpgsql; + +ALTER TYPE pgmq.metrics_result ADD ATTRIBUTE queue_visible_length bigint; + +DROP FUNCTION pgmq.metrics(queue_name TEXT); + +CREATE FUNCTION pgmq.metrics(queue_name TEXT) +RETURNS pgmq.metrics_result AS $$ +DECLARE + result_row pgmq.metrics_result; + query TEXT; + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); +BEGIN + query := FORMAT( + $QUERY$ + WITH q_summary AS ( + SELECT + count(*) as queue_length, + count(CASE WHEN vt <= NOW() THEN 1 END) as queue_visible_length, + EXTRACT(epoch FROM (NOW() - max(enqueued_at)))::int as newest_msg_age_sec, + EXTRACT(epoch FROM (NOW() - min(enqueued_at)))::int as oldest_msg_age_sec, + NOW() as scrape_time + FROM pgmq.%I + ), + all_metrics AS ( + SELECT CASE + WHEN is_called THEN last_value ELSE 0 + END as total_messages + FROM pgmq.%I + ) + SELECT + %L as queue_name, + q_summary.queue_length, + q_summary.newest_msg_age_sec, + q_summary.oldest_msg_age_sec, + all_metrics.total_messages, + q_summary.scrape_time, + q_summary.queue_visible_length + FROM q_summary, all_metrics + $QUERY$, + qtable, qtable || '_msg_id_seq', queue_name + ); + EXECUTE query INTO result_row; + RETURN result_row; +END; +$$ LANGUAGE plpgsql; diff --git a/pgmq-extension/sql/pgmq.sql b/pgmq-extension/sql/pgmq.sql index 751a22f8..e2dcabb8 100644 --- a/pgmq-extension/sql/pgmq.sql +++ b/pgmq-extension/sql/pgmq.sql @@ -141,7 +141,7 @@ BEGIN IF FOUND THEN RETURN; ELSE - PERFORM pg_sleep(poll_interval_ms / 1000); + PERFORM pg_sleep(poll_interval_ms::numeric / 1000); END IF; END LOOP; END; diff --git a/pgmq-extension/test/expected/base.out b/pgmq-extension/test/expected/base.out index c527f5da..35d7bb27 100644 --- a/pgmq-extension/test/expected/base.out +++ b/pgmq-extension/test/expected/base.out @@ -319,7 +319,7 @@ SELECT ARRAY( -- Read with poll will poll until the first message is available SELECT clock_timestamp() AS start \gset -SELECT msg_id = :msg_id1 FROM pgmq.read_with_poll('test_read_queue', 10, 5, 5, 100); +SELECT msg_id = :msg_id1 FROM pgmq.read_with_poll('test_read_queue', 10, 5, 6, 100); ?column? ---------- t diff --git a/pgmq-extension/test/sql/base.sql b/pgmq-extension/test/sql/base.sql index fe546038..56a9e37e 100644 --- a/pgmq-extension/test/sql/base.sql +++ b/pgmq-extension/test/sql/base.sql @@ -132,7 +132,7 @@ SELECT ARRAY( -- Read with poll will poll until the first message is available SELECT clock_timestamp() AS start \gset -SELECT msg_id = :msg_id1 FROM pgmq.read_with_poll('test_read_queue', 10, 5, 5, 100); +SELECT msg_id = :msg_id1 FROM pgmq.read_with_poll('test_read_queue', 10, 5, 6, 100); SELECT clock_timestamp() - :'start' > '3 second'::interval; -- test_purge_queue