Skip to content

Commit

Permalink
Fix regression: not respecting interval in read_with_poll (#343)
Browse files Browse the repository at this point in the history
* Fix regression: not respecting interval in read_with_poll

The regression happened in the port to PL/pgSQL.

Division of integers on postgres yields integer results. Our unit conversion from
milliseconds to seconds didn't consider that, then returning wrong results.
This caused `pgmq.read_with_poll` to sleep 0s instead of `poll_interval_ms`
if `poll_interval_ms` was under 1000, using 100% of CPU.

The fix is simply converting `poll_interval_ms` to `numeric` before doing the
division, which produces a `numeric` result as originally intended.

* Fix tests
  • Loading branch information
v0idpwn authored and ChuckHend committed Nov 15, 2024
1 parent 9787166 commit bb598b2
Show file tree
Hide file tree
Showing 5 changed files with 295 additions and 3 deletions.
55 changes: 55 additions & 0 deletions pgmq-extension/sql/pgmq--1.4.4--1.4.5.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
CREATE OR REPLACE FUNCTION pgmq.read_with_poll(
queue_name TEXT,
vt INTEGER,
qty INTEGER,
max_poll_seconds INTEGER DEFAULT 5,
poll_interval_ms INTEGER DEFAULT 100
)
RETURNS SETOF pgmq.message_record AS $$
DECLARE
r pgmq.message_record;
stop_at TIMESTAMP;
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
stop_at := clock_timestamp() + make_interval(secs => max_poll_seconds);
LOOP
IF (SELECT clock_timestamp() >= stop_at) THEN
RETURN;
END IF;

sql := FORMAT(
$QUERY$
WITH cte AS
(
SELECT msg_id
FROM pgmq.%I
WHERE vt <= clock_timestamp()
ORDER BY msg_id ASC
LIMIT $1
FOR UPDATE SKIP LOCKED
)
UPDATE pgmq.%I m
SET
vt = clock_timestamp() + %L,
read_ct = read_ct + 1
FROM cte
WHERE m.msg_id = cte.msg_id
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message;
$QUERY$,
qtable, qtable, make_interval(secs => vt)
);

FOR r IN
EXECUTE sql USING qty
LOOP
RETURN NEXT r;
END LOOP;
IF FOUND THEN
RETURN;
ELSE
PERFORM pg_sleep(poll_interval_ms::numeric / 1000);
END IF;
END LOOP;
END;
$$ LANGUAGE plpgsql;
237 changes: 237 additions & 0 deletions pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
-- read
-- reads a number of messages from a queue, setting a visibility timeout on them
DROP FUNCTION IF EXISTS pgmq.read(TEXT, INTEGER, INTEGER);
CREATE FUNCTION pgmq.read(
queue_name TEXT,
vt INTEGER,
qty INTEGER,
conditional JSONB DEFAULT '{}'
)
RETURNS SETOF pgmq.message_record AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
WITH cte AS
(
SELECT msg_id
FROM pgmq.%I
WHERE vt <= clock_timestamp() AND CASE
WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer
ELSE 1
END = 1
ORDER BY msg_id ASC
LIMIT $1
FOR UPDATE SKIP LOCKED
)
UPDATE pgmq.%I m
SET
vt = clock_timestamp() + %L,
read_ct = read_ct + 1
FROM cte
WHERE m.msg_id = cte.msg_id
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message;
$QUERY$,
qtable, conditional, qtable, make_interval(secs => vt)
);
RETURN QUERY EXECUTE sql USING qty;
END;
$$ LANGUAGE plpgsql;

---- read_with_poll
---- reads a number of messages from a queue, setting a visibility timeout on them
DROP FUNCTION IF EXISTS pgmq.read_with_poll(TEXT, INTEGER, INTEGER, INTEGER, INTEGER);
CREATE FUNCTION pgmq.read_with_poll(
queue_name TEXT,
vt INTEGER,
qty INTEGER,
max_poll_seconds INTEGER DEFAULT 5,
poll_interval_ms INTEGER DEFAULT 100,
conditional JSONB DEFAULT '{}'
)
RETURNS SETOF pgmq.message_record AS $$
DECLARE
r pgmq.message_record;
stop_at TIMESTAMP;
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
stop_at := clock_timestamp() + make_interval(secs => max_poll_seconds);
LOOP
IF (SELECT clock_timestamp() >= stop_at) THEN
RETURN;
END IF;

sql := FORMAT(
$QUERY$
WITH cte AS
(
SELECT msg_id
FROM pgmq.%I
WHERE vt <= clock_timestamp() AND CASE
WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer
ELSE 1
END = 1
ORDER BY msg_id ASC
LIMIT $1
FOR UPDATE SKIP LOCKED
)
UPDATE pgmq.%I m
SET
vt = clock_timestamp() + %L,
read_ct = read_ct + 1
FROM cte
WHERE m.msg_id = cte.msg_id
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message;
$QUERY$,
qtable, conditional, qtable, make_interval(secs => vt)
);

FOR r IN
EXECUTE sql USING qty
LOOP
RETURN NEXT r;
END LOOP;
IF FOUND THEN
RETURN;
ELSE
PERFORM pg_sleep(poll_interval_ms::numeric / 1000);
END IF;
END LOOP;
END;
$$ LANGUAGE plpgsql;

