From 4a25580d5db8b9864174318d88f572404e1eb287 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Mon, 18 Nov 2024 15:48:11 -0500 Subject: [PATCH 1/9] CBG-4370 create a cancel context inside BlipSyncContext This cancel context allows a forceable closure of the underlying blip connection. In the case there is a continuous pull replication and there is an error on the changes feed, the only way to stop the pull replication is to shut down the connection. CBL clients do not listen to unsolicited error messages. --- base/leaky_bucket.go | 7 ++- base/leaky_datastore.go | 18 ++++++- db/active_replicator.go | 11 ++-- db/active_replicator_push.go | 5 +- db/active_replicator_push_collections.go | 5 +- db/blip_handler.go | 17 +++--- db/blip_sync_context.go | 67 +++++++++++++++++------- db/changes.go | 24 +++++---- rest/blip_api_crud_test.go | 42 +++++++++++++++ rest/blip_sync.go | 32 +++++------ rest/changes_api.go | 2 +- 11 files changed, 169 insertions(+), 61 deletions(-) diff --git a/base/leaky_bucket.go b/base/leaky_bucket.go index d5042b3b83..062ac8b9ec 100644 --- a/base/leaky_bucket.go +++ b/base/leaky_bucket.go @@ -125,7 +125,12 @@ type LeakyBucketConfig struct { // Returns a partial error the first time ViewCustom is called FirstTimeViewCustomPartialError bool - PostQueryCallback func(ddoc, viewName string, params map[string]interface{}) // Issues callback after issuing query when bucket.ViewQuery is called + + // QueryCallback allows tests to set a callback that will be issued prior to issuing a view query + QueryCallback func(ddoc, viewName string, params map[string]any) error + PostQueryCallback func(ddoc, viewName string, params map[string]interface{}) // Issues callback after issuing query when bucket.ViewQuery is called + + N1QLQueryCallback func(ctx context.Context, statement string, params map[string]any, consistency ConsistencyMode, adhoc bool) error PostN1QLQueryCallback func() diff --git a/base/leaky_datastore.go b/base/leaky_datastore.go index 2e6f2952f1..ab1e7854ef 100644 --- a/base/leaky_datastore.go +++ b/base/leaky_datastore.go @@ -241,6 +241,12 @@ func (lds *LeakyDataStore) ViewQuery(ctx context.Context, ddoc, name string, par if !ok { return nil, errors.New("bucket does not support views") } + if lds.config.QueryCallback != nil { + err := lds.config.QueryCallback(ddoc, name, params) + if err != nil { + return nil, err + } + } iterator, err := vs.ViewQuery(ctx, ddoc, name, params) if lds.config.FirstTimeViewCustomPartialError { @@ -324,10 +330,14 @@ func (lds *LeakyDataStore) SetFirstTimeViewCustomPartialError(val bool) { lds.config.FirstTimeViewCustomPartialError = val } -func (lds *LeakyDataStore) SetPostQueryCallback(callback func(ddoc, viewName string, params map[string]interface{})) { +func (lds *LeakyDataStore) SetPostQueryCallback(callback func(ddoc, viewName string, params map[string]any)) { lds.config.PostQueryCallback = callback } +func (lds *LeakyDataStore) SetQueryCallback(fn func(ddoc, viewName string, params map[string]any) error) { + lds.config.QueryCallback = fn +} + func (lds *LeakyDataStore) SetPostN1QLQueryCallback(callback func()) { lds.config.PostN1QLQueryCallback = callback } @@ -447,6 +457,12 @@ func (lds *LeakyDataStore) Query(ctx context.Context, statement string, params m if err != nil { return nil, err } + if lds.config.N1QLQueryCallback != nil { + err := lds.config.N1QLQueryCallback(ctx, statement, params, consistency, adhoc) + if err != nil { + return nil, err + } + } iterator, err := n1qlStore.Query(ctx, statement, params, consistency, adhoc) if lds.config.PostN1QLQueryCallback != nil { diff --git a/db/active_replicator.go b/db/active_replicator.go index 7cae4bfa2c..5cea473b6d 100644 --- a/db/active_replicator.go +++ b/db/active_replicator.go @@ -207,12 +207,15 @@ func (ar *ActiveReplicator) GetStatus(ctx context.Context) *ReplicationStatus { func connect(arc *activeReplicatorCommon, idSuffix string) (blipSender *blip.Sender, bsc *BlipSyncContext, err error) { arc.replicationStats.NumConnectAttempts.Add(1) - var originPatterns []string // no origin headers for ISGR - // NewSGBlipContext doesn't set cancellation context - active replication cancellation on db close is handled independently - blipContext, err := NewSGBlipContext(arc.ctx, arc.config.ID+idSuffix, originPatterns, nil) + bsc, err = NewBlipSyncContext(arc.ctx, BlipSyncContextOptions{ + DB: arc.config.ActiveDB, + ReplicationStats: arc.replicationStats, + BlipID: arc.config.ID + idSuffix, + }) if err != nil { return nil, nil, err } + blipContext := bsc.BlipContext() blipContext.WebsocketPingInterval = arc.config.WebsocketPingInterval blipContext.OnExitCallback = func() { // fall into a reconnect loop only if the connection is unexpectedly closed. @@ -221,8 +224,6 @@ func connect(arc *activeReplicatorCommon, idSuffix string) (blipSender *blip.Sen } } - bsc = NewBlipSyncContext(arc.ctx, blipContext, arc.config.ActiveDB, blipContext.ID, arc.replicationStats) - bsc.loggingCtx = base.CorrelationIDLogCtx( arc.config.ActiveDB.AddDatabaseLogContext(base.NewNonCancelCtx().Ctx), arc.config.ID+idSuffix) diff --git a/db/active_replicator_push.go b/db/active_replicator_push.go index e1a295b798..38b3adf62e 100644 --- a/db/active_replicator_push.go +++ b/db/active_replicator_push.go @@ -328,7 +328,7 @@ func (apr *ActivePushReplicator) _startPushNonCollection() error { apr.activeSendChanges.Set(true) go func(s *blip.Sender) { defer apr.activeSendChanges.Set(false) - isComplete := bh.sendChanges(s, &sendChangesOptions{ + isComplete, err := bh.sendChanges(s, &sendChangesOptions{ docIDs: apr.config.DocIDs, since: apr.defaultCollection.Checkpointer.lastCheckpointSeq, continuous: apr.config.Continuous, @@ -340,6 +340,9 @@ func (apr *ActivePushReplicator) _startPushNonCollection() error { ignoreNoConflicts: true, // force the passive side to accept a "changes" message, even in no conflicts mode. changesCtx: collectionCtx.changesCtx, }) + if err != nil { + base.InfofCtx(apr.ctx, base.KeyReplicate, "Error sending changes: %v", err) + } // On a normal completion, call complete for the replication if isComplete { apr.Complete() diff --git a/db/active_replicator_push_collections.go b/db/active_replicator_push_collections.go index 05d110f40a..6e6a749327 100644 --- a/db/active_replicator_push_collections.go +++ b/db/active_replicator_push_collections.go @@ -76,7 +76,7 @@ func (apr *ActivePushReplicator) _startPushWithCollections() error { apr.activeSendChanges.Set(true) go func(s *blip.Sender) { defer apr.activeSendChanges.Set(false) - isComplete := bh.sendChanges(s, &sendChangesOptions{ + isComplete, err := bh.sendChanges(s, &sendChangesOptions{ docIDs: apr.config.DocIDs, since: replicationCollection.Checkpointer.lastCheckpointSeq, continuous: apr.config.Continuous, @@ -88,6 +88,9 @@ func (apr *ActivePushReplicator) _startPushWithCollections() error { ignoreNoConflicts: true, // force the passive side to accept a "changes" message, even in no conflicts mode. changesCtx: c.changesCtx, }) + if err != nil { + base.InfofCtx(apr.ctx, base.KeyReplicate, "Error sending changes: %v", err) + } // On a normal completion, call complete for the replication if isComplete { apr.Complete() diff --git a/db/blip_handler.go b/db/blip_handler.go index a192e2d762..09b829e972 100644 --- a/db/blip_handler.go +++ b/db/blip_handler.go @@ -348,7 +348,7 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error { }() // sendChanges runs until blip context closes, or fails due to error startTime := time.Now() - _ = bh.sendChanges(rq.Sender, &sendChangesOptions{ + _, err = bh.sendChanges(rq.Sender, &sendChangesOptions{ docIDs: subChangesParams.docIDs(), since: subChangesParams.Since(), continuous: continuous, @@ -362,6 +362,11 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error { requestPlusSeq: requestPlusSeq, }) base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s --> Time:%v", bh.serialNumber, rq.Profile(), time.Since(startTime)) + if err != nil { + base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "Closing blip connection due to changes feed error %+v\n", err) + bh.ctxCancelFunc() + } + base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s --> Time:%v", bh.serialNumber, rq.Profile(), time.Since(startTime)) }() auditFields := base.AuditFields{ @@ -429,7 +434,7 @@ func (flag changesDeletedFlag) HasFlag(deletedFlag changesDeletedFlag) bool { } // Sends all changes since the given sequence -func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions) (isComplete bool) { +func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions) (bool, error) { defer func() { if panicked := recover(); panicked != nil { bh.replicationStats.NumHandlersPanicked.Add(1) @@ -472,11 +477,10 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions changesDb, err := bh.copyDatabaseCollectionWithUser(bh.collectionIdx) if err != nil { base.WarnfCtx(bh.loggingCtx, "[%s] error sending changes: %v", bh.blipContext.ID, err) - return false - + return false, err } - forceClose := generateBlipSyncChanges(bh.loggingCtx, changesDb, channelSet, options, opts.docIDs, func(changes []*ChangeEntry) error { + forceClose, err := generateBlipSyncChanges(bh.loggingCtx, changesDb, channelSet, options, opts.docIDs, func(changes []*ChangeEntry) error { base.DebugfCtx(bh.loggingCtx, base.KeySync, " Sending %d changes", len(changes)) for _, change := range changes { if !strings.HasPrefix(change.ID, "_") { @@ -539,8 +543,7 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions } bh.db.DatabaseContext.NotifyTerminatedChanges(bh.loggingCtx, user) } - - return !forceClose + return !forceClose, err } func (bh *blipHandler) buildChangesRow(change *ChangeEntry, revID string) []interface{} { diff --git a/db/blip_sync_context.go b/db/blip_sync_context.go index cb39b0c34b..847f892262 100644 --- a/db/blip_sync_context.go +++ b/db/blip_sync_context.go @@ -34,37 +34,64 @@ const ( var ErrClosedBLIPSender = errors.New("use of closed BLIP sender") -func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, contextID string, replicationStats *BlipSyncStats) *BlipSyncContext { +// BlipSyncContextOptions provides options for creating a new BlipSyncContext +type BlipSyncContextOptions struct { + DB *Database // The backing database + ReplicationStats *BlipSyncStats + OriginPatterns []string // origin patterns for CORS checking + BlipID string // if specified, becomes blip.Context.ID, otherwise this is automatically assigned + CorrelationID string // correlation ID for logging + ClientType BLIPSyncContextClientType // name of the client + CancelContext context.Context // optional context for cancelation +} + +// NewBlipSyncContext creates a BlipSyncContext for listening to blip messages and an associated blip.Context. +func NewBlipSyncContext(ctx context.Context, opts BlipSyncContextOptions) (*BlipSyncContext, error) { + cancelCtx := opts.CancelContext + if cancelCtx == nil { + cancelCtx = context.Background() // create a local context for the BlipSyncContext + } + cancelCtx, cancelFunc := context.WithCancel(cancelCtx) + bc, err := NewSGBlipContext(ctx, opts.BlipID, opts.OriginPatterns, cancelCtx) + if err != nil { + cancelFunc() + return nil, err + } + maxInFlightChangesBatches := DefaultMaxConcurrentChangesBatches - if db.Options.MaxConcurrentChangesBatches != nil { - maxInFlightChangesBatches = *db.Options.MaxConcurrentChangesBatches + if opts.DB.Options.MaxConcurrentChangesBatches != nil { + maxInFlightChangesBatches = *opts.DB.Options.MaxConcurrentChangesBatches } maxInFlightRevs := DefaultMaxConcurrentRevs - if db.Options.MaxConcurrentRevs != nil { - maxInFlightRevs = *db.Options.MaxConcurrentRevs + if opts.DB.Options.MaxConcurrentRevs != nil { + maxInFlightRevs = *opts.DB.Options.MaxConcurrentRevs } bsc := &BlipSyncContext{ blipContext: bc, - blipContextDb: db, + blipContextDb: opts.DB, loggingCtx: ctx, terminator: make(chan bool), - userChangeWaiter: db.NewUserWaiter(), - sgCanUseDeltas: db.DeltaSyncEnabled(), - replicationStats: replicationStats, + userChangeWaiter: opts.DB.NewUserWaiter(), + sgCanUseDeltas: opts.DB.DeltaSyncEnabled(), + replicationStats: opts.ReplicationStats, inFlightChangesThrottle: make(chan struct{}, maxInFlightChangesBatches), inFlightRevsThrottle: make(chan struct{}, maxInFlightRevs), collections: &blipCollections{}, + ctxCancelFunc: cancelFunc, + } + if opts.ClientType != "" { + bsc.clientType = opts.ClientType } if bsc.replicationStats == nil { bsc.replicationStats = NewBlipSyncStats() } bsc.stats.lastReportTime.Store(time.Now().UnixMilli()) - if u := db.User(); u != nil { + if u := opts.DB.User(); u != nil { bsc.userName = u.Name() u.InitializeRoles() - if u.Name() == "" && db.IsGuestReadOnly() { + if u.Name() == "" && opts.DB.IsGuestReadOnly() { bsc.readOnly = true } } @@ -72,7 +99,7 @@ func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, con // Register default handlers bc.DefaultHandler = bsc.NotFoundHandler bc.FatalErrorHandler = func(err error) { - base.InfofCtx(ctx, base.KeyHTTP, "%s: --> BLIP+WebSocket connection error: %v", contextID, err) + base.InfofCtx(ctx, base.KeyHTTP, "%s: --> BLIP+WebSocket connection error: %v", opts.CorrelationID, err) } // Register 2.x replicator handlers @@ -80,13 +107,13 @@ func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, con bsc.register(profile, handlerFn) } - if db.Options.UnsupportedOptions.ConnectedClient { + if opts.DB.Options.UnsupportedOptions.ConnectedClient { // Register Connected Client handlers for profile, handlerFn := range kConnectedClientHandlersByProfile { bsc.register(profile, handlerFn) } } - return bsc + return bsc, nil } // BlipSyncContext represents one BLIP connection (socket) opened by a client. @@ -133,6 +160,8 @@ type BlipSyncContext struct { collections *blipCollections // all collections handled by blipSyncContext, implicit or via GetCollections stats blipSyncStats // internal structure to store stats + + ctxCancelFunc context.CancelFunc // function which will terminate the BlipSyncContext connections } // blipSyncStats has support structures to support reporting stats at regular interval @@ -160,10 +189,6 @@ func (bsc *BlipSyncContext) SetActiveCBMobileSubprotocol(subprotocol string) (er return err } -func (bsc *BlipSyncContext) SetClientType(clientType BLIPSyncContextClientType) { - bsc.clientType = clientType -} - // Registers a BLIP handler including the outer-level work of logging & error handling. // Includes the outer handler as a nested function. func (bsc *BlipSyncContext) register(profile string, handlerFn func(*blipHandler, *blip.Message) error) { @@ -246,6 +271,7 @@ func (bsc *BlipSyncContext) Close() { collection.changesCtxCancel() } + bsc.ctxCancelFunc() bsc.reportStats(true) close(bsc.terminator) }) @@ -773,3 +799,8 @@ func (bsc *BlipSyncContext) reportStats(updateImmediately bool) { bsc.stats.lastReportTime.Store(currentTime) } + +// BlipContext returns blip.Context for the BlipSyncContext +func (bsc *BlipSyncContext) BlipContext() *blip.Context { + return bsc.blipContext +} diff --git a/db/changes.go b/db/changes.go index 3cd7ab397e..8db0d317b4 100644 --- a/db/changes.go +++ b/db/changes.go @@ -1362,16 +1362,16 @@ func (options ChangesOptions) String() string { ) } -// Used by BLIP connections for changes. Supports both one-shot and continuous changes. -func generateBlipSyncChanges(ctx context.Context, database *DatabaseCollectionWithUser, inChannels base.Set, options ChangesOptions, docIDFilter []string, send func([]*ChangeEntry) error) (forceClose bool) { +// Used by BLIP connections for changes. Supports both one-shot and continuous changes. Returns an error in the case that the feed does not start up, or there is a fatal error in the feed. The caller is responsible for closing the connection, no more changes will be generated. forceClose will be true if connection was terminated underneath the changes feed. +func generateBlipSyncChanges(ctx context.Context, database *DatabaseCollectionWithUser, inChannels base.Set, options ChangesOptions, docIDFilter []string, send func([]*ChangeEntry) error) (forceClose bool, err error) { // Store one-shot here to protect isOneShot := !options.Continuous - err, forceClose := GenerateChanges(ctx, database, inChannels, options, docIDFilter, send) + forceClose, err = GenerateChanges(ctx, database, inChannels, options, docIDFilter, send) if _, ok := err.(*ChangesSendErr); ok { // If there was already an error in a send function, do not send last one shot changes message, since it probably will not work anyway. - return forceClose // error is probably because the client closed the connection + return forceClose, err // error is probably because the client closed the connection } // For one-shot changes, invoke the callback w/ nil to trigger the 'caught up' changes message. (For continuous changes, this @@ -1379,7 +1379,7 @@ func generateBlipSyncChanges(ctx context.Context, database *DatabaseCollectionWi if isOneShot { _ = send(nil) } - return forceClose + return forceClose, err } type ChangesSendErr struct{ error } @@ -1387,7 +1387,7 @@ type ChangesSendErr struct{ error } // Shell of the continuous changes feed -- calls out to a `send` function to deliver the change. // This is called from BLIP connections as well as HTTP handlers, which is why this is not a // method on `handler`. -func GenerateChanges(ctx context.Context, database *DatabaseCollectionWithUser, inChannels base.Set, options ChangesOptions, docIDFilter []string, send func([]*ChangeEntry) error) (err error, forceClose bool) { +func GenerateChanges(ctx context.Context, database *DatabaseCollectionWithUser, inChannels base.Set, options ChangesOptions, docIDFilter []string, send func([]*ChangeEntry) error) (forceClose bool, err error) { // Set up heartbeat/timeout var timeoutInterval time.Duration var timer *time.Timer @@ -1416,6 +1416,7 @@ func GenerateChanges(ctx context.Context, database *DatabaseCollectionWithUser, var lastSeq SequenceID var feed <-chan *ChangeEntry var timeout <-chan time.Time + var feedErr error // feedStarted identifies whether at least one MultiChangesFeed has been started. Used to identify when a one-shot changes is done. feedStarted := false @@ -1444,7 +1445,7 @@ loop: feed, feedErr = database.MultiChangesFeed(ctx, inChannels, options) } if feedErr != nil || feed == nil { - return feedErr, forceClose + return forceClose, feedErr } feedStarted = true } @@ -1456,7 +1457,6 @@ loop: } var sendErr error - // Wait for either a new change, a heartbeat, or a timeout: select { case entry, ok := <-feed: @@ -1465,6 +1465,7 @@ loop: } else if entry == nil { sendErr = send(nil) } else if entry.Err != nil { + feedErr = entry.Err break loop // error returned by feed - end changes } else { entries := []*ChangeEntry{entry} @@ -1481,6 +1482,7 @@ loop: waiting = true break collect } else if entry.Err != nil { + feedErr = entry.Err break loop // error returned by feed - end changes } entries = append(entries, entry) @@ -1524,14 +1526,14 @@ loop: } if sendErr != nil { forceClose = true - return &ChangesSendErr{sendErr}, forceClose + return forceClose, &ChangesSendErr{sendErr} } } // if the ChangesCtx is done, the connection was force closed. This could actually happen and send a ChangeEntry.Err. Instead of checking each place in this function, set the forceClose flag here. - if options.ChangesCtx.Err() != nil { + if options.ChangesCtx.Err() != nil { // && feedErr != nil { forceClose = true } - return nil, forceClose + return forceClose, feedErr } diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index 3143bba4b8..7b7ffd6395 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -11,6 +11,7 @@ licenses/APL2.txt. package rest import ( + "context" "encoding/base64" "encoding/json" "fmt" @@ -3185,3 +3186,44 @@ func TestBlipDatabaseClose(t *testing.T) { }, time.Second*10, time.Millisecond*100) }) } + +// Starts a continuous pull replication then updates the db to trigger a close. +func TestChangesFeedExitDisconnect(t *testing.T) { + + base.SetUpTestLogging(t, base.LevelInfo, base.KeyHTTP, base.KeySync, base.KeySyncMsg, base.KeyChanges, base.KeyCache) + btcRunner := NewBlipTesterClientRunner(t) + var shouldChanenlQueryError atomic.Bool + btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { + rt := NewRestTester(t, &RestTesterConfig{ + leakyBucketConfig: &base.LeakyBucketConfig{ + QueryCallback: func(ddoc, viewname string, params map[string]any) error { + if viewname == "channels" && shouldChanenlQueryError.Load() { + return gocb.ErrTimeout + } + return nil + }, + N1QLQueryCallback: func(_ context.Context, statement string, params map[string]any, consistency base.ConsistencyMode, adhoc bool) error { + if strings.Contains(statement, "sg_channels") && shouldChanenlQueryError.Load() { + return gocb.ErrTimeout + } + return nil + }, + }, + }) + defer rt.Close() + const username = "alice" + rt.CreateUser(username, []string{"*"}) + btc := btcRunner.NewBlipTesterClientOptsWithRT(rt, &BlipTesterClientOpts{Username: username}) + var blipContextClosed atomic.Bool + btcRunner.clients[btc.id].pullReplication.bt.blipContext.OnExitCallback = func() { + blipContextClosed.Store(true) + } + + shouldChanenlQueryError.Store(true) + btcRunner.StartPull(btc.id) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + assert.True(c, blipContextClosed.Load()) + }, time.Second*10, time.Millisecond*100) + }) +} diff --git a/rest/blip_sync.go b/rest/blip_sync.go index cf4fb0a2a6..23a6f13c11 100644 --- a/rest/blip_sync.go +++ b/rest/blip_sync.go @@ -51,28 +51,30 @@ func (h *handler) handleBLIPSync() error { // error is checked at the time of database load, and ignored at this time originPatterns, _ := hostOnlyCORS(h.db.CORS.Origin) - // Create a BLIP context: - blipContext, err := db.NewSGBlipContext(h.ctx(), "", originPatterns, h.db.DatabaseContext.CancelContext) + // we could pull the exact CBL client and version from User-Agent + clientType := db.BLIPClientTypeCBL2 + if string(db.BLIPClientTypeSGR2) == h.getQuery(db.BLIPSyncClientTypeQueryParam) { + clientType = db.BLIPClientTypeSGR2 + } + bsc, err := db.NewBlipSyncContext(h.ctx(), db.BlipSyncContextOptions{ + DB: h.db, + ReplicationStats: db.BlipSyncStatsForCBL(h.db.DbStats), + OriginPatterns: originPatterns, + CorrelationID: h.formatSerialNumber(), + CancelContext: h.db.DatabaseContext.CancelContext, + ClientType: clientType, + }) if err != nil { return err } + defer bsc.Close() + blipContext := bsc.BlipContext() // Overwrite the existing logging context with the blip context ID h.rqCtx = base.CorrelationIDLogCtx(h.ctx(), base.FormatBlipContextID(blipContext.ID)) h.response.Header().Set(db.BLIPCorrelationIDResponseHeader, blipContext.ID) - // Create a new BlipSyncContext attached to the given blipContext. - ctx := db.NewBlipSyncContext(h.rqCtx, blipContext, h.db, h.formatSerialNumber(), db.BlipSyncStatsForCBL(h.db.DbStats)) - defer ctx.Close() - auditFields := base.AuditFields{base.AuditFieldReplicationID: base.FormatBlipContextID(blipContext.ID)} - if string(db.BLIPClientTypeSGR2) == h.getQuery(db.BLIPSyncClientTypeQueryParam) { - ctx.SetClientType(db.BLIPClientTypeSGR2) - auditFields["client_type"] = db.BLIPClientTypeSGR2 - } else { - // we could pull the exact CBL client and version from User-Agent - ctx.SetClientType(db.BLIPClientTypeCBL2) - auditFields["client_type"] = db.BLIPClientTypeCBL2 - } + auditFields := base.AuditFields{base.AuditFieldReplicationID: base.FormatBlipContextID(blipContext.ID), "client_type": clientType} base.Audit(h.rqCtx, base.AuditIDReplicationConnect, auditFields) defer func() { base.Audit(h.rqCtx, base.AuditIDReplicationDisconnect, auditFields) @@ -89,7 +91,7 @@ func (h *handler) handleBLIPSync() error { // ActiveSubprotocol only available after handshake via ServeHTTP(), so have to get go-blip to invoke callback between handshake and serving BLIP messages subprotocol := blipContext.ActiveSubprotocol() h.logStatus(http.StatusSwitchingProtocols, fmt.Sprintf("[%s] Upgraded to WebSocket protocol %s+%s%s", blipContext.ID, blip.WebSocketSubProtocolPrefix, subprotocol, h.formattedEffectiveUserName())) - err = ctx.SetActiveCBMobileSubprotocol(subprotocol) + err = bsc.SetActiveCBMobileSubprotocol(subprotocol) if err != nil { base.WarnfCtx(h.ctx(), "Couldn't set active CB Mobile Subprotocol: %v", err) } diff --git a/rest/changes_api.go b/rest/changes_api.go index 78ce4a03fb..0f69bcd002 100644 --- a/rest/changes_api.go +++ b/rest/changes_api.go @@ -464,7 +464,7 @@ func (h *handler) sendSimpleChanges(channels base.Set, options db.ChangesOptions func (h *handler) generateContinuousChanges(inChannels base.Set, options db.ChangesOptions, send func([]*db.ChangeEntry) error) (error, bool) { // Ensure continuous is set, since generateChanges now supports both continuous and one-shot options.Continuous = true - err, forceClose := db.GenerateChanges(h.ctx(), h.collection, inChannels, options, nil, send) + forceClose, err := db.GenerateChanges(h.ctx(), h.collection, inChannels, options, nil, send) if sendErr, ok := err.(*db.ChangesSendErr); ok { h.logStatus(http.StatusOK, fmt.Sprintf("Write error: %v", sendErr)) return nil, forceClose // error is probably because the client closed the connection From 58d929969ac581baf52d4232dbccc20d2182288c Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Tue, 19 Nov 2024 13:07:02 -0500 Subject: [PATCH 2/9] Avoid refactoring with the change --- db/active_replicator.go | 14 +++++---- db/blip_handler.go | 1 - db/blip_sync_context.go | 68 +++++++++++++---------------------------- db/changes.go | 12 ++++---- rest/blip_sync.go | 36 +++++++++++----------- rest/changes_api.go | 2 +- 6 files changed, 56 insertions(+), 77 deletions(-) diff --git a/db/active_replicator.go b/db/active_replicator.go index 5cea473b6d..64607a3e3f 100644 --- a/db/active_replicator.go +++ b/db/active_replicator.go @@ -207,15 +207,12 @@ func (ar *ActiveReplicator) GetStatus(ctx context.Context) *ReplicationStatus { func connect(arc *activeReplicatorCommon, idSuffix string) (blipSender *blip.Sender, bsc *BlipSyncContext, err error) { arc.replicationStats.NumConnectAttempts.Add(1) - bsc, err = NewBlipSyncContext(arc.ctx, BlipSyncContextOptions{ - DB: arc.config.ActiveDB, - ReplicationStats: arc.replicationStats, - BlipID: arc.config.ID + idSuffix, - }) + var originPatterns []string // no origin headers for ISGR + cancelCtx, cancelFunc := context.WithCancel(context.Background()) + blipContext, err := NewSGBlipContext(arc.ctx, arc.config.ID+idSuffix, originPatterns, cancelCtx) if err != nil { return nil, nil, err } - blipContext := bsc.BlipContext() blipContext.WebsocketPingInterval = arc.config.WebsocketPingInterval blipContext.OnExitCallback = func() { // fall into a reconnect loop only if the connection is unexpectedly closed. @@ -224,6 +221,11 @@ func connect(arc *activeReplicatorCommon, idSuffix string) (blipSender *blip.Sen } } + bsc, err = NewBlipSyncContext(arc.ctx, blipContext, arc.config.ActiveDB, blipContext.ID, arc.replicationStats, cancelFunc) + if err != nil { + return nil, nil, err + } + bsc.loggingCtx = base.CorrelationIDLogCtx( arc.config.ActiveDB.AddDatabaseLogContext(base.NewNonCancelCtx().Ctx), arc.config.ID+idSuffix) diff --git a/db/blip_handler.go b/db/blip_handler.go index 09b829e972..f785cd1a9c 100644 --- a/db/blip_handler.go +++ b/db/blip_handler.go @@ -361,7 +361,6 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error { changesCtx: collectionCtx.changesCtx, requestPlusSeq: requestPlusSeq, }) - base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s --> Time:%v", bh.serialNumber, rq.Profile(), time.Since(startTime)) if err != nil { base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "Closing blip connection due to changes feed error %+v\n", err) bh.ctxCancelFunc() diff --git a/db/blip_sync_context.go b/db/blip_sync_context.go index 847f892262..927c596c8d 100644 --- a/db/blip_sync_context.go +++ b/db/blip_sync_context.go @@ -34,64 +34,41 @@ const ( var ErrClosedBLIPSender = errors.New("use of closed BLIP sender") -// BlipSyncContextOptions provides options for creating a new BlipSyncContext -type BlipSyncContextOptions struct { - DB *Database // The backing database - ReplicationStats *BlipSyncStats - OriginPatterns []string // origin patterns for CORS checking - BlipID string // if specified, becomes blip.Context.ID, otherwise this is automatically assigned - CorrelationID string // correlation ID for logging - ClientType BLIPSyncContextClientType // name of the client - CancelContext context.Context // optional context for cancelation -} - -// NewBlipSyncContext creates a BlipSyncContext for listening to blip messages and an associated blip.Context. -func NewBlipSyncContext(ctx context.Context, opts BlipSyncContextOptions) (*BlipSyncContext, error) { - cancelCtx := opts.CancelContext - if cancelCtx == nil { - cancelCtx = context.Background() // create a local context for the BlipSyncContext +func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, contextID string, replicationStats *BlipSyncStats, ctxCancelFunc context.CancelFunc) (*BlipSyncContext, error) { + if ctxCancelFunc == nil { + return nil, errors.New("cancelCtxFunc is required") } - cancelCtx, cancelFunc := context.WithCancel(cancelCtx) - bc, err := NewSGBlipContext(ctx, opts.BlipID, opts.OriginPatterns, cancelCtx) - if err != nil { - cancelFunc() - return nil, err - } - maxInFlightChangesBatches := DefaultMaxConcurrentChangesBatches - if opts.DB.Options.MaxConcurrentChangesBatches != nil { - maxInFlightChangesBatches = *opts.DB.Options.MaxConcurrentChangesBatches + if db.Options.MaxConcurrentChangesBatches != nil { + maxInFlightChangesBatches = *db.Options.MaxConcurrentChangesBatches } maxInFlightRevs := DefaultMaxConcurrentRevs - if opts.DB.Options.MaxConcurrentRevs != nil { - maxInFlightRevs = *opts.DB.Options.MaxConcurrentRevs + if db.Options.MaxConcurrentRevs != nil { + maxInFlightRevs = *db.Options.MaxConcurrentRevs } bsc := &BlipSyncContext{ blipContext: bc, - blipContextDb: opts.DB, + blipContextDb: db, loggingCtx: ctx, terminator: make(chan bool), - userChangeWaiter: opts.DB.NewUserWaiter(), - sgCanUseDeltas: opts.DB.DeltaSyncEnabled(), - replicationStats: opts.ReplicationStats, + userChangeWaiter: db.NewUserWaiter(), + sgCanUseDeltas: db.DeltaSyncEnabled(), + replicationStats: replicationStats, inFlightChangesThrottle: make(chan struct{}, maxInFlightChangesBatches), inFlightRevsThrottle: make(chan struct{}, maxInFlightRevs), collections: &blipCollections{}, - ctxCancelFunc: cancelFunc, - } - if opts.ClientType != "" { - bsc.clientType = opts.ClientType + ctxCancelFunc: ctxCancelFunc, } if bsc.replicationStats == nil { bsc.replicationStats = NewBlipSyncStats() } bsc.stats.lastReportTime.Store(time.Now().UnixMilli()) - if u := opts.DB.User(); u != nil { + if u := db.User(); u != nil { bsc.userName = u.Name() u.InitializeRoles() - if u.Name() == "" && opts.DB.IsGuestReadOnly() { + if u.Name() == "" && db.IsGuestReadOnly() { bsc.readOnly = true } } @@ -99,7 +76,7 @@ func NewBlipSyncContext(ctx context.Context, opts BlipSyncContextOptions) (*Blip // Register default handlers bc.DefaultHandler = bsc.NotFoundHandler bc.FatalErrorHandler = func(err error) { - base.InfofCtx(ctx, base.KeyHTTP, "%s: --> BLIP+WebSocket connection error: %v", opts.CorrelationID, err) + base.InfofCtx(ctx, base.KeyHTTP, "%s: --> BLIP+WebSocket connection error: %v", contextID, err) } // Register 2.x replicator handlers @@ -107,7 +84,7 @@ func NewBlipSyncContext(ctx context.Context, opts BlipSyncContextOptions) (*Blip bsc.register(profile, handlerFn) } - if opts.DB.Options.UnsupportedOptions.ConnectedClient { + if db.Options.UnsupportedOptions.ConnectedClient { // Register Connected Client handlers for profile, handlerFn := range kConnectedClientHandlersByProfile { bsc.register(profile, handlerFn) @@ -161,7 +138,7 @@ type BlipSyncContext struct { stats blipSyncStats // internal structure to store stats - ctxCancelFunc context.CancelFunc // function which will terminate the BlipSyncContext connections + ctxCancelFunc context.CancelFunc // function to cancel a blip replication } // blipSyncStats has support structures to support reporting stats at regular interval @@ -189,6 +166,10 @@ func (bsc *BlipSyncContext) SetActiveCBMobileSubprotocol(subprotocol string) (er return err } +func (bsc *BlipSyncContext) SetClientType(clientType BLIPSyncContextClientType) { + bsc.clientType = clientType +} + // Registers a BLIP handler including the outer-level work of logging & error handling. // Includes the outer handler as a nested function. func (bsc *BlipSyncContext) register(profile string, handlerFn func(*blipHandler, *blip.Message) error) { @@ -271,9 +252,9 @@ func (bsc *BlipSyncContext) Close() { collection.changesCtxCancel() } - bsc.ctxCancelFunc() bsc.reportStats(true) close(bsc.terminator) + bsc.ctxCancelFunc() }) } @@ -799,8 +780,3 @@ func (bsc *BlipSyncContext) reportStats(updateImmediately bool) { bsc.stats.lastReportTime.Store(currentTime) } - -// BlipContext returns blip.Context for the BlipSyncContext -func (bsc *BlipSyncContext) BlipContext() *blip.Context { - return bsc.blipContext -} diff --git a/db/changes.go b/db/changes.go index 8db0d317b4..0dc19f9a51 100644 --- a/db/changes.go +++ b/db/changes.go @@ -1367,7 +1367,7 @@ func generateBlipSyncChanges(ctx context.Context, database *DatabaseCollectionWi // Store one-shot here to protect isOneShot := !options.Continuous - forceClose, err = GenerateChanges(ctx, database, inChannels, options, docIDFilter, send) + err, forceClose = GenerateChanges(ctx, database, inChannels, options, docIDFilter, send) if _, ok := err.(*ChangesSendErr); ok { // If there was already an error in a send function, do not send last one shot changes message, since it probably will not work anyway. @@ -1387,7 +1387,7 @@ type ChangesSendErr struct{ error } // Shell of the continuous changes feed -- calls out to a `send` function to deliver the change. // This is called from BLIP connections as well as HTTP handlers, which is why this is not a // method on `handler`. -func GenerateChanges(ctx context.Context, database *DatabaseCollectionWithUser, inChannels base.Set, options ChangesOptions, docIDFilter []string, send func([]*ChangeEntry) error) (forceClose bool, err error) { +func GenerateChanges(ctx context.Context, database *DatabaseCollectionWithUser, inChannels base.Set, options ChangesOptions, docIDFilter []string, send func([]*ChangeEntry) error) (err error, forceClose bool) { // Set up heartbeat/timeout var timeoutInterval time.Duration var timer *time.Timer @@ -1445,7 +1445,7 @@ loop: feed, feedErr = database.MultiChangesFeed(ctx, inChannels, options) } if feedErr != nil || feed == nil { - return forceClose, feedErr + return feedErr, forceClose } feedStarted = true } @@ -1526,14 +1526,14 @@ loop: } if sendErr != nil { forceClose = true - return forceClose, &ChangesSendErr{sendErr} + return &ChangesSendErr{sendErr}, forceClose } } // if the ChangesCtx is done, the connection was force closed. This could actually happen and send a ChangeEntry.Err. Instead of checking each place in this function, set the forceClose flag here. - if options.ChangesCtx.Err() != nil { // && feedErr != nil { + if options.ChangesCtx.Err() != nil { forceClose = true } - return forceClose, feedErr + return feedErr, forceClose } diff --git a/rest/blip_sync.go b/rest/blip_sync.go index 23a6f13c11..8c81cf1e6d 100644 --- a/rest/blip_sync.go +++ b/rest/blip_sync.go @@ -51,30 +51,32 @@ func (h *handler) handleBLIPSync() error { // error is checked at the time of database load, and ignored at this time originPatterns, _ := hostOnlyCORS(h.db.CORS.Origin) - // we could pull the exact CBL client and version from User-Agent - clientType := db.BLIPClientTypeCBL2 - if string(db.BLIPClientTypeSGR2) == h.getQuery(db.BLIPSyncClientTypeQueryParam) { - clientType = db.BLIPClientTypeSGR2 - } - bsc, err := db.NewBlipSyncContext(h.ctx(), db.BlipSyncContextOptions{ - DB: h.db, - ReplicationStats: db.BlipSyncStatsForCBL(h.db.DbStats), - OriginPatterns: originPatterns, - CorrelationID: h.formatSerialNumber(), - CancelContext: h.db.DatabaseContext.CancelContext, - ClientType: clientType, - }) + cancelCtx, cancelCtxFunc := context.WithCancel(h.db.DatabaseContext.CancelContext) + // Create a BLIP context: + blipContext, err := db.NewSGBlipContext(h.ctx(), "", originPatterns, cancelCtx) if err != nil { return err } - defer bsc.Close() - blipContext := bsc.BlipContext() // Overwrite the existing logging context with the blip context ID h.rqCtx = base.CorrelationIDLogCtx(h.ctx(), base.FormatBlipContextID(blipContext.ID)) h.response.Header().Set(db.BLIPCorrelationIDResponseHeader, blipContext.ID) + // Create a new BlipSyncContext attached to the given blipContext. + ctx, err := db.NewBlipSyncContext(h.rqCtx, blipContext, h.db, h.formatSerialNumber(), db.BlipSyncStatsForCBL(h.db.DbStats), cancelCtxFunc) + if err != nil { + return err + } + defer ctx.Close() - auditFields := base.AuditFields{base.AuditFieldReplicationID: base.FormatBlipContextID(blipContext.ID), "client_type": clientType} + auditFields := base.AuditFields{base.AuditFieldReplicationID: base.FormatBlipContextID(blipContext.ID)} + if string(db.BLIPClientTypeSGR2) == h.getQuery(db.BLIPSyncClientTypeQueryParam) { + ctx.SetClientType(db.BLIPClientTypeSGR2) + auditFields["client_type"] = db.BLIPClientTypeSGR2 + } else { + // we could pull the exact CBL client and version from User-Agent + ctx.SetClientType(db.BLIPClientTypeCBL2) + auditFields["client_type"] = db.BLIPClientTypeCBL2 + } base.Audit(h.rqCtx, base.AuditIDReplicationConnect, auditFields) defer func() { base.Audit(h.rqCtx, base.AuditIDReplicationDisconnect, auditFields) @@ -91,7 +93,7 @@ func (h *handler) handleBLIPSync() error { // ActiveSubprotocol only available after handshake via ServeHTTP(), so have to get go-blip to invoke callback between handshake and serving BLIP messages subprotocol := blipContext.ActiveSubprotocol() h.logStatus(http.StatusSwitchingProtocols, fmt.Sprintf("[%s] Upgraded to WebSocket protocol %s+%s%s", blipContext.ID, blip.WebSocketSubProtocolPrefix, subprotocol, h.formattedEffectiveUserName())) - err = bsc.SetActiveCBMobileSubprotocol(subprotocol) + err = ctx.SetActiveCBMobileSubprotocol(subprotocol) if err != nil { base.WarnfCtx(h.ctx(), "Couldn't set active CB Mobile Subprotocol: %v", err) } diff --git a/rest/changes_api.go b/rest/changes_api.go index 0f69bcd002..78ce4a03fb 100644 --- a/rest/changes_api.go +++ b/rest/changes_api.go @@ -464,7 +464,7 @@ func (h *handler) sendSimpleChanges(channels base.Set, options db.ChangesOptions func (h *handler) generateContinuousChanges(inChannels base.Set, options db.ChangesOptions, send func([]*db.ChangeEntry) error) (error, bool) { // Ensure continuous is set, since generateChanges now supports both continuous and one-shot options.Continuous = true - forceClose, err := db.GenerateChanges(h.ctx(), h.collection, inChannels, options, nil, send) + err, forceClose := db.GenerateChanges(h.ctx(), h.collection, inChannels, options, nil, send) if sendErr, ok := err.(*db.ChangesSendErr); ok { h.logStatus(http.StatusOK, fmt.Sprintf("Write error: %v", sendErr)) return nil, forceClose // error is probably because the client closed the connection From 58d2d655fa0ff07dcf5daccd8c4028c06826ed86 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Tue, 19 Nov 2024 13:33:42 -0500 Subject: [PATCH 3/9] pass lint --- db/active_replicator.go | 1 + rest/blip_sync.go | 1 + 2 files changed, 2 insertions(+) diff --git a/db/active_replicator.go b/db/active_replicator.go index 64607a3e3f..6394364200 100644 --- a/db/active_replicator.go +++ b/db/active_replicator.go @@ -211,6 +211,7 @@ func connect(arc *activeReplicatorCommon, idSuffix string) (blipSender *blip.Sen cancelCtx, cancelFunc := context.WithCancel(context.Background()) blipContext, err := NewSGBlipContext(arc.ctx, arc.config.ID+idSuffix, originPatterns, cancelCtx) if err != nil { + cancelFunc() return nil, nil, err } blipContext.WebsocketPingInterval = arc.config.WebsocketPingInterval diff --git a/rest/blip_sync.go b/rest/blip_sync.go index 8c81cf1e6d..7791f98041 100644 --- a/rest/blip_sync.go +++ b/rest/blip_sync.go @@ -55,6 +55,7 @@ func (h *handler) handleBLIPSync() error { // Create a BLIP context: blipContext, err := db.NewSGBlipContext(h.ctx(), "", originPatterns, cancelCtx) if err != nil { + cancelCtxFunc() return err } From 52ca63b77ee4e4d7ecdf065eee4560f795e40c34 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Wed, 27 Nov 2024 10:51:09 -0500 Subject: [PATCH 4/9] test active replicator reconnection --- db/active_replicator_common.go | 8 +++ db/active_replicator_push.go | 11 +++- db/blip_handler.go | 4 +- rest/replicatortest/replicator_test.go | 91 +++++++++++++++++++++++++- rest/utilities_testing.go | 18 ++--- 5 files changed, 116 insertions(+), 16 deletions(-) diff --git a/db/active_replicator_common.go b/db/active_replicator_common.go index 7e26edb7cb..7830526f42 100644 --- a/db/active_replicator_common.go +++ b/db/active_replicator_common.go @@ -357,6 +357,14 @@ func (a *activeReplicatorCommon) getCheckpointHighSeq() string { return highSeqStr } +// publishStatus updates the replication status document in the metadata store. +func (a *activeReplicatorCommon) publishStatus() { + a.lock.Lock() + defer a.lock.Unlock() + a._publishStatus() +} + +// _publishStatus updates the replication status document in the metadata store. Requires holding a.lock before calling. func (a *activeReplicatorCommon) _publishStatus() { status := a._getStatusCallback() err := setLocalStatus(a.ctx, a.config.ActiveDB.MetadataStore, a.statusKey, status, int(a.config.ActiveDB.Options.LocalDocExpirySecs)) diff --git a/db/active_replicator_push.go b/db/active_replicator_push.go index 79f22ed3f1..6cb779ccbc 100644 --- a/db/active_replicator_push.go +++ b/db/active_replicator_push.go @@ -332,7 +332,7 @@ func (apr *ActivePushReplicator) _startSendingChanges(bh *blipHandler, since Seq apr.activeSendChanges.Add(1) go func(s *blip.Sender) { defer apr.activeSendChanges.Add(-1) - isComplete := bh.sendChanges(s, &sendChangesOptions{ + isComplete, err := bh.sendChanges(s, &sendChangesOptions{ docIDs: apr.config.DocIDs, since: since, continuous: apr.config.Continuous, @@ -344,8 +344,15 @@ func (apr *ActivePushReplicator) _startSendingChanges(bh *blipHandler, since Seq ignoreNoConflicts: true, // force the passive side to accept a "changes" message, even in no conflicts mode. changesCtx: collectionCtx.changesCtx, }) - // On a normal completion, call complete for the replication + if err != nil { + base.InfofCtx(apr.ctx, base.KeyReplicate, "Terminating blip connection due to changes feed error: %v", err) + bh.ctxCancelFunc() + apr.setError(err) + apr.publishStatus() + return + } if isComplete { + // On a normal completion, call complete for the replication apr.Complete() } }(apr.blipSender) diff --git a/db/blip_handler.go b/db/blip_handler.go index f785cd1a9c..cac1821bda 100644 --- a/db/blip_handler.go +++ b/db/blip_handler.go @@ -432,7 +432,7 @@ func (flag changesDeletedFlag) HasFlag(deletedFlag changesDeletedFlag) bool { return flag&deletedFlag != 0 } -// Sends all changes since the given sequence +// sendChanges will start a changes feed and send changes. Returns bool to indicate whether the changes feed finished and all changes were sent. The error value is only used to indicate a fatal error, where the blip connection should be terminated. If the blip connection is disconnected by the client, the error will be nil, but the boolean parameter will be false. func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions) (bool, error) { defer func() { if panicked := recover(); panicked != nil { @@ -542,7 +542,7 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions } bh.db.DatabaseContext.NotifyTerminatedChanges(bh.loggingCtx, user) } - return !forceClose, err + return (err == nil && !forceClose), err } func (bh *blipHandler) buildChangesRow(change *ChangeEntry, revID string) []interface{} { diff --git a/rest/replicatortest/replicator_test.go b/rest/replicatortest/replicator_test.go index ef0fd83f06..679081f5b5 100644 --- a/rest/replicatortest/replicator_test.go +++ b/rest/replicatortest/replicator_test.go @@ -9,6 +9,7 @@ package replicatortest import ( + "context" "encoding/json" "expvar" "fmt" @@ -25,14 +26,16 @@ import ( "testing" "time" + "github.com/couchbase/gocb/v2" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/couchbase/sync_gateway/auth" "github.com/couchbase/sync_gateway/base" "github.com/couchbase/sync_gateway/channels" "github.com/couchbase/sync_gateway/db" "github.com/couchbase/sync_gateway/rest" - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestReplicationAPI(t *testing.T) { @@ -8573,8 +8576,90 @@ func TestDbConfigNoOverwriteReplications(t *testing.T) { require.Equal(t, startReplicationConfig.Direction, config.Direction) } +func TestActiveReplicatorChangesFeedExit(t *testing.T) { + base.RequireNumTestBuckets(t, 2) + + base.SetUpTestLogging(t, base.LevelDebug, base.KeyHTTP, base.KeySync, base.KeyChanges, base.KeyCRUD, base.KeyBucket) + + var shouldChannelQueryError atomic.Bool + activeRT := rest.NewRestTester(t, &rest.RestTesterConfig{ + LeakyBucketConfig: &base.LeakyBucketConfig{ + QueryCallback: func(ddoc, viewname string, params map[string]any) error { + if viewname == "channels" && shouldChannelQueryError.Load() { + shouldChannelQueryError.Store(false) + return gocb.ErrTimeout + } + return nil + }, + N1QLQueryCallback: func(_ context.Context, statement string, params map[string]any, consistency base.ConsistencyMode, adhoc bool) error { + if strings.Contains(statement, "sg_channels") && shouldChannelQueryError.Load() { + shouldChannelQueryError.Store(false) + return gocb.ErrTimeout + } + return nil + }, + }, + }) + t.Cleanup(activeRT.Close) + _ = activeRT.Bucket() + + passiveRT := rest.NewRestTesterPersistentConfig(t) + t.Cleanup(passiveRT.Close) + + username := "alice" + passiveRT.CreateUser(username, []string{"*"}) + passiveDBURL := passiveDBURLForAlice(passiveRT, username) + stats := dbReplicatorStats(t) + ar, err := db.NewActiveReplicator(activeRT.Context(), &db.ActiveReplicatorConfig{ + ID: t.Name(), + Direction: db.ActiveReplicatorTypePush, + RemoteDBURL: passiveDBURL, + ActiveDB: &db.Database{ + DatabaseContext: activeRT.GetDatabase(), + }, + ChangesBatchSize: 200, + Continuous: false, + ReplicationStatsMap: stats, + CollectionsEnabled: !activeRT.GetDatabase().OnlyDefaultCollection(), + }) + require.NoError(t, err) + t.Cleanup(func() { assert.NoError(t, ar.Stop()) }) + + docID := "doc1" + _ = activeRT.CreateTestDoc(docID) + + shouldChannelQueryError.Store(false) + require.NoError(t, ar.Start(activeRT.Context())) + + changesResults, err := passiveRT.WaitForChanges(1, "/{{.keyspace}}/_changes?since=0", "", true) + require.NoError(t, err) + require.Len(t, changesResults.Results, 1) + require.Equal(t, docID, changesResults.Results[0].ID) + require.Equal(t, int64(2), stats.NumConnectAttemptsPush.Value()) +} func requireBodyEqual(t *testing.T, expected string, doc *db.Document) { var expectedBody db.Body require.NoError(t, base.JSONUnmarshal([]byte(expected), &expectedBody)) require.Equal(t, expectedBody, doc.Body(base.TestCtx(t))) } + +func dbReplicatorStats(t *testing.T) *base.DbReplicatorStats { + stats, err := base.SyncGatewayStats.NewDBStats(t.Name(), false, false, false, nil, nil) + require.NoError(t, err) + dbstats, err := stats.DBReplicatorStats(t.Name()) + require.NoError(t, err) + return dbstats +} + +// passiveDBURLForAlice creates a public server for the passive RT and returns the URL for the alice user, e.g. http://alice:password@localhost:1234/dbname +func passiveDBURLForAlice(rt *rest.RestTester, username string) *url.URL { + srv := httptest.NewServer(rt.TestPublicHandler()) + rt.TB().Cleanup(srv.Close) + + passiveDBURL, err := url.Parse(srv.URL + "/" + rt.GetDatabase().Name) + require.NoError(rt.TB(), err) + + // Add basic auth creds to target db URL + passiveDBURL.User = url.UserPassword(username, rest.RestTesterDefaultUserPassword) + return passiveDBURL +} diff --git a/rest/utilities_testing.go b/rest/utilities_testing.go index 6675831acd..9035ba96de 100644 --- a/rest/utilities_testing.go +++ b/rest/utilities_testing.go @@ -59,7 +59,7 @@ type RestTesterConfig struct { EnableNoConflictsMode bool // Enable no-conflicts mode. By default, conflicts will be allowed, which is the default behavior EnableUserQueries bool // Enable the feature-flag for user N1QL/etc queries CustomTestBucket *base.TestBucket // If set, use this bucket instead of requesting a new one. - leakyBucketConfig *base.LeakyBucketConfig // Set to create and use a leaky bucket on the RT and DB. A test bucket cannot be passed in if using this option. + LeakyBucketConfig *base.LeakyBucketConfig // Set to create and use a leaky bucket on the RT and DB. A test bucket cannot be passed in if using this option. adminInterface string // adminInterface overrides the default admin interface. SgReplicateEnabled bool // SgReplicateManager disabled by default for RestTester AutoImport *bool @@ -177,14 +177,14 @@ func (rt *RestTester) Bucket() base.Bucket { testBucket := rt.RestTesterConfig.CustomTestBucket if testBucket == nil { testBucket = base.GetTestBucket(rt.TB()) - if rt.leakyBucketConfig != nil { - leakyConfig := *rt.leakyBucketConfig + if rt.LeakyBucketConfig != nil { + leakyConfig := *rt.LeakyBucketConfig // Ignore closures to avoid double closing panics leakyConfig.IgnoreClose = true testBucket = testBucket.LeakyBucketClone(leakyConfig) } - } else if rt.leakyBucketConfig != nil { - rt.TB().Fatalf("A passed in TestBucket cannot be used on the RestTester when defining a leakyBucketConfig") + } else if rt.LeakyBucketConfig != nil { + rt.TB().Fatalf("A passed in TestBucket cannot be used on the RestTester when defining a LeakyBucketConfig") } rt.TestBucket = testBucket @@ -360,7 +360,7 @@ func (rt *RestTester) Bucket() base.Bucket { } _, isLeaky := base.AsLeakyBucket(rt.TestBucket) var err error - if rt.leakyBucketConfig != nil || isLeaky { + if rt.LeakyBucketConfig != nil || isLeaky { _, err = rt.RestTesterServerContext.AddDatabaseFromConfigWithBucket(ctx, rt.TB(), *rt.DatabaseConfig, testBucket.Bucket) } else { _, err = rt.RestTesterServerContext.AddDatabaseFromConfig(ctx, *rt.DatabaseConfig) @@ -448,11 +448,11 @@ func GetDataStoreNamesFromScopesConfig(config ScopesConfig) []sgbucket.DataStore } // LeakyBucket gets the bucket from the RestTester as a leaky bucket allowing for callbacks to be set on the fly. -// The RestTester must have been set up to create and use a leaky bucket by setting leakyBucketConfig in the RT +// The RestTester must have been set up to create and use a leaky bucket by setting LeakyBucketConfig in the RT // config when calling NewRestTester. func (rt *RestTester) LeakyBucket() *base.LeakyDataStore { - if rt.leakyBucketConfig == nil { - rt.TB().Fatalf("Cannot get leaky bucket when leakyBucketConfig was not set on RestTester initialisation") + if rt.LeakyBucketConfig == nil { + rt.TB().Fatalf("Cannot get leaky bucket when LeakyBucketConfig was not set on RestTester initialisation") } leakyDataStore, ok := base.AsLeakyDataStore(rt.Bucket().DefaultDataStore()) if !ok { From fe70069575ba16ad1aed3301622935995494739a Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Wed, 27 Nov 2024 13:50:18 -0500 Subject: [PATCH 5/9] test fixup --- rest/blip_api_crud_test.go | 4 ++-- rest/replicatortest/replicator_test.go | 2 +- rest/revocation_test.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index 7b7ffd6395..73f4ea4c58 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -1206,7 +1206,7 @@ func TestBlipSendConcurrentRevs(t *testing.T) { concurrentSendRevNum = 50 ) rt := NewRestTester(t, &RestTesterConfig{ - leakyBucketConfig: &base.LeakyBucketConfig{ + LeakyBucketConfig: &base.LeakyBucketConfig{ UpdateCallback: func(_ string) { time.Sleep(time.Millisecond * 5) // slow down rosmar - it's too quick to be throttled }, @@ -3195,7 +3195,7 @@ func TestChangesFeedExitDisconnect(t *testing.T) { var shouldChanenlQueryError atomic.Bool btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { rt := NewRestTester(t, &RestTesterConfig{ - leakyBucketConfig: &base.LeakyBucketConfig{ + LeakyBucketConfig: &base.LeakyBucketConfig{ QueryCallback: func(ddoc, viewname string, params map[string]any) error { if viewname == "channels" && shouldChanenlQueryError.Load() { return gocb.ErrTimeout diff --git a/rest/replicatortest/replicator_test.go b/rest/replicatortest/replicator_test.go index 679081f5b5..868b38fa1c 100644 --- a/rest/replicatortest/replicator_test.go +++ b/rest/replicatortest/replicator_test.go @@ -8628,7 +8628,7 @@ func TestActiveReplicatorChangesFeedExit(t *testing.T) { docID := "doc1" _ = activeRT.CreateTestDoc(docID) - shouldChannelQueryError.Store(false) + shouldChannelQueryError.Store(true) require.NoError(t, ar.Start(activeRT.Context())) changesResults, err := passiveRT.WaitForChanges(1, "/{{.keyspace}}/_changes?since=0", "", true) diff --git a/rest/revocation_test.go b/rest/revocation_test.go index d0bab3e9fa..25167cafeb 100644 --- a/rest/revocation_test.go +++ b/rest/revocation_test.go @@ -2413,7 +2413,7 @@ func TestRevocationGetSyncDataError(t *testing.T) { // Two callbacks to cover usage with CBS/Xattrs and without revocationTester, rt := InitScenario( t, &RestTesterConfig{ - leakyBucketConfig: &base.LeakyBucketConfig{ + LeakyBucketConfig: &base.LeakyBucketConfig{ GetWithXattrCallback: func(key string) error { return fmt.Errorf("Leaky Bucket GetWithXattrCallback Error") }, GetRawCallback: func(key string) error { From e1714a3115418c26466ebf163e8e702440be32dd Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Wed, 27 Nov 2024 13:54:23 -0500 Subject: [PATCH 6/9] lint correctly --- db/active_replicator_push.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/active_replicator_push.go b/db/active_replicator_push.go index 6cb779ccbc..db89a94ad9 100644 --- a/db/active_replicator_push.go +++ b/db/active_replicator_push.go @@ -347,7 +347,7 @@ func (apr *ActivePushReplicator) _startSendingChanges(bh *blipHandler, since Seq if err != nil { base.InfofCtx(apr.ctx, base.KeyReplicate, "Terminating blip connection due to changes feed error: %v", err) bh.ctxCancelFunc() - apr.setError(err) + _ = apr.setError(err) apr.publishStatus() return } From ac2d5d83ad3442b3c36c5de8f12a189ecd00ede6 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Wed, 27 Nov 2024 14:01:16 -0500 Subject: [PATCH 7/9] PR comments --- db/changes.go | 1 - rest/blip_api_crud_test.go | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/db/changes.go b/db/changes.go index 0dc19f9a51..b288ccfd18 100644 --- a/db/changes.go +++ b/db/changes.go @@ -1438,7 +1438,6 @@ loop: forceClose = true break loop } - var feedErr error if len(docIDFilter) > 0 { feed, feedErr = database.DocIDChangesFeed(ctx, inChannels, docIDFilter, options) } else { diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index 73f4ea4c58..d7cf2e3168 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -3192,18 +3192,18 @@ func TestChangesFeedExitDisconnect(t *testing.T) { base.SetUpTestLogging(t, base.LevelInfo, base.KeyHTTP, base.KeySync, base.KeySyncMsg, base.KeyChanges, base.KeyCache) btcRunner := NewBlipTesterClientRunner(t) - var shouldChanenlQueryError atomic.Bool + var shouldChannelQueryError atomic.Bool btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { rt := NewRestTester(t, &RestTesterConfig{ LeakyBucketConfig: &base.LeakyBucketConfig{ QueryCallback: func(ddoc, viewname string, params map[string]any) error { - if viewname == "channels" && shouldChanenlQueryError.Load() { + if viewname == "channels" && shouldChannelQueryError.Load() { return gocb.ErrTimeout } return nil }, N1QLQueryCallback: func(_ context.Context, statement string, params map[string]any, consistency base.ConsistencyMode, adhoc bool) error { - if strings.Contains(statement, "sg_channels") && shouldChanenlQueryError.Load() { + if strings.Contains(statement, "sg_channels") && shouldChannelQueryError.Load() { return gocb.ErrTimeout } return nil @@ -3219,7 +3219,7 @@ func TestChangesFeedExitDisconnect(t *testing.T) { blipContextClosed.Store(true) } - shouldChanenlQueryError.Store(true) + shouldChannelQueryError.Store(true) btcRunner.StartPull(btc.id) require.EventuallyWithT(t, func(c *assert.CollectT) { From a3199760301e357797685ae8bfd8ed312c6e3c20 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Wed, 27 Nov 2024 15:04:40 -0500 Subject: [PATCH 8/9] Acquire read lock to present data race --- db/active_replicator_common.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/db/active_replicator_common.go b/db/active_replicator_common.go index 7830526f42..069c1084e1 100644 --- a/db/active_replicator_common.go +++ b/db/active_replicator_common.go @@ -313,8 +313,9 @@ func (a *activeReplicatorCommon) getState() string { return a.state } -// requires a.stateErrorLock func (a *activeReplicatorCommon) _getStateWithErrorMessage() (state string, lastErrorMessage string) { + a.stateErrorLock.RLock() + defer a.stateErrorLock.RUnlock() if a.lastError == nil { return a.state, "" } From 27ed759edf341e11d6a526e19ffe4bf2e15301e9 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Sun, 1 Dec 2024 14:05:50 -0500 Subject: [PATCH 9/9] switch query to use all docs index --- rest/replicatortest/replicator_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rest/replicatortest/replicator_test.go b/rest/replicatortest/replicator_test.go index 868b38fa1c..769d9ee390 100644 --- a/rest/replicatortest/replicator_test.go +++ b/rest/replicatortest/replicator_test.go @@ -8592,7 +8592,8 @@ func TestActiveReplicatorChangesFeedExit(t *testing.T) { return nil }, N1QLQueryCallback: func(_ context.Context, statement string, params map[string]any, consistency base.ConsistencyMode, adhoc bool) error { - if strings.Contains(statement, "sg_channels") && shouldChannelQueryError.Load() { + // * channel query uses all docs index + if strings.Contains(statement, "sg_allDocs") && shouldChannelQueryError.Load() { shouldChannelQueryError.Store(false) return gocb.ErrTimeout }