diff --git a/base/leaky_bucket.go b/base/leaky_bucket.go index d5042b3b83..062ac8b9ec 100644 --- a/base/leaky_bucket.go +++ b/base/leaky_bucket.go @@ -125,7 +125,12 @@ type LeakyBucketConfig struct { // Returns a partial error the first time ViewCustom is called FirstTimeViewCustomPartialError bool - PostQueryCallback func(ddoc, viewName string, params map[string]interface{}) // Issues callback after issuing query when bucket.ViewQuery is called + + // QueryCallback allows tests to set a callback that will be issued prior to issuing a view query + QueryCallback func(ddoc, viewName string, params map[string]any) error + PostQueryCallback func(ddoc, viewName string, params map[string]interface{}) // Issues callback after issuing query when bucket.ViewQuery is called + + N1QLQueryCallback func(ctx context.Context, statement string, params map[string]any, consistency ConsistencyMode, adhoc bool) error PostN1QLQueryCallback func() diff --git a/base/leaky_datastore.go b/base/leaky_datastore.go index 2e6f2952f1..ab1e7854ef 100644 --- a/base/leaky_datastore.go +++ b/base/leaky_datastore.go @@ -241,6 +241,12 @@ func (lds *LeakyDataStore) ViewQuery(ctx context.Context, ddoc, name string, par if !ok { return nil, errors.New("bucket does not support views") } + if lds.config.QueryCallback != nil { + err := lds.config.QueryCallback(ddoc, name, params) + if err != nil { + return nil, err + } + } iterator, err := vs.ViewQuery(ctx, ddoc, name, params) if lds.config.FirstTimeViewCustomPartialError { @@ -324,10 +330,14 @@ func (lds *LeakyDataStore) SetFirstTimeViewCustomPartialError(val bool) { lds.config.FirstTimeViewCustomPartialError = val } -func (lds *LeakyDataStore) SetPostQueryCallback(callback func(ddoc, viewName string, params map[string]interface{})) { +func (lds *LeakyDataStore) SetPostQueryCallback(callback func(ddoc, viewName string, params map[string]any)) { lds.config.PostQueryCallback = callback } +func (lds *LeakyDataStore) SetQueryCallback(fn func(ddoc, viewName string, params map[string]any) error) { + lds.config.QueryCallback = fn +} + func (lds *LeakyDataStore) SetPostN1QLQueryCallback(callback func()) { lds.config.PostN1QLQueryCallback = callback } @@ -447,6 +457,12 @@ func (lds *LeakyDataStore) Query(ctx context.Context, statement string, params m if err != nil { return nil, err } + if lds.config.N1QLQueryCallback != nil { + err := lds.config.N1QLQueryCallback(ctx, statement, params, consistency, adhoc) + if err != nil { + return nil, err + } + } iterator, err := n1qlStore.Query(ctx, statement, params, consistency, adhoc) if lds.config.PostN1QLQueryCallback != nil { diff --git a/db/active_replicator.go b/db/active_replicator.go index 7cae4bfa2c..6394364200 100644 --- a/db/active_replicator.go +++ b/db/active_replicator.go @@ -208,9 +208,10 @@ func connect(arc *activeReplicatorCommon, idSuffix string) (blipSender *blip.Sen arc.replicationStats.NumConnectAttempts.Add(1) var originPatterns []string // no origin headers for ISGR - // NewSGBlipContext doesn't set cancellation context - active replication cancellation on db close is handled independently - blipContext, err := NewSGBlipContext(arc.ctx, arc.config.ID+idSuffix, originPatterns, nil) + cancelCtx, cancelFunc := context.WithCancel(context.Background()) + blipContext, err := NewSGBlipContext(arc.ctx, arc.config.ID+idSuffix, originPatterns, cancelCtx) if err != nil { + cancelFunc() return nil, nil, err } blipContext.WebsocketPingInterval = arc.config.WebsocketPingInterval @@ -221,7 +222,10 @@ func connect(arc *activeReplicatorCommon, idSuffix string) (blipSender *blip.Sen } } - bsc = NewBlipSyncContext(arc.ctx, blipContext, arc.config.ActiveDB, blipContext.ID, arc.replicationStats) + bsc, err = NewBlipSyncContext(arc.ctx, blipContext, arc.config.ActiveDB, blipContext.ID, arc.replicationStats, cancelFunc) + if err != nil { + return nil, nil, err + } bsc.loggingCtx = base.CorrelationIDLogCtx( arc.config.ActiveDB.AddDatabaseLogContext(base.NewNonCancelCtx().Ctx), diff --git a/db/active_replicator_common.go b/db/active_replicator_common.go index 7e26edb7cb..069c1084e1 100644 --- a/db/active_replicator_common.go +++ b/db/active_replicator_common.go @@ -313,8 +313,9 @@ func (a *activeReplicatorCommon) getState() string { return a.state } -// requires a.stateErrorLock func (a *activeReplicatorCommon) _getStateWithErrorMessage() (state string, lastErrorMessage string) { + a.stateErrorLock.RLock() + defer a.stateErrorLock.RUnlock() if a.lastError == nil { return a.state, "" } @@ -357,6 +358,14 @@ func (a *activeReplicatorCommon) getCheckpointHighSeq() string { return highSeqStr } +// publishStatus updates the replication status document in the metadata store. +func (a *activeReplicatorCommon) publishStatus() { + a.lock.Lock() + defer a.lock.Unlock() + a._publishStatus() +} + +// _publishStatus updates the replication status document in the metadata store. Requires holding a.lock before calling. func (a *activeReplicatorCommon) _publishStatus() { status := a._getStatusCallback() err := setLocalStatus(a.ctx, a.config.ActiveDB.MetadataStore, a.statusKey, status, int(a.config.ActiveDB.Options.LocalDocExpirySecs)) diff --git a/db/active_replicator_push.go b/db/active_replicator_push.go index 79f22ed3f1..db89a94ad9 100644 --- a/db/active_replicator_push.go +++ b/db/active_replicator_push.go @@ -332,7 +332,7 @@ func (apr *ActivePushReplicator) _startSendingChanges(bh *blipHandler, since Seq apr.activeSendChanges.Add(1) go func(s *blip.Sender) { defer apr.activeSendChanges.Add(-1) - isComplete := bh.sendChanges(s, &sendChangesOptions{ + isComplete, err := bh.sendChanges(s, &sendChangesOptions{ docIDs: apr.config.DocIDs, since: since, continuous: apr.config.Continuous, @@ -344,8 +344,15 @@ func (apr *ActivePushReplicator) _startSendingChanges(bh *blipHandler, since Seq ignoreNoConflicts: true, // force the passive side to accept a "changes" message, even in no conflicts mode. changesCtx: collectionCtx.changesCtx, }) - // On a normal completion, call complete for the replication + if err != nil { + base.InfofCtx(apr.ctx, base.KeyReplicate, "Terminating blip connection due to changes feed error: %v", err) + bh.ctxCancelFunc() + _ = apr.setError(err) + apr.publishStatus() + return + } if isComplete { + // On a normal completion, call complete for the replication apr.Complete() } }(apr.blipSender) diff --git a/db/blip_handler.go b/db/blip_handler.go index d2db9ea508..218e0ec981 100644 --- a/db/blip_handler.go +++ b/db/blip_handler.go @@ -348,7 +348,7 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error { }() // sendChanges runs until blip context closes, or fails due to error startTime := time.Now() - _ = bh.sendChanges(rq.Sender, &sendChangesOptions{ + _, err = bh.sendChanges(rq.Sender, &sendChangesOptions{ docIDs: subChangesParams.docIDs(), since: subChangesParams.Since(), continuous: continuous, @@ -361,6 +361,10 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error { changesCtx: collectionCtx.changesCtx, requestPlusSeq: requestPlusSeq, }) + if err != nil { + base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "Closing blip connection due to changes feed error %+v\n", err) + bh.ctxCancelFunc() + } base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s --> Time:%v", bh.serialNumber, rq.Profile(), time.Since(startTime)) }() @@ -428,8 +432,8 @@ func (flag changesDeletedFlag) HasFlag(deletedFlag changesDeletedFlag) bool { return flag&deletedFlag != 0 } -// Sends all changes since the given sequence -func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions) (isComplete bool) { +// sendChanges will start a changes feed and send changes. Returns bool to indicate whether the changes feed finished and all changes were sent. The error value is only used to indicate a fatal error, where the blip connection should be terminated. If the blip connection is disconnected by the client, the error will be nil, but the boolean parameter will be false. +func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions) (bool, error) { defer func() { if panicked := recover(); panicked != nil { bh.replicationStats.NumHandlersPanicked.Add(1) @@ -472,11 +476,10 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions changesDb, err := bh.copyDatabaseCollectionWithUser(bh.collectionIdx) if err != nil { base.WarnfCtx(bh.loggingCtx, "[%s] error sending changes: %v", bh.blipContext.ID, err) - return false - + return false, err } - forceClose := generateBlipSyncChanges(bh.loggingCtx, changesDb, channelSet, options, opts.docIDs, func(changes []*ChangeEntry) error { + forceClose, err := generateBlipSyncChanges(bh.loggingCtx, changesDb, channelSet, options, opts.docIDs, func(changes []*ChangeEntry) error { base.DebugfCtx(bh.loggingCtx, base.KeySync, " Sending %d changes", len(changes)) for _, change := range changes { if !strings.HasPrefix(change.ID, "_") { @@ -538,8 +541,7 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions } bh.db.DatabaseContext.NotifyTerminatedChanges(bh.loggingCtx, user) } - - return !forceClose + return (err == nil && !forceClose), err } func (bh *blipHandler) buildChangesRow(change *ChangeEntry, revID string) []interface{} { diff --git a/db/blip_sync_context.go b/db/blip_sync_context.go index cb39b0c34b..927c596c8d 100644 --- a/db/blip_sync_context.go +++ b/db/blip_sync_context.go @@ -34,7 +34,10 @@ const ( var ErrClosedBLIPSender = errors.New("use of closed BLIP sender") -func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, contextID string, replicationStats *BlipSyncStats) *BlipSyncContext { +func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, contextID string, replicationStats *BlipSyncStats, ctxCancelFunc context.CancelFunc) (*BlipSyncContext, error) { + if ctxCancelFunc == nil { + return nil, errors.New("cancelCtxFunc is required") + } maxInFlightChangesBatches := DefaultMaxConcurrentChangesBatches if db.Options.MaxConcurrentChangesBatches != nil { maxInFlightChangesBatches = *db.Options.MaxConcurrentChangesBatches @@ -55,6 +58,7 @@ func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, con inFlightChangesThrottle: make(chan struct{}, maxInFlightChangesBatches), inFlightRevsThrottle: make(chan struct{}, maxInFlightRevs), collections: &blipCollections{}, + ctxCancelFunc: ctxCancelFunc, } if bsc.replicationStats == nil { bsc.replicationStats = NewBlipSyncStats() @@ -86,7 +90,7 @@ func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, con bsc.register(profile, handlerFn) } } - return bsc + return bsc, nil } // BlipSyncContext represents one BLIP connection (socket) opened by a client. @@ -133,6 +137,8 @@ type BlipSyncContext struct { collections *blipCollections // all collections handled by blipSyncContext, implicit or via GetCollections stats blipSyncStats // internal structure to store stats + + ctxCancelFunc context.CancelFunc // function to cancel a blip replication } // blipSyncStats has support structures to support reporting stats at regular interval @@ -248,6 +254,7 @@ func (bsc *BlipSyncContext) Close() { } bsc.reportStats(true) close(bsc.terminator) + bsc.ctxCancelFunc() }) } diff --git a/db/changes.go b/db/changes.go index 3cd7ab397e..b288ccfd18 100644 --- a/db/changes.go +++ b/db/changes.go @@ -1362,16 +1362,16 @@ func (options ChangesOptions) String() string { ) } -// Used by BLIP connections for changes. Supports both one-shot and continuous changes. -func generateBlipSyncChanges(ctx context.Context, database *DatabaseCollectionWithUser, inChannels base.Set, options ChangesOptions, docIDFilter []string, send func([]*ChangeEntry) error) (forceClose bool) { +// Used by BLIP connections for changes. Supports both one-shot and continuous changes. Returns an error in the case that the feed does not start up, or there is a fatal error in the feed. The caller is responsible for closing the connection, no more changes will be generated. forceClose will be true if connection was terminated underneath the changes feed. +func generateBlipSyncChanges(ctx context.Context, database *DatabaseCollectionWithUser, inChannels base.Set, options ChangesOptions, docIDFilter []string, send func([]*ChangeEntry) error) (forceClose bool, err error) { // Store one-shot here to protect isOneShot := !options.Continuous - err, forceClose := GenerateChanges(ctx, database, inChannels, options, docIDFilter, send) + err, forceClose = GenerateChanges(ctx, database, inChannels, options, docIDFilter, send) if _, ok := err.(*ChangesSendErr); ok { // If there was already an error in a send function, do not send last one shot changes message, since it probably will not work anyway. - return forceClose // error is probably because the client closed the connection + return forceClose, err // error is probably because the client closed the connection } // For one-shot changes, invoke the callback w/ nil to trigger the 'caught up' changes message. (For continuous changes, this @@ -1379,7 +1379,7 @@ func generateBlipSyncChanges(ctx context.Context, database *DatabaseCollectionWi if isOneShot { _ = send(nil) } - return forceClose + return forceClose, err } type ChangesSendErr struct{ error } @@ -1416,6 +1416,7 @@ func GenerateChanges(ctx context.Context, database *DatabaseCollectionWithUser, var lastSeq SequenceID var feed <-chan *ChangeEntry var timeout <-chan time.Time + var feedErr error // feedStarted identifies whether at least one MultiChangesFeed has been started. Used to identify when a one-shot changes is done. feedStarted := false @@ -1437,7 +1438,6 @@ loop: forceClose = true break loop } - var feedErr error if len(docIDFilter) > 0 { feed, feedErr = database.DocIDChangesFeed(ctx, inChannels, docIDFilter, options) } else { @@ -1456,7 +1456,6 @@ loop: } var sendErr error - // Wait for either a new change, a heartbeat, or a timeout: select { case entry, ok := <-feed: @@ -1465,6 +1464,7 @@ loop: } else if entry == nil { sendErr = send(nil) } else if entry.Err != nil { + feedErr = entry.Err break loop // error returned by feed - end changes } else { entries := []*ChangeEntry{entry} @@ -1481,6 +1481,7 @@ loop: waiting = true break collect } else if entry.Err != nil { + feedErr = entry.Err break loop // error returned by feed - end changes } entries = append(entries, entry) @@ -1533,5 +1534,5 @@ loop: forceClose = true } - return nil, forceClose + return feedErr, forceClose } diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index 4aa78bde27..9afad13938 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -11,6 +11,7 @@ licenses/APL2.txt. package rest import ( + "context" "encoding/base64" "encoding/json" "fmt" @@ -1205,7 +1206,7 @@ func TestBlipSendConcurrentRevs(t *testing.T) { concurrentSendRevNum = 50 ) rt := NewRestTester(t, &RestTesterConfig{ - leakyBucketConfig: &base.LeakyBucketConfig{ + LeakyBucketConfig: &base.LeakyBucketConfig{ UpdateCallback: func(_ string) { time.Sleep(time.Millisecond * 5) // slow down rosmar - it's too quick to be throttled }, @@ -3187,3 +3188,44 @@ func TestBlipDatabaseClose(t *testing.T) { }, time.Second*10, time.Millisecond*100) }) } + +// Starts a continuous pull replication then updates the db to trigger a close. +func TestChangesFeedExitDisconnect(t *testing.T) { + + base.SetUpTestLogging(t, base.LevelInfo, base.KeyHTTP, base.KeySync, base.KeySyncMsg, base.KeyChanges, base.KeyCache) + btcRunner := NewBlipTesterClientRunner(t) + var shouldChannelQueryError atomic.Bool + btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { + rt := NewRestTester(t, &RestTesterConfig{ + LeakyBucketConfig: &base.LeakyBucketConfig{ + QueryCallback: func(ddoc, viewname string, params map[string]any) error { + if viewname == "channels" && shouldChannelQueryError.Load() { + return gocb.ErrTimeout + } + return nil + }, + N1QLQueryCallback: func(_ context.Context, statement string, params map[string]any, consistency base.ConsistencyMode, adhoc bool) error { + if strings.Contains(statement, "sg_channels") && shouldChannelQueryError.Load() { + return gocb.ErrTimeout + } + return nil + }, + }, + }) + defer rt.Close() + const username = "alice" + rt.CreateUser(username, []string{"*"}) + btc := btcRunner.NewBlipTesterClientOptsWithRT(rt, &BlipTesterClientOpts{Username: username}) + var blipContextClosed atomic.Bool + btcRunner.clients[btc.id].pullReplication.bt.blipContext.OnExitCallback = func() { + blipContextClosed.Store(true) + } + + shouldChannelQueryError.Store(true) + btcRunner.StartPull(btc.id) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + assert.True(c, blipContextClosed.Load()) + }, time.Second*10, time.Millisecond*100) + }) +} diff --git a/rest/blip_sync.go b/rest/blip_sync.go index cf4fb0a2a6..7791f98041 100644 --- a/rest/blip_sync.go +++ b/rest/blip_sync.go @@ -51,9 +51,11 @@ func (h *handler) handleBLIPSync() error { // error is checked at the time of database load, and ignored at this time originPatterns, _ := hostOnlyCORS(h.db.CORS.Origin) + cancelCtx, cancelCtxFunc := context.WithCancel(h.db.DatabaseContext.CancelContext) // Create a BLIP context: - blipContext, err := db.NewSGBlipContext(h.ctx(), "", originPatterns, h.db.DatabaseContext.CancelContext) + blipContext, err := db.NewSGBlipContext(h.ctx(), "", originPatterns, cancelCtx) if err != nil { + cancelCtxFunc() return err } @@ -61,7 +63,10 @@ func (h *handler) handleBLIPSync() error { h.rqCtx = base.CorrelationIDLogCtx(h.ctx(), base.FormatBlipContextID(blipContext.ID)) h.response.Header().Set(db.BLIPCorrelationIDResponseHeader, blipContext.ID) // Create a new BlipSyncContext attached to the given blipContext. - ctx := db.NewBlipSyncContext(h.rqCtx, blipContext, h.db, h.formatSerialNumber(), db.BlipSyncStatsForCBL(h.db.DbStats)) + ctx, err := db.NewBlipSyncContext(h.rqCtx, blipContext, h.db, h.formatSerialNumber(), db.BlipSyncStatsForCBL(h.db.DbStats), cancelCtxFunc) + if err != nil { + return err + } defer ctx.Close() auditFields := base.AuditFields{base.AuditFieldReplicationID: base.FormatBlipContextID(blipContext.ID)} diff --git a/rest/replicatortest/replicator_test.go b/rest/replicatortest/replicator_test.go index 19c8a1c644..ed87b4bc97 100644 --- a/rest/replicatortest/replicator_test.go +++ b/rest/replicatortest/replicator_test.go @@ -9,6 +9,7 @@ package replicatortest import ( + "context" "encoding/json" "expvar" "fmt" @@ -25,14 +26,16 @@ import ( "testing" "time" + "github.com/couchbase/gocb/v2" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/couchbase/sync_gateway/auth" "github.com/couchbase/sync_gateway/base" "github.com/couchbase/sync_gateway/channels" "github.com/couchbase/sync_gateway/db" "github.com/couchbase/sync_gateway/rest" - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestReplicationAPI(t *testing.T) { @@ -8568,12 +8571,95 @@ func TestDbConfigNoOverwriteReplications(t *testing.T) { require.Equal(t, startReplicationConfig.Direction, config.Direction) } +func TestActiveReplicatorChangesFeedExit(t *testing.T) { + base.RequireNumTestBuckets(t, 2) + + base.SetUpTestLogging(t, base.LevelDebug, base.KeyHTTP, base.KeySync, base.KeyChanges, base.KeyCRUD, base.KeyBucket) + + var shouldChannelQueryError atomic.Bool + activeRT := rest.NewRestTester(t, &rest.RestTesterConfig{ + LeakyBucketConfig: &base.LeakyBucketConfig{ + QueryCallback: func(ddoc, viewname string, params map[string]any) error { + if viewname == "channels" && shouldChannelQueryError.Load() { + shouldChannelQueryError.Store(false) + return gocb.ErrTimeout + } + return nil + }, + N1QLQueryCallback: func(_ context.Context, statement string, params map[string]any, consistency base.ConsistencyMode, adhoc bool) error { + // * channel query uses all docs index + if strings.Contains(statement, "sg_allDocs") && shouldChannelQueryError.Load() { + shouldChannelQueryError.Store(false) + return gocb.ErrTimeout + } + return nil + }, + }, + }) + t.Cleanup(activeRT.Close) + _ = activeRT.Bucket() + + passiveRT := rest.NewRestTesterPersistentConfig(t) + t.Cleanup(passiveRT.Close) + + username := "alice" + passiveRT.CreateUser(username, []string{"*"}) + passiveDBURL := passiveDBURLForAlice(passiveRT, username) + stats := dbReplicatorStats(t) + ar, err := db.NewActiveReplicator(activeRT.Context(), &db.ActiveReplicatorConfig{ + ID: t.Name(), + Direction: db.ActiveReplicatorTypePush, + RemoteDBURL: passiveDBURL, + ActiveDB: &db.Database{ + DatabaseContext: activeRT.GetDatabase(), + }, + ChangesBatchSize: 200, + Continuous: false, + ReplicationStatsMap: stats, + CollectionsEnabled: !activeRT.GetDatabase().OnlyDefaultCollection(), + }) + require.NoError(t, err) + t.Cleanup(func() { assert.NoError(t, ar.Stop()) }) + + docID := "doc1" + _ = activeRT.CreateTestDoc(docID) + + shouldChannelQueryError.Store(true) + require.NoError(t, ar.Start(activeRT.Context())) + + changesResults, err := passiveRT.WaitForChanges(1, "/{{.keyspace}}/_changes?since=0", "", true) + require.NoError(t, err) + require.Len(t, changesResults.Results, 1) + require.Equal(t, docID, changesResults.Results[0].ID) + require.Equal(t, int64(2), stats.NumConnectAttemptsPush.Value()) +} func requireBodyEqual(t *testing.T, expected string, doc *db.Document) { var expectedBody db.Body require.NoError(t, base.JSONUnmarshal([]byte(expected), &expectedBody)) require.Equal(t, expectedBody, doc.Body(base.TestCtx(t))) } +func dbReplicatorStats(t *testing.T) *base.DbReplicatorStats { + stats, err := base.SyncGatewayStats.NewDBStats(t.Name(), false, false, false, nil, nil) + require.NoError(t, err) + dbstats, err := stats.DBReplicatorStats(t.Name()) + require.NoError(t, err) + return dbstats +} + +// passiveDBURLForAlice creates a public server for the passive RT and returns the URL for the alice user, e.g. http://alice:password@localhost:1234/dbname +func passiveDBURLForAlice(rt *rest.RestTester, username string) *url.URL { + srv := httptest.NewServer(rt.TestPublicHandler()) + rt.TB().Cleanup(srv.Close) + + passiveDBURL, err := url.Parse(srv.URL + "/" + rt.GetDatabase().Name) + require.NoError(rt.TB(), err) + + // Add basic auth creds to target db URL + passiveDBURL.User = url.UserPassword(username, rest.RestTesterDefaultUserPassword) + return passiveDBURL +} + func TestReplicationConfigUpdatedAt(t *testing.T) { base.RequireNumTestBuckets(t, 2) diff --git a/rest/revocation_test.go b/rest/revocation_test.go index d0bab3e9fa..25167cafeb 100644 --- a/rest/revocation_test.go +++ b/rest/revocation_test.go @@ -2413,7 +2413,7 @@ func TestRevocationGetSyncDataError(t *testing.T) { // Two callbacks to cover usage with CBS/Xattrs and without revocationTester, rt := InitScenario( t, &RestTesterConfig{ - leakyBucketConfig: &base.LeakyBucketConfig{ + LeakyBucketConfig: &base.LeakyBucketConfig{ GetWithXattrCallback: func(key string) error { return fmt.Errorf("Leaky Bucket GetWithXattrCallback Error") }, GetRawCallback: func(key string) error { diff --git a/rest/utilities_testing.go b/rest/utilities_testing.go index c2d4bbb907..520f607e4d 100644 --- a/rest/utilities_testing.go +++ b/rest/utilities_testing.go @@ -60,7 +60,7 @@ type RestTesterConfig struct { EnableNoConflictsMode bool // Enable no-conflicts mode. By default, conflicts will be allowed, which is the default behavior EnableUserQueries bool // Enable the feature-flag for user N1QL/etc queries CustomTestBucket *base.TestBucket // If set, use this bucket instead of requesting a new one. - leakyBucketConfig *base.LeakyBucketConfig // Set to create and use a leaky bucket on the RT and DB. A test bucket cannot be passed in if using this option. + LeakyBucketConfig *base.LeakyBucketConfig // Set to create and use a leaky bucket on the RT and DB. A test bucket cannot be passed in if using this option. adminInterface string // adminInterface overrides the default admin interface. SgReplicateEnabled bool // SgReplicateManager disabled by default for RestTester AutoImport *bool @@ -178,14 +178,14 @@ func (rt *RestTester) Bucket() base.Bucket { testBucket := rt.RestTesterConfig.CustomTestBucket if testBucket == nil { testBucket = base.GetTestBucket(rt.TB()) - if rt.leakyBucketConfig != nil { - leakyConfig := *rt.leakyBucketConfig + if rt.LeakyBucketConfig != nil { + leakyConfig := *rt.LeakyBucketConfig // Ignore closures to avoid double closing panics leakyConfig.IgnoreClose = true testBucket = testBucket.LeakyBucketClone(leakyConfig) } - } else if rt.leakyBucketConfig != nil { - rt.TB().Fatalf("A passed in TestBucket cannot be used on the RestTester when defining a leakyBucketConfig") + } else if rt.LeakyBucketConfig != nil { + rt.TB().Fatalf("A passed in TestBucket cannot be used on the RestTester when defining a LeakyBucketConfig") } rt.TestBucket = testBucket @@ -361,7 +361,7 @@ func (rt *RestTester) Bucket() base.Bucket { } _, isLeaky := base.AsLeakyBucket(rt.TestBucket) var err error - if rt.leakyBucketConfig != nil || isLeaky { + if rt.LeakyBucketConfig != nil || isLeaky { _, err = rt.RestTesterServerContext.AddDatabaseFromConfigWithBucket(ctx, rt.TB(), *rt.DatabaseConfig, testBucket.Bucket) } else { _, err = rt.RestTesterServerContext.AddDatabaseFromConfig(ctx, *rt.DatabaseConfig) @@ -449,11 +449,11 @@ func GetDataStoreNamesFromScopesConfig(config ScopesConfig) []sgbucket.DataStore } // LeakyBucket gets the bucket from the RestTester as a leaky bucket allowing for callbacks to be set on the fly. -// The RestTester must have been set up to create and use a leaky bucket by setting leakyBucketConfig in the RT +// The RestTester must have been set up to create and use a leaky bucket by setting LeakyBucketConfig in the RT // config when calling NewRestTester. func (rt *RestTester) LeakyBucket() *base.LeakyDataStore { - if rt.leakyBucketConfig == nil { - rt.TB().Fatalf("Cannot get leaky bucket when leakyBucketConfig was not set on RestTester initialisation") + if rt.LeakyBucketConfig == nil { + rt.TB().Fatalf("Cannot get leaky bucket when LeakyBucketConfig was not set on RestTester initialisation") } leakyDataStore, ok := base.AsLeakyDataStore(rt.Bucket().DefaultDataStore()) if !ok {