DROP FUNCTION IF EXISTS pgmq.drop_queue(TEXT, BOOLEAN);

CREATE FUNCTION pgmq.drop_queue(queue_name TEXT, partitioned BOOLEAN)
RETURNS BOOLEAN AS $$
DECLARE
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
fq_qtable TEXT := 'pgmq.' || qtable;
atable TEXT := pgmq.format_table_name(queue_name, 'a');
fq_atable TEXT := 'pgmq.' || atable;
BEGIN
RAISE WARNING "drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead.";

PERFORM pgmq.drop_queue(queue_name);

RETURN TRUE;
END;
$$ LANGUAGE plpgsql;

CREATE FUNCTION pgmq.drop_queue(queue_name TEXT)
RETURNS BOOLEAN AS $$
DECLARE
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
fq_qtable TEXT := 'pgmq.' || qtable;
atable TEXT := pgmq.format_table_name(queue_name, 'a');
fq_atable TEXT := 'pgmq.' || atable;
partitioned BOOLEAN;
BEGIN
EXECUTE FORMAT(
$QUERY$
SELECT is_partitioned FROM pgmq.meta WHERE queue_name = %L
$QUERY$,
queue_name
) INTO partitioned;

EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP TABLE pgmq.%I
$QUERY$,
qtable
);

EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP TABLE pgmq.%I
$QUERY$,
atable
);

EXECUTE FORMAT(
$QUERY$
DROP TABLE IF EXISTS pgmq.%I
$QUERY$,
qtable
);

EXECUTE FORMAT(
$QUERY$
DROP TABLE IF EXISTS pgmq.%I
$QUERY$,
atable
);

IF EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_name = 'meta' and table_schema = 'pgmq'
) THEN
EXECUTE FORMAT(
$QUERY$
DELETE FROM pgmq.meta WHERE queue_name = %L
$QUERY$,
queue_name
);
END IF;

IF partitioned THEN
EXECUTE FORMAT(
$QUERY$
DELETE FROM %I.part_config where parent_table in (%L, %L)
$QUERY$,
pgmq._get_pg_partman_schema(), fq_qtable, fq_atable
);
END IF;

RETURN TRUE;
END;
$$ LANGUAGE plpgsql;

ALTER TYPE pgmq.metrics_result ADD ATTRIBUTE queue_visible_length bigint;

DROP FUNCTION pgmq.metrics(queue_name TEXT);

CREATE FUNCTION pgmq.metrics(queue_name TEXT)
RETURNS pgmq.metrics_result AS $$
DECLARE
result_row pgmq.metrics_result;
query TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
query := FORMAT(
$QUERY$
WITH q_summary AS (
SELECT
count(*) as queue_length,
count(CASE WHEN vt <= NOW() THEN 1 END) as queue_visible_length,
EXTRACT(epoch FROM (NOW() - max(enqueued_at)))::int as newest_msg_age_sec,
EXTRACT(epoch FROM (NOW() - min(enqueued_at)))::int as oldest_msg_age_sec,
NOW() as scrape_time
FROM pgmq.%I
),
all_metrics AS (
SELECT CASE
WHEN is_called THEN last_value ELSE 0
END as total_messages
FROM pgmq.%I
)
SELECT
%L as queue_name,
q_summary.queue_length,
q_summary.newest_msg_age_sec,
q_summary.oldest_msg_age_sec,
all_metrics.total_messages,
q_summary.scrape_time,
q_summary.queue_visible_length
FROM q_summary, all_metrics
$QUERY$,
qtable, qtable || '_msg_id_seq', queue_name
);
EXECUTE query INTO result_row;
RETURN result_row;
END;
$$ LANGUAGE plpgsql;
2 changes: 1 addition & 1 deletion pgmq-extension/sql/pgmq.sql
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ BEGIN
IF FOUND THEN
RETURN;
ELSE
PERFORM pg_sleep(poll_interval_ms / 1000);
PERFORM pg_sleep(poll_interval_ms::numeric / 1000);
END IF;
END LOOP;
END;
Expand Down
2 changes: 1 addition & 1 deletion pgmq-extension/test/expected/base.out
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ SELECT ARRAY(

-- Read with poll will poll until the first message is available
SELECT clock_timestamp() AS start \gset
SELECT msg_id = :msg_id1 FROM pgmq.read_with_poll('test_read_queue', 10, 5, 5, 100);
SELECT msg_id = :msg_id1 FROM pgmq.read_with_poll('test_read_queue', 10, 5, 6, 100);
?column?
----------
t
Expand Down
2 changes: 1 addition & 1 deletion pgmq-extension/test/sql/base.sql
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ SELECT ARRAY(

-- Read with poll will poll until the first message is available
SELECT clock_timestamp() AS start \gset
SELECT msg_id = :msg_id1 FROM pgmq.read_with_poll('test_read_queue', 10, 5, 5, 100);
SELECT msg_id = :msg_id1 FROM pgmq.read_with_poll('test_read_queue', 10, 5, 6, 100);
SELECT clock_timestamp() - :'start' > '3 second'::interval;

-- test_purge_queue
Expand Down

0 comments on commit bb598b2

Please sign in to comment.