diff --git a/actions/actions.go b/actions/actions.go index a1c2cc6b2..bfb240d4e 100644 --- a/actions/actions.go +++ b/actions/actions.go @@ -43,6 +43,8 @@ type IActions interface { StopTunnel(ctx context.Context, tunnelID string) (*types.Tunnel, error) UpdateTunnel(ctx context.Context, tunnelID string, tunnelOpts *opts.TunnelOptions) (*types.Tunnel, error) DeleteTunnel(ctx context.Context, tunnelID string) error + + UpdateConnection(ctx context.Context, connectionID string, connOpts *opts.ConnectionOptions) (*types.Connection, error) } func New(cfg *Config) (IActions, error) { diff --git a/actions/actionsfakes/fake_iactions.go b/actions/actionsfakes/fake_iactions.go index dd44fa7ce..1973f4f9e 100644 --- a/actions/actionsfakes/fake_iactions.go +++ b/actions/actionsfakes/fake_iactions.go @@ -121,6 +121,21 @@ type FakeIActions struct { result1 *types.Tunnel result2 error } + UpdateConnectionStub func(context.Context, string, *opts.ConnectionOptions) (*types.Connection, error) + updateConnectionMutex sync.RWMutex + updateConnectionArgsForCall []struct { + arg1 context.Context + arg2 string + arg3 *opts.ConnectionOptions + } + updateConnectionReturns struct { + result1 *types.Connection + result2 error + } + updateConnectionReturnsOnCall map[int]struct { + result1 *types.Connection + result2 error + } UpdateRelayStub func(context.Context, string, *opts.RelayOptions) (*types.Relay, error) updateRelayMutex sync.RWMutex updateRelayArgsForCall []struct { @@ -672,6 +687,72 @@ func (fake *FakeIActions) StopTunnelReturnsOnCall(i int, result1 *types.Tunnel, }{result1, result2} } +func (fake *FakeIActions) UpdateConnection(arg1 context.Context, arg2 string, arg3 *opts.ConnectionOptions) (*types.Connection, error) { + fake.updateConnectionMutex.Lock() + ret, specificReturn := fake.updateConnectionReturnsOnCall[len(fake.updateConnectionArgsForCall)] + fake.updateConnectionArgsForCall = append(fake.updateConnectionArgsForCall, struct { + arg1 context.Context + arg2 string + arg3 *opts.ConnectionOptions + }{arg1, arg2, arg3}) + stub := fake.UpdateConnectionStub + fakeReturns := fake.updateConnectionReturns + fake.recordInvocation("UpdateConnection", []interface{}{arg1, arg2, arg3}) + fake.updateConnectionMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeIActions) UpdateConnectionCallCount() int { + fake.updateConnectionMutex.RLock() + defer fake.updateConnectionMutex.RUnlock() + return len(fake.updateConnectionArgsForCall) +} + +func (fake *FakeIActions) UpdateConnectionCalls(stub func(context.Context, string, *opts.ConnectionOptions) (*types.Connection, error)) { + fake.updateConnectionMutex.Lock() + defer fake.updateConnectionMutex.Unlock() + fake.UpdateConnectionStub = stub +} + +func (fake *FakeIActions) UpdateConnectionArgsForCall(i int) (context.Context, string, *opts.ConnectionOptions) { + fake.updateConnectionMutex.RLock() + defer fake.updateConnectionMutex.RUnlock() + argsForCall := fake.updateConnectionArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeIActions) UpdateConnectionReturns(result1 *types.Connection, result2 error) { + fake.updateConnectionMutex.Lock() + defer fake.updateConnectionMutex.Unlock() + fake.UpdateConnectionStub = nil + fake.updateConnectionReturns = struct { + result1 *types.Connection + result2 error + }{result1, result2} +} + +func (fake *FakeIActions) UpdateConnectionReturnsOnCall(i int, result1 *types.Connection, result2 error) { + fake.updateConnectionMutex.Lock() + defer fake.updateConnectionMutex.Unlock() + fake.UpdateConnectionStub = nil + if fake.updateConnectionReturnsOnCall == nil { + fake.updateConnectionReturnsOnCall = make(map[int]struct { + result1 *types.Connection + result2 error + }) + } + fake.updateConnectionReturnsOnCall[i] = struct { + result1 *types.Connection + result2 error + }{result1, result2} +} + func (fake *FakeIActions) UpdateRelay(arg1 context.Context, arg2 string, arg3 *opts.RelayOptions) (*types.Relay, error) { fake.updateRelayMutex.Lock() ret, specificReturn := fake.updateRelayReturnsOnCall[len(fake.updateRelayArgsForCall)] @@ -823,6 +904,8 @@ func (fake *FakeIActions) Invocations() map[string][][]interface{} { defer fake.stopRelayMutex.RUnlock() fake.stopTunnelMutex.RLock() defer fake.stopTunnelMutex.RUnlock() + fake.updateConnectionMutex.RLock() + defer fake.updateConnectionMutex.RUnlock() fake.updateRelayMutex.RLock() defer fake.updateRelayMutex.RUnlock() fake.updateTunnelMutex.RLock() diff --git a/actions/connection.go b/actions/connection.go new file mode 100644 index 000000000..712bb9754 --- /dev/null +++ b/actions/connection.go @@ -0,0 +1,46 @@ +package actions + +import ( + "context" + + "github.com/batchcorp/plumber-schemas/build/go/protos/opts" + + "github.com/batchcorp/plumber/server/types" +) + +func (a *Actions) UpdateConnection(_ context.Context, connectionID string, connOpts *opts.ConnectionOptions) (*types.Connection, error) { + + conn := &types.Connection{Connection: connOpts} + + // Update connection in persistent config + a.cfg.PersistentConfig.SetConnection(connectionID, conn) + _ = a.cfg.PersistentConfig.Save() + + // Starting/stopping needs to lock this mutex, so copy it for access + a.cfg.PersistentConfig.RelaysMutex.RLock() + relays := make(map[string]*types.Relay) + for k, v := range a.cfg.PersistentConfig.Relays { + relays[k] = v + } + a.cfg.PersistentConfig.RelaysMutex.RUnlock() + + // Restart all relays that use this connection and are active + // Inactive relays will pick up the new connection details whenever they get resumed + for _, relay := range relays { + if relay.Options.ConnectionId == connectionID && relay.Active { + // Don't use the request context, use a fresh one + if _, err := a.StopRelay(context.Background(), relay.Options.XRelayId); err != nil { + a.log.Errorf("unable to stop relay '%s': %s", relay.Options.XRelayId, err) + continue + } + + // Don't use the request context, use a fresh one + if _, err := a.ResumeRelay(context.Background(), relay.Options.XRelayId); err != nil { + a.log.Errorf("unable to resume relay '%s': %s", relay.Options.XRelayId, err) + continue + } + } + } + + return conn, nil +} diff --git a/actions/relay.go b/actions/relay.go index 8ae3e3c72..7aca5c6db 100644 --- a/actions/relay.go +++ b/actions/relay.go @@ -79,6 +79,8 @@ func (a *Actions) StopRelay(ctx context.Context, relayID string) (*types.Relay, relay.Active = false relay.Options.XActive = false + relay.CancelCtx = nil + relay.CancelFunc = nil // Update persistent storage a.cfg.PersistentConfig.SetRelay(relay.Id, relay) @@ -104,6 +106,22 @@ func (a *Actions) ResumeRelay(ctx context.Context, relayID string) (*types.Relay return nil, validate.ErrRelayAlreadyActive } + conn := a.cfg.PersistentConfig.GetConnection(relay.Options.ConnectionId) + if conn == nil { + return nil, validate.ErrConnectionNotFound + } + + // Try to create a backend from given connection options + be, err := backends.New(conn.Connection) + if err != nil { + return nil, errors.Wrap(err, "unable to create backend") + } + + relay.Backend = be + + shutdownCtx, shutdownFunc := context.WithCancel(context.Background()) + relay.CancelFunc = shutdownFunc + relay.CancelCtx = shutdownCtx if err := relay.StartRelay(time.Millisecond * 100); err != nil { return nil, errors.Wrap(err, "unable to start relay") } @@ -168,21 +186,13 @@ func (a *Actions) UpdateRelay(ctx context.Context, relayID string, relayOpts *op time.Sleep(time.Second) prometheus.DecrPromGauge(prometheus.PlumberRelayWorkers) - } - // Get stored connection information - conn := a.cfg.PersistentConfig.GetConnection(relayOpts.ConnectionId) - if conn == nil { - return nil, validate.ErrConnectionNotFound - } + relay.CancelCtx = nil + relay.CancelFunc = nil + _ = relay.Backend.Close(context.Background()) - // Try to create a backend from given connection options - be, err := backends.New(conn.Connection) - if err != nil { - return nil, errors.Wrap(err, "unable to create backend") } - relay.Backend = be relay.Options = relayOpts // New contexts @@ -191,6 +201,20 @@ func (a *Actions) UpdateRelay(ctx context.Context, relayID string, relayOpts *op relay.CancelFunc = cancelFunc if relayOpts.XActive { + // Get stored connection information + conn := a.cfg.PersistentConfig.GetConnection(relayOpts.ConnectionId) + if conn == nil { + return nil, validate.ErrConnectionNotFound + } + + // Try to create a backend from given connection options + be, err := backends.New(conn.Connection) + if err != nil { + return nil, errors.Wrap(err, "unable to create backend") + } + + relay.Backend = be + if err := relay.StartRelay(5 * time.Second); err != nil { relay.Options.XActive = false return nil, errors.Wrap(err, "unable to start relay") diff --git a/actions/tunnel.go b/actions/tunnel.go index 11449ce14..2a1387691 100644 --- a/actions/tunnel.go +++ b/actions/tunnel.go @@ -111,7 +111,7 @@ func (a *Actions) ResumeTunnel(ctx context.Context, tunnelID string) (*types.Tun return d, nil } -func (a Actions) StopTunnel(ctx context.Context, tunnelID string) (*types.Tunnel, error) { +func (a *Actions) StopTunnel(ctx context.Context, tunnelID string) (*types.Tunnel, error) { d := a.cfg.PersistentConfig.GetTunnel(tunnelID) if d == nil { return nil, errors.New("Tunnel replay does not exist") diff --git a/backends/kafka/kafka.go b/backends/kafka/kafka.go index b06bb3825..481959bbe 100644 --- a/backends/kafka/kafka.go +++ b/backends/kafka/kafka.go @@ -299,6 +299,10 @@ func newDialer(connArgs *args.KafkaConn) (*skafka.Dialer, error) { Timeout: time.Duration(connArgs.TimeoutSeconds) * time.Second, } + if connArgs.UseTls { + dialer.TLS = &tls.Config{} + } + if connArgs.TlsSkipVerify { dialer.TLS = &tls.Config{ InsecureSkipVerify: true, diff --git a/backends/kafka/relay.go b/backends/kafka/relay.go index 4d3dded1b..700128044 100644 --- a/backends/kafka/relay.go +++ b/backends/kafka/relay.go @@ -33,12 +33,14 @@ func (k *Kafka) Relay(ctx context.Context, relayOpts *opts.RelayOptions, relayCh defer reader.Close() + llog := k.log.WithField("relay-id", relayOpts.XRelayId) + for { msg, err := reader.ReadMessage(ctx) if err != nil { // Shutdown cancelled, exit so we don't spam logs with context cancelled errors if err == context.Canceled { - k.log.Debug("Received shutdown signal, exiting relayer") + llog.Debug("Received shutdown signal, exiting relayer") break } @@ -48,7 +50,7 @@ func (k *Kafka) Relay(ctx context.Context, relayOpts *opts.RelayOptions, relayCh prometheus.IncrPromCounter("plumber_read_errors", 1) wrappedErr := fmt.Errorf("unable to read kafka message: %s; retrying in %s", err, RetryReadInterval) - util.WriteError(k.log, errorCh, wrappedErr) + util.WriteError(llog, errorCh, wrappedErr) time.Sleep(RetryReadInterval) diff --git a/bus/broadcast_consumer_connection.go b/bus/broadcast_consumer_connection.go index cc1a1ab76..908dd5b8d 100644 --- a/bus/broadcast_consumer_connection.go +++ b/bus/broadcast_consumer_connection.go @@ -30,25 +30,20 @@ func (b *Bus) doCreateConnection(_ context.Context, msg *Message) error { return nil } -func (b *Bus) doUpdateConnection(_ context.Context, msg *Message) error { - b.log.Debugf("running doCreateConnection handler for msg emitted by %s", msg.EmittedBy) +func (b *Bus) doUpdateConnection(ctx context.Context, msg *Message) error { + b.log.Debugf("running doUpdateonnection handler for msg emitted by %s", msg.EmittedBy) connOpts := &opts.ConnectionOptions{} if err := proto.Unmarshal(msg.Data, connOpts); err != nil { return errors.Wrap(err, "unable to unmarshal message into opts.ConnectionOptions") } - // Update connection in in-memory map - b.config.PersistentConfig.SetConnection(connOpts.XId, &types.Connection{ - Connection: connOpts, - }) + if _, err := b.config.Actions.UpdateConnection(ctx, connOpts.XId, connOpts); err != nil { + return errors.Wrap(err, "unable to update connection") + } b.log.Debugf("updated connection '%s'", connOpts.Name) - // TODO: some way to signal reads/relays to restart? How will GRPC streams handle this? - - // TODO: Some more work here - return nil } @@ -74,7 +69,7 @@ func (b *Bus) doDeleteConnection(ctx context.Context, msg *Message) error { } } - b.log.Debugf("running doCreateConnection handler for msg emitted by %s", msg.EmittedBy) + b.log.Debugf("running doDeleteConnection handler for msg emitted by %s", msg.EmittedBy) connOpts := &opts.ConnectionOptions{} if err := proto.Unmarshal(msg.Data, connOpts); err != nil { diff --git a/bus/broadcast_consumer_relay.go b/bus/broadcast_consumer_relay.go index 6bbd548fe..3e074bb3e 100644 --- a/bus/broadcast_consumer_relay.go +++ b/bus/broadcast_consumer_relay.go @@ -68,7 +68,7 @@ func (b *Bus) doResumeRelay(ctx context.Context, msg *Message) error { return fmt.Errorf("unable to resume relay '%s': %s", relayOptions.XRelayId, err) } - b.log.Infof("stopped relay '%s' (from broadcast msg)", relayOptions.XRelayId) + b.log.Infof("resumed relay '%s' (from broadcast msg)", relayOptions.XRelayId) return nil } diff --git a/server/connections_handlers.go b/server/connections_handlers.go index 313af94dc..856e3ab94 100644 --- a/server/connections_handlers.go +++ b/server/connections_handlers.go @@ -154,15 +154,13 @@ func (s *Server) UpdateConnection(ctx context.Context, req *protos.UpdateConnect return nil, CustomError(common.Code_INVALID_ARGUMENT, err.Error()) } - // Re-assign connection options so we can update in-mem + etcd - conn.Connection = req.Options - - // Update conf - s.PersistentConfig.SetConnection(conn.Connection.XId, conn) - s.PersistentConfig.Save() + if _, err := s.Actions.UpdateConnection(ctx, req.ConnectionId, req.Options); err != nil { + return nil, CustomError(common.Code_INTERNAL, fmt.Sprintf("unable to update connection: %s", err)) + } //Publish UpdateConnection event - if err := s.Bus.PublishUpdateConnection(ctx, conn.Connection); err != nil { + req.Options.XId = req.ConnectionId + if err := s.Bus.PublishUpdateConnection(context.Background(), req.Options); err != nil { s.Log.Error(err) } diff --git a/server/connections_test.go b/server/connections_test.go index acdaf1d4e..4978ec5ed 100644 --- a/server/connections_test.go +++ b/server/connections_test.go @@ -15,6 +15,7 @@ import ( "github.com/batchcorp/plumber-schemas/build/go/protos/common" "github.com/batchcorp/plumber-schemas/build/go/protos/opts" + "github.com/batchcorp/plumber/actions" "github.com/batchcorp/plumber/bus/busfakes" "github.com/batchcorp/plumber/config" stypes "github.com/batchcorp/plumber/server/types" @@ -169,6 +170,10 @@ var _ = Describe("Connection", func() { fakeBus := &busfakes.FakeIBus{} p.Bus = fakeBus + a, err := actions.New(&actions.Config{PersistentConfig: p.PersistentConfig}) + Expect(err).ToNot(HaveOccurred()) + p.Actions = a + conn := &opts.ConnectionOptions{ XId: connID, Name: "testing", @@ -189,11 +194,12 @@ var _ = Describe("Connection", func() { }}, } - _, err := p.UpdateConnection(context.Background(), &protos.UpdateConnectionRequest{ + _, err = p.UpdateConnection(context.Background(), &protos.UpdateConnectionRequest{ Auth: &common.Auth{Token: "streamdal"}, ConnectionId: connID, Options: newConn, }) + Expect(err).ToNot(HaveOccurred()) updateConn := p.PersistentConfig.GetConnection(connID) diff --git a/server/relay_handlers.go b/server/relay_handlers.go index fb7d3f9b0..c67688e53 100644 --- a/server/relay_handlers.go +++ b/server/relay_handlers.go @@ -118,14 +118,6 @@ func (s *Server) UpdateRelay(ctx context.Context, req *protos.UpdateRelayRequest return nil, CustomError(common.Code_UNAUTHENTICATED, fmt.Sprintf("invalid auth: %s", err)) } - currentRelay := s.PersistentConfig.GetRelay(req.RelayId) - if currentRelay.Active { - // Publish StopRelay event - if err := s.Bus.PublishStopRelay(ctx, currentRelay.Options); err != nil { - return nil, fmt.Errorf("unable to publish stop relay event: %s", err) - } - } - if _, err := s.Actions.UpdateRelay(ctx, req.RelayId, req.Opts); err != nil { // No need to roll back here since we haven't updated anything yet return nil, CustomError(common.Code_ABORTED, err.Error()) diff --git a/server/types/relay.go b/server/types/relay.go index e2a169db9..4f7d6a456 100644 --- a/server/types/relay.go +++ b/server/types/relay.go @@ -23,7 +23,6 @@ type Relay struct { Id string `json:"-"` CancelCtx context.Context `json:"-"` CancelFunc context.CancelFunc `json:"-"` - RelayCh chan interface{} `json:"-"` Backend backends.Backend `json:"-"` Options *opts.RelayOptions `json:"config"`