Skip to content

Commit

Permalink
Update AMQP pool with NotifyClose based reconnect.
Browse files Browse the repository at this point in the history
  • Loading branch information
brocaar committed Mar 23, 2020
1 parent 052325c commit 600d6f3
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 38 deletions.
19 changes: 7 additions & 12 deletions internal/backend/gateway/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ var gatewayIDRegexp = regexp.MustCompile(`([0-9a-fA-F]{16})`)

// Backend implements an AMQP backend.
type Backend struct {
conn *amqp.Connection
chPool *pool

eventQueueName string
Expand Down Expand Up @@ -54,12 +53,7 @@ func NewBackend(c config.Config) (gateway.Gateway, error) {
}

log.Info("gateway/amqp: connecting to AMQP server")
b.conn, err = amqp.Dial(config.URL)
if err != nil {
return nil, errors.Wrap(err, "dial amqp url error")
}

b.chPool, err = newPool(10, b.conn)
b.chPool, err = newPool(10, config.URL)
if err != nil {
return nil, errors.Wrap(err, "new amqp channel pool error")
}
Expand Down Expand Up @@ -126,7 +120,7 @@ func (b *Backend) Close() error {
close(b.uplinkFrameChan)
close(b.gatewayStatsChan)
close(b.downlinkTXAckChan)
return b.conn.Close()
return b.chPool.close()
}

func (b *Backend) publishCommand(fields log.Fields, gatewayID lorawan.EUI64, command string, t marshaler.Type, data []byte) error {
Expand Down Expand Up @@ -177,13 +171,13 @@ func (b *Backend) publishCommand(fields log.Fields, gatewayID lorawan.EUI64, com
}

func (b *Backend) setupQueue() error {
ch, err := b.conn.Channel()
ch, err := b.chPool.get()
if err != nil {
return errors.Wrap(err, "open channel error")
}
defer ch.Close()
defer ch.close()

_, err = ch.QueueDeclare(
_, err = ch.ch.QueueDeclare(
b.eventQueueName,
true,
false,
Expand All @@ -195,7 +189,7 @@ func (b *Backend) setupQueue() error {
return errors.Wrap(err, "declare queue error")
}

err = ch.QueueBind(
err = ch.ch.QueueBind(
b.eventQueueName,
b.eventRoutingKey,
"amq.topic",
Expand Down Expand Up @@ -232,6 +226,7 @@ func (b *Backend) eventLoop() {
nil,
)
if err != nil {
ch.markUnusable()
return errors.Wrap(err, "register consumer error")
}

Expand Down
57 changes: 47 additions & 10 deletions internal/backend/gateway/amqp/channel_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package amqp

import (
"sync"
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)

Expand All @@ -18,18 +20,21 @@ type poolChannel struct {

type pool struct {
mu sync.RWMutex
url string
chans chan *amqp.Channel
conn *amqp.Connection
}

func newPool(size int, conn *amqp.Connection) (*pool, error) {
func newPool(size int, url string) (*pool, error) {
p := &pool{
chans: make(chan *amqp.Channel, size),
conn: conn,
url: url,
}

p.connect()

for i := 0; i < size; i++ {
ch, err := conn.Channel()
ch, err := p.conn.Channel()
if err != nil {
p.close()
return nil, errors.Wrap(err, "create channel error")
Expand All @@ -41,6 +46,33 @@ func newPool(size int, conn *amqp.Connection) (*pool, error) {
return p, nil
}

func (p *pool) connect() {
p.mu.Lock()
defer p.mu.Unlock()

for {
conn, err := amqp.Dial(p.url)
if err != nil {
log.WithError(err).Error("gateway/amqp: dial amqp url error")
time.Sleep(time.Second)
continue
}

p.conn = conn

closeChan := make(chan *amqp.Error)
p.conn.NotifyClose(closeChan)

go func() {
for _ = range closeChan {
p.connect()
}
}()

break
}
}

func (p *pool) getChansAndConn() (chan *amqp.Channel, *amqp.Connection) {
p.mu.RLock()
chans := p.chans
Expand Down Expand Up @@ -99,21 +131,26 @@ func (p *pool) put(ch *amqp.Channel) error {
}
}

func (p *pool) close() {
func (p *pool) close() error {
p.mu.Lock()
chans := p.chans
conn := p.conn
p.chans = nil
p.conn = nil
p.mu.Unlock()

if chans == nil {
return
if chans != nil {
close(chans)
for ch := range chans {
ch.Close()
}
}

close(chans)
for ch := range chans {
ch.Close()
if conn != nil {
return conn.Close()
}

return nil
}

func (pc *poolChannel) close() error {
Expand All @@ -122,7 +159,7 @@ func (pc *poolChannel) close() error {

if pc.unusable {
if pc.ch != nil {
return pc.ch.Close()
pc.ch.Close()
}
return nil
}
Expand Down
22 changes: 6 additions & 16 deletions internal/backend/gateway/amqp/channel_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package amqp
import (
"testing"

"github.com/streadway/amqp"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

Expand All @@ -13,22 +12,18 @@ import (
type ChannelPoolTestSuite struct {
suite.Suite

conn *amqp.Connection
url string
}

func (ts *ChannelPoolTestSuite) SetupSuite() {
assert := require.New(ts.T())
conf := test.GetConfig()

var err error
ts.conn, err = amqp.Dial(conf.NetworkServer.Gateway.Backend.AMQP.URL)
assert.NoError(err)
ts.url = conf.NetworkServer.Gateway.Backend.AMQP.URL
}

func (ts *ChannelPoolTestSuite) TestNew() {
assert := require.New(ts.T())

p, err := newPool(10, ts.conn)
p, err := newPool(10, ts.url)
assert.NoError(err)
defer p.close()
assert.Len(p.chans, 10)
Expand All @@ -37,7 +32,7 @@ func (ts *ChannelPoolTestSuite) TestNew() {
func (ts *ChannelPoolTestSuite) TestGet() {
assert := require.New(ts.T())

p, err := newPool(10, ts.conn)
p, err := newPool(10, ts.url)
assert.NoError(err)
defer p.close()
assert.Len(p.chans, 10)
Expand All @@ -60,7 +55,7 @@ func (ts *ChannelPoolTestSuite) TestGet() {
func (ts *ChannelPoolTestSuite) TestPut() {
assert := require.New(ts.T())

p, err := newPool(10, ts.conn)
p, err := newPool(10, ts.url)
assert.NoError(err)

chans := make([]*poolChannel, 10)
Expand All @@ -78,19 +73,14 @@ func (ts *ChannelPoolTestSuite) TestPut() {

assert.Len(p.chans, 10)

pc, err := p.get()
assert.NoError(err)
p.close()

assert.Len(p.chans, 0)
assert.NoError(pc.close())
assert.Len(p.chans, 0)
}

func (ts *ChannelPoolTestSuite) TestPutUnusable() {
assert := require.New(ts.T())

p, err := newPool(10, ts.conn)
p, err := newPool(10, ts.url)
assert.NoError(err)
defer p.close()

Expand Down

0 comments on commit 600d6f3

Please sign in to comment.