From fff39bf11f1a619a8ae44870fb1f812f563a0a3a Mon Sep 17 00:00:00 2001 From: adamcfraser Date: Wed, 3 Jul 2024 16:16:10 -0700 Subject: [PATCH] CBG-4015 Refresh _sync:seq when releasing sequences in nextSequenceGreaterThan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Accounts for expected jumps in sequence values under variable allocator throughput before triggering “greater than” handling. --- db/sequence_allocator.go | 82 +++++++++------ db/sequence_allocator_test.go | 184 ++++++++++++++++++++++++++++++---- go.mod | 2 +- 3 files changed, 218 insertions(+), 50 deletions(-) diff --git a/db/sequence_allocator.go b/db/sequence_allocator.go index b2256c230d..8e112f8762 100644 --- a/db/sequence_allocator.go +++ b/db/sequence_allocator.go @@ -187,6 +187,19 @@ func (s *sequenceAllocator) nextSequence(ctx context.Context) (sequence uint64, return sequence, nil } +// _releaseCurrentBatch releases any unused sequences currently held by the allocator. +// Writes unused sequence document while holding s.lock, shouldn't be used by high-throughput operations. +func (s *sequenceAllocator) _releaseCurrentBatch(ctx context.Context) (numReleased uint64, err error) { + if s.max > s.last { + numReleased, err = s.releaseSequenceRange(ctx, s.last+1, s.max) + if err != nil { + return 0, err + } + s.last = s.max + } + return numReleased, nil +} + // nextSequenceGreaterThan increments _sync:seq such that it's greater than existingSequence + s.sequenceBatchSize // In the case where our local s.max < _sync:seq (another node has incremented _sync:seq), we may be releasing // sequences greater than existingSequence, but we will only ever release sequences allocated by this node's incr operation @@ -226,39 +239,58 @@ func (s *sequenceAllocator) nextSequenceGreaterThan(ctx context.Context, existin } - // If the target sequence is greater than the highest in our batch (s.max), we want to: - // (a) Reserve n sequences past _sync:seq, where n = existingSequence - s.max. It's ok if the resulting sequence exceeds targetSequence (if other nodes have allocated sequences and - // updated _sync:seq since we last updated s.max.), then - // (b) Allocate a standard batch of sequences, and assign a sequence from that batch in the usual way. - // (c) Release any previously allocated sequences (s.last to s.max) - // (d) Release the reserved sequences from part (a) - // We can perform (a) and (b) as a single increment operation, but (c) and (d) aren't necessarily contiguous blocks and must be released - // separately - - prevAllocReleaseFrom := s.last + 1 - prevAllocReleaseTo := s.max + // At this point we need to allocate a sequence that's larger than what's in our current batch, so we first need to release the current batch. + var numReleasedBatch uint64 + numReleasedBatch, err = s._releaseCurrentBatch(ctx) + if err != nil { + base.WarnfCtx(ctx, "Error returned when releasing sequence range [%d-%d] for existing batch. Will be handled by skipped sequence handling. Error:%v", err) + } + releasedSequenceCount += numReleasedBatch - numberToRelease := existingSequence - s.max - numberToAllocate := s.sequenceBatchSize - incrVal := numberToRelease + numberToAllocate - allocatedToSeq, err := s.incrementSequence(incrVal) + syncSeq, err := s.getSequence() if err != nil { - base.WarnfCtx(ctx, "Error from incrementSequence in nextSequenceGreaterThan(%d): %v", existingSequence, err) - s.mutex.Unlock() - return 0, 0, err + base.WarnfCtx(ctx, "Error returned when fetching current sequence during nextSequenceGreaterThan. Will be handled by skipped sequence handling. Error:%v", err) } // check for rollback of _sync:seq before continuing - minimumExpectedValue := incrVal + prevAllocReleaseTo - if allocatedToSeq < minimumExpectedValue { + if syncSeq < s.last { // rollback of _sync:seq detected - allocatedToSeq, err = s._fixSyncSeqRollback(ctx, allocatedToSeq, minimumExpectedValue) + syncSeq, err = s._fixSyncSeqRollback(ctx, syncSeq, s.last) if err != nil { s.mutex.Unlock() return 0, 0, err } } + // If the target sequence is less than the current syncSeq, allocate as normal using _nextSequence + if syncSeq >= targetSequence { + sequence, sequencesReserved, err := s._nextSequence(ctx) + s.mutex.Unlock() + if err != nil { + return 0, 0, err + } + if sequencesReserved { + s.reserveNotify <- struct{}{} + } + s.dbStats.SequenceAssignedCount.Add(1) + return sequence, releasedSequenceCount, nil + } + + // If the target sequence is greater than the current _sync:seq, we want to: + // (a) Reserve n sequences past _sync:seq, where n = existingSequence - syncSeq. It's ok if the resulting sequence exceeds targetSequence (if other nodes have allocated sequences and + // updated _sync:seq since we last updated s.max.) + // (b) Allocate a standard batch of sequences, and assign a sequence from that batch in the usual way. + // (c) Release the reserved sequences from part (a) + numberToRelease := existingSequence - syncSeq + numberToAllocate := s.sequenceBatchSize + incrVal := numberToRelease + numberToAllocate + allocatedToSeq, err := s.incrementSequence(incrVal) + if err != nil { + base.WarnfCtx(ctx, "Error from incrementSequence in nextSequenceGreaterThan(%d): %v", existingSequence, err) + s.mutex.Unlock() + return 0, 0, err + } + s.max = allocatedToSeq s.last = allocatedToSeq - numberToAllocate + 1 sequence = s.last @@ -267,15 +299,9 @@ func (s *sequenceAllocator) nextSequenceGreaterThan(ctx context.Context, existin // Perform standard batch handling and stats updates s.lastSequenceReserveTime = time.Now() s.reserveNotify <- struct{}{} - s.dbStats.SequenceReservedCount.Add(int64(numberToRelease + numberToAllocate)) + s.dbStats.SequenceReservedCount.Add(int64(incrVal)) s.dbStats.SequenceAssignedCount.Add(1) - // Release previously allocated sequences (c), if any - released, err := s.releaseSequenceRange(ctx, prevAllocReleaseFrom, prevAllocReleaseTo) - if err != nil { - base.WarnfCtx(ctx, "Error returned when releasing sequence range [%d-%d] for previously allocated sequences. Will be handled by skipped sequence handling. Error:%v", prevAllocReleaseFrom, prevAllocReleaseTo, err) - } - releasedSequenceCount += released // Release the newly allocated sequences that were used to catch up to existingSequence (d) if numberToRelease > 0 { releaseTo := allocatedToSeq - numberToAllocate diff --git a/db/sequence_allocator_test.go b/db/sequence_allocator_test.go index 0e3631d15e..f647ae4741 100644 --- a/db/sequence_allocator_test.go +++ b/db/sequence_allocator_test.go @@ -11,8 +11,13 @@ licenses/APL2.txt. package db import ( + "context" + "fmt" + "log" "math" + "math/rand" "sync" + "sync/atomic" "testing" "time" @@ -379,15 +384,14 @@ func TestNextSequenceGreaterThanMultiNode(t *testing.T) { assertNewAllocatorStats(t, dbStatsB, 1, 10, 2, 4) // calling nextSequenceGreaterThan(15) on A will increment _sync:seq by 5 on it's previously allocated sequence (10). - // Since node B has already updated _sync:seq to 20, will result in: + // Since node B has already updated _sync:seq to 20, calling nextSequenceGreaterThan(15) on A will result in: // node A releasing sequences 2-10 from it's existing buffer - // node A allocating and releasing sequences 21-24 - // node A adding sequences 25-35 to its buffer, and assigning 25 to the current request + // node A adding sequences 21-30 to its buffer, and assigning 21 to the current request nextSequence, releasedSequenceCount, err = a.nextSequenceGreaterThan(ctx, 15) assert.NoError(t, err) - assert.Equal(t, uint64(26), nextSequence) - assert.Equal(t, 14, int(releasedSequenceCount)) - assertNewAllocatorStats(t, dbStatsA, 2, 25, 2, 14) + assert.Equal(t, uint64(21), nextSequence) + assert.Equal(t, 9, int(releasedSequenceCount)) + assertNewAllocatorStats(t, dbStatsA, 2, 20, 2, 9) } @@ -511,32 +515,36 @@ func TestSingleNodeNextSeqGreaterThanRollbackHandling(t *testing.T) { // alter s.last to mock sequences being allocated a.last = 10 - // triggers correction value increase to 10 (sequence batch value) thus nextSeq is higher than you would expect + // nextSequenceGreaterThan fetches current _sync:seq, detects rollback and adjusts by + // delta from last allocated (5) + syncSeqCorrectionValue (500) nxtSeq, _, err = a.nextSequenceGreaterThan(ctx, 15) require.NoError(t, err) - assert.Equal(t, uint64(526), nxtSeq) - assert.Equal(t, uint64(526), a.last) - assert.Equal(t, uint64(535), a.max) + + assert.Equal(t, 521, int(nxtSeq)) + assert.Equal(t, 521, int(a.last)) + assert.Equal(t, 530, int(a.max)) // alter _sync:seq in bucket to end seq in prev batch err = ds.Set(a.metaKeys.SyncSeqKey(), 0, nil, 10) require.NoError(t, err) + // nextSequenceGreaterThan fetches current _sync:seq, detects rollback and adjusts by + // delta from last allocated (530-10=520) + syncSeqCorrectionValue (500) + batch size (10) nxtSeq, _, err = a.nextSequenceGreaterThan(ctx, 535) require.NoError(t, err) - assert.Equal(t, uint64(1046), nxtSeq) - assert.Equal(t, uint64(1046), a.last) - assert.Equal(t, uint64(1055), a.max) + assert.Equal(t, uint64(1041), nxtSeq) + assert.Equal(t, uint64(1041), a.last) + assert.Equal(t, uint64(1050), a.max) - // alter _sync:seq in bucket to end seq in prev batch start seq - err = ds.Set(a.metaKeys.SyncSeqKey(), 0, nil, 526) + // alter _sync:seq in bucket to start seq in prev batch + err = ds.Set(a.metaKeys.SyncSeqKey(), 0, nil, 1041) require.NoError(t, err) nxtSeq, _, err = a.nextSequenceGreaterThan(ctx, 1055) require.NoError(t, err) - assert.Equal(t, uint64(1566), nxtSeq) - assert.Equal(t, uint64(1566), a.last) - assert.Equal(t, uint64(1575), a.max) + assert.Equal(t, 1561, int(nxtSeq)) + assert.Equal(t, 1561, int(a.last)) + assert.Equal(t, 1570, int(a.max)) // alter _sync:seq in bucket to prev batch value err = ds.Set(a.metaKeys.SyncSeqKey(), 0, nil, 5) @@ -544,9 +552,9 @@ func TestSingleNodeNextSeqGreaterThanRollbackHandling(t *testing.T) { nxtSeq, _, err = a.nextSequenceGreaterThan(ctx, 1575) require.NoError(t, err) - assert.Equal(t, uint64(2086), nxtSeq) - assert.Equal(t, uint64(2086), a.last) - assert.Equal(t, uint64(2095), a.max) + assert.Equal(t, uint64(2081), nxtSeq) + assert.Equal(t, uint64(2081), a.last) + assert.Equal(t, uint64(2090), a.max) } // TestSyncSeqRollbackMultiNode: @@ -746,3 +754,137 @@ func TestFiveNodeRollbackMiddleNodesDetects(t *testing.T) { assert.Equal(t, uint64(551), a.last) assert.Equal(t, uint64(560), a.max) } + +// TestVariableRateAllocators simulates the following scenario: +// - import nodes have high sequence allocation rate +// - client-facing nodes have low sequence allocation rate +// - documents are imported, then the same documents are immediately updated by clients +// (including sequence validation triggering nextSequenceGreaterThan) +// +// Ensures we don't release more sequences than would be expected based on allocator batch size +func TestVariableRateAllocators(t *testing.T) { + ctx := base.TestCtx(t) + bucket := base.GetTestBucket(t) + defer bucket.Close(ctx) + var expectedAllocations uint64 + + dataStore := bucket.GetSingleDataStore() + stats, err := base.NewSyncGatewayStats() + require.NoError(t, err) + + importStats, err := stats.NewDBStats("import", false, false, false, nil, nil) + require.NoError(t, err) + + importFeedAllocator, err := newSequenceAllocator(ctx, dataStore, importStats.DatabaseStats, base.DefaultMetadataKeys) + require.NoError(t, err) + + // All test allocators are stopped when allocatorCtx is closed + allocatorCtx, cancelFunc := context.WithCancel(ctx) + + // Start import node allocator, performing 10000 allocations/second. + var allocatorWg sync.WaitGroup + allocatorWg.Add(1) + go func() { + count := runAllocator(allocatorCtx, importFeedAllocator, 100*time.Microsecond) // 10000 writes/second + atomic.AddUint64(&expectedAllocations, count) + allocatorWg.Done() + }() + + // Start multiple client node allocators, performing 100 allocations/second + clientAllocators := make([]*sequenceAllocator, 0) + clientAllocatorCount := 10 + for i := 0; i <= clientAllocatorCount; i++ { + clientStats, err := stats.NewDBStats(fmt.Sprintf("client%d", i), false, false, false, nil, nil) + require.NoError(t, err) + clientAllocator, err := newSequenceAllocator(ctx, dataStore, clientStats.DatabaseStats, base.DefaultMetadataKeys) + require.NoError(t, err) + clientAllocators = append(clientAllocators, clientAllocator) + allocatorWg.Add(1) + go func() { + count := runAllocator(allocatorCtx, clientAllocator, 10*time.Millisecond) // 100 writes/second + atomic.AddUint64(&expectedAllocations, count) + allocatorWg.Done() + }() + } + + // Wait for allocators to get up to maximum batch size + time.Sleep(500 * time.Millisecond) + documentCount := 10 + var updateWg sync.WaitGroup + updateWg.Add(documentCount) + for i := 0; i < documentCount; i++ { + go func() { + _ = multiNodeUpdate(t, ctx, importFeedAllocator, clientAllocators, 5, 10*time.Millisecond) + updateWg.Done() + atomic.AddUint64(&expectedAllocations, 6) + }() + } + + updateWg.Wait() + + // Stop background allocation goroutines, wait for them to close + cancelFunc() + allocatorWg.Wait() + + log.Printf("expectedSequence (num allocations):%v", atomic.LoadUint64(&expectedAllocations)) + + importFeedAllocator.Stop(ctx) + numAssigned := importFeedAllocator.dbStats.SequenceAssignedCount.Value() + numReleased := importFeedAllocator.dbStats.SequenceReleasedCount.Value() + for _, allocator := range clientAllocators { + allocator.Stop(ctx) + numAssigned += allocator.dbStats.SequenceAssignedCount.Value() + clientSequencesReleased := allocator.dbStats.SequenceReleasedCount.Value() + numReleased += clientSequencesReleased + + } + + log.Printf("Total sequences released + assigned: %v", numReleased+numAssigned) + actualSequence, err := importFeedAllocator.getSequence() + log.Printf("actual sequence (getSequence): %v", actualSequence) + require.NoError(t, err) +} + +// multiNodeUpdate obtains an initial sequence from an import allocator (import node), then performs repeated updates to the doc using random pool of iterators (random SG node). +// Performs sequenceGreaterThan, then ensures that allocator doesn't release more than the sequence batch size +func multiNodeUpdate(t *testing.T, ctx context.Context, importAllocator *sequenceAllocator, clientAllocators []*sequenceAllocator, updateCount int, interval time.Duration) (releasedCount uint64) { + currentSequence, _ := importAllocator.nextSequence(ctx) + + for i := 0; i < updateCount; i++ { + allocatorIndex := rand.Intn(len(clientAllocators)) + clientAllocator := clientAllocators[allocatorIndex] + nextSequence, err := clientAllocator.nextSequence(ctx) + require.NoError(t, err, "nextSequence error: %v", err) + if nextSequence < currentSequence { + prevNext := nextSequence + var numReleased uint64 + nextSequence, numReleased, err = clientAllocator.nextSequenceGreaterThan(ctx, currentSequence) + require.NoError(t, err, "nextSequenceGreaterThan error: %v", err) + log.Printf("allocator %d released %d sequences because next < current (%d < %d)", numReleased, allocatorIndex, prevNext, currentSequence) + // At most clientAllocator should only need to release the current batch + assert.LessOrEqual(t, numReleased, clientAllocator.sequenceBatchSize) + releasedCount += numReleased + } + currentSequence = nextSequence + time.Sleep(interval) + } + + return releasedCount +} + +func runAllocator(ctx context.Context, a *sequenceAllocator, frequency time.Duration) (allocationCount uint64) { + + allocationCount = 0 + ticker := time.NewTicker(frequency) + for { + select { + case <-ticker.C: + _, _ = a.nextSequence(ctx) + allocationCount++ + case <-ctx.Done(): + ticker.Stop() + log.Printf("allocator count: %v", allocationCount) + return allocationCount + } + } +} diff --git a/go.mod b/go.mod index 191a5b26d6..7a9ad9941e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/couchbase/sync_gateway -go 1.22 +go 1.22.0 require ( dario.cat/mergo v1.0.0