Skip to content

Commit

Permalink
Merge pull request #20 from Clarilab/feat/add-getting-queue-info
Browse files Browse the repository at this point in the history
feat: add getting queue info
  • Loading branch information
nicoandrewss authored Dec 19, 2024
2 parents 696e213 + b0a20f2 commit 53413b8
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 2 deletions.
39 changes: 39 additions & 0 deletions clarimq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"os"
"os/exec"
"reflect"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -990,6 +991,44 @@ func Test_Integration_ManualRemoveExchangeQueueAndBindings(t *testing.T) {
}
}

func Test_Integration_InspectQueue(t *testing.T) {
t.Parallel()

var queueName = stringGen()

conn := getConnection(t)

consumer, err := clarimq.NewConsumer(
conn,
queueName,
nil,
clarimq.WithQueueOptionDurable(true),
clarimq.WithQueueOptionAutoDelete(true),
)
requireNoError(t, err)

t.Cleanup(func() { requireNoError(t, consumer.Close()) })

publisher, err := clarimq.NewPublisher(conn)
requireNoError(t, err)

t.Cleanup(func() { requireNoError(t, publisher.Close()) })

for i := range 10 {
err = publisher.Publish(context.Background(), queueName, "test-message-"+strconv.Itoa(i+1))
requireNoError(t, err)
}

time.Sleep(time.Second * 2)

queueInfo, err := conn.InspectQueue(queueName)
requireNoError(t, err)

requireEqual(t, queueName, queueInfo.Name)
requireEqual(t, 10, queueInfo.Messages)
requireEqual(t, 0, queueInfo.Consumers)
}

func Test_Integration_ReturnHandler(t *testing.T) {
t.Parallel()

Expand Down
11 changes: 11 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,17 @@ func (c *Connection) RemoveQueue(name string, ifUnused, ifEmpty, noWait bool) (i
return purgedMessages, nil
}

func (c *Connection) InspectQueue(name string) (*QueueInfo, error) {
const errMessage = "failed to inspect queue: %w"

result, err := getQueueInfo(c.channelExec, name)
if err != nil {
return nil, fmt.Errorf(errMessage, err)
}

return result, nil
}

// RemoveBinding removes a binding between an exchange and queue matching the key and arguments.
//
// It is possible to send and empty string for the exchange name which means to unbind the queue from the default exchange.
Expand Down
26 changes: 26 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
amqp "github.com/rabbitmq/amqp091-go"
)

// QueueInfo represents the current server state of a queue on the server.
type QueueInfo amqp.Queue

// QueueOptions are used to configure a queue.
// A passive queue is assumed by RabbitMQ to already exist, and attempting to connect
// to a non-existent queue will cause RabbitMQ to throw an exception.
Expand Down Expand Up @@ -80,3 +83,26 @@ func declareQueue(channelExec channelExec, options *QueueOptions) error {

return nil
}

func getQueueInfo(channelExec channelExec, name string) (*QueueInfo, error) {
const errMessage = "failed to get queue info: %w"

result := new(QueueInfo)

exec := func(channel *amqp.Channel) error {
q, err := channel.QueueDeclarePassive(name, false, false, false, false, nil)
if err != nil {
return fmt.Errorf(errMessage, err)
}

*result = QueueInfo(q)

return nil
}

if err := channelExec(exec); err != nil {
return nil, fmt.Errorf(errMessage, err)
}

return result, nil
}
4 changes: 2 additions & 2 deletions run_integration_tests.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/sh

docker-compose down
docker-compose up -d
docker compose down
docker compose up -d


CONTAINER=rabbitmq
Expand Down

0 comments on commit 53413b8

Please sign in to comment.