diff --git a/docs/modules/components/pages/processors/couchbase.adoc b/docs/modules/components/pages/processors/couchbase.adoc index 649e87b659..703f5bac64 100644 --- a/docs/modules/components/pages/processors/couchbase.adoc +++ b/docs/modules/components/pages/processors/couchbase.adoc @@ -69,7 +69,7 @@ couchbase: -- ====== -When inserting, replacing or upserting documents, each must have the `content` property set. +When inserting, replacing or upserting documents, each must have the `content` property set. CAS value is stored in meta `couchbase_cas`. It prevent read/write conflict by only allowing write if not modified by other. You can clear the value with `meta couchbase_cas = deleted()` to disable this check. == Fields diff --git a/internal/impl/couchbase/cache_test.go b/internal/impl/couchbase/cache_test.go index 8a8d6cbeff..ed47d0c9c8 100644 --- a/internal/impl/couchbase/cache_test.go +++ b/internal/impl/couchbase/cache_test.go @@ -62,13 +62,17 @@ cache_resources: ) } -func removeBucket(ctx context.Context, tb testing.TB, port, bucket string) error { - cluster, err := gocb.Connect(fmt.Sprintf("couchbase://localhost:%v", port), gocb.ClusterOptions{ +func getCluster(ctx context.Context, tb testing.TB, port string) (*gocb.Cluster, error) { + return gocb.Connect(fmt.Sprintf("couchbase://localhost:%v", port), gocb.ClusterOptions{ Authenticator: gocb.PasswordAuthenticator{ Username: username, Password: password, }, }) +} + +func removeBucket(ctx context.Context, tb testing.TB, port, bucket string) error { + cluster, err := getCluster(ctx, tb, port) if err != nil { return err } @@ -79,12 +83,7 @@ func removeBucket(ctx context.Context, tb testing.TB, port, bucket string) error } func createBucket(ctx context.Context, tb testing.TB, port, bucket string) error { - cluster, err := gocb.Connect(fmt.Sprintf("couchbase://localhost:%v", port), gocb.ClusterOptions{ - Authenticator: gocb.PasswordAuthenticator{ - Username: username, - Password: password, - }, - }) + cluster, err := getCluster(ctx, tb, port) if err != nil { return err } diff --git a/internal/impl/couchbase/couchbase.go b/internal/impl/couchbase/couchbase.go index 52e4b9373b..ab35c6136d 100644 --- a/internal/impl/couchbase/couchbase.go +++ b/internal/impl/couchbase/couchbase.go @@ -20,56 +20,72 @@ import ( "github.com/couchbase/gocb/v2" ) -func valueFromOp(op gocb.BulkOp) (out any, err error) { +func valueFromOp(op gocb.BulkOp) (out any, cas gocb.Cas, err error) { switch o := op.(type) { case *gocb.GetOp: if o.Err != nil { - return nil, o.Err + return nil, gocb.Cas(0), o.Err } err := o.Result.Content(&out) - return out, err + + return out, o.Result.Cas(), err case *gocb.InsertOp: - return nil, o.Err + if o.Result != nil { + return nil, o.Result.Cas(), o.Err + } + return nil, gocb.Cas(0), o.Err case *gocb.RemoveOp: - return nil, o.Err + if o.Result != nil { + return nil, o.Result.Cas(), o.Err + } + return nil, gocb.Cas(0), o.Err case *gocb.ReplaceOp: - return nil, o.Err + if o.Result != nil { + return nil, o.Result.Cas(), o.Err + } + return nil, gocb.Cas(0), o.Err case *gocb.UpsertOp: - return nil, o.Err + if o.Result != nil { + return nil, o.Result.Cas(), o.Err + } + return nil, gocb.Cas(0), o.Err } - return nil, errors.New("type not supported") + return nil, gocb.Cas(0), errors.New("type not supported") } -func get(key string, _ []byte) gocb.BulkOp { +func get(key string, _ []byte, _ gocb.Cas) gocb.BulkOp { return &gocb.GetOp{ ID: key, } } -func insert(key string, data []byte) gocb.BulkOp { +func insert(key string, data []byte, _ gocb.Cas) gocb.BulkOp { return &gocb.InsertOp{ ID: key, Value: data, } } -func remove(key string, _ []byte) gocb.BulkOp { +func remove(key string, _ []byte, cas gocb.Cas) gocb.BulkOp { return &gocb.RemoveOp{ - ID: key, + ID: key, + Cas: cas, } } -func replace(key string, data []byte) gocb.BulkOp { +func replace(key string, data []byte, cas gocb.Cas) gocb.BulkOp { return &gocb.ReplaceOp{ ID: key, Value: data, + Cas: cas, } } -func upsert(key string, data []byte) gocb.BulkOp { +func upsert(key string, data []byte, cas gocb.Cas) gocb.BulkOp { return &gocb.UpsertOp{ ID: key, Value: data, + Cas: cas, } } diff --git a/internal/impl/couchbase/processor.go b/internal/impl/couchbase/processor.go index 2e4e508e94..141f68296a 100644 --- a/internal/impl/couchbase/processor.go +++ b/internal/impl/couchbase/processor.go @@ -27,6 +27,11 @@ import ( "github.com/redpanda-data/connect/v4/internal/impl/couchbase/client" ) +const ( + // MetaCASKey hold CAS of entry. + MetaCASKey = "couchbase_cas" +) + var ( // ErrInvalidOperation specified operation is not supported. ErrInvalidOperation = errors.New("invalid operation") @@ -41,7 +46,7 @@ func ProcessorConfig() *service.ConfigSpec { Version("4.11.0"). Categories("Integration"). Summary("Performs operations against Couchbase for each message, allowing you to store or retrieve data within message payloads."). - Description("When inserting, replacing or upserting documents, each must have the `content` property set."). + Description("When inserting, replacing or upserting documents, each must have the `content` property set. CAS value is stored in meta `couchbase_cas`. It prevent read/write conflict by only allowing write if not modified by other. You can clear the value with `meta couchbase_cas = deleted()` to disable this check."). Field(service.NewInterpolatedStringField("id").Description("Document id.").Example(`${! json("id") }`)). Field(service.NewBloblangField("content").Description("Document content.").Optional()). Field(service.NewStringAnnotatedEnumField("operation", map[string]string{ @@ -73,7 +78,7 @@ type Processor struct { *couchbaseClient id *service.InterpolatedString content *bloblang.Executor - op func(key string, data []byte) gocb.BulkOp + op func(key string, data []byte, cas gocb.Cas) gocb.BulkOp } // NewProcessor returns a Couchbase processor. @@ -139,7 +144,7 @@ func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBat } // generate query - for index := range newMsg { + for index, msg := range newMsg { // generate id k, err := inBatch.TryInterpolatedString(index, p.id) if err != nil { @@ -159,7 +164,14 @@ func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBat } } - ops[index] = p.op(k, content) + var cas gocb.Cas // retrieve cas if set + if val, ok := msg.MetaGetMut(MetaCASKey); ok { + if v, ok := val.(gocb.Cas); ok { + cas = v + } + } + + ops[index] = p.op(k, content, cas) } // execute @@ -170,7 +182,7 @@ func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBat // set results for index, part := range newMsg { - out, err := valueFromOp(ops[index]) + out, cas, err := valueFromOp(ops[index]) if err != nil { part.SetError(fmt.Errorf("couchbase operator failed: %w", err)) } @@ -180,6 +192,8 @@ func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBat } else if out != nil { part.SetStructured(out) } + + part.MetaSetMut(MetaCASKey, cas) } return []service.MessageBatch{newMsg}, nil diff --git a/internal/impl/couchbase/processor_test.go b/internal/impl/couchbase/processor_test.go index 988bd14890..b19c7a0e17 100644 --- a/internal/impl/couchbase/processor_test.go +++ b/internal/impl/couchbase/processor_test.go @@ -17,6 +17,7 @@ package couchbase_test import ( "context" "fmt" + "sync" "testing" "time" @@ -28,6 +29,8 @@ import ( "github.com/redpanda-data/benthos/v4/public/service/integration" "github.com/redpanda-data/connect/v4/internal/impl/couchbase" + + _ "github.com/redpanda-data/benthos/v4/public/components/pure" ) func TestProcessorConfigLinting(t *testing.T) { @@ -196,6 +199,11 @@ operation: 'insert' assert.Len(t, msgOut, 1) assert.Len(t, msgOut[0], 1) + // check CAS + cas, ok := msgOut[0][0].MetaGetMut(couchbase.MetaCASKey) + assert.True(t, ok) + assert.NotEmpty(t, cas) + // message content should stay the same. dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) @@ -222,6 +230,11 @@ operation: 'upsert' assert.Len(t, msgOut, 1) assert.Len(t, msgOut[0], 1) + // check CAS + cas, ok := msgOut[0][0].MetaGetMut(couchbase.MetaCASKey) + assert.True(t, ok) + assert.NotEmpty(t, cas) + // message content should stay the same. dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) @@ -248,6 +261,11 @@ operation: 'replace' assert.Len(t, msgOut, 1) assert.Len(t, msgOut[0], 1) + // check CAS + cas, ok := msgOut[0][0].MetaGetMut(couchbase.MetaCASKey) + assert.True(t, ok) + assert.NotEmpty(t, cas) + // message content should stay the same. dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) @@ -273,6 +291,11 @@ operation: 'get' assert.Len(t, msgOut, 1) assert.Len(t, msgOut[0], 1) + // check CAS + cas, ok := msgOut[0][0].MetaGetMut(couchbase.MetaCASKey) + assert.True(t, ok) + assert.NotEmpty(t, cas) + // message should contain expected payload. dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) @@ -298,6 +321,11 @@ operation: 'remove' assert.Len(t, msgOut, 1) assert.Len(t, msgOut[0], 1) + // check CAS + cas, ok := msgOut[0][0].MetaGetMut(couchbase.MetaCASKey) + assert.True(t, ok) + assert.NotEmpty(t, cas) + // message content should stay the same. dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) @@ -324,10 +352,217 @@ operation: 'get' assert.Len(t, msgOut[0], 1) // message should contain an error. - assert.Error(t, msgOut[0][0].GetError(), "TODO") + assert.Error(t, msgOut[0][0].GetError()) // message content should stay the same. dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) assert.Equal(t, uid, string(dataOut)) } + +func TestIntegrationCouchbaseStream(t *testing.T) { + ctx := context.Background() + + integration.CheckSkip(t) + + servicePort := requireCouchbase(t) + bucket := fmt.Sprintf("testing-stream-%d", time.Now().Unix()) + require.NoError(t, createBucket(context.Background(), t, servicePort, bucket)) + t.Cleanup(func() { + require.NoError(t, removeBucket(context.Background(), t, servicePort, bucket)) + }) + + for _, clearCAS := range []bool{true, false} { + t.Run(fmt.Sprintf("%t", clearCAS), func(t *testing.T) { + streamOutBuilder := service.NewStreamBuilder() + require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: OFF`)) + + inFn, err := streamOutBuilder.AddBatchProducerFunc() + require.NoError(t, err) + + var outBatches []service.MessageBatch + var outBatchMut sync.Mutex + require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error { + outBatchMut.Lock() + outBatches = append(outBatches, mb) + outBatchMut.Unlock() + return nil + })) + + // insert + require.NoError(t, streamOutBuilder.AddProcessorYAML(fmt.Sprintf(` +couchbase: + url: 'couchbase://localhost:%s' + bucket: %s + username: %s + password: %s + id: '${! json("key") }' + content: 'root = this' + operation: 'insert' +`, servicePort, bucket, username, password))) + + if clearCAS { // ignore cas check + require.NoError(t, streamOutBuilder.AddProcessorYAML(` +mapping: | + meta couchbase_cas = deleted() +`)) + } + + // replace + require.NoError(t, streamOutBuilder.AddProcessorYAML(fmt.Sprintf(` +couchbase: + url: 'couchbase://localhost:%s' + bucket: %s + username: %s + password: %s + id: '${! json("key") }' + content: 'root = this' + operation: 'replace' +`, servicePort, bucket, username, password))) + + if clearCAS { // ignore cas check + require.NoError(t, streamOutBuilder.AddProcessorYAML(` +mapping: | + meta couchbase_cas = deleted() +`)) + } + // remove + require.NoError(t, streamOutBuilder.AddProcessorYAML(fmt.Sprintf(` +couchbase: + url: 'couchbase://localhost:%s' + bucket: %s + username: %s + password: %s + id: '${! json("key") }' + operation: 'remove' +`, servicePort, bucket, username, password))) + + streamOut, err := streamOutBuilder.Build() + require.NoError(t, err) + go func() { + err = streamOut.Run(context.Background()) + require.NoError(t, err) + }() + + require.NoError(t, inFn(ctx, service.MessageBatch{ + service.NewMessage([]byte(`{"key":"hello","value":"word"}`)), + })) + require.NoError(t, streamOut.StopWithin(time.Second*15)) + + assert.Eventually(t, func() bool { + outBatchMut.Lock() + defer outBatchMut.Unlock() + return len(outBatches) == 1 + }, time.Second*5, time.Millisecond*100) + + // batch processing should be fine and contain one message. + assert.NoError(t, err) + assert.Len(t, outBatches, 1) + assert.Len(t, outBatches[0], 1) + + // message should contain an error. + assert.NoError(t, outBatches[0][0].GetError()) + }) + } +} + +func TestIntegrationCouchbaseStreamError(t *testing.T) { + ctx := context.Background() + + integration.CheckSkip(t) + + servicePort := requireCouchbase(t) + bucket := fmt.Sprintf("testing-stream-error-%d", time.Now().Unix()) + require.NoError(t, createBucket(context.Background(), t, servicePort, bucket)) + t.Cleanup(func() { + require.NoError(t, removeBucket(context.Background(), t, servicePort, bucket)) + }) + + streamOutBuilder := service.NewStreamBuilder() + require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: OFF`)) + + inFn, err := streamOutBuilder.AddBatchProducerFunc() + require.NoError(t, err) + + var outBatches []service.MessageBatch + var outBatchMut sync.Mutex + require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error { + outBatchMut.Lock() + outBatches = append(outBatches, mb) + outBatchMut.Unlock() + return nil + })) + + // insert + require.NoError(t, streamOutBuilder.AddProcessorYAML(fmt.Sprintf(` +couchbase: + url: 'couchbase://localhost:%s' + bucket: %s + username: %s + password: %s + id: '${! json("key") }' + content: | + root = this + root.at = timestamp_unix_micro() + operation: 'insert' +`, servicePort, bucket, username, password))) + + // upsert adn remove in parallel + require.NoError(t, streamOutBuilder.AddProcessorYAML(fmt.Sprintf(` +workflow: + meta_path: "" + branches: + write: + processors: + - couchbase: + url: 'couchbase://localhost:%[1]s' + bucket: %[2]s + username: %[3]s + password: %[4]s + id: '${! json("key") }' + content: | + root = this + root.at = timestamp_unix_micro() + operation: 'replace' + remove: + processors: + - sleep: + duration: "1s" + - couchbase: + url: 'couchbase://localhost:%[1]s' + bucket: %[2]s + username: %[3]s + password: %[4]s + id: '${! json("key") }' + content: | + root = this + root.at = timestamp_unix_micro() + operation: 'replace' +`, servicePort, bucket, username, password))) + + streamOut, err := streamOutBuilder.Build() + require.NoError(t, err) + go func() { + err = streamOut.Run(context.Background()) + require.NoError(t, err) + }() + + require.NoError(t, inFn(ctx, service.MessageBatch{ + service.NewMessage([]byte(`{"key":"hello","value":"word"}`)), + })) + require.NoError(t, streamOut.StopWithin(time.Second*15)) + + assert.Eventually(t, func() bool { + outBatchMut.Lock() + defer outBatchMut.Unlock() + return len(outBatches) == 1 + }, time.Second*5, time.Millisecond*100) + + // batch contain one message. + assert.NoError(t, err) + assert.Len(t, outBatches, 1) + assert.Len(t, outBatches[0], 1) + + // message should contain an error. + assert.Error(t, outBatches[0][0].GetError()) +}