Skip to content

Commit

Permalink
Create send_at and send_batch_at functions receiving a timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
Neptune650 committed Oct 17, 2024
1 parent e2740d0 commit 9e48c61
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 0 deletions.
67 changes: 67 additions & 0 deletions docs/api/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,39 @@ select * from pgmq.send('my_queue', '{"hello": "world"}');

---

### send_at

Send a single message to a queue with delay as a timestamp.

```text
pgmq.send_at(
queue_name text,
msg jsonb,
delay timestamp
)
RETURNS SETOF bigint
```

**Parameters:**

| Parameter | Type | Description |
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
| msg | jsonb | The message to send to the queue |
| delay | timestamp | Timestamp until the message becomes visible. |

Example:

```sql
select * from pgmq.send_at('my_queue', '{"hello": "world"}', CURRENT_TIMESTAMP);
send_at
---------
4
```

---

### send_batch

Send 1 or more messages to a queue.
Expand Down Expand Up @@ -68,6 +101,40 @@ select * from pgmq.send_batch('my_queue', ARRAY[

---

### send_batch_at

Send 1 or more messages to a queue with delay as a timestamp.

```text
pgmq.send_batch(
queue_name text,
msgs jsonb[],
delay timestamp
)
RETURNS SETOF bigint
```
**Parameters:**

| Parameter | Type | Description |
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
| msgs | jsonb[] | Array of messages to send to the queue |
| delay | timestamp | Timestamp until the messages become visible. |

```sql
select * from pgmq.send_batch_at('my_queue', ARRAY[
'{"hello": "world_0"}'::jsonb,
'{"hello": "world_1"}'::jsonb],
CURRENT_TIMESTAMP
);
send_batch_at
---------------
1
2
```

---

## Reading Messages

### read
Expand Down
46 changes: 46 additions & 0 deletions pgmq-extension/sql/pgmq.sql
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,29 @@ BEGIN
END;
$$ LANGUAGE plpgsql;
-- send_at
-- sends a message to a queue, with a delay as a timestamp
CREATE FUNCTION pgmq.send_at(
queue_name TEXT,
msg JSONB,
delay TIMESTAMP
) RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
INSERT INTO pgmq.%I (vt, message)
VALUES ((clock_timestamp() + %L), $1)
RETURNING msg_id;
$QUERY$,
qtable, AGE(date_trunc('second', delay), date_trunc('second', CURRENT_TIMESTAMP))
);
RETURN QUERY EXECUTE sql USING msg;
END;
$$ LANGUAGE plpgsql;
-- send_batch
-- sends an array of list of messages to a queue, optionally with a delay
CREATE FUNCTION pgmq.send_batch(
Expand All @@ -305,6 +328,29 @@ BEGIN
END;
$$ LANGUAGE plpgsql;
-- send_batch_at
-- sends an array of list of messages to a queue, with a delay as a timestamp
CREATE FUNCTION pgmq.send_batch_at(
queue_name TEXT,
msgs JSONB[],
delay TIMESTAMP
) RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
INSERT INTO pgmq.%I (vt, message)
SELECT clock_timestamp() + %L, unnest($1)
RETURNING msg_id;
$QUERY$,
qtable, AGE(date_trunc('second', delay), date_trunc('second', CURRENT_TIMESTAMP))
);
RETURN QUERY EXECUTE sql USING msgs;
END;
$$ LANGUAGE plpgsql;
-- returned by pgmq.metrics() and pgmq.metrics_all
CREATE TYPE pgmq.metrics_result AS (
queue_name text,
Expand Down
44 changes: 44 additions & 0 deletions tembo-pgmq-python/tembo_pgmq_python/async_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,27 @@ async def _send_internal(self, queue, message, delay, conn):
self.logger.debug(f"Message sent with msg_id={result[0]}")
return result[0]

@transaction
async def send_at(self, queue: str, message: dict, delay: str, conn=None) -> int:
"""Send a message to a queue with timestamp."""
self.logger.debug(f"send_at called with queue='{queue}', message={message}, delay={delay}, conn={conn}")
if conn is None:
async with self.pool.acquire() as conn:
return await self._send_at_internal(queue, message, delay, conn)
else:
return await self._send_at_internal(queue, message, delay, conn)

async def _send_at_internal(self, queue, message, delay, conn):
self.logger.debug(f"Sending message to queue '{queue}' with delay={delay}")
result = await conn.fetchrow(
"SELECT * FROM pgmq.send_at($1, $2::jsonb, $3);",
queue,
dumps(message).decode("utf-8"),
delay,
)
self.logger.debug(f"Message sent with msg_id={result[0]}")
return result[0]

@transaction
async def send_batch(self, queue: str, messages: List[dict], delay: int = 0, conn=None) -> List[int]:
"""Send a batch of messages to a queue."""
Expand All @@ -198,6 +219,29 @@ async def _send_batch_internal(self, queue, messages, delay, conn):
self.logger.debug(f"Batch messages sent with msg_ids={msg_ids}")
return msg_ids

@transaction
async def send_batch_at(self, queue: str, messages: List[dict], delay: str, conn=None) -> List[int]:
"""Send a batch of messages to a queue with timestamp."""
self.logger.debug(f"send_batch_at called with queue='{queue}', messages={messages}, delay={delay}, conn={conn}")
if conn is None:
async with self.pool.acquire() as conn:
return await self._send_batch_at_internal(queue, messages, delay, conn)
else:
return await self._send_batch_at_internal(queue, messages, delay, conn)

async def _send_batch_at_internal(self, queue, messages, delay, conn):
self.logger.debug(f"Sending batch of messages to queue '{queue}' with delay={delay}")
jsonb_array = [dumps(message).decode("utf-8") for message in messages]
result = await conn.fetch(
"SELECT * FROM pgmq.send_batch_at($1, $2::jsonb[], $3);",
queue,
jsonb_array,
delay,
)
msg_ids = [message[0] for message in result]
self.logger.debug(f"Batch messages sent with msg_ids={msg_ids}")
return msg_ids

@transaction
async def read(self, queue: str, vt: Optional[int] = None, conn=None) -> Optional[Message]:
"""Read a message from a queue."""
Expand Down
17 changes: 17 additions & 0 deletions tembo-pgmq-python/tembo_pgmq_python/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ def send(self, queue: str, message: dict, delay: int = 0, conn=None) -> int:
result = self._execute_query_with_result(query, [queue, Jsonb(message), delay], conn=conn)
return result[0][0]

@transaction
def send_at(self, queue: str, message: dict, delay: str, conn=None) -> int:
"""Send a message to a queue with timestamp."""
self.logger.debug(f"send called with conn: {conn}")
query = "select * from pgmq.send_at(%s, %s, %s);"
result = self._execute_query_with_result(query, [queue, Jsonb(message), delay], conn=conn)
return result[0][0]

@transaction
def send_batch(self, queue: str, messages: List[dict], delay: int = 0, conn=None) -> List[int]:
"""Send a batch of messages to a queue."""
Expand All @@ -128,6 +136,15 @@ def send_batch(self, queue: str, messages: List[dict], delay: int = 0, conn=None
result = self._execute_query_with_result(query, params, conn=conn)
return [message[0] for message in result]

@transaction
def send_batch_at(self, queue: str, messages: List[dict], delay: str, conn=None) -> List[int]:
"""Send a batch of messages to a queue with timestamp."""
self.logger.debug(f"send_batch called with conn: {conn}")
query = "select * from pgmq.send_batch_at(%s, %s, %s);"
params = [queue, [Jsonb(message) for message in messages], delay]
result = self._execute_query_with_result(query, params, conn=conn)
return [message[0] for message in result]

@transaction
def read(self, queue: str, vt: Optional[int] = None, conn=None) -> Optional[Message]:
"""Read a message from a queue."""
Expand Down

0 comments on commit 9e48c61

Please sign in to comment.