Skip to content

Commit

Permalink
CBG-4015 Refresh _sync:seq when releasing sequences in nextSequenceGr…
Browse files Browse the repository at this point in the history
…eaterThan

Accounts for expected jumps in sequence values under variable allocator throughput before triggering “greater than” handling.
  • Loading branch information
adamcfraser committed Jul 4, 2024
1 parent bf0f0f6 commit fff39bf
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 50 deletions.
82 changes: 54 additions & 28 deletions db/sequence_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,19 @@ func (s *sequenceAllocator) nextSequence(ctx context.Context) (sequence uint64,
return sequence, nil
}

// _releaseCurrentBatch releases any unused sequences currently held by the allocator.
// Writes unused sequence document while holding s.lock, shouldn't be used by high-throughput operations.
func (s *sequenceAllocator) _releaseCurrentBatch(ctx context.Context) (numReleased uint64, err error) {
if s.max > s.last {
numReleased, err = s.releaseSequenceRange(ctx, s.last+1, s.max)
if err != nil {
return 0, err
}
s.last = s.max
}
return numReleased, nil
}

// nextSequenceGreaterThan increments _sync:seq such that it's greater than existingSequence + s.sequenceBatchSize
// In the case where our local s.max < _sync:seq (another node has incremented _sync:seq), we may be releasing
// sequences greater than existingSequence, but we will only ever release sequences allocated by this node's incr operation
Expand Down Expand Up @@ -226,39 +239,58 @@ func (s *sequenceAllocator) nextSequenceGreaterThan(ctx context.Context, existin

}

// If the target sequence is greater than the highest in our batch (s.max), we want to:
// (a) Reserve n sequences past _sync:seq, where n = existingSequence - s.max. It's ok if the resulting sequence exceeds targetSequence (if other nodes have allocated sequences and
// updated _sync:seq since we last updated s.max.), then
// (b) Allocate a standard batch of sequences, and assign a sequence from that batch in the usual way.
// (c) Release any previously allocated sequences (s.last to s.max)
// (d) Release the reserved sequences from part (a)
// We can perform (a) and (b) as a single increment operation, but (c) and (d) aren't necessarily contiguous blocks and must be released
// separately

prevAllocReleaseFrom := s.last + 1
prevAllocReleaseTo := s.max
// At this point we need to allocate a sequence that's larger than what's in our current batch, so we first need to release the current batch.
var numReleasedBatch uint64
numReleasedBatch, err = s._releaseCurrentBatch(ctx)
if err != nil {
base.WarnfCtx(ctx, "Error returned when releasing sequence range [%d-%d] for existing batch. Will be handled by skipped sequence handling. Error:%v", err)
}
releasedSequenceCount += numReleasedBatch

numberToRelease := existingSequence - s.max
numberToAllocate := s.sequenceBatchSize
incrVal := numberToRelease + numberToAllocate
allocatedToSeq, err := s.incrementSequence(incrVal)
syncSeq, err := s.getSequence()
if err != nil {
base.WarnfCtx(ctx, "Error from incrementSequence in nextSequenceGreaterThan(%d): %v", existingSequence, err)
s.mutex.Unlock()
return 0, 0, err
base.WarnfCtx(ctx, "Error returned when fetching current sequence during nextSequenceGreaterThan. Will be handled by skipped sequence handling. Error:%v", err)
}

