Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-4370 create a cancel context inside BlipSyncContext #7201

Merged
merged 11 commits into from
Jan 17, 2025
7 changes: 6 additions & 1 deletion base/leaky_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,12 @@ type LeakyBucketConfig struct {

// Returns a partial error the first time ViewCustom is called
FirstTimeViewCustomPartialError bool
PostQueryCallback func(ddoc, viewName string, params map[string]interface{}) // Issues callback after issuing query when bucket.ViewQuery is called

// QueryCallback allows tests to set a callback that will be issued prior to issuing a view query
QueryCallback func(ddoc, viewName string, params map[string]any) error
PostQueryCallback func(ddoc, viewName string, params map[string]interface{}) // Issues callback after issuing query when bucket.ViewQuery is called

N1QLQueryCallback func(ctx context.Context, statement string, params map[string]any, consistency ConsistencyMode, adhoc bool) error

PostN1QLQueryCallback func()

Expand Down
18 changes: 17 additions & 1 deletion base/leaky_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ func (lds *LeakyDataStore) ViewQuery(ctx context.Context, ddoc, name string, par
if !ok {
return nil, errors.New("bucket does not support views")
}
if lds.config.QueryCallback != nil {
err := lds.config.QueryCallback(ddoc, name, params)
if err != nil {
return nil, err
}
}
iterator, err := vs.ViewQuery(ctx, ddoc, name, params)

if lds.config.FirstTimeViewCustomPartialError {
Expand Down Expand Up @@ -324,10 +330,14 @@ func (lds *LeakyDataStore) SetFirstTimeViewCustomPartialError(val bool) {
lds.config.FirstTimeViewCustomPartialError = val
}

func (lds *LeakyDataStore) SetPostQueryCallback(callback func(ddoc, viewName string, params map[string]interface{})) {
func (lds *LeakyDataStore) SetPostQueryCallback(callback func(ddoc, viewName string, params map[string]any)) {
lds.config.PostQueryCallback = callback
}

func (lds *LeakyDataStore) SetQueryCallback(fn func(ddoc, viewName string, params map[string]any) error) {
lds.config.QueryCallback = fn
}

func (lds *LeakyDataStore) SetPostN1QLQueryCallback(callback func()) {
lds.config.PostN1QLQueryCallback = callback
}
Expand Down Expand Up @@ -447,6 +457,12 @@ func (lds *LeakyDataStore) Query(ctx context.Context, statement string, params m
if err != nil {
return nil, err
}
if lds.config.N1QLQueryCallback != nil {
err := lds.config.N1QLQueryCallback(ctx, statement, params, consistency, adhoc)
if err != nil {
return nil, err
}
}
iterator, err := n1qlStore.Query(ctx, statement, params, consistency, adhoc)

if lds.config.PostN1QLQueryCallback != nil {
Expand Down
10 changes: 7 additions & 3 deletions db/active_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,10 @@ func connect(arc *activeReplicatorCommon, idSuffix string) (blipSender *blip.Sen
arc.replicationStats.NumConnectAttempts.Add(1)

var originPatterns []string // no origin headers for ISGR
// NewSGBlipContext doesn't set cancellation context - active replication cancellation on db close is handled independently
blipContext, err := NewSGBlipContext(arc.ctx, arc.config.ID+idSuffix, originPatterns, nil)
cancelCtx, cancelFunc := context.WithCancel(context.Background())
blipContext, err := NewSGBlipContext(arc.ctx, arc.config.ID+idSuffix, originPatterns, cancelCtx)
if err != nil {
cancelFunc()
adamcfraser marked this conversation as resolved.
Show resolved Hide resolved
return nil, nil, err
}
blipContext.WebsocketPingInterval = arc.config.WebsocketPingInterval
Expand All @@ -221,7 +222,10 @@ func connect(arc *activeReplicatorCommon, idSuffix string) (blipSender *blip.Sen
}
}

bsc = NewBlipSyncContext(arc.ctx, blipContext, arc.config.ActiveDB, blipContext.ID, arc.replicationStats)
bsc, err = NewBlipSyncContext(arc.ctx, blipContext, arc.config.ActiveDB, blipContext.ID, arc.replicationStats, cancelFunc)
if err != nil {
return nil, nil, err
}

bsc.loggingCtx = base.CorrelationIDLogCtx(
arc.config.ActiveDB.AddDatabaseLogContext(base.NewNonCancelCtx().Ctx),
Expand Down
5 changes: 4 additions & 1 deletion db/active_replicator_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (apr *ActivePushReplicator) _startPushNonCollection() error {
apr.activeSendChanges.Set(true)
go func(s *blip.Sender) {
defer apr.activeSendChanges.Set(false)
isComplete := bh.sendChanges(s, &sendChangesOptions{
isComplete, err := bh.sendChanges(s, &sendChangesOptions{
docIDs: apr.config.DocIDs,
since: apr.defaultCollection.Checkpointer.lastCheckpointSeq,
continuous: apr.config.Continuous,
Expand All @@ -340,6 +340,9 @@ func (apr *ActivePushReplicator) _startPushNonCollection() error {
ignoreNoConflicts: true, // force the passive side to accept a "changes" message, even in no conflicts mode.
changesCtx: collectionCtx.changesCtx,
})
if err != nil {
base.InfofCtx(apr.ctx, base.KeyReplicate, "Error sending changes: %v", err)
}
// On a normal completion, call complete for the replication
if isComplete {
apr.Complete()
Expand Down
5 changes: 4 additions & 1 deletion db/active_replicator_push_collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (apr *ActivePushReplicator) _startPushWithCollections() error {
apr.activeSendChanges.Set(true)
go func(s *blip.Sender) {
defer apr.activeSendChanges.Set(false)
isComplete := bh.sendChanges(s, &sendChangesOptions{
isComplete, err := bh.sendChanges(s, &sendChangesOptions{
docIDs: apr.config.DocIDs,
since: replicationCollection.Checkpointer.lastCheckpointSeq,
continuous: apr.config.Continuous,
Expand All @@ -88,6 +88,9 @@ func (apr *ActivePushReplicator) _startPushWithCollections() error {
ignoreNoConflicts: true, // force the passive side to accept a "changes" message, even in no conflicts mode.
changesCtx: c.changesCtx,
})
if err != nil {
base.InfofCtx(apr.ctx, base.KeyReplicate, "Error sending changes: %v", err)
}
// On a normal completion, call complete for the replication
if isComplete {
apr.Complete()
Expand Down
16 changes: 9 additions & 7 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error {
}()
// sendChanges runs until blip context closes, or fails due to error
startTime := time.Now()
_ = bh.sendChanges(rq.Sender, &sendChangesOptions{
_, err = bh.sendChanges(rq.Sender, &sendChangesOptions{
docIDs: subChangesParams.docIDs(),
since: subChangesParams.Since(),
continuous: continuous,
Expand All @@ -361,6 +361,10 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error {
changesCtx: collectionCtx.changesCtx,
requestPlusSeq: requestPlusSeq,
})
if err != nil {
base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "Closing blip connection due to changes feed error %+v\n", err)
bh.ctxCancelFunc()
}
base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s --> Time:%v", bh.serialNumber, rq.Profile(), time.Since(startTime))
}()

Expand Down Expand Up @@ -429,7 +433,7 @@ func (flag changesDeletedFlag) HasFlag(deletedFlag changesDeletedFlag) bool {
}

// Sends all changes since the given sequence
func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions) (isComplete bool) {
func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions) (bool, error) {
adamcfraser marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
if panicked := recover(); panicked != nil {
bh.replicationStats.NumHandlersPanicked.Add(1)
Expand Down Expand Up @@ -472,11 +476,10 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions
changesDb, err := bh.copyDatabaseCollectionWithUser(bh.collectionIdx)
if err != nil {
base.WarnfCtx(bh.loggingCtx, "[%s] error sending changes: %v", bh.blipContext.ID, err)
return false

return false, err
}

forceClose := generateBlipSyncChanges(bh.loggingCtx, changesDb, channelSet, options, opts.docIDs, func(changes []*ChangeEntry) error {
forceClose, err := generateBlipSyncChanges(bh.loggingCtx, changesDb, channelSet, options, opts.docIDs, func(changes []*ChangeEntry) error {
base.DebugfCtx(bh.loggingCtx, base.KeySync, " Sending %d changes", len(changes))
for _, change := range changes {
if !strings.HasPrefix(change.ID, "_") {
Expand Down Expand Up @@ -539,8 +542,7 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions
}
bh.db.DatabaseContext.NotifyTerminatedChanges(bh.loggingCtx, user)
}

return !forceClose
return !forceClose, err
}

func (bh *blipHandler) buildChangesRow(change *ChangeEntry, revID string) []interface{} {
Expand Down
11 changes: 9 additions & 2 deletions db/blip_sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ const (

var ErrClosedBLIPSender = errors.New("use of closed BLIP sender")

func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, contextID string, replicationStats *BlipSyncStats) *BlipSyncContext {
func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, contextID string, replicationStats *BlipSyncStats, ctxCancelFunc context.CancelFunc) (*BlipSyncContext, error) {
if ctxCancelFunc == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required for cases where the BlipSyncContext is for an active peer (ISGR)? Closing the connection in that case feels like a different problem than the passive peer case.

return nil, errors.New("cancelCtxFunc is required")
}
maxInFlightChangesBatches := DefaultMaxConcurrentChangesBatches
if db.Options.MaxConcurrentChangesBatches != nil {
maxInFlightChangesBatches = *db.Options.MaxConcurrentChangesBatches
Expand All @@ -55,6 +58,7 @@ func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, con
inFlightChangesThrottle: make(chan struct{}, maxInFlightChangesBatches),
inFlightRevsThrottle: make(chan struct{}, maxInFlightRevs),
collections: &blipCollections{},
ctxCancelFunc: ctxCancelFunc,
}
if bsc.replicationStats == nil {
bsc.replicationStats = NewBlipSyncStats()
Expand Down Expand Up @@ -86,7 +90,7 @@ func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, con
bsc.register(profile, handlerFn)
}
}
return bsc
return bsc, nil
}

// BlipSyncContext represents one BLIP connection (socket) opened by a client.
Expand Down Expand Up @@ -133,6 +137,8 @@ type BlipSyncContext struct {
collections *blipCollections // all collections handled by blipSyncContext, implicit or via GetCollections

stats blipSyncStats // internal structure to store stats

ctxCancelFunc context.CancelFunc // function to cancel a blip replication
}

// blipSyncStats has support structures to support reporting stats at regular interval
Expand Down Expand Up @@ -248,6 +254,7 @@ func (bsc *BlipSyncContext) Close() {
}
bsc.reportStats(true)
close(bsc.terminator)
bsc.ctxCancelFunc()
})
}

Expand Down
16 changes: 9 additions & 7 deletions db/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -1362,24 +1362,24 @@ func (options ChangesOptions) String() string {
)
}

// Used by BLIP connections for changes. Supports both one-shot and continuous changes.
func generateBlipSyncChanges(ctx context.Context, database *DatabaseCollectionWithUser, inChannels base.Set, options ChangesOptions, docIDFilter []string, send func([]*ChangeEntry) error) (forceClose bool) {
// Used by BLIP connections for changes. Supports both one-shot and continuous changes. Returns an error in the case that the feed does not start up, or there is a fatal error in the feed. The caller is responsible for closing the connection, no more changes will be generated. forceClose will be true if connection was terminated underneath the changes feed.
func generateBlipSyncChanges(ctx context.Context, database *DatabaseCollectionWithUser, inChannels base.Set, options ChangesOptions, docIDFilter []string, send func([]*ChangeEntry) error) (forceClose bool, err error) {

// Store one-shot here to protect
isOneShot := !options.Continuous
err, forceClose := GenerateChanges(ctx, database, inChannels, options, docIDFilter, send)
err, forceClose = GenerateChanges(ctx, database, inChannels, options, docIDFilter, send)

if _, ok := err.(*ChangesSendErr); ok {
// If there was already an error in a send function, do not send last one shot changes message, since it probably will not work anyway.
return forceClose // error is probably because the client closed the connection
return forceClose, err // error is probably because the client closed the connection
}

// For one-shot changes, invoke the callback w/ nil to trigger the 'caught up' changes message. (For continuous changes, this
// is done by MultiChangesFeed prior to going into Wait mode)
if isOneShot {
_ = send(nil)
}
return forceClose
return forceClose, err
}

type ChangesSendErr struct{ error }
Expand Down Expand Up @@ -1416,6 +1416,7 @@ func GenerateChanges(ctx context.Context, database *DatabaseCollectionWithUser,
var lastSeq SequenceID
var feed <-chan *ChangeEntry
var timeout <-chan time.Time
var feedErr error
adamcfraser marked this conversation as resolved.
Show resolved Hide resolved

// feedStarted identifies whether at least one MultiChangesFeed has been started. Used to identify when a one-shot changes is done.
feedStarted := false
Expand Down Expand Up @@ -1456,7 +1457,6 @@ loop:
}

var sendErr error

// Wait for either a new change, a heartbeat, or a timeout:
select {
case entry, ok := <-feed:
Expand All @@ -1465,6 +1465,7 @@ loop:
} else if entry == nil {
sendErr = send(nil)
} else if entry.Err != nil {
feedErr = entry.Err
break loop // error returned by feed - end changes
} else {
entries := []*ChangeEntry{entry}
Expand All @@ -1481,6 +1482,7 @@ loop:
waiting = true
break collect
} else if entry.Err != nil {
feedErr = entry.Err
break loop // error returned by feed - end changes
}
entries = append(entries, entry)
Expand Down Expand Up @@ -1533,5 +1535,5 @@ loop:
forceClose = true
}

return nil, forceClose
return feedErr, forceClose
}
42 changes: 42 additions & 0 deletions rest/blip_api_crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ licenses/APL2.txt.
package rest

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -3185,3 +3186,44 @@ func TestBlipDatabaseClose(t *testing.T) {
}, time.Second*10, time.Millisecond*100)
})
}

// Starts a continuous pull replication then updates the db to trigger a close.
func TestChangesFeedExitDisconnect(t *testing.T) {

base.SetUpTestLogging(t, base.LevelInfo, base.KeyHTTP, base.KeySync, base.KeySyncMsg, base.KeyChanges, base.KeyCache)
btcRunner := NewBlipTesterClientRunner(t)
var shouldChanenlQueryError atomic.Bool
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var shouldChanenlQueryError atomic.Bool
var shouldChannelQueryError atomic.Bool

btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) {
rt := NewRestTester(t, &RestTesterConfig{
leakyBucketConfig: &base.LeakyBucketConfig{
QueryCallback: func(ddoc, viewname string, params map[string]any) error {
if viewname == "channels" && shouldChanenlQueryError.Load() {
return gocb.ErrTimeout
}
return nil
},
N1QLQueryCallback: func(_ context.Context, statement string, params map[string]any, consistency base.ConsistencyMode, adhoc bool) error {
if strings.Contains(statement, "sg_channels") && shouldChanenlQueryError.Load() {
return gocb.ErrTimeout
}
return nil
},
},
})
defer rt.Close()
const username = "alice"
rt.CreateUser(username, []string{"*"})
btc := btcRunner.NewBlipTesterClientOptsWithRT(rt, &BlipTesterClientOpts{Username: username})
var blipContextClosed atomic.Bool
btcRunner.clients[btc.id].pullReplication.bt.blipContext.OnExitCallback = func() {
blipContextClosed.Store(true)
}

shouldChanenlQueryError.Store(true)
btcRunner.StartPull(btc.id)

require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.True(c, blipContextClosed.Load())
}, time.Second*10, time.Millisecond*100)
})
}
9 changes: 7 additions & 2 deletions rest/blip_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,22 @@ func (h *handler) handleBLIPSync() error {
// error is checked at the time of database load, and ignored at this time
originPatterns, _ := hostOnlyCORS(h.db.CORS.Origin)

cancelCtx, cancelCtxFunc := context.WithCancel(h.db.DatabaseContext.CancelContext)
// Create a BLIP context:
blipContext, err := db.NewSGBlipContext(h.ctx(), "", originPatterns, h.db.DatabaseContext.CancelContext)
blipContext, err := db.NewSGBlipContext(h.ctx(), "", originPatterns, cancelCtx)
if err != nil {
cancelCtxFunc()
adamcfraser marked this conversation as resolved.
Show resolved Hide resolved
return err
}

// Overwrite the existing logging context with the blip context ID
h.rqCtx = base.CorrelationIDLogCtx(h.ctx(), base.FormatBlipContextID(blipContext.ID))
h.response.Header().Set(db.BLIPCorrelationIDResponseHeader, blipContext.ID)
// Create a new BlipSyncContext attached to the given blipContext.
ctx := db.NewBlipSyncContext(h.rqCtx, blipContext, h.db, h.formatSerialNumber(), db.BlipSyncStatsForCBL(h.db.DbStats))
ctx, err := db.NewBlipSyncContext(h.rqCtx, blipContext, h.db, h.formatSerialNumber(), db.BlipSyncStatsForCBL(h.db.DbStats), cancelCtxFunc)
if err != nil {
return err
}
defer ctx.Close()

auditFields := base.AuditFields{base.AuditFieldReplicationID: base.FormatBlipContextID(blipContext.ID)}
Expand Down