Skip to content

Commit

Permalink
feat: add convenience implementation for golang slog package (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxBreida authored Sep 16, 2024
1 parent 98161b8 commit 696e213
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 38 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
*.out
.vscode
.vscode
*.DS_Store
83 changes: 49 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
## INFO
# ClariMQ

This library is a wrapper around the [Go AMQP Client Library](https://github.com/rabbitmq/amqp091-go).

This library includes support for:

- structured logging to multiple writers
- automatic recovery
- retry functionality
Expand All @@ -27,10 +28,12 @@ The connection can be configured by passing needed connection options.
Also there is the possibility to fully customize the configuration by passing a **ConnectionOptions** struct with the corresponding option.
To ensure correct escaping of the URI, the **SettingsToURI** function can be used to convert a **ConnectionSettings** struct to a valid URI.

#### Note:
#### Note

_Although it is possible to publish and consume with one connection, it is best practice to use two separate connections for publisher and consumer activities._

##### Example Connection with some options:
#### Example Connection with some options

```Go
conn, err := clarimq.NewConnection("amqp://user:password@localhost:5672/",
clarimq.WithConnectionOptionConnectionName("app-name-connection"),
Expand All @@ -41,7 +44,8 @@ if err != nil {
}
```

##### Example Connection with custom options:
#### Example Connection with custom options

```Go
connectionSettings := &clarimq.ConnectionSettings{
UserName: "username",
Expand Down Expand Up @@ -73,7 +77,8 @@ if err != nil {

When the connection is no longer needed, it should be closed to conserve resources.

##### Example
#### Example

```Go
if err := conn.Close(); err != nil {
// handle error
Expand All @@ -85,7 +90,8 @@ if err := conn.Close(); err != nil {
The "NotifyErrors()" method provides a channel that returns any errors that may happen concurrently. Mainly custom errors of types **clarimq.AMQPError** and **clarimq.RecoveryFailedError** are returned.


##### Example
#### Example

```Go
handleErrors := func(errChan <-chan error) {
for err := range errChan {
Expand Down Expand Up @@ -124,7 +130,8 @@ To publish messages a publisher instance needs to be created. A previously creat
The publisher can be configured by passing needed connector options.
Also there is the possibility to fully customize the configuration by passing a **PublishOptions** struct with the corresponding option.

##### Example
#### Example

```Go
publisher, err := clarimq.NewPublisher(conn,
clarimq.WithPublishOptionAppID("my-application"),
Expand All @@ -135,10 +142,12 @@ if err != nil {
// handle error
}
```

The publisher can then be used to publish messages.
The target can be a queue name, or a topic if the publisher is configured to publish messages to an exchange.

##### Example Simple publish:
#### Example Simple publish

```Go
if err := publisher.Publish(context.Background(), "my-target", "my-message"); err != nil {
// handle error
Expand All @@ -148,7 +157,8 @@ if err := publisher.Publish(context.Background(), "my-target", "my-message"); er
Optionally the **PublishWithOptions** method can be used to configure the publish options just for this specific publish.
The Method also gives the possibility to publish to multiple targets at once.

##### Example Publish with options:
#### Example Publish with options

```Go
if err := publisher.PublishWithOptions(context.Background(), []string{"my-target-1","my-target-2"}, "my-message",
clarimq.WithPublishOptionMessageID("99819a3a-388f-4199-b7e6-cc580d85a2e5"),
Expand All @@ -165,7 +175,8 @@ To consume messages a consumer instance needs to be created. A previously create
The consumer can be configured by passing needed consume options.
Also there is the possibility to fully customize the configuration by passing a **ConsumeOptions** struct with the corresponding option.

##### Example
#### Example

```Go
consumer, err := clarimq.NewConsumer(conn, "my-queue", handler(),
clarimq.WithConsumerOptionConsumerName("my-consumer"),
Expand All @@ -183,7 +194,9 @@ if err != nil {

The consumer can be set up to immediately start consuming messages from the broker by using the **WithConsumerOptionConsumeAfterCreation** option.
The consumer then does not need to be started with the **Start** method. An error will be returned when trying to start an already started/running consumer.
##### Example

#### Example

```Go
consumer, err := clarimq.NewConsumer(conn, "my-queue", handler(),
clarimq.WithConsumerOptionConsumeAfterCreation(true),
Expand All @@ -197,7 +210,8 @@ if err != nil {

The consumer can be used to declare exchanges, queues and queue-bindings:

##### Example
#### Example

```Go
consumer, err := clarimq.NewConsumer(conn, "my-queue", handler(),
clarimq.WithConsumerOptionConsumerName("my-consumer"),
Expand All @@ -219,41 +233,40 @@ if err != nil {

The consumer can be closed to stop consuming if needed. The consumer does not need to be explicitly closed for a graceful shutdown if its connection is closed afterwards. However when using the retry functionality without providing a connection, the consumer must be closed for a graceful shutdown of the retry connection to conserve resources.

##### Example
#### Example

```Go
if err := consumer.Close(); err != nil {
// handle error
}
```

### Logging:
Structured logging is supported with the golang "log/slog" package.
A text- or json-logger can be specified with the desired log level.
The logs are written to a io.Writer that also can be specified.
### Logging

Structured logging is supported either with the golang "log/slog" package or by passing a custom logger that implements the clarimq.Logger interface.

Note: Multiple loggers can be specified!

##### Example
#### Example
```Go
jsonBuff := new(bytes.Buffer)
textBuff := new(bytes.Buffer)

conn, err := clarimq.NewConnection(connectionSettings,
clarimq.WithConnectionOptionTextLogging(os.Stdout, slog.LevelInfo),
clarimq.WithConnectionOptionTextLogging(textBuff, slog.LevelWarn),
clarimq.WithConnectionOptionJSONLogging(jsonBuff, slog.LevelDebug),
clarimq.WithConnectionOptionLoggers(
myCustomLogger,
clarimq.NewSlogLogger(mySlogLogger),
)
)
if err != nil {
// handle error
}
```

### Return Handler:
### Return Handler
When publishing mandatory messages, they will be returned if it is not possible to route the message to the given destination. A return handler can be specified to handle the the return. The return contains the original message together with some information such as an error code and an error code description.

If no return handler is specified a log will be written to the logger at warn level.

##### Example
#### Example

```Go
returnHandler := func(r clarimq.Return) {
// handle the return
Expand All @@ -269,11 +282,12 @@ if err != nil {
}
```

### Recovery:
### Recovery

This library provides an automatic recovery with build-in exponential back-off functionality. When the connection to the broker is lost, the recovery will automatically try to reconnect. You can adjust the parameters of the back-off algorithm:

##### Example
#### Example

```Go
conn, err := clarimq.NewConnection(settings,
clarimq.WithConnectionOptionRecoveryInterval(2), // default is 1 second
Expand All @@ -287,8 +301,7 @@ if err != nil {

For the case the maximum number of retries is reached, a custom error of type **RecoveryFailedError** will be send to the error channel.


### Publishing Cache:
### Publishing Cache

To prevent loosing messages from being published while the broker has downtime / the client is recovering, the Publishing Cache can be used to cache the messages and publish them as soon as the client is fully recovered. The cache itself is an interface that can be implemented to the users needs. For example it could be implemented to use a redis store or any other storage of choice.

Expand All @@ -304,7 +317,8 @@ When implementing the publishing cache, it must be properly protected from concu

_Hint: The "cache" sub-package provides a simple "in-memory-cache" implementation, that can be used for testing, but could also be used in production._

##### Example
#### Example

```Go
publisher, err := clarimq.NewPublisher(publishConn,
clarimq.WithPublisherOptionPublishingCache(cache.NewBasicMemoryCache()),
Expand All @@ -331,11 +345,12 @@ if err = b.publisher.PublishWithOptions(context.Background(), "my-target", "my-m
}
```

### Retry:
### Retry

This library includes a retry functionality with a dead letter exchange and dead letter queues. To use the retry, some parameters have to be set:

##### Example
#### Example

```Go
consumeConn, err := clarimq.NewConnection(clarimq.SettingsToURI(settings))
if err != nil {
Expand Down Expand Up @@ -363,7 +378,7 @@ if err != nil {
}
```

It is recommended to provide a separate publish connection for the retry functionality. If no connection is specified, a separate connection is established internally.
It is recommended to provide a separate publish connection for the retry functionality. If no connection is specified, a separate connection is established internally.

For each given delay a separate dead letter queue is declared. When a delivery is nacked by the consumer, it is republished via the delay queues one after another until it is acknowledged or the specified maximum number of retry attempts is reached.

Expand Down
6 changes: 3 additions & 3 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (c *Connection) closeForRenewal() error {
//
// When ifEmpty is true, the queue will not be deleted if there are any messages remaining on the queue.
// If there are messages, an error will be returned and the channel will be closed.
func (c *Connection) RemoveQueue(name string, ifUnused bool, ifEmpty bool, noWait bool) (int, error) {
func (c *Connection) RemoveQueue(name string, ifUnused, ifEmpty, noWait bool) (int, error) {
const errMessage = "failed to remove queue: %w"

purgedMessages, err := c.amqpChannel.QueueDelete(name, ifUnused, ifEmpty, noWait)
Expand All @@ -208,7 +208,7 @@ func (c *Connection) RemoveQueue(name string, ifUnused bool, ifEmpty bool, noWai
// RemoveBinding removes a binding between an exchange and queue matching the key and arguments.
//
// It is possible to send and empty string for the exchange name which means to unbind the queue from the default exchange.
func (c *Connection) RemoveBinding(queueName string, routingKey string, exchangeName string, args Table) error {
func (c *Connection) RemoveBinding(queueName, routingKey, exchangeName string, args Table) error {
const errMessage = "failed to remove binding: %w"

if err := c.amqpChannel.QueueUnbind(queueName, routingKey, exchangeName, amqp.Table(args)); err != nil {
Expand All @@ -227,7 +227,7 @@ func (c *Connection) RemoveBinding(queueName string, routingKey string, exchange
//
// When noWait is true, do not wait for a broker confirmation that the exchange has been deleted.
// Failing to delete the channel could close the channel. Add a NotifyClose listener to respond to these channel exceptions.
func (c *Connection) RemoveExchange(name string, ifUnused bool, noWait bool) error {
func (c *Connection) RemoveExchange(name string, ifUnused, noWait bool) error {
const errMessage = "failed to remove exchange: %w"

if err := c.amqpChannel.ExchangeDelete(name, ifUnused, noWait); err != nil {
Expand Down
46 changes: 46 additions & 0 deletions logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clarimq

import (
"context"
"log/slog"
)

// Logger is an interface that is be used for log messages.
Expand Down Expand Up @@ -43,3 +44,48 @@ func (l *logger) logWarn(ctx context.Context, msg string, args ...any) {
l.loggers[i].Warn(ctx, msg, args...)
}
}

// SlogLogger is a clarimq.Logger implementation that uses slog.Logger.
type SlogLogger struct {
logger *slog.Logger
}

// Debug logs a debug message with the provided attributes.
func (s *SlogLogger) Debug(ctx context.Context, msg string, attrs ...any) {
s.logger.DebugContext(ctx, msg, attrs...)
}

// Info logs an info message with the provided attributes.
func (s *SlogLogger) Info(ctx context.Context, msg string, attrs ...any) {
s.logger.InfoContext(ctx, msg, attrs...)
}

// Warn logs a warning message with the provided attributes.
func (s *SlogLogger) Warn(ctx context.Context, msg string, attrs ...any) {
s.logger.WarnContext(ctx, msg, attrs...)
}

// Error logs an error message with the provided attributes and error.
func (s *SlogLogger) Error(ctx context.Context, msg string, err error, attrs ...any) {
if err != nil {
attrs = append(attrs, "error", err.Error())
}

s.logger.ErrorContext(ctx, msg, attrs...)
}

// NewSlogLogger creates a new instance of SlogLogger.
// If a logger is not provided, it will use the default slog.Logger.
//
// Parameters:
// - logger: A pointer to a slog.Logger instance. If nil, it will use the default logger.
//
// Returns:
// - A new SlogLogger instance that implements the clarimq.Logger.
func NewSlogLogger(logger *slog.Logger) *SlogLogger {
if logger == nil {
logger = slog.Default()
}

return &SlogLogger{logger}
}

0 comments on commit 696e213

Please sign in to comment.