diff --git a/CHANGELOG.md b/CHANGELOG.md index f77615ddb6..f325aa6c29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,8 +27,13 @@ Main (unreleased) ### Bugfixes - Fixed an issue with `loki.source.kubernetes_events` not starting in large clusters due to short informer sync timeout. (@nrwiersma) + - Updated [ckit](https://github.com/grafana/ckit) to fix an issue with armv7 panic on startup when forming a cluster. (@imavroukakis) +- Fixed a clustering mode issue where a failure to perform static peers + discovery did not result in a fatal failure at startup and could lead to + potential split-brain issues. (@thampiotr) + v1.2.0 ----------------- diff --git a/internal/alloycli/cluster_builder.go b/internal/alloycli/cluster_builder.go index 8f32447aaa..3a73feaa44 100644 --- a/internal/alloycli/cluster_builder.go +++ b/internal/alloycli/cluster_builder.go @@ -1,6 +1,7 @@ package alloycli import ( + "errors" "fmt" stdlog "log" "net" @@ -9,13 +10,14 @@ import ( "time" "github.com/go-kit/log" - "github.com/grafana/alloy/internal/runtime/logging/level" - "github.com/grafana/alloy/internal/service/cluster" "github.com/grafana/ckit/advertise" "github.com/hashicorp/go-discover" "github.com/hashicorp/go-discover/provider/k8s" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/trace" + + "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/grafana/alloy/internal/service/cluster" ) type clusterOptions struct { @@ -69,7 +71,7 @@ func buildClusterService(opts clusterOptions) (*cluster.Service, error) { return nil, fmt.Errorf("at most one of join peers and discover peers may be set") case len(opts.JoinPeers) > 0: - config.DiscoverPeers = newStaticDiscovery(opts.JoinPeers, listenPort) + config.DiscoverPeers = newStaticDiscovery(opts.JoinPeers, listenPort, opts.Log) case opts.DiscoverPeers != "": discoverFunc, err := newDynamicDiscovery(config.Log, opts.DiscoverPeers, listenPort) @@ -137,46 +139,66 @@ func appendDefaultPort(addr string, port int) string { type discoverFunc func() ([]string, error) -func newStaticDiscovery(peers []string, defaultPort int) discoverFunc { +func newStaticDiscovery(providedAddr []string, defaultPort int, log log.Logger) discoverFunc { return func() ([]string, error) { - var addrs []string - - for _, addr := range peers { - addrs = appendJoinAddr(addrs, addr) + addresses, err := buildJoinAddresses(providedAddr, log) + if err != nil { + return nil, fmt.Errorf("static peer discovery: %w", err) } - - for i := range addrs { + for i := range addresses { // Default to using the same advertise port as the local node. This may // break in some cases, so the user should make sure the port numbers // align on as many nodes as possible. - addrs[i] = appendDefaultPort(addrs[i], defaultPort) + addresses[i] = appendDefaultPort(addresses[i], defaultPort) } - - return addrs, nil + return addresses, nil } } -func appendJoinAddr(addrs []string, in string) []string { - _, _, err := net.SplitHostPort(in) - if err == nil { - addrs = append(addrs, in) - return addrs +func buildJoinAddresses(providedAddr []string, log log.Logger) ([]string, error) { + // Currently we don't consider it an error to not have any join addresses. + if len(providedAddr) == 0 { + return nil, nil } + var ( + result []string + deferredErr error + ) + for _, addr := range providedAddr { + // If it's a host:port, use it as is. + _, _, err := net.SplitHostPort(addr) + if err != nil { + deferredErr = errors.Join(deferredErr, err) + } else { + level.Debug(log).Log("msg", "found a host:port cluster join address", "addr", addr) + result = append(result, addr) + break + } - ip := net.ParseIP(in) - if ip != nil { - addrs = append(addrs, ip.String()) - return addrs - } + // If it's an IP address, use it. + ip := net.ParseIP(addr) + if ip != nil { + level.Debug(log).Log("msg", "found an IP cluster join address", "addr", addr) + result = append(result, ip.String()) + break + } - _, srvs, err := net.LookupSRV("", "", in) - if err == nil { - for _, srv := range srvs { - addrs = append(addrs, srv.Target) + // Otherwise, do a DNS lookup and return all the records found. + _, srvs, err := net.LookupSRV("", "", addr) + if err != nil { + level.Warn(log).Log("msg", "failed to resolve SRV records", "addr", addr, "err", err) + deferredErr = errors.Join(deferredErr, err) + } else { + level.Debug(log).Log("msg", "found cluster join addresses via SRV records", "addr", addr, "count", len(srvs)) + for _, srv := range srvs { + result = append(result, srv.Target) + } } } - - return addrs + if len(result) == 0 { + return nil, fmt.Errorf("failed to find any valid join addresses: %w", deferredErr) + } + return result, nil } func newDynamicDiscovery(l log.Logger, config string, defaultPort int) (discoverFunc, error) { diff --git a/internal/alloycli/cluster_builder_test.go b/internal/alloycli/cluster_builder_test.go index 9d9622e68e..ff020aa7b8 100644 --- a/internal/alloycli/cluster_builder_test.go +++ b/internal/alloycli/cluster_builder_test.go @@ -1,6 +1,7 @@ package alloycli import ( + "os" "testing" "github.com/go-kit/log" @@ -54,3 +55,52 @@ func TestGetAdvertiseAddress(t *testing.T) { require.Equal(t, "127.0.0.1:80", addr) }) } + +func TestStaticDiscovery(t *testing.T) { + t.Run("no addresses provided", func(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stdout) + sd := newStaticDiscovery([]string{}, 12345, logger) + actual, err := sd() + require.NoError(t, err) + require.Nil(t, actual) + }) + t.Run("host and port provided", func(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stdout) + sd := newStaticDiscovery([]string{"host:8080"}, 12345, logger) + actual, err := sd() + require.NoError(t, err) + require.Equal(t, []string{"host:8080"}, actual) + }) + t.Run("IP provided and default port added", func(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stdout) + sd := newStaticDiscovery([]string{"192.168.0.1"}, 12345, logger) + actual, err := sd() + require.NoError(t, err) + require.Equal(t, []string{"192.168.0.1:12345"}, actual) + }) + t.Run("fallback to next host and port provided", func(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stdout) + sd := newStaticDiscovery([]string{"this | wont | work", "host2:8080"}, 12345, logger) + actual, err := sd() + require.NoError(t, err) + require.Equal(t, []string{"host2:8080"}, actual) + }) + t.Run("fallback to next host and port provided", func(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stdout) + sd := newStaticDiscovery([]string{"this | wont | work", "host2:8080"}, 12345, logger) + actual, err := sd() + require.NoError(t, err) + require.Equal(t, []string{"host2:8080"}, actual) + }) + t.Run("nothing found", func(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stdout) + sd := newStaticDiscovery([]string{"this | wont | work", "and/this/won't/either"}, 12345, logger) + actual, err := sd() + require.Nil(t, actual) + require.ErrorContains(t, err, "failed to find any valid join addresses") + require.ErrorContains(t, err, "this | wont | work: missing port in address") + require.ErrorContains(t, err, "lookup this | wont | work: no such host") + require.ErrorContains(t, err, "and/this/won't/either: missing port in address") + require.ErrorContains(t, err, "lookup and/this/won't/either: no such host") + }) +} diff --git a/internal/alloycli/cmd_run.go b/internal/alloycli/cmd_run.go index 63f40c94af..52d2edc736 100644 --- a/internal/alloycli/cmd_run.go +++ b/internal/alloycli/cmd_run.go @@ -19,6 +19,13 @@ import ( "github.com/KimMachineGun/automemlimit/memlimit" "github.com/fatih/color" "github.com/go-kit/log" + "github.com/grafana/ckit/advertise" + "github.com/grafana/ckit/peer" + "github.com/prometheus/client_golang/prometheus" + "github.com/spf13/cobra" + "go.opentelemetry.io/otel" + "golang.org/x/exp/maps" + "github.com/grafana/alloy/internal/alloyseed" "github.com/grafana/alloy/internal/boringcrypto" "github.com/grafana/alloy/internal/component" @@ -39,12 +46,6 @@ import ( "github.com/grafana/alloy/internal/static/config/instrumentation" "github.com/grafana/alloy/internal/usagestats" "github.com/grafana/alloy/syntax/diag" - "github.com/grafana/ckit/advertise" - "github.com/grafana/ckit/peer" - "github.com/prometheus/client_golang/prometheus" - "github.com/spf13/cobra" - "go.opentelemetry.io/otel" - "golang.org/x/exp/maps" // Install Components _ "github.com/grafana/alloy/internal/component/all" @@ -233,7 +234,7 @@ func (fr *alloyRun) Run(configPath string) error { ) clusterService, err := buildClusterService(clusterOptions{ - Log: l, + Log: log.With(l, "service", "cluster"), Tracer: t, Metrics: reg, diff --git a/internal/service/cluster/cluster.go b/internal/service/cluster/cluster.go index d534dd6c34..fb060bc9e7 100644 --- a/internal/service/cluster/cluster.go +++ b/internal/service/cluster/cluster.go @@ -14,11 +14,6 @@ import ( "time" "github.com/go-kit/log" - "github.com/grafana/alloy/internal/component" - "github.com/grafana/alloy/internal/featuregate" - "github.com/grafana/alloy/internal/runtime/logging/level" - "github.com/grafana/alloy/internal/service" - http_service "github.com/grafana/alloy/internal/service/http" "github.com/grafana/ckit" "github.com/grafana/ckit/peer" "github.com/grafana/ckit/shard" @@ -27,25 +22,37 @@ import ( "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" "golang.org/x/net/http2" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/grafana/alloy/internal/service" + http_service "github.com/grafana/alloy/internal/service/http" + "github.com/grafana/alloy/internal/util" ) -// tokensPerNode is used to decide how many tokens each node should be given in -// the hash ring. All nodes must use the same value, otherwise they will have -// different views of the ring and assign work differently. -// -// Using 512 tokens strikes a good balance between distribution accuracy and -// memory consumption. A cluster of 1,000 nodes with 512 tokens per node -// requires 12MB for the hash ring. -// -// Distribution accuracy measures how close a node was to being responsible for -// exactly 1/N keys during simulation. Simulation tests used a cluster of 10 -// nodes and hashing 100,000 random keys: -// -// 512 tokens per node: min 96.1%, median 99.9%, max 103.2% (stddev: 197.9 hashes) -const tokensPerNode = 512 +const ( + // ServiceName defines the name used for the cluster service. + ServiceName = "cluster" -// ServiceName defines the name used for the cluster service. -const ServiceName = "cluster" + // tokensPerNode is used to decide how many tokens each node should be given in + // the hash ring. All nodes must use the same value, otherwise they will have + // different views of the ring and assign work differently. + // + // Using 512 tokens strikes a good balance between distribution accuracy and + // memory consumption. A cluster of 1,000 nodes with 512 tokens per node + // requires 12MB for the hash ring. + // + // Distribution accuracy measures how close a node was to being responsible for + // exactly 1/N keys during simulation. Simulation tests used a cluster of 10 + // nodes and hashing 100,000 random keys: + // + // 512 tokens per node: min 96.1%, median 99.9%, max 103.2% (stddev: 197.9 hashes) + tokensPerNode = 512 + + // maxPeersToLog is the maximum number of peers to log on info level. All peers are logged on debug level. + maxPeersToLog = 10 +) // Options are used to configure the cluster service. Options are constant for // the lifetime of the cluster service. @@ -213,11 +220,7 @@ func (s *Service) Run(ctx context.Context, host service.Host) error { spanCtx, span := tracer.Start(ctx, "NotifyClusterChange", trace.WithSpanKind(trace.SpanKindInternal)) defer span.End() - names := make([]string, len(peers)) - for i, p := range peers { - names[i] = p.Name - } - level.Info(s.log).Log("msg", "peers changed", "new_peers", strings.Join(names, ",")) + s.logPeers("peers changed", toStringSlice(peers)) // Notify all components about the clustering change. components := component.GetAllComponents(host, component.InfoOptions{}) @@ -246,11 +249,18 @@ func (s *Service) Run(ctx context.Context, host service.Host) error { peers, err := s.getPeers() if err != nil { - return fmt.Errorf("failed to get peers to join: %w", err) + // Fail fast on startup if we can't discover peers to prevent a split brain and give a clear signal to the user. + return fmt.Errorf("failed to get peers to join at startup: %w", err) } - level.Info(s.log).Log("msg", "starting cluster node", "peers", strings.Join(peers, ","), - "advertise_addr", s.opts.AdvertiseAddress) + // We log on info level including all the peers (without any abbreviation), as it's happening only on startup and + // won't spam too much in most cases. In other cases we should either abbreviate the list or log on debug level. + level.Info(s.log).Log( + "msg", "starting cluster node", + "peers_count", len(peers), + "peers", strings.Join(peers, ","), + "advertise_addr", s.opts.AdvertiseAddress, + ) if err := s.node.Start(peers); err != nil { level.Warn(s.log).Log("msg", "failed to connect to peers; bootstrapping a new cluster", "err", err) @@ -281,8 +291,8 @@ func (s *Service) Run(ctx context.Context, host service.Host) error { level.Warn(s.log).Log("msg", "failed to refresh list of peers", "err", err) continue } + s.logPeers("rejoining peers", peers) - level.Info(s.log).Log("msg", "rejoining peers", "peers", strings.Join(peers, ",")) if err := s.node.Start(peers); err != nil { level.Error(s.log).Log("msg", "failed to rejoin list of peers", "err", err) continue @@ -306,6 +316,13 @@ func (s *Service) getPeers() ([]string, error) { return nil, err } + // Debug level log all the peers for troubleshooting. + level.Debug(s.log).Log( + "msg", "discovered peers", + "peers_count", len(peers), + "peers", strings.Join(peers, ","), + ) + // Here we return the entire list because we can't take a subset. if s.opts.ClusterMaxJoinPeers == 0 || len(peers) < s.opts.ClusterMaxJoinPeers { return peers, nil @@ -347,6 +364,15 @@ func (s *Service) Data() any { return &sharderCluster{sharder: s.sharder} } +func (s *Service) logPeers(msg string, peers []string) { + // Truncate peers list on info level. + level.Info(s.log).Log( + "msg", msg, + "peers_count", len(peers), + "peers", util.JoinWithTruncation(peers, ",", maxPeersToLog, "..."), + ) +} + // Component is a component which subscribes to clustering updates. type Component interface { component.Component @@ -394,3 +420,11 @@ func (sc *sharderCluster) Lookup(key shard.Key, replicationFactor int, op shard. func (sc *sharderCluster) Peers() []peer.Peer { return sc.sharder.Peers() } + +func toStringSlice[T any](slice []T) []string { + s := make([]string, 0, len(slice)) + for _, p := range slice { + s = append(s, fmt.Sprintf("%v", p)) + } + return s +} diff --git a/internal/service/cluster/cluster_test.go b/internal/service/cluster/cluster_test.go index 02fba7b9be..d11a791a8e 100644 --- a/internal/service/cluster/cluster_test.go +++ b/internal/service/cluster/cluster_test.go @@ -2,8 +2,10 @@ package cluster import ( "math/rand" + "os" "testing" + "github.com/go-kit/log" "github.com/stretchr/testify/require" ) @@ -46,6 +48,7 @@ func TestGetPeers(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { s := &Service{ + log: log.NewLogfmtLogger(os.Stdout), opts: test.opts, randGen: rand.New(rand.NewSource(1)), // Seeded random generator to have consistent results in tests. } diff --git a/internal/util/strings.go b/internal/util/strings.go index 0caaadcecb..cea793251e 100644 --- a/internal/util/strings.go +++ b/internal/util/strings.go @@ -3,6 +3,8 @@ package util import ( "regexp" "strings" + + "k8s.io/utils/strings/slices" ) // CamelToSnake is a helper function for converting CamelCase to Snake Case @@ -13,3 +15,25 @@ func CamelToSnake(str string) string { snake = matchAllCap.ReplaceAllString(snake, "${1}_${2}") return strings.ToLower(snake) } + +// JoinWithTruncation joins a slice of string elements with a separator sep, truncating the middle if the slice is longer +// than maxElements, using abbreviation as a placeholder for the truncated part. The last element of the slice is always +// included in the result. For example: ["1", "2", "3", "4"] with sep=",", maxLength=3 and abbreviation="..." will +// return "1, 2, ..., 4". +func JoinWithTruncation(elements []string, sep string, maxElements int, abbreviation string) string { + if maxElements <= 0 { + return "" + } + if len(elements) <= maxElements { + return strings.Join(elements, sep) + } + // We know now that len(elements) > maxElements >= 1, so we need to truncate something. + // Handle the special case of maxElements == 1. + if maxElements == 1 { + return elements[0] + sep + abbreviation + } + // We know now that len(elements) > maxElements >= 2, can safely truncate the middle. + result := slices.Clone(elements[:maxElements-1]) + result = append(result, abbreviation, elements[len(elements)-1]) + return strings.Join(result, sep) +} diff --git a/internal/util/strings_test.go b/internal/util/strings_test.go new file mode 100644 index 0000000000..cebbc5ad39 --- /dev/null +++ b/internal/util/strings_test.go @@ -0,0 +1,130 @@ +package util + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestJoinWithTruncation(t *testing.T) { + type args struct { + str []string + sep string + maxLength int + abbreviation string + } + tests := []struct { + name string + args args + expected string + }{ + { + name: "empty slice", + args: args{str: []string{}, sep: ", ", maxLength: 0, abbreviation: "..."}, + expected: "", + }, + { + name: "empty slice 2", + args: args{str: []string{}, sep: ", ", maxLength: 10, abbreviation: "..."}, + expected: "", + }, + { + name: "smaller slice", + args: args{str: []string{"one", "two", "three"}, sep: ", ", maxLength: 10}, + expected: "one, two, three", + }, + { + name: "truncate slice", + args: args{ + str: []string{"one", "two", "three", "four", "five", "six"}, + sep: ", ", + maxLength: 4, + abbreviation: "[...]", + }, + expected: "one, two, three, [...], six", + }, + { + name: "truncate to 0", + args: args{ + str: []string{"one", "two", "three", "four", "five", "six"}, + sep: ", ", + maxLength: 0, + abbreviation: "[...]", + }, + expected: "", + }, + { + name: "truncate to 1", + args: args{ + str: []string{"one", "two", "three", "four", "five", "six"}, + sep: ", ", + maxLength: 1, + abbreviation: "[...]", + }, + expected: "one, [...]", + }, + { + name: "truncate to 2", + args: args{ + str: []string{"one", "two", "three", "four", "five", "six"}, + sep: ", ", + maxLength: 2, + abbreviation: "[...]", + }, + expected: "one, [...], six", + }, + { + name: "single element to 0", + args: args{ + str: []string{"one"}, + sep: ", ", + maxLength: 0, + abbreviation: "[...]", + }, + expected: "", + }, + { + name: "single element to 1", + args: args{ + str: []string{"one"}, + sep: ", ", + maxLength: 1, + abbreviation: "[...]", + }, + expected: "one", + }, + { + name: "single element to 2", + args: args{ + str: []string{"one"}, + sep: ", ", + maxLength: 2, + abbreviation: "[...]", + }, + expected: "one", + }, + { + name: "cluster peers example", + args: args{ + str: []string{ + "grafana-agent-helm-15.grafana-agent-helm.grafana-agent.svc.cluster.local.:3090", + "grafana-agent-helm-6.grafana-agent-helm.grafana-agent.svc.cluster.local.:3090", + "grafana-agent-helm-16.grafana-agent-helm.grafana-agent.svc.cluster.local.:3090", + "grafana-agent-helm-2.grafana-agent-helm.grafana-agent.svc.cluster.local.:3090", + }, + sep: ",", + maxLength: 2, + abbreviation: "...", + }, + expected: "grafana-agent-helm-15.grafana-agent-helm.grafana-agent.svc.cluster.local.:3090," + + "...," + + "grafana-agent-helm-2.grafana-agent-helm.grafana-agent.svc.cluster.local.:3090", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actual := JoinWithTruncation(tt.args.str, tt.args.sep, tt.args.maxLength, tt.args.abbreviation) + require.Equal(t, tt.expected, actual) + }) + } +}