From 64dda15141d36bdb4306caf987b6d96bfd5ef4c0 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 29 Jan 2024 15:58:39 -0500 Subject: [PATCH] Setup defined (and configurable) behavior if a ZedToken from an older datastore is used All ZedTokens are now minted with the datastore's unique ID included in the ZedToken and that ID is checked when the ZedToken is decoded. In scenarios where the datastore ID does not match, either an error is raised (watch, at_exact_snapshot) or configurable behavior is used (at_least_as_fresh) Fixes #1541 --- e2e/newenemy/newenemy_test.go | 4 +- internal/datastore/proxy/proxy_test/mock.go | 8 +- .../datastore/revisions/commonrevision.go | 9 +- .../middleware/consistency/consistency.go | 81 ++++++-- .../consistency/consistency_test.go | 190 +++++++++++++++++- .../services/integrationtesting/cert_test.go | 8 +- .../consistencytestutil/servicetester.go | 12 +- .../services/integrationtesting/perf_test.go | 2 +- internal/services/v1/debug_test.go | 2 +- internal/services/v1/metadata_test.go | 8 +- internal/services/v1/permissions_test.go | 32 +-- internal/services/v1/relationships.go | 14 +- internal/services/v1/relationships_test.go | 12 +- internal/services/v1/schema.go | 14 +- internal/services/v1/watch.go | 13 +- internal/services/v1/watch_test.go | 2 +- internal/testserver/server.go | 4 +- pkg/cmd/serve.go | 1 + pkg/cmd/server/defaults.go | 19 +- pkg/cmd/server/server.go | 21 ++ pkg/cmd/server/server_test.go | 5 +- pkg/cmd/server/zz_generated.options.go | 9 + pkg/cmd/testserver/testserver.go | 8 +- pkg/cursor/cursor.go | 26 ++- pkg/cursor/cursor_test.go | 5 +- pkg/development/devcontext.go | 4 +- pkg/proto/impl/v1/impl.pb.go | 36 +++- pkg/proto/impl/v1/impl.pb.validate.go | 4 + pkg/proto/impl/v1/impl_vtproto.pb.go | 94 +++++++++ pkg/zedtoken/zedtoken.go | 73 +++++-- pkg/zedtoken/zedtoken_test.go | 101 ++++++++-- proto/internal/impl/v1/impl.proto | 8 + 32 files changed, 691 insertions(+), 138 deletions(-) diff --git a/e2e/newenemy/newenemy_test.go b/e2e/newenemy/newenemy_test.go index f4bfd3389d..7208cddaf4 100644 --- a/e2e/newenemy/newenemy_test.go +++ b/e2e/newenemy/newenemy_test.go @@ -396,8 +396,8 @@ func checkDataNoNewEnemy(ctx context.Context, t testing.TB, slowNodeID int, crdb ns2AllowlistLeader := getLeaderNodeForNamespace(ctx, crdb[2].Conn(), allowlists[i].Relationship.Subject.Object.ObjectType) r1leader, r2leader := getLeaderNode(ctx, crdb[2].Conn(), blockusers[i].Relationship), getLeaderNode(ctx, crdb[2].Conn(), allowlists[i].Relationship) - z1, _ := zedtoken.DecodeRevision(r1.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) - z2, _ := zedtoken.DecodeRevision(r2.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) + z1, _, _ := zedtoken.DecodeRevision(r1.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) + z2, _, _ := zedtoken.DecodeRevision(r2.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) t.Log(sleep, z1, z2, z1.GreaterThan(z2), r1leader, r2leader, ns1BlocklistLeader, ns1UserLeader, ns2ResourceLeader, ns2AllowlistLeader) if z1.GreaterThan(z2) { diff --git a/internal/datastore/proxy/proxy_test/mock.go b/internal/datastore/proxy/proxy_test/mock.go index b6b8e90968..a0d6da1da9 100644 --- a/internal/datastore/proxy/proxy_test/mock.go +++ b/internal/datastore/proxy/proxy_test/mock.go @@ -13,10 +13,16 @@ import ( type MockDatastore struct { mock.Mock + + CurrentUniqueID string } func (dm *MockDatastore) UniqueID(_ context.Context) (string, error) { - return "mockds", nil + if dm.CurrentUniqueID == "" { + return "mockds", nil + } + + return dm.CurrentUniqueID, nil } func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader { diff --git a/internal/datastore/revisions/commonrevision.go b/internal/datastore/revisions/commonrevision.go index de85496d9b..c1adc51fca 100644 --- a/internal/datastore/revisions/commonrevision.go +++ b/internal/datastore/revisions/commonrevision.go @@ -1,6 +1,8 @@ package revisions import ( + "context" + "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/spiceerrors" ) @@ -43,7 +45,12 @@ func RevisionParser(kind RevisionKind) ParsingFunc { // CommonDecoder is a revision decoder that can decode revisions of a given kind. type CommonDecoder struct { - Kind RevisionKind + Kind RevisionKind + DatastoreUniqueID string +} + +func (cd CommonDecoder) UniqueID(_ context.Context) (string, error) { + return cd.DatastoreUniqueID, nil } func (cd CommonDecoder) RevisionFromString(s string) (datastore.Revision, error) { diff --git a/internal/middleware/consistency/consistency.go b/internal/middleware/consistency/consistency.go index c928e6b88a..ac6e222870 100644 --- a/internal/middleware/consistency/consistency.go +++ b/internal/middleware/consistency/consistency.go @@ -18,6 +18,7 @@ import ( "github.com/authzed/spicedb/internal/services/shared" "github.com/authzed/spicedb/pkg/cursor" "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/spiceerrors" "github.com/authzed/spicedb/pkg/zedtoken" ) @@ -55,19 +56,39 @@ func RevisionFromContext(ctx context.Context) (datastore.Revision, *v1.ZedToken, handle := c.(*revisionHandle) rev := handle.revision if rev != nil { - return rev, zedtoken.MustNewFromRevision(rev), nil + ds := datastoremw.FromContext(ctx) + if ds == nil { + return nil, nil, spiceerrors.MustBugf("consistency middleware did not inject datastore") + } + + zedToken, err := zedtoken.NewFromRevision(ctx, rev, ds) + if err != nil { + return nil, nil, err + } + + return rev, zedToken, nil } } return nil, nil, fmt.Errorf("consistency middleware did not inject revision") } +type MismatchingTokenOption int + +const ( + TreatMismatchingTokensAsFullConsistency MismatchingTokenOption = iota + + TreatMismatchingTokensAsMinLatency + + TreatMismatchingTokensAsError +) + // AddRevisionToContext adds a revision to the given context, based on the consistency block found // in the given request (if applicable). -func AddRevisionToContext(ctx context.Context, req interface{}, ds datastore.Datastore) error { +func AddRevisionToContext(ctx context.Context, req interface{}, ds datastore.Datastore, option MismatchingTokenOption) error { switch req := req.(type) { case hasConsistency: - return addRevisionToContextFromConsistency(ctx, req, ds) + return addRevisionToContextFromConsistency(ctx, req, ds, option) default: return nil } @@ -75,7 +96,7 @@ func AddRevisionToContext(ctx context.Context, req interface{}, ds datastore.Dat // addRevisionToContextFromConsistency adds a revision to the given context, based on the consistency block found // in the given request (if applicable). -func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency, ds datastore.Datastore) error { +func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency, ds datastore.Datastore, option MismatchingTokenOption) error { handle := ctx.Value(revisionKey) if handle == nil { return nil @@ -91,7 +112,7 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency // Always use the revision encoded in the cursor. ConsistentyCounter.WithLabelValues("snapshot", "cursor").Inc() - requestedRev, err := cursor.DecodeToDispatchRevision(withOptionalCursor.GetOptionalCursor(), ds) + requestedRev, _, err := cursor.DecodeToDispatchRevision(ctx, withOptionalCursor.GetOptionalCursor(), ds) if err != nil { return rewriteDatastoreError(ctx, err) } @@ -130,7 +151,7 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency case consistency.GetAtLeastAsFresh() != nil: // At least as fresh as: Pick one of the datastore's revision and that specified, which // ever is later. - picked, pickedRequest, err := pickBestRevision(ctx, consistency.GetAtLeastAsFresh(), ds) + picked, pickedRequest, err := pickBestRevision(ctx, consistency.GetAtLeastAsFresh(), ds, option) if err != nil { return rewriteDatastoreError(ctx, err) } @@ -147,11 +168,16 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency // Exact snapshot: Use the revision as encoded in the zed token. ConsistentyCounter.WithLabelValues("snapshot", "request").Inc() - requestedRev, err := zedtoken.DecodeRevision(consistency.GetAtExactSnapshot(), ds) + requestedRev, status, err := zedtoken.DecodeRevision(consistency.GetAtExactSnapshot(), ds) if err != nil { return errInvalidZedToken } + if status == zedtoken.StatusMismatchedDatastoreID { + log.Error().Str("zedtoken", consistency.GetAtExactSnapshot().Token).Msg("ZedToken specified references an older datastore but at-exact-snapshot was requested") + return fmt.Errorf("ZedToken specified references an older datastore but at-exact-snapshot was requested") + } + err = ds.CheckRevision(ctx, requestedRev) if err != nil { return rewriteDatastoreError(ctx, err) @@ -175,7 +201,7 @@ var bypassServiceWhitelist = map[string]struct{}{ // UnaryServerInterceptor returns a new unary server interceptor that performs per-request exchange of // the specified consistency configuration for the revision at which to perform the request. -func UnaryServerInterceptor() grpc.UnaryServerInterceptor { +func UnaryServerInterceptor(option MismatchingTokenOption) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { for bypass := range bypassServiceWhitelist { if strings.HasPrefix(info.FullMethod, bypass) { @@ -184,7 +210,7 @@ func UnaryServerInterceptor() grpc.UnaryServerInterceptor { } ds := datastoremw.MustFromContext(ctx) newCtx := ContextWithHandle(ctx) - if err := AddRevisionToContext(newCtx, req, ds); err != nil { + if err := AddRevisionToContext(newCtx, req, ds, option); err != nil { return nil, err } @@ -194,21 +220,22 @@ func UnaryServerInterceptor() grpc.UnaryServerInterceptor { // StreamServerInterceptor returns a new stream server interceptor that performs per-request exchange of // the specified consistency configuration for the revision at which to perform the request. -func StreamServerInterceptor() grpc.StreamServerInterceptor { +func StreamServerInterceptor(option MismatchingTokenOption) grpc.StreamServerInterceptor { return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { for bypass := range bypassServiceWhitelist { if strings.HasPrefix(info.FullMethod, bypass) { return handler(srv, stream) } } - wrapper := &recvWrapper{stream, ContextWithHandle(stream.Context())} + wrapper := &recvWrapper{stream, ContextWithHandle(stream.Context()), option} return handler(srv, wrapper) } } type recvWrapper struct { grpc.ServerStream - ctx context.Context + ctx context.Context + option MismatchingTokenOption } func (s *recvWrapper) Context() context.Context { return s.ctx } @@ -219,12 +246,12 @@ func (s *recvWrapper) RecvMsg(m interface{}) error { } ds := datastoremw.MustFromContext(s.ctx) - return AddRevisionToContext(s.ctx, m, ds) + return AddRevisionToContext(s.ctx, m, ds, s.option) } // pickBestRevision compares the provided ZedToken with the optimized revision of the datastore, and returns the most // recent one. The boolean return value will be true if the provided ZedToken is the most recent, false otherwise. -func pickBestRevision(ctx context.Context, requested *v1.ZedToken, ds datastore.Datastore) (datastore.Revision, bool, error) { +func pickBestRevision(ctx context.Context, requested *v1.ZedToken, ds datastore.Datastore, option MismatchingTokenOption) (datastore.Revision, bool, error) { // Calculate a revision as we see fit databaseRev, err := ds.OptimizedRevision(ctx) if err != nil { @@ -232,11 +259,35 @@ func pickBestRevision(ctx context.Context, requested *v1.ZedToken, ds datastore. } if requested != nil { - requestedRev, err := zedtoken.DecodeRevision(requested, ds) + requestedRev, status, err := zedtoken.DecodeRevision(requested, ds) if err != nil { return datastore.NoRevision, false, errInvalidZedToken } + if status == zedtoken.StatusMismatchedDatastoreID { + switch option { + case TreatMismatchingTokensAsFullConsistency: + log.Warn().Str("zedtoken", requested.Token).Msg("ZedToken specified references an older datastore and SpiceDB is configured to treat this as a full consistency request") + headRev, err := ds.HeadRevision(ctx) + if err != nil { + return datastore.NoRevision, false, err + } + + return headRev, false, nil + + case TreatMismatchingTokensAsMinLatency: + log.Warn().Str("zedtoken", requested.Token).Msg("ZedToken specified references an older datastore and SpiceDB is configured to treat this as a min latency request") + return databaseRev, false, nil + + case TreatMismatchingTokensAsError: + log.Error().Str("zedtoken", requested.Token).Msg("ZedToken specified references an older datastore and SpiceDB is configured to raise an error in this scenario") + return datastore.NoRevision, false, fmt.Errorf("ZedToken specified references an older datastore and SpiceDB is configured to raise an error in this scenario") + + default: + return datastore.NoRevision, false, spiceerrors.MustBugf("unknown mismatching token option: %v", option) + } + } + if databaseRev.GreaterThan(requestedRev) { return databaseRev, false, nil } diff --git a/internal/middleware/consistency/consistency_test.go b/internal/middleware/consistency/consistency_test.go index 58a2555246..e28a816805 100644 --- a/internal/middleware/consistency/consistency_test.go +++ b/internal/middleware/consistency/consistency_test.go @@ -10,6 +10,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/proxy/proxy_test" "github.com/authzed/spicedb/internal/datastore/revisions" + datastoremw "github.com/authzed/spicedb/internal/middleware/datastore" "github.com/authzed/spicedb/pkg/cursor" dispatch "github.com/authzed/spicedb/pkg/proto/dispatch/v1" "github.com/authzed/spicedb/pkg/zedtoken" @@ -29,7 +30,9 @@ func TestAddRevisionToContextNoneSupplied(t *testing.T) { ds.On("OptimizedRevision").Return(optimized, nil).Once() updated := ContextWithHandle(context.Background()) - err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{}, ds) + updated = datastoremw.ContextWithDatastore(updated, ds) + + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{}, ds, TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -46,13 +49,15 @@ func TestAddRevisionToContextMinimizeLatency(t *testing.T) { ds.On("OptimizedRevision").Return(optimized, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_MinimizeLatency{ MinimizeLatency: true, }, }, - }, ds) + }, ds, TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -69,13 +74,15 @@ func TestAddRevisionToContextFullyConsistent(t *testing.T) { ds.On("HeadRevision").Return(head, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_FullyConsistent{ FullyConsistent: true, }, }, - }, ds) + }, ds, TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -93,13 +100,15 @@ func TestAddRevisionToContextAtLeastAsFresh(t *testing.T) { ds.On("RevisionFromString", exact.String()).Return(exact, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(exact), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(exact), }, }, - }, ds) + }, ds, TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -117,13 +126,15 @@ func TestAddRevisionToContextAtValidExactSnapshot(t *testing.T) { ds.On("RevisionFromString", exact.String()).Return(exact, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtExactSnapshot{ - AtExactSnapshot: zedtoken.MustNewFromRevision(exact), + AtExactSnapshot: zedtoken.MustNewFromRevisionForTesting(exact), }, }, - }, ds) + }, ds, TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -141,13 +152,15 @@ func TestAddRevisionToContextAtInvalidExactSnapshot(t *testing.T) { ds.On("RevisionFromString", zero.String()).Return(zero, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtExactSnapshot{ - AtExactSnapshot: zedtoken.MustNewFromRevision(zero), + AtExactSnapshot: zedtoken.MustNewFromRevisionForTesting(zero), }, }, - }, ds) + }, ds, TreatMismatchingTokensAsError) require.Error(err) ds.AssertExpectations(t) } @@ -155,7 +168,10 @@ func TestAddRevisionToContextAtInvalidExactSnapshot(t *testing.T) { func TestAddRevisionToContextNoConsistencyAPI(t *testing.T) { require := require.New(t) + ds := &proxy_test.MockDatastore{} + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) _, _, err := RevisionFromContext(updated) require.Error(err) @@ -174,14 +190,16 @@ func TestAddRevisionToContextWithCursor(t *testing.T) { // revision in context is at `exact` updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtExactSnapshot{ - AtExactSnapshot: zedtoken.MustNewFromRevision(exact), + AtExactSnapshot: zedtoken.MustNewFromRevisionForTesting(exact), }, }, OptionalCursor: cursor, - }, ds) + }, ds, TreatMismatchingTokensAsError) require.NoError(err) // ensure we get back `optimized` from the cursor @@ -191,3 +209,153 @@ func TestAddRevisionToContextWithCursor(t *testing.T) { require.True(optimized.Equal(rev)) ds.AssertExpectations(t) } + +func TestAtExactSnapshotWithMismatchedToken(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtExactSnapshot{ + AtExactSnapshot: zedToken, + }, + }, + }, ds, TreatMismatchingTokensAsError) + require.Error(err) + require.ErrorContains(err, "ZedToken specified references an older datastore but at-exact-snapshot") +} + +func TestAtLeastAsFreshWithMismatchedTokenExpectError(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedToken, + }, + }, + }, ds, TreatMismatchingTokensAsError) + require.Error(err) + require.ErrorContains(err, "ZedToken specified references an older datastore and SpiceDB is configured to raise an error in this scenario") +} + +func TestAtLeastAsFreshWithMismatchedTokenExpectMinLatency(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedToken, + }, + }, + }, ds, TreatMismatchingTokensAsMinLatency) + require.NoError(err) + + rev, _, err := RevisionFromContext(updated) + require.NoError(err) + + require.True(optimized.Equal(rev)) + ds.AssertExpectations(t) +} + +func TestAtLeastAsFreshWithMismatchedTokenExpectFullConsistency(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("HeadRevision").Return(head, nil).Once() + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedToken, + }, + }, + }, ds, TreatMismatchingTokensAsFullConsistency) + require.NoError(err) + + rev, _, err := RevisionFromContext(updated) + require.NoError(err) + + require.True(head.Equal(rev)) + ds.AssertExpectations(t) +} + +func TestAddRevisionToContextAtLeastAsFreshMatchingIDs(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", exact.String()).Return(exact, nil).Once() + + ds.CurrentUniqueID = "foo" + + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(exact), + }, + }, + }, ds, TreatMismatchingTokensAsError) + require.NoError(err) + + rev, _, err := RevisionFromContext(updated) + require.NoError(err) + + require.True(exact.Equal(rev)) + ds.AssertExpectations(t) +} diff --git a/internal/services/integrationtesting/cert_test.go b/internal/services/integrationtesting/cert_test.go index 516aa6de19..4ccdf27271 100644 --- a/internal/services/integrationtesting/cert_test.go +++ b/internal/services/integrationtesting/cert_test.go @@ -146,7 +146,7 @@ func TestCertRotation(t *testing.T) { }, { Name: "consistency", - Middleware: consistency.UnaryServerInterceptor(), + Middleware: consistency.UnaryServerInterceptor(consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", @@ -165,7 +165,7 @@ func TestCertRotation(t *testing.T) { }, { Name: "consistency", - Middleware: consistency.StreamServerInterceptor(), + Middleware: consistency.StreamServerInterceptor(consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", @@ -209,7 +209,7 @@ func TestCertRotation(t *testing.T) { _, err = client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: rel.Resource, @@ -262,7 +262,7 @@ func TestCertRotation(t *testing.T) { _, err = client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: rel.Resource, diff --git a/internal/services/integrationtesting/consistencytestutil/servicetester.go b/internal/services/integrationtesting/consistencytestutil/servicetester.go index c26c012c44..12ebb9f3ba 100644 --- a/internal/services/integrationtesting/consistencytestutil/servicetester.go +++ b/internal/services/integrationtesting/consistencytestutil/servicetester.go @@ -76,7 +76,7 @@ func (v1st v1ServiceTester) Check(ctx context.Context, resource *core.ObjectAndR }, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, Context: context, @@ -96,7 +96,7 @@ func (v1st v1ServiceTester) Expand(ctx context.Context, resource *core.ObjectAnd Permission: resource.Relation, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, }) @@ -126,7 +126,7 @@ func (v1st v1ServiceTester) Read(_ context.Context, namespaceName string, atRevi }, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, }) @@ -164,7 +164,7 @@ func (v1st v1ServiceTester) LookupResources(_ context.Context, resourceRelation }, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, OptionalLimit: limit, @@ -212,7 +212,7 @@ func (v1st v1ServiceTester) LookupSubjects(_ context.Context, resource *core.Obj OptionalSubjectRelation: optionalizeRelation(subjectRelation.Relation), Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, Context: builtContext, @@ -242,7 +242,7 @@ func (v1st v1ServiceTester) BulkCheck(ctx context.Context, items []*v1.BulkCheck Items: items, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, }) diff --git a/internal/services/integrationtesting/perf_test.go b/internal/services/integrationtesting/perf_test.go index f411bce80e..d2ab1786b9 100644 --- a/internal/services/integrationtesting/perf_test.go +++ b/internal/services/integrationtesting/perf_test.go @@ -58,7 +58,7 @@ func TestBurst(t *testing.T) { _, err := client.CheckPermission(context.Background(), &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: rel.Resource, diff --git a/internal/services/v1/debug_test.go b/internal/services/v1/debug_test.go index b925a33609..0fc8c65915 100644 --- a/internal/services/v1/debug_test.go +++ b/internal/services/v1/debug_test.go @@ -468,7 +468,7 @@ func TestCheckPermissionWithDebug(t *testing.T) { checkResp, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: stc.checkRequest.resource, diff --git a/internal/services/v1/metadata_test.go b/internal/services/v1/metadata_test.go index 7759431dcd..02fd741593 100644 --- a/internal/services/v1/metadata_test.go +++ b/internal/services/v1/metadata_test.go @@ -38,7 +38,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { _, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), @@ -77,7 +77,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { _, err := client.ExpandPermissionTree(ctx, &v1.ExpandPermissionTreeRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), @@ -111,7 +111,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { stream, err := client.LookupResources(ctx, &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, ResourceObjectType: "document", @@ -136,7 +136,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { stream, err := client.LookupSubjects(ctx, &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), diff --git a/internal/services/v1/permissions_test.go b/internal/services/v1/permissions_test.go index 95dbe2b23b..7fb5ca64b0 100644 --- a/internal/services/v1/permissions_test.go +++ b/internal/services/v1/permissions_test.go @@ -270,7 +270,7 @@ func TestCheckPermissions(t *testing.T) { checkResp, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: tc.resource, @@ -327,7 +327,7 @@ func TestCheckPermissionWithDebugInfo(t *testing.T) { checkResp, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), @@ -544,7 +544,7 @@ func TestLookupResources(t *testing.T) { Subject: tc.subject, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }, grpc.Trailer(&trailer)) @@ -619,7 +619,7 @@ func TestExpand(t *testing.T) { Permission: tc.startPermission, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }, grpc.Trailer(&trailer)) @@ -863,7 +863,7 @@ func TestLookupSubjects(t *testing.T) { OptionalSubjectRelation: tc.subjectRelation, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }, grpc.Trailer(&trailer)) @@ -911,7 +911,7 @@ func TestCheckWithCaveats(t *testing.T) { request := &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "companyplan"), @@ -1023,7 +1023,7 @@ func TestCheckWithCaveatErrors(t *testing.T) { request := &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "firstdoc"), @@ -1076,7 +1076,7 @@ func TestLookupResourcesWithCaveats(t *testing.T) { request := &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, ResourceObjectType: "document", @@ -1122,7 +1122,7 @@ func TestLookupResourcesWithCaveats(t *testing.T) { request = &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, ResourceObjectType: "document", @@ -1195,7 +1195,7 @@ func TestLookupSubjectsWithCaveats(t *testing.T) { request := &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1240,7 +1240,7 @@ func TestLookupSubjectsWithCaveats(t *testing.T) { request = &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1285,7 +1285,7 @@ func TestLookupSubjectsWithCaveats(t *testing.T) { request = &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1359,7 +1359,7 @@ func TestLookupSubjectsWithCaveatedWildcards(t *testing.T) { request := &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1398,7 +1398,7 @@ func TestLookupSubjectsWithCaveatedWildcards(t *testing.T) { request = &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1542,7 +1542,7 @@ func TestLookupResourcesWithCursors(t *testing.T) { Subject: tc.subject, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, OptionalLimit: uint32(limit), @@ -1611,7 +1611,7 @@ func TestLookupResourcesDeduplication(t *testing.T) { Subject: sub("user", "tom", ""), Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }) diff --git a/internal/services/v1/relationships.go b/internal/services/v1/relationships.go index f75f220f14..e43e0063b8 100644 --- a/internal/services/v1/relationships.go +++ b/internal/services/v1/relationships.go @@ -330,8 +330,13 @@ func (ps *permissionServer) WriteRelationships(ctx context.Context, req *v1.Writ writeUpdateCounter.WithLabelValues(v1.RelationshipUpdate_Operation_name[int32(kind)]).Observe(float64(count)) } + zedToken, err := zedtoken.NewFromRevision(ctx, revision, ds) + if err != nil { + return nil, ps.rewriteError(ctx, err) + } + return &v1.WriteRelationshipsResponse{ - WrittenAt: zedtoken.MustNewFromRevision(revision), + WrittenAt: zedToken, }, nil } @@ -404,8 +409,13 @@ func (ps *permissionServer) DeleteRelationships(ctx context.Context, req *v1.Del return nil, ps.rewriteError(ctx, err) } + zedToken, err := zedtoken.NewFromRevision(ctx, revision, ds) + if err != nil { + return nil, ps.rewriteError(ctx, err) + } + return &v1.DeleteRelationshipsResponse{ - DeletedAt: zedtoken.MustNewFromRevision(revision), + DeletedAt: zedToken, DeletionProgress: deletionProgress, }, nil } diff --git a/internal/services/v1/relationships_test.go b/internal/services/v1/relationships_test.go index 2683fdfbf4..b95385a6d4 100644 --- a/internal/services/v1/relationships_test.go +++ b/internal/services/v1/relationships_test.go @@ -230,7 +230,7 @@ func TestReadRelationships(t *testing.T) { stream, err := client.ReadRelationships(context.Background(), &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, RelationshipFilter: tc.filter, @@ -1035,7 +1035,7 @@ func TestDeleteRelationships(t *testing.T) { } require.NoError(err) require.NotNil(resp.DeletedAt) - rev, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) + rev, _, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) require.NoError(err) require.True(rev.GreaterThan(revision)) require.EqualValues(standardTuplesWithout(tc.deleted), readAll(require, client, resp.DeletedAt)) @@ -1111,7 +1111,7 @@ func TestDeleteRelationshipsBeyondLimitPartial(t *testing.T) { headRev, err := ds.HeadRevision(context.Background()) require.NoError(err) - beforeDelete := readOfType(require, "document", client, zedtoken.MustNewFromRevision(headRev)) + beforeDelete := readOfType(require, "document", client, zedtoken.MustNewFromRevisionForTesting(headRev)) resp, err := client.DeleteRelationships(context.Background(), &v1.DeleteRelationshipsRequest{ RelationshipFilter: &v1.RelationshipFilter{ @@ -1125,7 +1125,7 @@ func TestDeleteRelationshipsBeyondLimitPartial(t *testing.T) { headRev, err = ds.HeadRevision(context.Background()) require.NoError(err) - afterDelete := readOfType(require, "document", client, zedtoken.MustNewFromRevision(headRev)) + afterDelete := readOfType(require, "document", client, zedtoken.MustNewFromRevisionForTesting(headRev)) require.LessOrEqual(len(beforeDelete)-len(afterDelete), batchSize) if i == 0 { @@ -1136,7 +1136,7 @@ func TestDeleteRelationshipsBeyondLimitPartial(t *testing.T) { require.NoError(err) require.NotNil(resp.DeletedAt) - rev, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) + rev, _, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) require.NoError(err) require.True(rev.GreaterThan(revision)) require.EqualValues(standardTuplesWithout(expected), readAll(require, client, resp.DeletedAt)) @@ -1393,7 +1393,7 @@ func TestReadRelationshipsInvalidCursor(t *testing.T) { stream, err := client.ReadRelationships(context.Background(), &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, RelationshipFilter: &v1.RelationshipFilter{ diff --git a/internal/services/v1/schema.go b/internal/services/v1/schema.go index 4d0805aed9..805e77100b 100644 --- a/internal/services/v1/schema.go +++ b/internal/services/v1/schema.go @@ -91,9 +91,14 @@ func (ss *schemaServer) ReadSchema(ctx context.Context, _ *v1.ReadSchemaRequest) DispatchCount: uint32(len(nsDefs) + len(caveatDefs)), }) + zedToken, err := zedtoken.NewFromRevision(ctx, headRevision, ds) + if err != nil { + return nil, ss.rewriteError(ctx, err) + } + return &v1.ReadSchemaResponse{ SchemaText: schemaText, - ReadAt: zedtoken.MustNewFromRevision(headRevision), + ReadAt: zedToken, }, nil } @@ -133,7 +138,12 @@ func (ss *schemaServer) WriteSchema(ctx context.Context, in *v1.WriteSchemaReque return nil, ss.rewriteError(ctx, err) } + zedToken, err := zedtoken.NewFromRevision(ctx, revision, ds) + if err != nil { + return nil, ss.rewriteError(ctx, err) + } + return &v1.WriteSchemaResponse{ - WrittenAt: zedtoken.MustNewFromRevision(revision), + WrittenAt: zedToken, }, nil } diff --git a/internal/services/v1/watch.go b/internal/services/v1/watch.go index 66d10df788..6f1b5ba554 100644 --- a/internal/services/v1/watch.go +++ b/internal/services/v1/watch.go @@ -48,11 +48,15 @@ func (ws *watchServer) Watch(req *v1.WatchRequest, stream v1.WatchService_WatchS var afterRevision datastore.Revision if req.OptionalStartCursor != nil && req.OptionalStartCursor.Token != "" { - decodedRevision, err := zedtoken.DecodeRevision(req.OptionalStartCursor, ds) + decodedRevision, tokenStatus, err := zedtoken.DecodeRevision(req.OptionalStartCursor, ds) if err != nil { return status.Errorf(codes.InvalidArgument, "failed to decode start revision: %s", err) } + if tokenStatus == zedtoken.StatusMismatchedDatastoreID { + return status.Errorf(codes.InvalidArgument, "start revision was generated by a different datastore") + } + afterRevision = decodedRevision } else { var err error @@ -76,9 +80,14 @@ func (ws *watchServer) Watch(req *v1.WatchRequest, stream v1.WatchService_WatchS if ok { filtered := filterUpdates(objectTypesMap, update.RelationshipChanges) if len(filtered) > 0 { + zedToken, err := zedtoken.NewFromRevision(ctx, update.Revision, ds) + if err != nil { + return err + } + if err := stream.Send(&v1.WatchResponse{ Updates: filtered, - ChangesThrough: zedtoken.MustNewFromRevision(update.Revision), + ChangesThrough: zedToken, }); err != nil { return status.Errorf(codes.Canceled, "watch canceled by user: %s", err) } diff --git a/internal/services/v1/watch_test.go b/internal/services/v1/watch_test.go index 33e2218a39..c3b205bb47 100644 --- a/internal/services/v1/watch_test.go +++ b/internal/services/v1/watch_test.go @@ -107,7 +107,7 @@ func TestWatch(t *testing.T) { t.Cleanup(cleanup) client := v1.NewWatchServiceClient(conn) - cursor := zedtoken.MustNewFromRevision(revision) + cursor := zedtoken.MustNewFromRevisionForTesting(revision) if tc.startCursor != nil { cursor = tc.startCursor } diff --git a/internal/testserver/server.go b/internal/testserver/server.go index 108d3156c3..e4dbc25919 100644 --- a/internal/testserver/server.go +++ b/internal/testserver/server.go @@ -90,7 +90,7 @@ func NewTestServerWithConfig(require *require.Assertions, }, { Name: "consistency", - Middleware: consistency.UnaryServerInterceptor(), + Middleware: consistency.UnaryServerInterceptor(consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", @@ -113,7 +113,7 @@ func NewTestServerWithConfig(require *require.Assertions, }, { Name: "consistency", - Middleware: consistency.StreamServerInterceptor(), + Middleware: consistency.StreamServerInterceptor(consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", diff --git a/pkg/cmd/serve.go b/pkg/cmd/serve.go index ffdcc7dbf5..54ee2da3eb 100644 --- a/pkg/cmd/serve.go +++ b/pkg/cmd/serve.go @@ -132,6 +132,7 @@ func RegisterServeFlags(cmd *cobra.Command, config *server.Config) error { cmd.Flags().IntVar(&config.MaxRelationshipContextSize, "max-relationship-context-size", 25000, "maximum allowed size of the context to be stored in a relationship") cmd.Flags().DurationVar(&config.StreamingAPITimeout, "streaming-api-response-delay-timeout", 30*time.Second, "max duration time elapsed between messages sent by the server-side to the client (responses) before the stream times out") cmd.Flags().DurationVar(&config.WatchHeartbeat, "watch-api-heartbeat", 1*time.Second, "heartbeat time on the watch in the API. 0 means to default to the datastore's minimum.") + cmd.Flags().StringVar(&config.MismatchZedTokenBehavior, "mismatch-zed-token-behavior", "full-consistency", "behavior when a mismatched zedtoken is encountered. One of: full-consistency (treat as a full-consistency call), min-latency (treat as a min-latency call), error (return an error). defaults to full-consistency for safety.") cmd.Flags().BoolVar(&config.V1SchemaAdditiveOnly, "testing-only-schema-additive-writes", false, "append new definitions to the existing schema, rather than overwriting it") if err := cmd.Flags().MarkHidden("testing-only-schema-additive-writes"); err != nil { diff --git a/pkg/cmd/server/defaults.go b/pkg/cmd/server/defaults.go index 398217467a..d243ee77c3 100644 --- a/pkg/cmd/server/defaults.go +++ b/pkg/cmd/server/defaults.go @@ -139,13 +139,14 @@ const ( ) type MiddlewareOption struct { - logger zerolog.Logger - authFunc grpcauth.AuthFunc - enableVersionResponse bool - dispatcher dispatch.Dispatcher - ds datastore.Datastore - enableRequestLog bool - enableResponseLog bool + logger zerolog.Logger + authFunc grpcauth.AuthFunc + enableVersionResponse bool + dispatcher dispatch.Dispatcher + ds datastore.Datastore + enableRequestLog bool + enableResponseLog bool + mismatchingZedTokenOption consistencymw.MismatchingTokenOption } // DefaultUnaryMiddleware generates the default middleware chain used for the public SpiceDB Unary gRPC methods @@ -202,7 +203,7 @@ func DefaultUnaryMiddleware(opts MiddlewareOption) (*MiddlewareChain[grpc.UnaryS NewUnaryMiddleware(). WithName(DefaultInternalMiddlewareConsistency). WithInternal(true). - WithInterceptor(consistencymw.UnaryServerInterceptor()). + WithInterceptor(consistencymw.UnaryServerInterceptor(opts.mismatchingZedTokenOption)). Done(), NewUnaryMiddleware(). @@ -268,7 +269,7 @@ func DefaultStreamingMiddleware(opts MiddlewareOption) (*MiddlewareChain[grpc.St NewStreamMiddleware(). WithName(DefaultInternalMiddlewareConsistency). WithInternal(true). - WithInterceptor(consistencymw.StreamServerInterceptor()). + WithInterceptor(consistencymw.StreamServerInterceptor(opts.mismatchingZedTokenOption)). Done(), NewStreamMiddleware(). diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 563e5ce208..6da045572c 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -35,6 +35,7 @@ import ( "github.com/authzed/spicedb/internal/dispatch/graph" "github.com/authzed/spicedb/internal/gateway" log "github.com/authzed/spicedb/internal/logging" + "github.com/authzed/spicedb/internal/middleware/consistency" "github.com/authzed/spicedb/internal/services" dispatchSvc "github.com/authzed/spicedb/internal/services/dispatch" "github.com/authzed/spicedb/internal/services/health" @@ -112,6 +113,7 @@ type Config struct { MaxDatastoreReadPageSize uint64 `debugmap:"visible"` StreamingAPITimeout time.Duration `debugmap:"visible"` WatchHeartbeat time.Duration `debugmap:"visible"` + MismatchZedTokenBehavior string `debugmap:"visible"` // Additional Services MetricsAPI util.HTTPServerConfig `debugmap:"visible"` @@ -356,6 +358,24 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) { watchServiceOption = services.WatchServiceDisabled } + var mismatchZedTokenOption consistency.MismatchingTokenOption + switch c.MismatchZedTokenBehavior { + case "": + fallthrough + + case "full-consistency": + mismatchZedTokenOption = consistency.TreatMismatchingTokensAsFullConsistency + + case "min-latency": + mismatchZedTokenOption = consistency.TreatMismatchingTokensAsMinLatency + + case "error": + mismatchZedTokenOption = consistency.TreatMismatchingTokensAsError + + default: + return nil, fmt.Errorf("unknown mismatched zedtoken behavior: %s", c.MismatchZedTokenBehavior) + } + opts := MiddlewareOption{ log.Logger, c.GRPCAuthFunc, @@ -364,6 +384,7 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) { ds, c.EnableRequestLogs, c.EnableResponseLogs, + mismatchZedTokenOption, } defaultUnaryMiddlewareChain, err := DefaultUnaryMiddleware(opts) if err != nil { diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index f87f3f3847..2e30207bfe 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -9,6 +9,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/memdb" "github.com/authzed/spicedb/internal/logging" + "github.com/authzed/spicedb/internal/middleware/consistency" "github.com/authzed/spicedb/pkg/cmd/datastore" "github.com/authzed/spicedb/pkg/cmd/util" @@ -230,7 +231,7 @@ func TestModifyUnaryMiddleware(t *testing.T) { }, }} - opt := MiddlewareOption{logging.Logger, nil, false, nil, nil, false, false} + opt := MiddlewareOption{logging.Logger, nil, false, nil, nil, false, false, consistency.TreatMismatchingTokensAsFullConsistency} defaultMw, err := DefaultUnaryMiddleware(opt) require.NoError(t, err) @@ -256,7 +257,7 @@ func TestModifyStreamingMiddleware(t *testing.T) { }, }} - opt := MiddlewareOption{logging.Logger, nil, false, nil, nil, false, false} + opt := MiddlewareOption{logging.Logger, nil, false, nil, nil, false, false, consistency.TreatMismatchingTokensAsFullConsistency} defaultMw, err := DefaultStreamingMiddleware(opt) require.NoError(t, err) diff --git a/pkg/cmd/server/zz_generated.options.go b/pkg/cmd/server/zz_generated.options.go index 95b0d9f439..96cfd39684 100644 --- a/pkg/cmd/server/zz_generated.options.go +++ b/pkg/cmd/server/zz_generated.options.go @@ -81,6 +81,7 @@ func (c *Config) ToOption() ConfigOption { to.MaxDatastoreReadPageSize = c.MaxDatastoreReadPageSize to.StreamingAPITimeout = c.StreamingAPITimeout to.WatchHeartbeat = c.WatchHeartbeat + to.MismatchZedTokenBehavior = c.MismatchZedTokenBehavior to.MetricsAPI = c.MetricsAPI to.UnaryMiddlewareModification = c.UnaryMiddlewareModification to.StreamingMiddlewareModification = c.StreamingMiddlewareModification @@ -141,6 +142,7 @@ func (c Config) DebugMap() map[string]any { debugMap["MaxDatastoreReadPageSize"] = helpers.DebugValue(c.MaxDatastoreReadPageSize, false) debugMap["StreamingAPITimeout"] = helpers.DebugValue(c.StreamingAPITimeout, false) debugMap["WatchHeartbeat"] = helpers.DebugValue(c.WatchHeartbeat, false) + debugMap["MismatchZedTokenBehavior"] = helpers.DebugValue(c.MismatchZedTokenBehavior, false) debugMap["MetricsAPI"] = helpers.DebugValue(c.MetricsAPI, false) debugMap["SilentlyDisableTelemetry"] = helpers.DebugValue(c.SilentlyDisableTelemetry, false) debugMap["TelemetryCAOverridePath"] = helpers.DebugValue(c.TelemetryCAOverridePath, false) @@ -496,6 +498,13 @@ func WithWatchHeartbeat(watchHeartbeat time.Duration) ConfigOption { } } +// WithMismatchZedTokenBehavior returns an option that can set MismatchZedTokenBehavior on a Config +func WithMismatchZedTokenBehavior(mismatchZedTokenBehavior string) ConfigOption { + return func(c *Config) { + c.MismatchZedTokenBehavior = mismatchZedTokenBehavior + } +} + // WithMetricsAPI returns an option that can set MetricsAPI on a Config func WithMetricsAPI(metricsAPI util.HTTPServerConfig) ConfigOption { return func(c *Config) { diff --git a/pkg/cmd/testserver/testserver.go b/pkg/cmd/testserver/testserver.go index 47b3f22877..c90a87b263 100644 --- a/pkg/cmd/testserver/testserver.go +++ b/pkg/cmd/testserver/testserver.go @@ -78,13 +78,13 @@ func (c *Config) Complete() (RunnableTestServer, error) { grpc.ChainUnaryInterceptor( datastoreMiddleware.UnaryServerInterceptor(), dispatchmw.UnaryServerInterceptor(dispatcher), - consistencymw.UnaryServerInterceptor(), + consistencymw.UnaryServerInterceptor(consistencymw.TreatMismatchingTokensAsError), servicespecific.UnaryServerInterceptor, ), grpc.ChainStreamInterceptor( datastoreMiddleware.StreamServerInterceptor(), dispatchmw.StreamServerInterceptor(dispatcher), - consistencymw.StreamServerInterceptor(), + consistencymw.StreamServerInterceptor(consistencymw.TreatMismatchingTokensAsError), servicespecific.StreamServerInterceptor, ), ) @@ -97,14 +97,14 @@ func (c *Config) Complete() (RunnableTestServer, error) { datastoreMiddleware.UnaryServerInterceptor(), readonly.UnaryServerInterceptor(), dispatchmw.UnaryServerInterceptor(dispatcher), - consistencymw.UnaryServerInterceptor(), + consistencymw.UnaryServerInterceptor(consistencymw.TreatMismatchingTokensAsError), servicespecific.UnaryServerInterceptor, ), grpc.ChainStreamInterceptor( datastoreMiddleware.StreamServerInterceptor(), readonly.StreamServerInterceptor(), dispatchmw.StreamServerInterceptor(dispatcher), - consistencymw.StreamServerInterceptor(), + consistencymw.StreamServerInterceptor(consistencymw.TreatMismatchingTokensAsError), servicespecific.StreamServerInterceptor, ), ) diff --git a/pkg/cursor/cursor.go b/pkg/cursor/cursor.go index 38df2353ee..cc81b31be9 100644 --- a/pkg/cursor/cursor.go +++ b/pkg/cursor/cursor.go @@ -1,6 +1,7 @@ package cursor import ( + "context" "encoding/base64" "errors" "fmt" @@ -11,6 +12,7 @@ import ( dispatch "github.com/authzed/spicedb/pkg/proto/dispatch/v1" impl "github.com/authzed/spicedb/pkg/proto/impl/v1" "github.com/authzed/spicedb/pkg/spiceerrors" + "github.com/authzed/spicedb/pkg/zedtoken" ) // Encode converts a decoded cursor to its opaque version. @@ -92,25 +94,39 @@ func DecodeToDispatchCursor(encoded *v1.Cursor, callAndParameterHash string) (*d // DecodeToDispatchRevision decodes an encoded API cursor into an internal dispatch revision. // NOTE: this method does *not* verify the caller's method signature. -func DecodeToDispatchRevision(encoded *v1.Cursor, ds revisionDecoder) (datastore.Revision, error) { +func DecodeToDispatchRevision(ctx context.Context, encoded *v1.Cursor, ds revisionDecoder) (datastore.Revision, zedtoken.TokenStatus, error) { decoded, err := Decode(encoded) if err != nil { - return nil, err + return nil, zedtoken.StatusUnknown, err } v1decoded := decoded.GetV1() if v1decoded == nil { - return nil, ErrNilCursor + return nil, zedtoken.StatusUnknown, ErrNilCursor + } + + datastoreUniqueID, err := ds.UniqueID(ctx) + if err != nil { + return nil, zedtoken.StatusUnknown, fmt.Errorf(errEncodeError, err) } parsed, err := ds.RevisionFromString(v1decoded.Revision) if err != nil { - return datastore.NoRevision, fmt.Errorf(errDecodeError, err) + return datastore.NoRevision, zedtoken.StatusUnknown, fmt.Errorf(errDecodeError, err) + } + + if v1decoded.DatastoreUniqueId == "" { + return parsed, zedtoken.StatusLegacyEmptyDatastoreID, nil + } + + if v1decoded.DatastoreUniqueId != datastoreUniqueID { + return parsed, zedtoken.StatusMismatchedDatastoreID, nil } - return parsed, nil + return parsed, zedtoken.StatusValid, nil } type revisionDecoder interface { + UniqueID(_ context.Context) (string, error) RevisionFromString(string) (datastore.Revision, error) } diff --git a/pkg/cursor/cursor_test.go b/pkg/cursor/cursor_test.go index 9013fb51df..1c66863024 100644 --- a/pkg/cursor/cursor_test.go +++ b/pkg/cursor/cursor_test.go @@ -1,6 +1,7 @@ package cursor import ( + "context" "fmt" "testing" @@ -58,7 +59,7 @@ func TestEncodeDecode(t *testing.T) { require.Equal(tc.sections, decoded.Sections) - decodedRev, err := DecodeToDispatchRevision(encoded, revisions.CommonDecoder{ + decodedRev, _, err := DecodeToDispatchRevision(context.Background(), encoded, revisions.CommonDecoder{ Kind: revisions.TransactionID, }) require.NoError(err) @@ -136,7 +137,7 @@ func TestDecode(t *testing.T) { require.NotNil(decoded) require.Equal(testCase.expectedSections, decoded.Sections) - decodedRev, err := DecodeToDispatchRevision(&v1.Cursor{ + decodedRev, _, err := DecodeToDispatchRevision(context.Background(), &v1.Cursor{ Token: testCase.token, }, revisions.CommonDecoder{ Kind: revisions.TransactionID, diff --git a/pkg/development/devcontext.go b/pkg/development/devcontext.go index 4a93fa1445..c7ff66ed5c 100644 --- a/pkg/development/devcontext.go +++ b/pkg/development/devcontext.go @@ -124,11 +124,11 @@ func (dc *DevContext) RunV1InMemoryService() (*grpc.ClientConn, func(), error) { s := grpc.NewServer( grpc.ChainUnaryInterceptor( datastoremw.UnaryServerInterceptor(dc.Datastore), - consistency.UnaryServerInterceptor(), + consistency.UnaryServerInterceptor(consistency.TreatMismatchingTokensAsError), ), grpc.ChainStreamInterceptor( datastoremw.StreamServerInterceptor(dc.Datastore), - consistency.StreamServerInterceptor(), + consistency.StreamServerInterceptor(consistency.TreatMismatchingTokensAsError), ), ) ps := v1svc.NewPermissionsServer(dc.Dispatcher, v1svc.PermissionsServerConfig{ diff --git a/pkg/proto/impl/v1/impl.pb.go b/pkg/proto/impl/v1/impl.pb.go index f802925911..44e9ef927a 100644 --- a/pkg/proto/impl/v1/impl.pb.go +++ b/pkg/proto/impl/v1/impl.pb.go @@ -400,6 +400,9 @@ type V1Cursor struct { CallAndParametersHash string `protobuf:"bytes,3,opt,name=call_and_parameters_hash,json=callAndParametersHash,proto3" json:"call_and_parameters_hash,omitempty"` // dispatch_version is the version of the dispatcher which created the cursor. DispatchVersion uint32 `protobuf:"varint,4,opt,name=dispatch_version,json=dispatchVersion,proto3" json:"dispatch_version,omitempty"` + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // cursors. + DatastoreUniqueId string `protobuf:"bytes,5,opt,name=datastore_unique_id,json=datastoreUniqueId,proto3" json:"datastore_unique_id,omitempty"` } func (x *V1Cursor) Reset() { @@ -462,6 +465,13 @@ func (x *V1Cursor) GetDispatchVersion() uint32 { return 0 } +func (x *V1Cursor) GetDatastoreUniqueId() string { + if x != nil { + return x.DatastoreUniqueId + } + return "" +} + type DocComment struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -805,6 +815,9 @@ type DecodedZedToken_V1ZedToken struct { unknownFields protoimpl.UnknownFields Revision string `protobuf:"bytes,1,opt,name=revision,proto3" json:"revision,omitempty"` + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // tokens. + DatastoreUniqueId string `protobuf:"bytes,2,opt,name=datastore_unique_id,json=datastoreUniqueId,proto3" json:"datastore_unique_id,omitempty"` } func (x *DecodedZedToken_V1ZedToken) Reset() { @@ -846,6 +859,13 @@ func (x *DecodedZedToken_V1ZedToken) GetRevision() string { return "" } +func (x *DecodedZedToken_V1ZedToken) GetDatastoreUniqueId() string { + if x != nil { + return x.DatastoreUniqueId + } + return "" +} + var File_impl_v1_impl_proto protoreflect.FileDescriptor var file_impl_v1_impl_proto_rawDesc = []byte{ @@ -875,7 +895,7 @@ var file_impl_v1_impl_proto_rawDesc = []byte{ 0x08, 0x56, 0x32, 0x5a, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, - 0x5f, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x22, 0x82, 0x02, 0x0a, 0x0f, 0x44, 0x65, 0x63, 0x6f, 0x64, + 0x5f, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x22, 0xb2, 0x02, 0x0a, 0x0f, 0x44, 0x65, 0x63, 0x6f, 0x64, 0x65, 0x64, 0x5a, 0x65, 0x64, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x55, 0x0a, 0x14, 0x64, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x76, 0x31, 0x5f, 0x7a, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x69, 0x6d, 0x70, 0x6c, 0x2e, @@ -888,15 +908,18 @@ var file_impl_v1_impl_proto_rawDesc = []byte{ 0x65, 0x6e, 0x48, 0x00, 0x52, 0x02, 0x76, 0x31, 0x1a, 0x26, 0x0a, 0x08, 0x56, 0x31, 0x5a, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, - 0x1a, 0x28, 0x0a, 0x0a, 0x56, 0x31, 0x5a, 0x65, 0x64, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x1a, + 0x1a, 0x58, 0x0a, 0x0a, 0x56, 0x31, 0x5a, 0x65, 0x64, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, + 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2e, 0x0a, 0x13, 0x64, 0x61, + 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x5f, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x69, + 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x64, 0x61, 0x74, 0x61, 0x73, 0x74, 0x6f, + 0x72, 0x65, 0x55, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x49, 0x64, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x22, 0x45, 0x0a, 0x0d, 0x44, 0x65, 0x63, 0x6f, 0x64, 0x65, 0x64, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x23, 0x0a, 0x02, 0x76, 0x31, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x69, 0x6d, 0x70, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x56, 0x31, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x48, 0x00, 0x52, 0x02, 0x76, 0x31, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x6f, 0x6e, 0x65, - 0x6f, 0x66, 0x22, 0xa6, 0x01, 0x0a, 0x08, 0x56, 0x31, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, + 0x6f, 0x66, 0x22, 0xd6, 0x01, 0x0a, 0x08, 0x56, 0x31, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x08, 0x73, @@ -906,7 +929,10 @@ var file_impl_v1_impl_proto_rawDesc = []byte{ 0x6e, 0x64, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x73, 0x48, 0x61, 0x73, 0x68, 0x12, 0x29, 0x0a, 0x10, 0x64, 0x69, 0x73, 0x70, 0x61, 0x74, 0x63, 0x68, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x64, 0x69, 0x73, 0x70, - 0x61, 0x74, 0x63, 0x68, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x26, 0x0a, 0x0a, 0x44, + 0x61, 0x74, 0x63, 0x68, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2e, 0x0a, 0x13, 0x64, + 0x61, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x5f, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, + 0x69, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x64, 0x61, 0x74, 0x61, 0x73, 0x74, + 0x6f, 0x72, 0x65, 0x55, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x49, 0x64, 0x22, 0x26, 0x0a, 0x0a, 0x44, 0x6f, 0x63, 0x43, 0x6f, 0x6d, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x8e, 0x01, 0x0a, 0x10, 0x52, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, diff --git a/pkg/proto/impl/v1/impl.pb.validate.go b/pkg/proto/impl/v1/impl.pb.validate.go index 43a753a435..d46c680943 100644 --- a/pkg/proto/impl/v1/impl.pb.validate.go +++ b/pkg/proto/impl/v1/impl.pb.validate.go @@ -733,6 +733,8 @@ func (m *V1Cursor) validate(all bool) error { // no validation rules for DispatchVersion + // no validation rules for DatastoreUniqueId + if len(errors) > 0 { return V1CursorMultiError(errors) } @@ -1589,6 +1591,8 @@ func (m *DecodedZedToken_V1ZedToken) validate(all bool) error { // no validation rules for Revision + // no validation rules for DatastoreUniqueId + if len(errors) > 0 { return DecodedZedToken_V1ZedTokenMultiError(errors) } diff --git a/pkg/proto/impl/v1/impl_vtproto.pb.go b/pkg/proto/impl/v1/impl_vtproto.pb.go index b33b124c04..e39a6cddaa 100644 --- a/pkg/proto/impl/v1/impl_vtproto.pb.go +++ b/pkg/proto/impl/v1/impl_vtproto.pb.go @@ -154,6 +154,7 @@ func (m *DecodedZedToken_V1ZedToken) CloneVT() *DecodedZedToken_V1ZedToken { } r := new(DecodedZedToken_V1ZedToken) r.Revision = m.Revision + r.DatastoreUniqueId = m.DatastoreUniqueId if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) copy(r.unknownFields, m.unknownFields) @@ -242,6 +243,7 @@ func (m *V1Cursor) CloneVT() *V1Cursor { r.Revision = m.Revision r.CallAndParametersHash = m.CallAndParametersHash r.DispatchVersion = m.DispatchVersion + r.DatastoreUniqueId = m.DatastoreUniqueId if rhs := m.Sections; rhs != nil { tmpContainer := make([]string, len(rhs)) copy(tmpContainer, rhs) @@ -542,6 +544,9 @@ func (this *DecodedZedToken_V1ZedToken) EqualVT(that *DecodedZedToken_V1ZedToken if this.Revision != that.Revision { return false } + if this.DatastoreUniqueId != that.DatastoreUniqueId { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -707,6 +712,9 @@ func (this *V1Cursor) EqualVT(that *V1Cursor) bool { if this.DispatchVersion != that.DispatchVersion { return false } + if this.DatastoreUniqueId != that.DatastoreUniqueId { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -1121,6 +1129,13 @@ func (m *DecodedZedToken_V1ZedToken) MarshalToSizedBufferVT(dAtA []byte) (int, e i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if len(m.DatastoreUniqueId) > 0 { + i -= len(m.DatastoreUniqueId) + copy(dAtA[i:], m.DatastoreUniqueId) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.DatastoreUniqueId))) + i-- + dAtA[i] = 0x12 + } if len(m.Revision) > 0 { i -= len(m.Revision) copy(dAtA[i:], m.Revision) @@ -1302,6 +1317,13 @@ func (m *V1Cursor) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if len(m.DatastoreUniqueId) > 0 { + i -= len(m.DatastoreUniqueId) + copy(dAtA[i:], m.DatastoreUniqueId) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.DatastoreUniqueId))) + i-- + dAtA[i] = 0x2a + } if m.DispatchVersion != 0 { i = protohelpers.EncodeVarint(dAtA, i, uint64(m.DispatchVersion)) i-- @@ -1628,6 +1650,10 @@ func (m *DecodedZedToken_V1ZedToken) SizeVT() (n int) { if l > 0 { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + l = len(m.DatastoreUniqueId) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } n += len(m.unknownFields) return n } @@ -1717,6 +1743,10 @@ func (m *V1Cursor) SizeVT() (n int) { if m.DispatchVersion != 0 { n += 1 + protohelpers.SizeOfVarint(uint64(m.DispatchVersion)) } + l = len(m.DatastoreUniqueId) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } n += len(m.unknownFields) return n } @@ -2358,6 +2388,38 @@ func (m *DecodedZedToken_V1ZedToken) UnmarshalVT(dAtA []byte) error { } m.Revision = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DatastoreUniqueId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DatastoreUniqueId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -2749,6 +2811,38 @@ func (m *V1Cursor) UnmarshalVT(dAtA []byte) error { break } } + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DatastoreUniqueId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DatastoreUniqueId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/pkg/zedtoken/zedtoken.go b/pkg/zedtoken/zedtoken.go index 797b3e184b..f655b74565 100644 --- a/pkg/zedtoken/zedtoken.go +++ b/pkg/zedtoken/zedtoken.go @@ -2,6 +2,7 @@ package zedtoken import ( + "context" "encoding/base64" "errors" "fmt" @@ -22,9 +23,32 @@ const ( // zedtoken argument to Decode var ErrNilZedToken = errors.New("zedtoken pointer was nil") -// MustNewFromRevision generates an encoded zedtoken from an integral revision. -func MustNewFromRevision(revision datastore.Revision) *v1.ZedToken { - encoded, err := NewFromRevision(revision) +// legacyEmptyDatastoreID is the empty datastore ID for legacy tokens and cursors. +const legacyEmptyDatastoreID = "" + +// TokenStatus is the status of a zedtoken. +type TokenStatus int + +const ( + // StatusUnknown indicates that the status of the zedtoken is unknown. + StatusUnknown TokenStatus = iota + + // StatusLegacyEmptyDatastoreID indicates that the zedtoken is a legacy token + // with an empty datastore ID. + StatusLegacyEmptyDatastoreID + + // StatusValid indicates that the zedtoken is valid. + StatusValid + + // StatusMismatchedDatastoreID indicates that the zedtoken is valid, but the + // datastore ID does not match the current datastore, indicating that the + // token was generated by a different datastore. + StatusMismatchedDatastoreID +) + +// MustNewFromRevisionForTesting generates an encoded zedtoken from an integral revision. +func MustNewFromRevisionForTesting(revision datastore.Revision) *v1.ZedToken { + encoded, err := newFromRevision(revision, legacyEmptyDatastoreID) if err != nil { panic(err) } @@ -32,11 +56,21 @@ func MustNewFromRevision(revision datastore.Revision) *v1.ZedToken { } // NewFromRevision generates an encoded zedtoken from an integral revision. -func NewFromRevision(revision datastore.Revision) (*v1.ZedToken, error) { +func NewFromRevision(ctx context.Context, revision datastore.Revision, ds datastore.Datastore) (*v1.ZedToken, error) { + datastoreUniqueID, err := ds.UniqueID(ctx) + if err != nil { + return nil, fmt.Errorf(errEncodeError, err) + } + + return newFromRevision(revision, datastoreUniqueID) +} + +func newFromRevision(revision datastore.Revision, datastoreUniqueID string) (*v1.ZedToken, error) { toEncode := &zedtoken.DecodedZedToken{ VersionOneof: &zedtoken.DecodedZedToken_V1{ V1: &zedtoken.DecodedZedToken_V1ZedToken{ - Revision: revision.String(), + Revision: revision.String(), + DatastoreUniqueId: datastoreUniqueID, }, }, } @@ -77,10 +111,10 @@ func Decode(encoded *v1.ZedToken) (*zedtoken.DecodedZedToken, error) { } // DecodeRevision converts and extracts the revision from a zedtoken or legacy zookie. -func DecodeRevision(encoded *v1.ZedToken, ds revisionDecoder) (datastore.Revision, error) { +func DecodeRevision(encoded *v1.ZedToken, ds revisionDecoder) (datastore.Revision, TokenStatus, error) { decoded, err := Decode(encoded) if err != nil { - return datastore.NoRevision, err + return datastore.NoRevision, StatusUnknown, err } switch ver := decoded.VersionOneof.(type) { @@ -88,21 +122,36 @@ func DecodeRevision(encoded *v1.ZedToken, ds revisionDecoder) (datastore.Revisio revString := fmt.Sprintf("%d", ver.DeprecatedV1Zookie.Revision) parsed, err := ds.RevisionFromString(revString) if err != nil { - return datastore.NoRevision, fmt.Errorf(errDecodeError, err) + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, err) } - return parsed, nil + return parsed, StatusLegacyEmptyDatastoreID, nil case *zedtoken.DecodedZedToken_V1: parsed, err := ds.RevisionFromString(ver.V1.Revision) if err != nil { - return datastore.NoRevision, fmt.Errorf(errDecodeError, err) + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, err) } - return parsed, nil + + if ver.V1.DatastoreUniqueId == legacyEmptyDatastoreID { + return parsed, StatusLegacyEmptyDatastoreID, nil + } + + datastoreUniqueID, err := ds.UniqueID(context.Background()) + if err != nil { + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, err) + } + + if ver.V1.DatastoreUniqueId != datastoreUniqueID { + return parsed, StatusMismatchedDatastoreID, nil + } + + return parsed, StatusValid, nil default: - return datastore.NoRevision, fmt.Errorf(errDecodeError, fmt.Errorf("unknown zookie version: %T", decoded.VersionOneof)) + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, fmt.Errorf("unknown zookie version: %T", decoded.VersionOneof)) } } type revisionDecoder interface { + UniqueID(context.Context) (string, error) RevisionFromString(string) (datastore.Revision, error) } diff --git a/pkg/zedtoken/zedtoken_test.go b/pkg/zedtoken/zedtoken_test.go index 2fdf22a659..39b7b16d04 100644 --- a/pkg/zedtoken/zedtoken_test.go +++ b/pkg/zedtoken/zedtoken_test.go @@ -41,10 +41,9 @@ func TestZedTokenEncode(t *testing.T) { rev := rev t.Run(rev.String(), func(t *testing.T) { require := require.New(t) - encoded, err := NewFromRevision(rev) - require.NoError(err) + encoded := MustNewFromRevisionForTesting(rev) - decoded, err := DecodeRevision(encoded, revisions.CommonDecoder{ + decoded, _, err := DecodeRevision(encoded, revisions.CommonDecoder{ Kind: revisions.TransactionID, }) require.NoError(err) @@ -58,10 +57,9 @@ func TestZedTokenEncodeHLC(t *testing.T) { rev := rev t.Run(rev.String(), func(t *testing.T) { require := require.New(t) - encoded, err := NewFromRevision(rev) - require.NoError(err) + encoded := MustNewFromRevisionForTesting(rev) - decoded, err := DecodeRevision(encoded, revisions.CommonDecoder{ + decoded, _, err := DecodeRevision(encoded, revisions.CommonDecoder{ Kind: revisions.HybridLogicalClock, }) require.NoError(err) @@ -71,65 +69,92 @@ func TestZedTokenEncodeHLC(t *testing.T) { } var decodeTests = []struct { - format string - token string - expectedRevision datastore.Revision - expectError bool + format string + token string + datastoreUniqueID string + expectedRevision datastore.Revision + expectedStatus TokenStatus + expectError bool }{ { format: "invalid", token: "abc", expectedRevision: datastore.NoRevision, + expectedStatus: StatusUnknown, expectError: true, }, { format: "V1 Zookie", token: "CAESAA==", expectedRevision: revisions.NewForTransactionID(0), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAESAggB", expectedRevision: revisions.NewForTransactionID(1), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAESAggC", expectedRevision: revisions.NewForTransactionID(2), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAESAwiAAg==", expectedRevision: revisions.NewForTransactionID(256), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAIaAwoBMA==", expectedRevision: revisions.NewForTransactionID(0), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 ZedToken", token: "CAIaAwoBMQ==", expectedRevision: revisions.NewForTransactionID(1), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 ZedToken", token: "CAIaAwoBMg==", expectedRevision: revisions.NewForTransactionID(2), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 ZedToken", token: "CAIaAwoBNA==", expectedRevision: revisions.NewForTransactionID(4), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, + { + format: "V1 ZedToken with matching datastore unique ID", + token: "GhIKAjQyEgxzb21ldW5pcXVlaWQ=", + datastoreUniqueID: "someuniqueid", + expectedRevision: revisions.NewForTransactionID(42), + expectedStatus: StatusValid, + expectError: false, + }, + { + format: "V1 ZedToken with mismatched datastore unique ID", + token: "GhIKAjQyEgxzb21ldW5pcXVlaWQ=", + datastoreUniqueID: "anotheruniqueid", + expectedRevision: revisions.NewForTransactionID(42), + expectedStatus: StatusMismatchedDatastoreID, + expectError: false, + }, } func TestDecode(t *testing.T) { @@ -139,15 +164,17 @@ func TestDecode(t *testing.T) { t.Run(testName, func(t *testing.T) { require := require.New(t) - decoded, err := DecodeRevision(&v1.ZedToken{ + decoded, status, err := DecodeRevision(&v1.ZedToken{ Token: testCase.token, }, revisions.CommonDecoder{ - Kind: revisions.TransactionID, + DatastoreUniqueID: testCase.datastoreUniqueID, + Kind: revisions.TransactionID, }) if testCase.expectError { require.Error(err) } else { require.NoError(err) + require.Equal(testCase.expectedStatus, status) require.True( testCase.expectedRevision.Equal(decoded), "%s != %s", @@ -160,20 +187,52 @@ func TestDecode(t *testing.T) { } var hlcDecodeTests = []struct { - format string - token string - expectedRevision datastore.Revision - expectError bool + format string + token string + datastoreUniqueID string + expectedRevision datastore.Revision + expectedStatus TokenStatus + expectError bool }{ { format: "V1 ZedToken", token: "CAIaFQoTMTYyMTUzODE4OTAyODkyODAwMA==", + expectedStatus: StatusLegacyEmptyDatastoreID, expectedRevision: revisions.NewForHLC(decimal.NewFromInt(1621538189028928000)), expectError: false, }, { - format: "V1 ZedToken", - token: "GiAKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMQ==", + format: "V1 ZedToken", + token: "GiAKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMQ==", + expectedStatus: StatusLegacyEmptyDatastoreID, + expectedRevision: (func() datastore.Revision { + v, err := decimal.NewFromString("1693540940373045727.0000000001") + if err != nil { + panic(err) + } + return revisions.NewForHLC(v) + })(), + expectError: false, + }, + { + format: "V1 ZedToken with matching datastore unique ID", + token: "GkYKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMRIkNjM0OWFhZjItMzdjZC00N2I5LTg0ZTgtZmU1ZmE2ZTJkZWFk", + datastoreUniqueID: "6349aaf2-37cd-47b9-84e8-fe5fa6e2dead", + expectedStatus: StatusValid, + expectedRevision: (func() datastore.Revision { + v, err := decimal.NewFromString("1693540940373045727.0000000001") + if err != nil { + panic(err) + } + return revisions.NewForHLC(v) + })(), + expectError: false, + }, + { + format: "V1 ZedToken with mismatched datastore unique ID", + token: "GkYKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMRIkNjM0OWFhZjItMzdjZC00N2I5LTg0ZTgtZmU1ZmE2ZTJkZWFk", + datastoreUniqueID: "arrrg-6349aaf2-37cd-47b9-84e8-fe5fa6e2dead", + expectedStatus: StatusMismatchedDatastoreID, expectedRevision: (func() datastore.Revision { v, err := decimal.NewFromString("1693540940373045727.0000000001") if err != nil { @@ -192,15 +251,17 @@ func TestHLCDecode(t *testing.T) { t.Run(testName, func(t *testing.T) { require := require.New(t) - decoded, err := DecodeRevision(&v1.ZedToken{ + decoded, status, err := DecodeRevision(&v1.ZedToken{ Token: testCase.token, }, revisions.CommonDecoder{ - Kind: revisions.HybridLogicalClock, + DatastoreUniqueID: testCase.datastoreUniqueID, + Kind: revisions.HybridLogicalClock, }) if testCase.expectError { require.Error(err) } else { require.NoError(err) + require.Equal(testCase.expectedStatus, status) require.True( testCase.expectedRevision.Equal(decoded), "%s != %s", diff --git a/proto/internal/impl/v1/impl.proto b/proto/internal/impl/v1/impl.proto index c6e871a1ed..0e3f1356e6 100644 --- a/proto/internal/impl/v1/impl.proto +++ b/proto/internal/impl/v1/impl.proto @@ -33,6 +33,10 @@ message DecodedZedToken { } message V1ZedToken { string revision = 1; + + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // tokens. + string datastore_unique_id = 2; } oneof version_oneof { V1Zookie deprecated_v1_zookie = 2; @@ -60,6 +64,10 @@ message V1Cursor { // dispatch_version is the version of the dispatcher which created the cursor. uint32 dispatch_version = 4; + + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // cursors. + string datastore_unique_id = 5; } message DocComment {