Skip to content

Commit

Permalink
Improve clustering logging and error handling. (#1242)
Browse files Browse the repository at this point in the history
* Improve clustering logging and error handling.

* Address feedback, ty

* Clustering: fail fast on startup when cannot discover peers

* Add comment as suggested

* Log a warning for all DNS lookup failures

* Add missing logger to test fixture
  • Loading branch information
thampiotr authored Jul 10, 2024
1 parent d014908 commit 266e5e0
Show file tree
Hide file tree
Showing 8 changed files with 335 additions and 66 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ Main (unreleased)
### Bugfixes

- Fixed an issue with `loki.source.kubernetes_events` not starting in large clusters due to short informer sync timeout. (@nrwiersma)

- Updated [ckit](https://github.com/grafana/ckit) to fix an issue with armv7 panic on startup when forming a cluster. (@imavroukakis)

- Fixed a clustering mode issue where a failure to perform static peers
discovery did not result in a fatal failure at startup and could lead to
potential split-brain issues. (@thampiotr)

v1.2.0
-----------------

Expand Down
80 changes: 51 additions & 29 deletions internal/alloycli/cluster_builder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package alloycli

import (
"errors"
"fmt"
stdlog "log"
"net"
Expand All @@ -9,13 +10,14 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/internal/service/cluster"
"github.com/grafana/ckit/advertise"
"github.com/hashicorp/go-discover"
"github.com/hashicorp/go-discover/provider/k8s"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"

"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/internal/service/cluster"
)

type clusterOptions struct {
Expand Down Expand Up @@ -69,7 +71,7 @@ func buildClusterService(opts clusterOptions) (*cluster.Service, error) {
return nil, fmt.Errorf("at most one of join peers and discover peers may be set")

case len(opts.JoinPeers) > 0:
config.DiscoverPeers = newStaticDiscovery(opts.JoinPeers, listenPort)
config.DiscoverPeers = newStaticDiscovery(opts.JoinPeers, listenPort, opts.Log)

case opts.DiscoverPeers != "":
discoverFunc, err := newDynamicDiscovery(config.Log, opts.DiscoverPeers, listenPort)
Expand Down Expand Up @@ -137,46 +139,66 @@ func appendDefaultPort(addr string, port int) string {

type discoverFunc func() ([]string, error)

func newStaticDiscovery(peers []string, defaultPort int) discoverFunc {
func newStaticDiscovery(providedAddr []string, defaultPort int, log log.Logger) discoverFunc {
return func() ([]string, error) {
var addrs []string

for _, addr := range peers {
addrs = appendJoinAddr(addrs, addr)
addresses, err := buildJoinAddresses(providedAddr, log)
if err != nil {
return nil, fmt.Errorf("static peer discovery: %w", err)
}

for i := range addrs {
for i := range addresses {
// Default to using the same advertise port as the local node. This may
// break in some cases, so the user should make sure the port numbers
// align on as many nodes as possible.
addrs[i] = appendDefaultPort(addrs[i], defaultPort)
addresses[i] = appendDefaultPort(addresses[i], defaultPort)
}

return addrs, nil
return addresses, nil
}
}

func appendJoinAddr(addrs []string, in string) []string {
_, _, err := net.SplitHostPort(in)
if err == nil {
addrs = append(addrs, in)
return addrs
func buildJoinAddresses(providedAddr []string, log log.Logger) ([]string, error) {
// Currently we don't consider it an error to not have any join addresses.
if len(providedAddr) == 0 {
return nil, nil
}
var (
result []string
deferredErr error
)
for _, addr := range providedAddr {
// If it's a host:port, use it as is.
_, _, err := net.SplitHostPort(addr)
if err != nil {
deferredErr = errors.Join(deferredErr, err)
} else {
level.Debug(log).Log("msg", "found a host:port cluster join address", "addr", addr)
result = append(result, addr)
break
}

ip := net.ParseIP(in)
if ip != nil {
addrs = append(addrs, ip.String())
return addrs
}
// If it's an IP address, use it.
ip := net.ParseIP(addr)
if ip != nil {
level.Debug(log).Log("msg", "found an IP cluster join address", "addr", addr)
result = append(result, ip.String())
break
}

_, srvs, err := net.LookupSRV("", "", in)
if err == nil {
for _, srv := range srvs {
addrs = append(addrs, srv.Target)
// Otherwise, do a DNS lookup and return all the records found.
_, srvs, err := net.LookupSRV("", "", addr)
if err != nil {
level.Warn(log).Log("msg", "failed to resolve SRV records", "addr", addr, "err", err)
deferredErr = errors.Join(deferredErr, err)
} else {
level.Debug(log).Log("msg", "found cluster join addresses via SRV records", "addr", addr, "count", len(srvs))
for _, srv := range srvs {
result = append(result, srv.Target)
}
}
}

return addrs
if len(result) == 0 {
return nil, fmt.Errorf("failed to find any valid join addresses: %w", deferredErr)
}
return result, nil
}

func newDynamicDiscovery(l log.Logger, config string, defaultPort int) (discoverFunc, error) {
Expand Down
50 changes: 50 additions & 0 deletions internal/alloycli/cluster_builder_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package alloycli

import (
"os"
"testing"

"github.com/go-kit/log"
Expand Down Expand Up @@ -54,3 +55,52 @@ func TestGetAdvertiseAddress(t *testing.T) {
require.Equal(t, "127.0.0.1:80", addr)
})
}

func TestStaticDiscovery(t *testing.T) {
t.Run("no addresses provided", func(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stdout)
sd := newStaticDiscovery([]string{}, 12345, logger)
actual, err := sd()
require.NoError(t, err)
require.Nil(t, actual)
})
t.Run("host and port provided", func(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stdout)
sd := newStaticDiscovery([]string{"host:8080"}, 12345, logger)
actual, err := sd()
require.NoError(t, err)
require.Equal(t, []string{"host:8080"}, actual)
})
t.Run("IP provided and default port added", func(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stdout)
sd := newStaticDiscovery([]string{"192.168.0.1"}, 12345, logger)
actual, err := sd()
require.NoError(t, err)
require.Equal(t, []string{"192.168.0.1:12345"}, actual)
})
t.Run("fallback to next host and port provided", func(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stdout)
sd := newStaticDiscovery([]string{"this | wont | work", "host2:8080"}, 12345, logger)
actual, err := sd()
require.NoError(t, err)
require.Equal(t, []string{"host2:8080"}, actual)
})
t.Run("fallback to next host and port provided", func(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stdout)
sd := newStaticDiscovery([]string{"this | wont | work", "host2:8080"}, 12345, logger)
actual, err := sd()
require.NoError(t, err)
require.Equal(t, []string{"host2:8080"}, actual)
})
t.Run("nothing found", func(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stdout)
sd := newStaticDiscovery([]string{"this | wont | work", "and/this/won't/either"}, 12345, logger)
actual, err := sd()
require.Nil(t, actual)
require.ErrorContains(t, err, "failed to find any valid join addresses")
require.ErrorContains(t, err, "this | wont | work: missing port in address")
require.ErrorContains(t, err, "lookup this | wont | work: no such host")
require.ErrorContains(t, err, "and/this/won't/either: missing port in address")
require.ErrorContains(t, err, "lookup and/this/won't/either: no such host")
})
}
15 changes: 8 additions & 7 deletions internal/alloycli/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ import (
"github.com/KimMachineGun/automemlimit/memlimit"
"github.com/fatih/color"
"github.com/go-kit/log"
"github.com/grafana/ckit/advertise"
"github.com/grafana/ckit/peer"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
"go.opentelemetry.io/otel"
"golang.org/x/exp/maps"

"github.com/grafana/alloy/internal/alloyseed"
"github.com/grafana/alloy/internal/boringcrypto"
"github.com/grafana/alloy/internal/component"
Expand All @@ -39,12 +46,6 @@ import (
"github.com/grafana/alloy/internal/static/config/instrumentation"
"github.com/grafana/alloy/internal/usagestats"
"github.com/grafana/alloy/syntax/diag"
"github.com/grafana/ckit/advertise"
"github.com/grafana/ckit/peer"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
"go.opentelemetry.io/otel"
"golang.org/x/exp/maps"

// Install Components
_ "github.com/grafana/alloy/internal/component/all"
Expand Down Expand Up @@ -233,7 +234,7 @@ func (fr *alloyRun) Run(configPath string) error {
)

clusterService, err := buildClusterService(clusterOptions{
Log: l,
Log: log.With(l, "service", "cluster"),
Tracer: t,
Metrics: reg,

Expand Down
94 changes: 64 additions & 30 deletions internal/service/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/internal/service"
http_service "github.com/grafana/alloy/internal/service/http"
"github.com/grafana/ckit"
"github.com/grafana/ckit/peer"
"github.com/grafana/ckit/shard"
Expand All @@ -27,25 +22,37 @@ import (
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"golang.org/x/net/http2"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/internal/service"
http_service "github.com/grafana/alloy/internal/service/http"
"github.com/grafana/alloy/internal/util"
)

// tokensPerNode is used to decide how many tokens each node should be given in
// the hash ring. All nodes must use the same value, otherwise they will have
// different views of the ring and assign work differently.
//
// Using 512 tokens strikes a good balance between distribution accuracy and
// memory consumption. A cluster of 1,000 nodes with 512 tokens per node
// requires 12MB for the hash ring.
//
// Distribution accuracy measures how close a node was to being responsible for
// exactly 1/N keys during simulation. Simulation tests used a cluster of 10
// nodes and hashing 100,000 random keys:
//
// 512 tokens per node: min 96.1%, median 99.9%, max 103.2% (stddev: 197.9 hashes)
const tokensPerNode = 512
const (
// ServiceName defines the name used for the cluster service.
ServiceName = "cluster"

// ServiceName defines the name used for the cluster service.
const ServiceName = "cluster"
// tokensPerNode is used to decide how many tokens each node should be given in
// the hash ring. All nodes must use the same value, otherwise they will have
// different views of the ring and assign work differently.
//
// Using 512 tokens strikes a good balance between distribution accuracy and
// memory consumption. A cluster of 1,000 nodes with 512 tokens per node
// requires 12MB for the hash ring.
//
// Distribution accuracy measures how close a node was to being responsible for
// exactly 1/N keys during simulation. Simulation tests used a cluster of 10
// nodes and hashing 100,000 random keys:
//
// 512 tokens per node: min 96.1%, median 99.9%, max 103.2% (stddev: 197.9 hashes)
tokensPerNode = 512

// maxPeersToLog is the maximum number of peers to log on info level. All peers are logged on debug level.
maxPeersToLog = 10
)

// Options are used to configure the cluster service. Options are constant for
// the lifetime of the cluster service.
Expand Down Expand Up @@ -213,11 +220,7 @@ func (s *Service) Run(ctx context.Context, host service.Host) error {
spanCtx, span := tracer.Start(ctx, "NotifyClusterChange", trace.WithSpanKind(trace.SpanKindInternal))
defer span.End()

names := make([]string, len(peers))
for i, p := range peers {
names[i] = p.Name
}
level.Info(s.log).Log("msg", "peers changed", "new_peers", strings.Join(names, ","))
s.logPeers("peers changed", toStringSlice(peers))

// Notify all components about the clustering change.
components := component.GetAllComponents(host, component.InfoOptions{})
Expand Down Expand Up @@ -246,11 +249,18 @@ func (s *Service) Run(ctx context.Context, host service.Host) error {

peers, err := s.getPeers()
if err != nil {
return fmt.Errorf("failed to get peers to join: %w", err)
// Fail fast on startup if we can't discover peers to prevent a split brain and give a clear signal to the user.
return fmt.Errorf("failed to get peers to join at startup: %w", err)
}

level.Info(s.log).Log("msg", "starting cluster node", "peers", strings.Join(peers, ","),
"advertise_addr", s.opts.AdvertiseAddress)
// We log on info level including all the peers (without any abbreviation), as it's happening only on startup and
// won't spam too much in most cases. In other cases we should either abbreviate the list or log on debug level.
level.Info(s.log).Log(
"msg", "starting cluster node",
"peers_count", len(peers),
"peers", strings.Join(peers, ","),
"advertise_addr", s.opts.AdvertiseAddress,
)

if err := s.node.Start(peers); err != nil {
level.Warn(s.log).Log("msg", "failed to connect to peers; bootstrapping a new cluster", "err", err)
Expand Down Expand Up @@ -281,8 +291,8 @@ func (s *Service) Run(ctx context.Context, host service.Host) error {
level.Warn(s.log).Log("msg", "failed to refresh list of peers", "err", err)
continue
}
s.logPeers("rejoining peers", peers)

level.Info(s.log).Log("msg", "rejoining peers", "peers", strings.Join(peers, ","))
if err := s.node.Start(peers); err != nil {
level.Error(s.log).Log("msg", "failed to rejoin list of peers", "err", err)
continue
Expand All @@ -306,6 +316,13 @@ func (s *Service) getPeers() ([]string, error) {
return nil, err
}

// Debug level log all the peers for troubleshooting.
level.Debug(s.log).Log(
"msg", "discovered peers",
"peers_count", len(peers),
"peers", strings.Join(peers, ","),
)

// Here we return the entire list because we can't take a subset.
if s.opts.ClusterMaxJoinPeers == 0 || len(peers) < s.opts.ClusterMaxJoinPeers {
return peers, nil
Expand Down Expand Up @@ -347,6 +364,15 @@ func (s *Service) Data() any {
return &sharderCluster{sharder: s.sharder}
}

func (s *Service) logPeers(msg string, peers []string) {
// Truncate peers list on info level.
level.Info(s.log).Log(
"msg", msg,
"peers_count", len(peers),
"peers", util.JoinWithTruncation(peers, ",", maxPeersToLog, "..."),
)
}

// Component is a component which subscribes to clustering updates.
type Component interface {
component.Component
Expand Down Expand Up @@ -394,3 +420,11 @@ func (sc *sharderCluster) Lookup(key shard.Key, replicationFactor int, op shard.
func (sc *sharderCluster) Peers() []peer.Peer {
return sc.sharder.Peers()
}

func toStringSlice[T any](slice []T) []string {
s := make([]string, 0, len(slice))
for _, p := range slice {
s = append(s, fmt.Sprintf("%v", p))
}
return s
}
Loading

0 comments on commit 266e5e0

Please sign in to comment.