diff --git a/service/message_queue_service.py b/service/message_queue_service.py index 70dc2c8..b503cbf 100644 --- a/service/message_queue_service.py +++ b/service/message_queue_service.py @@ -131,11 +131,11 @@ async def listen( if exchange_name not in self._exchanges: await self.declare_exchange(exchange_name, exchange_type) - queue = await self._channel.declare_queue(config.QUEUE_NAME, exclusive=True, durable=True) + queue = await self._channel.declare_queue(config.QUEUE_NAME, durable=True) await queue.bind(exchange=exchange_name, routing_key=routing_key) - await queue.consume(callback) + await queue.consume(callback, exclusive=True) async def reconnect(self) -> None: await self.shutdown() diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py index c9071e7..982b7af 100644 --- a/tests/integration_tests/conftest.py +++ b/tests/integration_tests/conftest.py @@ -22,12 +22,13 @@ async def initialize(self): exchange = await channel.declare_exchange( config.EXCHANGE_NAME, aio_pika.ExchangeType.TOPIC, durable=True ) - self.queue = await channel.declare_queue("test_queue", exclusive=True) + self.queue = await channel.declare_queue("test_queue", durable=True) await self.queue.bind(exchange, routing_key="#") - self.consumer_tag = await self.queue.consume(self.callback) + self.consumer_tag = await self.queue.consume(self.callback, exclusive=True) def callback(self, message): + message.ack() self.received_messages.append(message) self._logger.debug("Received message %r", message) self._callback()