// check for rollback of _sync:seq before continuing
minimumExpectedValue := incrVal + prevAllocReleaseTo
if allocatedToSeq < minimumExpectedValue {
if syncSeq < s.last {
// rollback of _sync:seq detected
allocatedToSeq, err = s._fixSyncSeqRollback(ctx, allocatedToSeq, minimumExpectedValue)
syncSeq, err = s._fixSyncSeqRollback(ctx, syncSeq, s.last)
if err != nil {
s.mutex.Unlock()
return 0, 0, err
}
}

// If the target sequence is less than the current syncSeq, allocate as normal using _nextSequence
if syncSeq >= targetSequence {
sequence, sequencesReserved, err := s._nextSequence(ctx)
s.mutex.Unlock()
if err != nil {
return 0, 0, err
}
if sequencesReserved {
s.reserveNotify <- struct{}{}
}
s.dbStats.SequenceAssignedCount.Add(1)
return sequence, releasedSequenceCount, nil
}

// If the target sequence is greater than the current _sync:seq, we want to:
// (a) Reserve n sequences past _sync:seq, where n = existingSequence - syncSeq. It's ok if the resulting sequence exceeds targetSequence (if other nodes have allocated sequences and
// updated _sync:seq since we last updated s.max.)
// (b) Allocate a standard batch of sequences, and assign a sequence from that batch in the usual way.
// (c) Release the reserved sequences from part (a)
numberToRelease := existingSequence - syncSeq
numberToAllocate := s.sequenceBatchSize
incrVal := numberToRelease + numberToAllocate
allocatedToSeq, err := s.incrementSequence(incrVal)
if err != nil {
base.WarnfCtx(ctx, "Error from incrementSequence in nextSequenceGreaterThan(%d): %v", existingSequence, err)
s.mutex.Unlock()
return 0, 0, err
}

s.max = allocatedToSeq
s.last = allocatedToSeq - numberToAllocate + 1
sequence = s.last
Expand All @@ -267,15 +299,9 @@ func (s *sequenceAllocator) nextSequenceGreaterThan(ctx context.Context, existin
// Perform standard batch handling and stats updates
s.lastSequenceReserveTime = time.Now()
s.reserveNotify <- struct{}{}
s.dbStats.SequenceReservedCount.Add(int64(numberToRelease + numberToAllocate))
s.dbStats.SequenceReservedCount.Add(int64(incrVal))
s.dbStats.SequenceAssignedCount.Add(1)

// Release previously allocated sequences (c), if any
released, err := s.releaseSequenceRange(ctx, prevAllocReleaseFrom, prevAllocReleaseTo)
if err != nil {
base.WarnfCtx(ctx, "Error returned when releasing sequence range [%d-%d] for previously allocated sequences. Will be handled by skipped sequence handling. Error:%v", prevAllocReleaseFrom, prevAllocReleaseTo, err)
}
releasedSequenceCount += released
// Release the newly allocated sequences that were used to catch up to existingSequence (d)
if numberToRelease > 0 {
releaseTo := allocatedToSeq - numberToAllocate
Expand Down
184 changes: 163 additions & 21 deletions db/sequence_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@ licenses/APL2.txt.
package db

import (
"context"
"fmt"
"log"
"math"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -379,15 +384,14 @@ func TestNextSequenceGreaterThanMultiNode(t *testing.T) {
assertNewAllocatorStats(t, dbStatsB, 1, 10, 2, 4)

// calling nextSequenceGreaterThan(15) on A will increment _sync:seq by 5 on it's previously allocated sequence (10).
// Since node B has already updated _sync:seq to 20, will result in:
// Since node B has already updated _sync:seq to 20, calling nextSequenceGreaterThan(15) on A will result in:
// node A releasing sequences 2-10 from it's existing buffer
// node A allocating and releasing sequences 21-24
// node A adding sequences 25-35 to its buffer, and assigning 25 to the current request
// node A adding sequences 21-30 to its buffer, and assigning 21 to the current request
nextSequence, releasedSequenceCount, err = a.nextSequenceGreaterThan(ctx, 15)
assert.NoError(t, err)
assert.Equal(t, uint64(26), nextSequence)
assert.Equal(t, 14, int(releasedSequenceCount))
assertNewAllocatorStats(t, dbStatsA, 2, 25, 2, 14)
assert.Equal(t, uint64(21), nextSequence)
assert.Equal(t, 9, int(releasedSequenceCount))
assertNewAllocatorStats(t, dbStatsA, 2, 20, 2, 9)

}

Expand Down Expand Up @@ -511,42 +515,46 @@ func TestSingleNodeNextSeqGreaterThanRollbackHandling(t *testing.T) {
// alter s.last to mock sequences being allocated
a.last = 10

// triggers correction value increase to 10 (sequence batch value) thus nextSeq is higher than you would expect
// nextSequenceGreaterThan fetches current _sync:seq, detects rollback and adjusts by
// delta from last allocated (5) + syncSeqCorrectionValue (500)
nxtSeq, _, err = a.nextSequenceGreaterThan(ctx, 15)
require.NoError(t, err)
assert.Equal(t, uint64(526), nxtSeq)
assert.Equal(t, uint64(526), a.last)
assert.Equal(t, uint64(535), a.max)

assert.Equal(t, 521, int(nxtSeq))
assert.Equal(t, 521, int(a.last))
assert.Equal(t, 530, int(a.max))

// alter _sync:seq in bucket to end seq in prev batch
err = ds.Set(a.metaKeys.SyncSeqKey(), 0, nil, 10)
require.NoError(t, err)

// nextSequenceGreaterThan fetches current _sync:seq, detects rollback and adjusts by
// delta from last allocated (530-10=520) + syncSeqCorrectionValue (500) + batch size (10)
nxtSeq, _, err = a.nextSequenceGreaterThan(ctx, 535)
require.NoError(t, err)
assert.Equal(t, uint64(1046), nxtSeq)
assert.Equal(t, uint64(1046), a.last)
assert.Equal(t, uint64(1055), a.max)
assert.Equal(t, uint64(1041), nxtSeq)
assert.Equal(t, uint64(1041), a.last)
assert.Equal(t, uint64(1050), a.max)

// alter _sync:seq in bucket to end seq in prev batch start seq
err = ds.Set(a.metaKeys.SyncSeqKey(), 0, nil, 526)
// alter _sync:seq in bucket to start seq in prev batch
err = ds.Set(a.metaKeys.SyncSeqKey(), 0, nil, 1041)
require.NoError(t, err)

nxtSeq, _, err = a.nextSequenceGreaterThan(ctx, 1055)
require.NoError(t, err)
assert.Equal(t, uint64(1566), nxtSeq)
assert.Equal(t, uint64(1566), a.last)
assert.Equal(t, uint64(1575), a.max)
assert.Equal(t, 1561, int(nxtSeq))
assert.Equal(t, 1561, int(a.last))
assert.Equal(t, 1570, int(a.max))

// alter _sync:seq in bucket to prev batch value
err = ds.Set(a.metaKeys.SyncSeqKey(), 0, nil, 5)
require.NoError(t, err)

nxtSeq, _, err = a.nextSequenceGreaterThan(ctx, 1575)
require.NoError(t, err)
assert.Equal(t, uint64(2086), nxtSeq)
assert.Equal(t, uint64(2086), a.last)
assert.Equal(t, uint64(2095), a.max)
assert.Equal(t, uint64(2081), nxtSeq)
assert.Equal(t, uint64(2081), a.last)
assert.Equal(t, uint64(2090), a.max)
}

// TestSyncSeqRollbackMultiNode:
Expand Down Expand Up @@ -746,3 +754,137 @@ func TestFiveNodeRollbackMiddleNodesDetects(t *testing.T) {
assert.Equal(t, uint64(551), a.last)
assert.Equal(t, uint64(560), a.max)
}

// TestVariableRateAllocators simulates the following scenario:
// - import nodes have high sequence allocation rate
// - client-facing nodes have low sequence allocation rate
// - documents are imported, then the same documents are immediately updated by clients
// (including sequence validation triggering nextSequenceGreaterThan)
//
// Ensures we don't release more sequences than would be expected based on allocator batch size
func TestVariableRateAllocators(t *testing.T) {
ctx := base.TestCtx(t)
bucket := base.GetTestBucket(t)
defer bucket.Close(ctx)
var expectedAllocations uint64

dataStore := bucket.GetSingleDataStore()
stats, err := base.NewSyncGatewayStats()
require.NoError(t, err)

importStats, err := stats.NewDBStats("import", false, false, false, nil, nil)
require.NoError(t, err)

importFeedAllocator, err := newSequenceAllocator(ctx, dataStore, importStats.DatabaseStats, base.DefaultMetadataKeys)
require.NoError(t, err)

// All test allocators are stopped when allocatorCtx is closed
allocatorCtx, cancelFunc := context.WithCancel(ctx)

// Start import node allocator, performing 10000 allocations/second.
var allocatorWg sync.WaitGroup
allocatorWg.Add(1)
go func() {
count := runAllocator(allocatorCtx, importFeedAllocator, 100*time.Microsecond) // 10000 writes/second
atomic.AddUint64(&expectedAllocations, count)
allocatorWg.Done()
}()

// Start multiple client node allocators, performing 100 allocations/second
clientAllocators := make([]*sequenceAllocator, 0)
clientAllocatorCount := 10
for i := 0; i <= clientAllocatorCount; i++ {
clientStats, err := stats.NewDBStats(fmt.Sprintf("client%d", i), false, false, false, nil, nil)
require.NoError(t, err)
clientAllocator, err := newSequenceAllocator(ctx, dataStore, clientStats.DatabaseStats, base.DefaultMetadataKeys)
require.NoError(t, err)
clientAllocators = append(clientAllocators, clientAllocator)
allocatorWg.Add(1)
go func() {
count := runAllocator(allocatorCtx, clientAllocator, 10*time.Millisecond) // 100 writes/second
atomic.AddUint64(&expectedAllocations, count)
allocatorWg.Done()
}()
}

// Wait for allocators to get up to maximum batch size
time.Sleep(500 * time.Millisecond)
documentCount := 10
var updateWg sync.WaitGroup
updateWg.Add(documentCount)
for i := 0; i < documentCount; i++ {
go func() {
_ = multiNodeUpdate(t, ctx, importFeedAllocator, clientAllocators, 5, 10*time.Millisecond)
updateWg.Done()
atomic.AddUint64(&expectedAllocations, 6)
}()
}

updateWg.Wait()

// Stop background allocation goroutines, wait for them to close
cancelFunc()
allocatorWg.Wait()

log.Printf("expectedSequence (num allocations):%v", atomic.LoadUint64(&expectedAllocations))

importFeedAllocator.Stop(ctx)
numAssigned := importFeedAllocator.dbStats.SequenceAssignedCount.Value()
numReleased := importFeedAllocator.dbStats.SequenceReleasedCount.Value()
for _, allocator := range clientAllocators {
allocator.Stop(ctx)
numAssigned += allocator.dbStats.SequenceAssignedCount.Value()
clientSequencesReleased := allocator.dbStats.SequenceReleasedCount.Value()
numReleased += clientSequencesReleased

}

log.Printf("Total sequences released + assigned: %v", numReleased+numAssigned)
actualSequence, err := importFeedAllocator.getSequence()
log.Printf("actual sequence (getSequence): %v", actualSequence)
require.NoError(t, err)
}

// multiNodeUpdate obtains an initial sequence from an import allocator (import node), then performs repeated updates to the doc using random pool of iterators (random SG node).
// Performs sequenceGreaterThan, then ensures that allocator doesn't release more than the sequence batch size
func multiNodeUpdate(t *testing.T, ctx context.Context, importAllocator *sequenceAllocator, clientAllocators []*sequenceAllocator, updateCount int, interval time.Duration) (releasedCount uint64) {
currentSequence, _ := importAllocator.nextSequence(ctx)

for i := 0; i < updateCount; i++ {
allocatorIndex := rand.Intn(len(clientAllocators))
clientAllocator := clientAllocators[allocatorIndex]
nextSequence, err := clientAllocator.nextSequence(ctx)
require.NoError(t, err, "nextSequence error: %v", err)
if nextSequence < currentSequence {
prevNext := nextSequence
var numReleased uint64
nextSequence, numReleased, err = clientAllocator.nextSequenceGreaterThan(ctx, currentSequence)
require.NoError(t, err, "nextSequenceGreaterThan error: %v", err)
log.Printf("allocator %d released %d sequences because next < current (%d < %d)", numReleased, allocatorIndex, prevNext, currentSequence)
// At most clientAllocator should only need to release the current batch
assert.LessOrEqual(t, numReleased, clientAllocator.sequenceBatchSize)
releasedCount += numReleased
}
currentSequence = nextSequence
time.Sleep(interval)
}

return releasedCount
}

func runAllocator(ctx context.Context, a *sequenceAllocator, frequency time.Duration) (allocationCount uint64) {

allocationCount = 0
ticker := time.NewTicker(frequency)
for {
select {
case <-ticker.C:
_, _ = a.nextSequence(ctx)
allocationCount++
case <-ctx.Done():
ticker.Stop()
log.Printf("allocator count: %v", allocationCount)
return allocationCount
}
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/couchbase/sync_gateway

go 1.22
go 1.22.0

require (
dario.cat/mergo v1.0.0
Expand Down

0 comments on commit fff39bf

Please sign in to comment.