Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-4203 pass RevSeqNo for on demand imports #7273

Open
wants to merge 10 commits into
base: release/anemone
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 48 additions & 14 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,20 @@ func realDocID(docid string) string {
return docid
}

// getRevSeqNo fetches the revSeqNo for a document, using the virtual xattr if available, otherwise the document body. Returns the cas from this fetch
torcolvin marked this conversation as resolved.
Show resolved Hide resolved
func (c *DatabaseCollection) getRevSeqNo(ctx context.Context, docID string) (revSeqNo, cas uint64, err error) {
xattrs, cas, err := c.dataStore.GetXattrs(ctx, docID, []string{base.VirtualXattrRevSeqNo})
if err != nil {
return 0, 0, err
}
// CBG-4233: revSeqNo not implemented yet in rosmar
if c.dbCtx.BucketSpec.IsWalrusBucket() {
torcolvin marked this conversation as resolved.
Show resolved Hide resolved
return 0, cas, err
}
revSeqNo, err = unmarshalRevSeqNo(xattrs[base.VirtualXattrRevSeqNo])
return revSeqNo, cas, err
}

func (c *DatabaseCollection) GetDocument(ctx context.Context, docid string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) {
doc, _, err = c.GetDocumentWithRaw(ctx, docid, unmarshalLevel)
return doc, err
Expand All @@ -64,22 +78,29 @@ func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid strin
if err != nil {
return nil, nil, err
}

isSgWrite, crc32Match, _ := doc.IsSGWrite(ctx, rawBucketDoc.Body)
if crc32Match {
c.dbStats().Database().Crc32MatchCount.Add(1)
}

// If existing doc wasn't an SG Write, import the doc.
if !isSgWrite {
var importErr error
doc, importErr = c.OnDemandImportForGet(ctx, docid, rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas)
if importErr != nil {
return nil, nil, importErr
// reload to get revseqno
torcolvin marked this conversation as resolved.
Show resolved Hide resolved
doc, rawBucketDoc, err = c.getDocWithXattrs(ctx, key, append(c.syncGlobalSyncAndUserXattrKeys(), base.VirtualXattrRevSeqNo), unmarshalLevel)
if err != nil {
return nil, nil, err
}
// nil, nil returned when ErrImportCancelled is swallowed by importDoc switch
if doc == nil {
return nil, nil, base.ErrNotFound
isSgWrite, _, _ := doc.IsSGWrite(ctx, rawBucketDoc.Body)
if !isSgWrite {
var importErr error
doc, importErr = c.OnDemandImportForGet(ctx, docid, rawBucketDoc.Body, doc.RevSeqNo, rawBucketDoc.Xattrs, rawBucketDoc.Cas)
if importErr != nil {
return nil, nil, importErr
}
// nil, nil returned when ErrImportCancelled is swallowed by importDoc switch
if doc == nil {
return nil, nil, base.ErrNotFound
}
}
}
if !doc.HasValidSyncData() {
Expand Down Expand Up @@ -114,10 +135,16 @@ func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid strin
return doc, rawBucketDoc, nil
}

// GetDocWithXattrs retrieves a document from the bucket, including sync gateway metadta xattrs, and the user xattr, if specified.
func (c *DatabaseCollection) GetDocWithXattrs(ctx context.Context, key string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, rawBucketDoc *sgbucket.BucketDocument, err error) {
return c.getDocWithXattrs(ctx, key, c.syncGlobalSyncAndUserXattrKeys(), unmarshalLevel)
}

// GetDocWithXattrs retrieves a document from the bucket, including sync gateway metadta xattrs, and the user xattr, if specified. Arbitrary xattrs can be passed into this function to allow VirtualXattrRevSeqNo to be returned and set on Document.
func (c *DatabaseCollection) getDocWithXattrs(ctx context.Context, key string, xattrKeys []string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, rawBucketDoc *sgbucket.BucketDocument, err error) {
rawBucketDoc = &sgbucket.BucketDocument{}
var getErr error
rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas, getErr = c.dataStore.GetWithXattrs(ctx, key, c.syncGlobalSyncAndUserXattrKeys())
rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas, getErr = c.dataStore.GetWithXattrs(ctx, key, xattrKeys)
if getErr != nil {
return nil, nil, getErr
}
Expand Down Expand Up @@ -163,7 +190,7 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) (
if !isSgWrite {
var importErr error

doc, importErr = c.OnDemandImportForGet(ctx, docid, rawDoc, xattrs, cas)
doc, importErr = c.OnDemandImportForGet(ctx, docid, rawDoc, doc.RevSeqNo, xattrs, cas)
if importErr != nil {
return emptySyncData, importErr
}
Expand Down Expand Up @@ -240,19 +267,18 @@ func (db *DatabaseCollection) GetDocSyncDataNoImport(ctx context.Context, docid

// OnDemandImportForGet. Attempts to import the doc based on the provided id, contents and cas. ImportDocRaw does cas retry handling
// if the document gets updated after the initial retrieval attempt that triggered this.
func (c *DatabaseCollection) OnDemandImportForGet(ctx context.Context, docid string, rawDoc []byte, xattrs map[string][]byte, cas uint64) (docOut *Document, err error) {
func (c *DatabaseCollection) OnDemandImportForGet(ctx context.Context, docid string, rawDoc []byte, revSeqNo uint64, xattrs map[string][]byte, cas uint64) (docOut *Document, err error) {
isDelete := rawDoc == nil
importDb := DatabaseCollectionWithUser{DatabaseCollection: c, user: nil}
var importErr error

importOpts := importDocOptions{
isDelete: isDelete,
mode: ImportOnDemand,
revSeqNo: 0, // pending work in CBG-4203
revSeqNo: revSeqNo,
expiry: nil,
}

// RevSeqNo is 0 here pending work in CBG-4203
docOut, importErr = importDb.ImportDocRaw(ctx, docid, rawDoc, xattrs, importOpts, cas)

if importErr == base.ErrImportCancelledFilter {
Expand Down Expand Up @@ -906,6 +932,13 @@ func (db *DatabaseCollectionWithUser) backupAncestorRevs(ctx context.Context, do
// ////// UPDATING DOCUMENTS:

func (db *DatabaseCollectionWithUser) OnDemandImportForWrite(ctx context.Context, docid string, doc *Document, deleted bool) error {
revSeqNo, cas, err := db.getRevSeqNo(ctx, docid)
if err != nil {
return err
}
if cas != doc.Cas {
return base.ErrCasFailureShouldRetry
}

// Check whether the doc requiring import is an SDK delete
isDelete := false
Expand All @@ -921,7 +954,7 @@ func (db *DatabaseCollectionWithUser) OnDemandImportForWrite(ctx context.Context
expiry: nil,
mode: ImportOnDemand,
isDelete: isDelete,
revSeqNo: 0, // pending work in CBG-4203
revSeqNo: revSeqNo,
}
importedDoc, importErr := importDb.ImportDoc(ctx, docid, doc, importOpts) // nolint:staticcheck

Expand Down Expand Up @@ -2405,6 +2438,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
if doc, err = db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll); err != nil {
return
}

prevCurrentRev = doc.CurrentRev

// Check whether Sync Data originated in body
Expand Down
2 changes: 1 addition & 1 deletion db/database_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (c *DatabaseCollection) syncGlobalSyncAndUserXattrKeys() []string {
func (c *DatabaseCollection) syncGlobalSyncMouRevSeqNoAndUserXattrKeys() []string {
xattrKeys := []string{base.SyncXattrName, base.VvXattrName}
if c.useMou() {
xattrKeys = append(xattrKeys, base.MouXattrName, base.VirtualXattrRevSeqNo, base.GlobalXattrName)
xattrKeys = append(xattrKeys, base.MouXattrName, base.GlobalXattrName)
}
userXattrKey := c.userXattrKey()
if userXattrKey != "" {
Expand Down
45 changes: 30 additions & 15 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,14 +430,19 @@ func unmarshalDocument(docid string, data []byte) (*Document, error) {
return doc, nil
}

func unmarshalDocumentWithXattrs(ctx context.Context, docid string, data, syncXattrData, hlvXattrData, mouXattrData, userXattrData, virtualXattr []byte, globalSyncData []byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) {

func unmarshalDocumentWithXattrs(ctx context.Context, docid string, data, syncXattrData, hlvXattrData, mouXattrData, userXattrData, revSeqNo []byte, globalSyncData []byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) {
if len(syncXattrData) == 0 && len(hlvXattrData) == 0 {
// If no xattr data, unmarshal as standard doc
doc, err = unmarshalDocument(docid, data)
if doc != nil {
doc.RevSeqNo, err = unmarshalRevSeqNo(revSeqNo)
if err != nil {
return nil, pkgerrors.WithStack(base.RedactErrorf("Failed convert rev seq number during UnmarshalWithXattrs() doc with id: %s. Error: %v", base.UD(doc.ID), err))
}
}
} else {
doc = NewDocument(docid)
err = doc.UnmarshalWithXattrs(ctx, data, syncXattrData, hlvXattrData, virtualXattr, globalSyncData, unmarshalLevel)
err = doc.UnmarshalWithXattrs(ctx, data, syncXattrData, hlvXattrData, revSeqNo, globalSyncData, unmarshalLevel)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -1117,7 +1122,7 @@ func (doc *Document) MarshalJSON() (data []byte, err error) {
// unmarshalLevel is anything less than the full document + metadata, the raw data is retained for subsequent
// lazy unmarshalling as needed.
// Must handle cases where document body and hlvXattrData are present without syncXattrData for all DocumentUnmarshalLevel
func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrData, hlvXattrData, virtualXattr []byte, globalSyncData []byte, unmarshalLevel DocumentUnmarshalLevel) error {
func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrData, hlvXattrData, revSeqNo []byte, globalSyncData []byte, unmarshalLevel DocumentUnmarshalLevel) error {
if doc.ID == "" {
base.WarnfCtx(ctx, "Attempted to unmarshal document without ID set")
return errors.New("Document was unmarshalled without ID set")
Expand All @@ -1140,18 +1145,11 @@ func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrDat
return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal HLV during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), err))
}
}
if virtualXattr != nil {
var revSeqNo string
err := base.JSONUnmarshal(virtualXattr, &revSeqNo)
if revSeqNo != nil {
var err error
doc.RevSeqNo, err = unmarshalRevSeqNo(revSeqNo)
if err != nil {
return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal doc virtual revSeqNo xattr during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), err))
}
if revSeqNo != "" {
revNo, err := strconv.ParseUint(revSeqNo, 10, 64)
if err != nil {
return pkgerrors.WithStack(base.RedactErrorf("Failed convert rev seq number %q during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", revSeqNo, base.UD(doc.ID), err))
}
doc.RevSeqNo = revNo
return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal RevSeqNo during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), err))
}
}
if len(globalSyncData) > 0 {
Expand Down Expand Up @@ -1385,3 +1383,20 @@ func (s *SyncData) GetRevAndVersion() (rav channels.RevAndVersion) {
}
return rav
}

// unmarshalRevSeqNo unmarshals the rev seq number from the provided bytes, expects a string representation of the uint64.
func unmarshalRevSeqNo(revSeqNoBytes []byte) (uint64, error) {
if len(revSeqNoBytes) == 0 {
return 0, nil
}
var revSeqNoString string
err := base.JSONUnmarshal(revSeqNoBytes, &revSeqNoString)
if err != nil {
return 0, fmt.Errorf("Failed to unmarshal rev seq number %s", revSeqNoBytes)
}
revSeqNo, err := strconv.ParseUint(revSeqNoString, 10, 64)
if err != nil {
return 0, fmt.Errorf("Failed convert rev seq number %s", revSeqNoBytes)
}
return revSeqNo, nil
}
118 changes: 88 additions & 30 deletions db/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,20 @@ func TestFeedImport(t *testing.T) {

// fetch the xattrs directly doc to confirm import (to avoid triggering on-demand import)
var syncData SyncData
xattrs, importCas, err := collection.dataStore.GetXattrs(ctx, key, []string{base.SyncXattrName})
xattrs, importCas, err := collection.dataStore.GetXattrs(ctx, key, []string{base.SyncXattrName, base.VirtualXattrRevSeqNo})
require.NoError(t, err)
syncXattr, ok := xattrs[base.SyncXattrName]
require.True(t, ok)
require.NoError(t, base.JSONUnmarshal(syncXattr, &syncData))
require.NotZero(t, syncData.Sequence, "Sequence should not be zero for imported doc")
revSeqNo := RetrieveDocRevSeqNo(t, xattrs[base.VirtualXattrRevSeqNo])
// CBG-4233: revSeqNo not implemented yet in rosmar
if !base.UnitTestUrlIsWalrus() {
require.NotZero(t, revSeqNo, "RevSeqNo should not be zero for imported doc")
}

// verify mou
xattrs, _, err = collection.dataStore.GetXattrs(ctx, key, []string{base.MouXattrName})
// verify mou and rev seqno
xattrs, _, err = collection.dataStore.GetXattrs(ctx, key, []string{base.MouXattrName, base.VirtualXattrRevSeqNo})
if db.UseMou() {
var mou *MetadataOnlyUpdate
require.NoError(t, err)
Expand All @@ -71,6 +76,11 @@ func TestFeedImport(t *testing.T) {
require.NoError(t, base.JSONUnmarshal(mouXattr, &mou))
require.Equal(t, base.CasToString(writeCas), mou.PreviousHexCAS)
require.Equal(t, base.CasToString(importCas), mou.HexCAS)
// CBG-4233: revSeqNo not implemented yet in rosmar
if !base.UnitTestUrlIsWalrus() {
// curr revSeqNo should be 2, so prev revSeqNo is 1
require.Equal(t, revSeqNo-1, mou.PreviousRevSeqNo)
}
} else {
// Expect not found fetching mou xattr
require.Error(t, err)
Expand Down Expand Up @@ -107,6 +117,10 @@ func TestOnDemandImportMou(t *testing.T) {
require.NotNil(t, doc.MetadataOnlyUpdate)
require.Equal(t, base.CasToString(writeCas), doc.MetadataOnlyUpdate.PreviousHexCAS)
require.Equal(t, base.CasToString(doc.Cas), doc.MetadataOnlyUpdate.HexCAS)
// CBG-4233: revSeqNo not implemented yet in rosmar
if !base.UnitTestUrlIsWalrus() {
require.Equal(t, uint64(1), doc.MetadataOnlyUpdate.PreviousRevSeqNo)
}
} else {
require.Nil(t, doc.MetadataOnlyUpdate)
}
Expand All @@ -115,34 +129,78 @@ func TestOnDemandImportMou(t *testing.T) {
// On-demand write
// Create via the SDK
t.Run("on-demand write", func(t *testing.T) {
writeKey := baseKey + "write"
bodyBytes := []byte(`{"foo":"bar"}`)
body := Body{}
err := body.Unmarshal(bodyBytes)
assert.NoError(t, err, "Error unmarshalling body")
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
writeCas, err := collection.dataStore.WriteCas(writeKey, 0, 0, bodyBytes, 0)
require.NoError(t, err)

// Update the document to trigger on-demand import. Write will be a conflict, but import should be performed
_, doc, err := collection.Put(ctx, writeKey, Body{"foo": "baz"})
require.Nil(t, doc)
assertHTTPError(t, err, 409)
for _, funcName := range []string{"Put", "PutExistingRev", "PutExistingCurrentVersion", "PutExistingRevWithConflictResolution"} {
t.Run(funcName, func(t *testing.T) {
writeKey := baseKey + "_" + funcName
bodyBytes := []byte(`{"foo":"bar"}`)
body := Body{}
err := body.Unmarshal(bodyBytes)
assert.NoError(t, err, "Error unmarshalling body")
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
writeCas, err := collection.dataStore.WriteCas(writeKey, 0, 0, bodyBytes, 0)
require.NoError(t, err)

newDoc := &Document{
ID: writeKey,
}
newDoc.UpdateBodyBytes([]byte(`{"foo": "baz"}`))

_, rawBucketDoc, err := collection.GetDocumentWithRaw(ctx, writeKey, DocUnmarshalSync)
require.NoError(t, err)

switch funcName {
case "Put":
// Update the document to trigger on-demand import. Write will be a conflict, but import should be performed
_, doc, err := collection.Put(ctx, writeKey, Body{"foo": "baz"})
require.Nil(t, doc)
assertHTTPError(t, err, 409)
case "PutExistingRev":
fakeRevID := "1-abc"
docHistory := []string{fakeRevID}
noConflicts := true
forceAllowConflictingTombstone := false
_, _, err := collection.PutExistingRev(ctx, newDoc, docHistory, noConflicts, forceAllowConflictingTombstone, rawBucketDoc, ExistingVersionWithUpdateToHLV)
assertHTTPError(t, err, 409)
case "PutExistingCurrentVersion":
hlv := NewHybridLogicalVector()
var legacyRevList []string
_, _, _, err = collection.PutExistingCurrentVersion(ctx, newDoc, hlv, rawBucketDoc, legacyRevList)
assertHTTPError(t, err, 409)
case "PutExistingRevWithConflictResolution":
fakeRevID := "1-abc"
docHistory := []string{fakeRevID}
noConflicts := true
forceAllowConflictingTombstone := false
conflictResolverFunc, err := NewConflictResolverFunc(ctx, ConflictResolverRemoteWins, "", time.Duration(base.DefaultJavascriptTimeoutSecs)*time.Second)
require.NoError(t, err)
conflictResolver := NewConflictResolver(conflictResolverFunc, nil)
_, _, err = collection.PutExistingRevWithConflictResolution(ctx, newDoc, docHistory, noConflicts, conflictResolver, forceAllowConflictingTombstone, rawBucketDoc, ExistingVersionWithUpdateToHLV)
require.NoError(t, err)
assertHTTPError(t, err, 409)
default:
require.FailNow(t, fmt.Sprintf("unexpected funcName: %s", funcName))
}

// fetch the mou xattr directly doc to confirm import (to avoid triggering on-demand get import)
// verify mou
xattrs, importCas, err := collection.dataStore.GetXattrs(ctx, writeKey, []string{base.MouXattrName})
if db.UseMou() {
require.NoError(t, err)
mouXattr, mouOk := xattrs[base.MouXattrName]
var mou *MetadataOnlyUpdate
require.True(t, mouOk)
require.NoError(t, base.JSONUnmarshal(mouXattr, &mou))
require.Equal(t, base.CasToString(writeCas), mou.PreviousHexCAS)
require.Equal(t, base.CasToString(importCas), mou.HexCAS)
} else {
// expect not found fetching mou xattr
require.Error(t, err)
// fetch the mou xattr directly doc to confirm import (to avoid triggering on-demand get import)
// verify mou
xattrs, importCas, err := collection.dataStore.GetXattrs(ctx, writeKey, []string{base.MouXattrName})
if db.UseMou() {
require.NoError(t, err)
mouXattr, mouOk := xattrs[base.MouXattrName]
var mou *MetadataOnlyUpdate
require.True(t, mouOk)
require.NoError(t, base.JSONUnmarshal(mouXattr, &mou))
require.Equal(t, base.CasToString(writeCas), mou.PreviousHexCAS)
require.Equal(t, base.CasToString(importCas), mou.HexCAS)
// CBG-4233: revSeqNo not implemented yet in rosmar
if !base.UnitTestUrlIsWalrus() {
require.Equal(t, uint64(1), mou.PreviousRevSeqNo)
}
} else {
// expect not found fetching mou xattr
require.Error(t, err)
}
})
}
})

Expand Down
Loading
Loading