Skip to content

Commit

Permalink
pgmq-python: explicitly cast types in send_batch (#365)
Browse files Browse the repository at this point in the history
* pgmq-python: explicitly cast types in send_batch

* type for all .send*

* types for read()

* run py tests when extension has changes
  • Loading branch information
ChuckHend authored Dec 19, 2024
1 parent 8e6df4b commit 05f27a7
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 12 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/pgmq_python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ on:
- "tembo-pgmq-python/tembo_pgmq_python/**"
- "tembo-pgmq-python/tests/**"
- "tembo-pgmq-python/pyproject.toml"
- "pgmq-extension/**"
push:
branches:
- main
Expand All @@ -22,7 +23,7 @@ on:
- "tembo-pgmq-python/tembo_pgmq_python/**"
- "tembo-pgmq-python/tests/**"
- "tembo-pgmq-python/pyproject.toml"

- "pgmq-extension/**"
jobs:
lints:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion tembo-pgmq-python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "tembo-pgmq-python"
version = "0.9.1"
version = "0.9.2"
description = "Python client for the PGMQ Postgres extension."
authors = ["Adam Hendel <adam@tembo.io>"]
license = "PostgreSQL"
Expand Down
10 changes: 5 additions & 5 deletions tembo-pgmq-python/tembo_pgmq_python/async_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ async def send(self, queue: str, message: dict, delay: int = 0, conn=None) -> in
async def _send_internal(self, queue, message, delay, conn):
self.logger.debug(f"Sending message to queue '{queue}' with delay={delay}")
result = await conn.fetchrow(
"SELECT * FROM pgmq.send($1, $2::jsonb, $3);",
"SELECT * FROM pgmq.send($1::text, $2::jsonb, $3::integer);",
queue,
dumps(message).decode("utf-8"),
delay,
Expand All @@ -186,11 +186,11 @@ async def send_batch(self, queue: str, messages: List[dict], delay: int = 0, con
else:
return await self._send_batch_internal(queue, messages, delay, conn)

async def _send_batch_internal(self, queue, messages, delay, conn):
async def _send_batch_internal(self, queue: str, messages: List[dict], delay: int, conn):
self.logger.debug(f"Sending batch of messages to queue '{queue}' with delay={delay}")
jsonb_array = [dumps(message).decode("utf-8") for message in messages]
result = await conn.fetch(
"SELECT * FROM pgmq.send_batch($1, $2::jsonb[], $3);",
"SELECT * FROM pgmq.send_batch($1::text, $2::jsonb[], $3::integer);",
queue,
jsonb_array,
delay,
Expand All @@ -213,7 +213,7 @@ async def read(self, queue: str, vt: Optional[int] = None, conn=None) -> Optiona
async def _read_internal(self, queue, vt, batch_size, conn):
self.logger.debug(f"Reading message from queue '{queue}' with vt={vt}")
rows = await conn.fetch(
"SELECT * FROM pgmq.read($1, $2, $3);",
"SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer);",
queue,
vt or self.vt,
batch_size,
Expand Down Expand Up @@ -246,7 +246,7 @@ async def read_batch(
async def _read_batch_internal(self, queue, vt, batch_size, conn):
self.logger.debug(f"Reading batch of messages from queue '{queue}' with vt={vt}")
rows = await conn.fetch(
"SELECT * FROM pgmq.read($1, $2, $3);",
"SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer);",
queue,
vt or self.vt,
batch_size,
Expand Down
10 changes: 5 additions & 5 deletions tembo-pgmq-python/tembo_pgmq_python/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,15 @@ def list_queues(self, conn=None) -> List[str]:
def send(self, queue: str, message: dict, delay: int = 0, conn=None) -> int:
"""Send a message to a queue."""
self.logger.debug(f"send called with conn: {conn}")
query = "select * from pgmq.send(%s, %s, %s);"
query = "select * from pgmq.send(%s::text, %s::jsonb, %s::integer);"
result = self._execute_query_with_result(query, [queue, Jsonb(message), delay], conn=conn)
return result[0][0]

@transaction
def send_batch(self, queue: str, messages: List[dict], delay: int = 0, conn=None) -> List[int]:
"""Send a batch of messages to a queue."""
self.logger.debug(f"send_batch called with conn: {conn}")
query = "select * from pgmq.send_batch(%s, %s, %s);"
query = "select * from pgmq.send_batch(%s::text, %s::jsonb[], %s::integer);"
params = [queue, [Jsonb(message) for message in messages], delay]
result = self._execute_query_with_result(query, params, conn=conn)
return [message[0] for message in result]
Expand All @@ -133,7 +133,7 @@ def send_batch(self, queue: str, messages: List[dict], delay: int = 0, conn=None
def read(self, queue: str, vt: Optional[int] = None, conn=None) -> Optional[Message]:
"""Read a message from a queue."""
self.logger.debug(f"read called with conn: {conn}")
query = "select * from pgmq.read(%s, %s, %s);"
query = "select * from pgmq.read(%s::text, %s::integer, %s::integer);"
rows = self._execute_query_with_result(query, [queue, vt or self.vt, 1], conn=conn)
messages = [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows]
return messages[0] if messages else None
Expand All @@ -142,7 +142,7 @@ def read(self, queue: str, vt: Optional[int] = None, conn=None) -> Optional[Mess
def read_batch(self, queue: str, vt: Optional[int] = None, batch_size=1, conn=None) -> Optional[List[Message]]:
"""Read a batch of messages from a queue."""
self.logger.debug(f"read_batch called with conn: {conn}")
query = "select * from pgmq.read(%s, %s, %s);"
query = "select * from pgmq.read(%s::text, %s::integer, %s::integer);"
rows = self._execute_query_with_result(query, [queue, vt or self.vt, batch_size], conn=conn)
return [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows]

Expand All @@ -158,7 +158,7 @@ def read_with_poll(
) -> Optional[List[Message]]:
"""Read messages from a queue with polling."""
self.logger.debug(f"read_with_poll called with conn: {conn}")
query = "select * from pgmq.read_with_poll(%s, %s, %s, %s, %s);"
query = "select * from pgmq.read_with_poll(%s::text, %s::integer, %s::integer, %s::integer, %s::integer);"
params = [queue, vt or self.vt, qty, max_poll_seconds, poll_interval_ms]
rows = self._execute_query_with_result(query, params, conn=conn)
return [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows]
Expand Down

0 comments on commit 05f27a7

Please sign in to comment.