diff --git a/ring/replication_strategy.go b/ring/replication_strategy.go index db2b28354..8b9b501ca 100644 --- a/ring/replication_strategy.go +++ b/ring/replication_strategy.go @@ -7,10 +7,15 @@ import ( ) type ReplicationStrategy interface { - // Filter out unhealthy instances and checks if there're enough instances + // Filter out unhealthy instances and checks if there are enough instances // for an operation to succeed. Returns an error if there are not enough // instances. Filter(instances []InstanceDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool) (healthy []InstanceDesc, maxFailures int, err error) + + // SupportsExpandedReplication returns true for replication strategies that + // support increasing the replication factor beyond a single instance per zone, + // false otherwise. + SupportsExpandedReplication() bool } type defaultReplicationStrategy struct{} @@ -70,6 +75,14 @@ func (s *defaultReplicationStrategy) Filter(instances []InstanceDesc, op Operati return instances, len(instances) - minSuccess, nil } +func (s *defaultReplicationStrategy) SupportsExpandedReplication() bool { + // defaultReplicationStrategy assumes that a single instance per zone is returned and that + // it can treat replication factor as equivalent to the number of zones. This doesn't work + // when a per-call replication factor increases it beyond the configured replication factor + // and the number of zones. + return false +} + type ignoreUnhealthyInstancesReplicationStrategy struct{} func NewIgnoreUnhealthyInstancesReplicationStrategy() ReplicationStrategy { @@ -101,6 +114,10 @@ func (r *ignoreUnhealthyInstancesReplicationStrategy) Filter(instances []Instanc return instances, len(instances) - 1, nil } +func (r *ignoreUnhealthyInstancesReplicationStrategy) SupportsExpandedReplication() bool { + return true +} + func (r *Ring) IsHealthy(instance *InstanceDesc, op Operation, now time.Time) bool { return instance.IsHealthy(op, r.cfg.HeartbeatTimeout, now) } diff --git a/ring/ring.go b/ring/ring.go index d47eb8fe2..5eb046862 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -34,6 +34,41 @@ const ( GetBufferSize = 5 ) +// Options are the result of Option instances that can be used to modify Ring.GetWithOptions behavior. +type Options struct { + ReplicationFactor int + BufDescs []InstanceDesc + BufHosts []string + BufZones []string +} + +// Option can be used to modify Ring behavior when calling Ring.GetWithOptions +type Option func(opts *Options) + +// WithBuffers creates an Option that will cause the given buffers to be used, avoiding allocations. +func WithBuffers(bufDescs []InstanceDesc, bufHosts, bufZones []string) Option { + return func(opts *Options) { + opts.BufDescs = bufDescs + opts.BufHosts = bufHosts + opts.BufZones = bufZones + } +} + +// WithReplicationFactor creates an Option that overrides the default replication factor for a single call. +func WithReplicationFactor(replication int) Option { + return func(opts *Options) { + opts.ReplicationFactor = replication + } +} + +func collectOptions(opts ...Option) Options { + final := Options{} + for _, opt := range opts { + opt(&final) + } + return final +} + // ReadRing represents the read interface to the ring. // Support for read-only instances requires use of ShuffleShard or ShuffleShardWithLookback prior to getting a ReplicationSet. type ReadRing interface { @@ -42,13 +77,17 @@ type ReadRing interface { // to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet(). Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) + // GetWithOptions returns n (or more) instances which form the replicas for the given key + // with 0 or more Option instances to change the behavior of the method call. + GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error) + // GetAllHealthy returns all healthy instances in the ring, for the given operation. // This function doesn't check if the quorum is honored, so doesn't fail if the number // of unhealthy instances is greater than the tolerated max unavailable. GetAllHealthy(op Operation) (ReplicationSet, error) // GetReplicationSetForOperation returns all instances where the input operation should be executed. - // The resulting ReplicationSet doesn't necessarily contains all healthy instances + // The resulting ReplicationSet doesn't necessarily contain all healthy instances // in the ring, but could contain the minimum set of instances required to execute // the input operation. GetReplicationSetForOperation(op Operation) (ReplicationSet, error) @@ -421,19 +460,44 @@ func (r *Ring) setRingStateFromDesc(ringDesc *Desc, updateMetrics, updateRegiste } // Get returns n (or more) instances which form the replicas for the given key. -func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) { +func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, _ []string) (ReplicationSet, error) { + // Note that we purposefully aren't calling GetWithOptions here since the closures it + // uses result in heap allocations which we specifically avoid in this method since it's + // called in hot loops. + return r.getReplicationSetForKey(key, op, bufDescs, bufHosts, r.cfg.ReplicationFactor) +} + +// GetWithOptions returns n (or more) instances which form the replicas for the given key +// with 0 or more options to change the behavior of the method call. +func (r *Ring) GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error) { + options := collectOptions(opts...) + return r.getReplicationSetForKey(key, op, options.BufDescs, options.BufHosts, options.ReplicationFactor) +} + +func (r *Ring) getReplicationSetForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, replicationFactor int) (ReplicationSet, error) { r.mtx.RLock() defer r.mtx.RUnlock() if r.ringDesc == nil || len(r.ringTokens) == 0 { return ReplicationSet{}, ErrEmptyRing } - instances, err := r.findInstancesForKey(key, op, bufDescs, bufHosts, bufZones, nil) + if replicationFactor <= 0 || replicationFactor < r.cfg.ReplicationFactor { + replicationFactor = r.cfg.ReplicationFactor + } + + // Not all replication strategies support increasing the replication factor beyond + // the number of zones available. Return an error unless a ReplicationStrategy has + // explicitly opted into supporting this. + if replicationFactor > r.cfg.ReplicationFactor && !r.strategy.SupportsExpandedReplication() { + return ReplicationSet{}, fmt.Errorf("per-call replication factor %d cannot exceed the configured replication factor %d with this replication strategy", replicationFactor, r.cfg.ReplicationFactor) + } + + instances, err := r.findInstancesForKey(key, op, bufDescs, bufHosts, replicationFactor, nil) if err != nil { return ReplicationSet{}, err } - healthyInstances, maxFailure, err := r.strategy.Filter(instances, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled) + healthyInstances, maxFailure, err := r.strategy.Filter(instances, op, replicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled) if err != nil { return ReplicationSet{}, err } @@ -447,9 +511,9 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, // Returns instances for given key and operation. Instances are not filtered through ReplicationStrategy. // InstanceFilter can ignore uninteresting instances that would otherwise be part of the output, and can also stop search early. // This function needs to be called with read lock on the ring. -func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, bufZones []string, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) { +func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, replicationFactor int, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) { var ( - n = r.cfg.ReplicationFactor + n = replicationFactor instances = bufDescs[:0] start = searchToken(r.ringTokens, key) iterations = 0 @@ -457,11 +521,21 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance maxInstances = len(r.ringDesc.Ingesters) // We use a slice instead of a map because it's faster to search within a - // slice than lookup a map for a very low number of items. + // slice than lookup a map for a very low number of items, we only expect + // to have low single-digit number of hosts. distinctHosts = bufHosts[:0] - distinctZones = bufZones[:0] + + // TODO: Do we need to pass this in to avoid allocations? + hostsPerZone = make(map[string]int) + targetHostsPerZone = max(1, replicationFactor/maxZones) ) - for i := start; len(distinctHosts) < min(maxInstances, n) && len(distinctZones) < maxZones && iterations < len(r.ringTokens); i++ { + + for i := start; len(distinctHosts) < min(maxInstances, n) && iterations < len(r.ringTokens); i++ { + // If we have the target number of instances in all zones, stop looking. + if r.cfg.ZoneAwarenessEnabled && haveTargetHostsInAllZones(hostsPerZone, targetHostsPerZone, maxZones) { + break + } + iterations++ // Wrap i around in the ring. i %= len(r.ringTokens) @@ -478,9 +552,9 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance continue } - // Ignore if the instances don't have a zone set. + // If we already have the required number of instances for this zone, skip. if r.cfg.ZoneAwarenessEnabled && info.Zone != "" { - if slices.Contains(distinctZones, info.Zone) { + if hostsPerZone[info.Zone] >= targetHostsPerZone { continue } } @@ -493,9 +567,9 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance if op.ShouldExtendReplicaSetOnState(instance.State) { n++ } else if r.cfg.ZoneAwarenessEnabled && info.Zone != "" { - // We should only add the zone if we are not going to extend, - // as we want to extend the instance in the same AZ. - distinctZones = append(distinctZones, info.Zone) + // We should only increment the count for this zone if we are not going to + // extend, as we want to extend the instance in the same AZ. + hostsPerZone[info.Zone]++ } include, keepGoing := true, true @@ -512,6 +586,20 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance return instances, nil } +func haveTargetHostsInAllZones(hostsByZone map[string]int, targetHostsPerZone int, maxZones int) bool { + if len(hostsByZone) != maxZones { + return false + } + + for _, count := range hostsByZone { + if count < targetHostsPerZone { + return false + } + } + + return true +} + // GetAllHealthy implements ReadRing. func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) { r.mtx.RLock() @@ -1332,36 +1420,3 @@ func (op Operation) ShouldExtendReplicaSetOnState(s InstanceState) bool { // All states are healthy, no states extend replica set. var allStatesRingOperation = Operation(0x0000ffff) - -// numberOfKeysOwnedByInstance returns how many of the supplied keys are owned by given instance. -func (r *Ring) numberOfKeysOwnedByInstance(keys []uint32, op Operation, instanceID string, bufDescs []InstanceDesc, bufHosts []string, bufZones []string) (int, error) { - r.mtx.RLock() - defer r.mtx.RUnlock() - - if r.ringDesc == nil || len(r.ringTokens) == 0 { - return 0, ErrEmptyRing - } - - // Instance is not in this ring, it can't own any key. - if _, ok := r.ringDesc.Ingesters[instanceID]; !ok { - return 0, nil - } - - owned := 0 - for _, tok := range keys { - i, err := r.findInstancesForKey(tok, op, bufDescs, bufHosts, bufZones, func(foundInstanceID string) (include, keepGoing bool) { - if foundInstanceID == instanceID { - // If we've found our instance, we can stop. - return true, false - } - return false, true - }) - if err != nil { - return 0, err - } - if len(i) > 0 { - owned++ - } - } - return owned, nil -} diff --git a/ring/ring_test.go b/ring/ring_test.go index 909c5d019..d44b04424 100644 --- a/ring/ring_test.go +++ b/ring/ring_test.go @@ -619,6 +619,7 @@ func TestRing_Get_ZoneAwareness(t *testing.T) { tests := map[string]struct { numInstances int numZones int + totalZones int replicationFactor int zoneAwarenessEnabled bool expectedErr string @@ -627,6 +628,7 @@ func TestRing_Get_ZoneAwareness(t *testing.T) { "should succeed if there are enough instances per zone on RF = 3": { numInstances: 16, numZones: 3, + totalZones: 3, replicationFactor: 3, zoneAwarenessEnabled: true, expectedInstances: 3, @@ -634,6 +636,7 @@ func TestRing_Get_ZoneAwareness(t *testing.T) { "should fail if there are instances in 1 zone only on RF = 3": { numInstances: 16, numZones: 1, + totalZones: 3, replicationFactor: 3, zoneAwarenessEnabled: true, expectedErr: "at least 2 live replicas required across different availability zones, could only find 1", @@ -641,6 +644,7 @@ func TestRing_Get_ZoneAwareness(t *testing.T) { "should succeed if there are instances in 2 zones on RF = 3": { numInstances: 16, numZones: 2, + totalZones: 3, replicationFactor: 3, zoneAwarenessEnabled: true, expectedInstances: 2, @@ -648,6 +652,7 @@ func TestRing_Get_ZoneAwareness(t *testing.T) { "should succeed if there are instances in 1 zone only on RF = 3 but zone-awareness is disabled": { numInstances: 16, numZones: 1, + totalZones: 3, replicationFactor: 3, zoneAwarenessEnabled: false, expectedInstances: 3, @@ -670,6 +675,12 @@ func TestRing_Get_ZoneAwareness(t *testing.T) { prevTokens = append(prevTokens, ingTokens...) } + // Add instances to the ring that don't own any tokens so that the ring is aware of all zones. + for i := testData.numInstances; i < testData.numInstances+testData.totalZones; i++ { + name := fmt.Sprintf("ing%v", i) + r.AddIngester(name, fmt.Sprintf("127.0.0.%d", i), fmt.Sprintf("zone-%v", i), nil, ACTIVE, time.Now(), false, time.Time{}) + } + // Create a ring with the instances ring := newRingForTesting(Config{ HeartbeatTimeout: time.Hour, @@ -721,6 +732,199 @@ func TestRing_Get_ZoneAwareness(t *testing.T) { } } +func TestRing_GetWithOptions(t *testing.T) { + const testCount = 10_000 + + healthyHeartbeat := time.Now() + unhealthyHeartbeat := healthyHeartbeat.Add(-2 * time.Minute) + + type testCase struct { + name string + ringInstances map[string]InstanceDesc + strategy ReplicationStrategy + options []Option + expectedSetSize int + expectError bool + } + cases := []testCase{ + { + name: "no options, default strategy", + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + }, + strategy: NewDefaultReplicationStrategy(), + expectedSetSize: 3, + expectError: false, + }, + { + name: "invalid replication factor, default strategy", + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + }, + strategy: NewDefaultReplicationStrategy(), + options: []Option{WithReplicationFactor(1)}, + expectedSetSize: 3, + expectError: false, + }, + { + name: "higher replication factor, default strategy", + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + }, + strategy: NewDefaultReplicationStrategy(), + options: []Option{WithReplicationFactor(6)}, + expectedSetSize: 0, + expectError: true, + }, + { + name: "higher replication factor, default strategy, some unhealthy", + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + }, + strategy: NewDefaultReplicationStrategy(), + options: []Option{WithReplicationFactor(6)}, + expectedSetSize: 0, + expectError: true, + }, + { + name: "higher replication factor, default strategy, most unhealthy", + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + }, + strategy: NewDefaultReplicationStrategy(), + options: []Option{WithReplicationFactor(6)}, + expectedSetSize: 0, + expectError: true, + }, + { + name: "higher replication factor, ignore unhealthy strategy, some unhealthy", + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + }, + strategy: NewIgnoreUnhealthyInstancesReplicationStrategy(), + options: []Option{WithReplicationFactor(6)}, + expectedSetSize: 4, + expectError: false, + }, + { + name: "higher replication factor, ignore unhealthy strategy, most unhealthy", + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + }, + strategy: NewIgnoreUnhealthyInstancesReplicationStrategy(), + options: []Option{WithReplicationFactor(6)}, + expectedSetSize: 2, + expectError: false, + }, + { + name: "higher replication factor, ignore unhealthy strategy, single healthy", + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", State: ACTIVE, Timestamp: healthyHeartbeat.Unix()}, + }, + strategy: NewIgnoreUnhealthyInstancesReplicationStrategy(), + options: []Option{WithReplicationFactor(6)}, + expectedSetSize: 1, + expectError: false, + }, + { + name: "higher replication factor, ignore unhealthy strategy, all unhealthy", + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", State: ACTIVE, Timestamp: unhealthyHeartbeat.Unix()}, + }, + strategy: NewIgnoreUnhealthyInstancesReplicationStrategy(), + options: []Option{WithReplicationFactor(6)}, + expectedSetSize: 0, + expectError: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + for _, zoneAwarenessEnabled := range []bool{false, true} { + t.Run(fmt.Sprintf("zoneAwareness = %t", zoneAwarenessEnabled), func(t *testing.T) { + gen := initTokenGenerator(t) + var prevTokens []uint32 + + r := NewDesc() + for id, desc := range tc.ringInstances { + desc.Tokens = gen.GenerateTokens(128, prevTokens) + r.Ingesters[id] = desc + + prevTokens = append(prevTokens, desc.Tokens...) + } + + cfg := Config{ + HeartbeatTimeout: time.Minute, + ReplicationFactor: 3, + ZoneAwarenessEnabled: zoneAwarenessEnabled, + } + + ring := newRingForTesting(cfg, false) + ring.setRingStateFromDesc(r, false, false, false) + ring.strategy = tc.strategy + + // Use the GenerateTokens to get an array of random uint32 values. + testValues := gen.GenerateTokens(testCount, nil) + + for i := 0; i < testCount; i++ { + set, err := ring.GetWithOptions(testValues[i], Write, tc.options...) + if tc.expectError { + require.Error(t, err) + } else { + require.Equal(t, tc.expectedSetSize, len(set.Instances)) + } + } + }) + } + }) + } +} + func TestRing_GetAllHealthy(t *testing.T) { const heartbeatTimeout = time.Minute now := time.Now() @@ -3538,13 +3742,14 @@ func generateRingInstance(gen TokenGenerator, id, zone, numTokens int, usedToken return instanceID, generateRingInstanceWithInfo(instanceID, zoneID, newTokens, time.Now()), newTokens } -func generateRingInstanceWithInfo(addr, zone string, tokens []uint32, registeredAt time.Time) InstanceDesc { +func generateRingInstanceWithInfo(id, zone string, tokens []uint32, registeredAt time.Time) InstanceDesc { var regts int64 if !registeredAt.IsZero() { regts = registeredAt.Unix() } return InstanceDesc{ - Addr: addr, + Id: id, + Addr: id, Timestamp: time.Now().Unix(), RegisteredTimestamp: regts, State: ACTIVE, diff --git a/ring/token_range_test.go b/ring/token_range_test.go index db6fef28f..005e56ee4 100644 --- a/ring/token_range_test.go +++ b/ring/token_range_test.go @@ -112,10 +112,12 @@ func TestGetTokenRangesForInstance(t *testing.T) { } } + cfg := Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: true, ReplicationFactor: numZones} + // Initialise the ring. ringDesc := &Desc{Ingesters: instances} ring := Ring{ - cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: true, ReplicationFactor: numZones}, + cfg: cfg, ringDesc: ringDesc, ringTokens: ringDesc.GetTokens(), ringTokensByZone: ringDesc.getTokensByZone(), @@ -163,9 +165,10 @@ func benchmarkGetTokenRangesForInstance(b *testing.B, instancesPerZone int) { for n := 0; n < b.N; n++ { b.StopTimer() // Initialise the ring. + cfg := Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: true, ReplicationFactor: numZones} ringDesc := &Desc{Ingesters: generateRingInstances(gen, instancesPerZone*numZones, numZones, numTokens)} ring := Ring{ - cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: true, ReplicationFactor: numZones}, + cfg: cfg, ringDesc: ringDesc, ringTokens: ringDesc.GetTokens(), ringTokensByZone: ringDesc.getTokensByZone(), @@ -226,18 +229,21 @@ func testCheckingOfKeyOwnership(t *testing.T, randomizeInstanceStates bool) { } } + cfg := Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: false, ReplicationFactor: replicationFactor} ring := Ring{ - cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: false, ReplicationFactor: replicationFactor}, + cfg: cfg, ringDesc: ringDesc, ringTokens: ringDesc.GetTokens(), ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), shuffledSubringCache: map[subringCacheKey]*Ring{}, - strategy: NewDefaultReplicationStrategy(), + strategy: nopReplicationStrategy{}, lastTopologyChange: time.Now(), } + bufInstances, bufHosts, bufZones := MakeBuffersForGet() + for uid, tokens := range userTokens { shardSize := shardSizes[uid] @@ -257,11 +263,7 @@ func testCheckingOfKeyOwnership(t *testing.T, randomizeInstanceStates bool) { } // Compute owned tokens using numberOfKeysOwnedByInstance. - bufDescs := make([]InstanceDesc, 5) - bufHosts := make([]string, 5) - bufZones := make([]string, numZones) - - cntViaGet, err := sr.numberOfKeysOwnedByInstance(tokens, WriteNoExtend, instanceID, bufDescs, bufHosts, bufZones) + cntViaGet, err := numberOfKeysOwnedByInstance(sr, tokens, WriteNoExtend, instanceID, bufInstances, bufHosts, bufZones) require.NoError(t, err) assert.Equal(t, cntViaTokens, cntViaGet, "user=%s, instance=%s", uid, instanceID) @@ -269,6 +271,41 @@ func testCheckingOfKeyOwnership(t *testing.T, randomizeInstanceStates bool) { } } +// numberOfKeysOwnedByInstance returns how many of the supplied keys are owned by given instance. +func numberOfKeysOwnedByInstance(r *Ring, keys []uint32, op Operation, instanceID string, bufDescs []InstanceDesc, bufHosts, bufZones []string) (int, error) { + owned := 0 + if !r.HasInstance(instanceID) { + return 0, nil + } + + for _, tok := range keys { + s, err := r.Get(tok, op, bufDescs, bufHosts, bufZones) + if err != nil { + return 0, err + } + + for _, inst := range s.Instances { + if inst.Id == instanceID { + owned++ + break + } + } + } + + return owned, nil +} + +type nopReplicationStrategy struct{} + +func (n nopReplicationStrategy) Filter(instances []InstanceDesc, _ Operation, _ int, _ time.Duration, _ bool) (healthy []InstanceDesc, maxFailures int, err error) { + // Don't filter any instances because we don't care, we only need to check their token ownership + return instances, 0, nil +} + +func (n nopReplicationStrategy) SupportsExpandedReplication() bool { + return false +} + func BenchmarkCompareCountingOfSeriesViaRingAndTokenRanges(b *testing.B) { const instancesPerZone = 100 const numZones = 3 @@ -280,9 +317,10 @@ func BenchmarkCompareCountingOfSeriesViaRingAndTokenRanges(b *testing.B) { seriesTokens := gen.GenerateTokens(userTokens, nil) // Generate ring + cfg := Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: false, ReplicationFactor: numZones} ringDesc := &Desc{Ingesters: generateRingInstances(gen, instancesPerZone*numZones, numZones, numTokens)} ring := Ring{ - cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: false, ReplicationFactor: numZones}, + cfg: cfg, ringDesc: ringDesc, ringTokens: ringDesc.GetTokens(), ringTokensByZone: ringDesc.getTokensByZone(), @@ -321,19 +359,4 @@ func BenchmarkCompareCountingOfSeriesViaRingAndTokenRanges(b *testing.B) { } } }) - - b.Run("numberOfKeysOwnedByInstance", func(b *testing.B) { - bufDescs := make([]InstanceDesc, 5) - bufHosts := make([]string, 5) - bufZones := make([]string, numZones) - - for i := 0; i < b.N; i++ { - cntViaGet, err := sr.numberOfKeysOwnedByInstance(seriesTokens, WriteNoExtend, instanceID, bufDescs, bufHosts, bufZones) - require.NoError(b, err) - - if cntViaGet <= 0 { - b.Fatal("no owned tokens found!") - } - } - }) } diff --git a/ring/util_test.go b/ring/util_test.go index ec52c5077..87765e979 100644 --- a/ring/util_test.go +++ b/ring/util_test.go @@ -28,6 +28,11 @@ func (r *RingMock) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHos return args.Get(0).(ReplicationSet), args.Error(1) } +func (r *RingMock) GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error) { + args := r.Called(key, op, opts) + return args.Get(0).(ReplicationSet), args.Error(1) +} + func (r *RingMock) GetAllHealthy(op Operation) (ReplicationSet, error) { args := r.Called(op) return args.Get(0).(ReplicationSet), args.Error(1) @@ -105,8 +110,9 @@ func createStartingRing() *Ring { "instance-5": {Id: "instance-5", Addr: "127.0.0.5", State: ACTIVE, Timestamp: time.Now().Unix()}, }} + cfg := Config{HeartbeatTimeout: time.Minute, ReplicationFactor: 3} ring := &Ring{ - cfg: Config{HeartbeatTimeout: time.Minute}, + cfg: cfg, ringDesc: ringDesc, ringTokens: ringDesc.GetTokens(), ringTokensByZone: ringDesc.getTokensByZone(),