Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
review fixes for #99
Browse files Browse the repository at this point in the history
  • Loading branch information
nurzhan-saktaganov committed Dec 3, 2024
1 parent 0a67baf commit 54070c7
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 67 deletions.
184 changes: 118 additions & 66 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,49 @@ func (c VshardMode) String() string {
return string(c)
}

type VShardResponse struct {
assertError *assertError // not nil if there is assert error
vshardError *vshardError // not nil if there is vshard response
data []interface{} // raw response data
type vshardStorageCallResponseProto struct {
assertError *assertError // not nil if there is assert error
vshardError *StorageCallVShardError // not nil if there is vshard response
respArrayLen int
data []interface{} // raw response data
dataFirstElement msgpack.RawMessage // the first element of data array as a raw message (if applicable)
}

func (s *VShardResponse) DecodeMsgpack(d *msgpack.Decoder) error {
// get array len for protocol violation check
arrayLen, err := d.DecodeArrayLen()
func (r *vshardStorageCallResponseProto) decodeDataFirstElementInto(v interface{}) error {
if r.respArrayLen < 2 {
return nil
}

return msgpack.Unmarshal(r.dataFirstElement, v)
}

func (r *vshardStorageCallResponseProto) decodeDataFirstElement() (interface{}, error) {
var resp interface{}
err := r.decodeDataFirstElementInto(&resp)
return resp, err
}

func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error {
/* vshard.storage.call(func) response has the next 4 possbile formats:
1. vshard error has occured:

Check failure on line 56 in api.go

View workflow job for this annotation

GitHub Actions / golangci-lint

`occured` is a misspelling of `occurred` (misspell)

Check failure on line 56 in api.go

View workflow job for this annotation

GitHub Actions / golangci-lint

`occured` is a misspelling of `occurred` (misspell)
array[nil, vshard_error]
2. User method has finished with some error:
array[false, assert error]
3. User mehod has finished succesfully, but has not returned anything

Check failure on line 60 in api.go

View workflow job for this annotation

GitHub Actions / golangci-lint

`succesfully` is a misspelling of `successfully` (misspell)

Check failure on line 60 in api.go

View workflow job for this annotation

GitHub Actions / golangci-lint

`succesfully` is a misspelling of `successfully` (misspell)
array[true]
4. User mehod has finished succesfully and has returned something

Check failure on line 62 in api.go

View workflow job for this annotation

GitHub Actions / golangci-lint

`succesfully` is a misspelling of `successfully` (misspell)

Check failure on line 62 in api.go

View workflow job for this annotation

GitHub Actions / golangci-lint

`succesfully` is a misspelling of `successfully` (misspell)
array[true, data]
*/

// Ensure it is an array and get array len for protocol violation check
var err error
r.respArrayLen, err = d.DecodeArrayLen()
if err != nil {
return err
}

if arrayLen == 0 {
// vshard.storage.call(func) returns up to two values:
// - true/false/nil
// - func result, omitted if func does not return anything
return fmt.Errorf("invalid array length: %d; protocol violation", arrayLen)
if r.respArrayLen == 0 {
return fmt.Errorf("protocol violation: invalid array length: %d", r.respArrayLen)
}

// we need peek code to make our check faster than decode interface
Expand All @@ -63,52 +88,85 @@ func (s *VShardResponse) DecodeMsgpack(d *msgpack.Decoder) error {
return err
}

ve := &vshardError{}
err = d.Decode(ve)
if r.respArrayLen != 2 {
return fmt.Errorf("protocol violation: length is %d on vshard error case", r.respArrayLen)
}

var vshardError StorageCallVShardError

err = d.Decode(&vshardError)
if err != nil {
return fmt.Errorf("failed to decode storage assert error: %w", err)
return fmt.Errorf("failed to decode storage vshard error: %w", err)
}

s.vshardError = ve
r.vshardError = &vshardError

return nil
}

assertBoolOk, err := d.DecodeBool()
isVShardRespOk, err := d.DecodeBool()
if err != nil {
return err
}

// that means we have no assert errors and response ok
if assertBoolOk {
data := make([]interface{}, 0, arrayLen-1)
for i := 1; i < arrayLen; i++ {
var face interface{}
face, err = d.DecodeInterface()

data = append(data, face)
if !isVShardRespOk {
// that means we have an assert errors and response is not ok
if r.respArrayLen != 2 {
return fmt.Errorf("protocol violation: length is %d on assert error case", r.respArrayLen)
}

s.data = data

return err
} else {
ae := &assertError{}
err = d.Decode(ae)
var assertError assertError
err = d.Decode(&assertError)
if err != nil {
return fmt.Errorf("failed to decode storage assert error: %w", err)
}

s.assertError = ae
r.assertError = &assertError

return nil
}

// isVShardRespOk is true

r.data = make([]interface{}, 0, r.respArrayLen-1)

if r.respArrayLen == 1 {
return nil
}

// The first element is a special case, because we have to return it in two places:
// - as the first element of r.data
// - decode it into user's type

r.dataFirstElement, err = d.DecodeRaw()
if err != nil {
return fmt.Errorf("failed to DecodeRaw element #2 of response array")
}

elem, err := r.decodeDataFirstElement()
if err != nil {
return fmt.Errorf("failed to decode into interface element #2 of response array")
}
r.data = append(r.data, elem)

// Handle other elements
for i := 2; i < r.respArrayLen; i++ {
elem, err := d.DecodeInterface()
if err != nil {
return fmt.Errorf("failed to decode into interface element #%d of response array", i+1)
}
r.data = append(r.data, elem)
}

return nil
}

type assertError struct {
Code int `msgpack:"code"`
BaseType string `msgpack:"base_type"`
Type string `msgpack:"type"`
Message string `msgpack:"message"`
Trace []map[string]interface{} `msgpack:"trace"`
Code int `msgpack:"code"`
BaseType string `msgpack:"base_type"`
Type string `msgpack:"type"`
Message string `msgpack:"message"`
Trace interface{} `msgpack:"trace"`
}

func (s assertError) Error() string {
Expand All @@ -119,25 +177,25 @@ func (s assertError) Error() string {
return fmt.Sprintf("%+v", alias(s))
}

type vshardError struct {
BucketID uint64 `msgpack:"bucket_id" mapstructure:"bucket_id"`
type StorageCallVShardError struct {
BucketID uint64 `msgpack:"bucket_id"`
Reason string `msgpack:"reason"`
Code int `msgpack:"code"`
Type string `msgpack:"type"`
Message string `msgpack:"message"`
Name string `msgpack:"name"`
// These 3 fields below are send as string by vshard storage, so we decode them into string, not uuid.UUID type
// Example: 00000000-0000-0002-0002-000000000000
MasterUUID string `msgpack:"master" mapstructure:"master"`
ReplicasetUUID string `msgpack:"replicaset" mapstructure:"replicaset"`
ReplicaUUID string `msgpack:"replica" mapstructure:"replica"`
MasterUUID string `msgpack:"master"`
ReplicasetUUID string `msgpack:"replicaset"`
ReplicaUUID string `msgpack:"replica"`
}

func (s vshardError) Error() string {
func (s StorageCallVShardError) Error() string {
// Just print struct as is, use hack with alias type to avoid recursion:
// %v attempts to call Error() method for s, which is recursion.
// This alias doesn't have method Error().
type alias vshardError
type alias StorageCallVShardError
return fmt.Sprintf("%+v", alias(s))
}

Expand Down Expand Up @@ -218,19 +276,19 @@ func (r *Router) RouterCallImpl(ctx context.Context,

future := rs.conn.Do(req, opts.PoolMode)

resp := &VShardResponse{}
err = future.GetTyped(resp)
var storageCallResponse vshardStorageCallResponseProto
err = future.GetTyped(&storageCallResponse)
if err != nil {
return nil, nil, fmt.Errorf("got error on future.Get(): %w", err)
}

r.log().Debugf(ctx, "Got call result response data %v", resp.data)
r.log().Debugf(ctx, "Got call result response data %v", storageCallResponse.data)

if resp.vshardError != nil {
vshardErr := resp.vshardError
if storageCallResponse.vshardError != nil {
vshardError := storageCallResponse.vshardError

switch vshardErr.Name {
case "WRONG_BUCKET", "BUCKET_IS_LOCKED":
switch vshardError.Name {
case VShardErrNameWrongBucket, VShardErrNameBucketIsLocked:
r.BucketReset(bucketID)

// TODO we should inspect here err.destination like lua vshard router does,
Expand All @@ -239,42 +297,36 @@ func (r *Router) RouterCallImpl(ctx context.Context,
// So we just retry here as a temporary solution.
r.metrics().RetryOnCall("bucket_migrate")

r.log().Debugf(ctx, "Retrying fnc '%s' cause got vshard error: %v", fnc, resp.vshardError)
r.log().Debugf(ctx, "Retrying fnc '%s' cause got vshard error: %v", fnc, vshardError)

// this vshardError will be returned to a caller in case of timeout
err = vshardErr
err = vshardError
continue
case VShardErrNameTransferIsInProgress:
// Since lua vshard router doesn't retry here, we don't retry too.
// There is a comment why lua vshard router doesn't retry:
// https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L697
r.BucketReset(bucketID)
return nil, nil, vshardErr
return nil, nil, vshardError
case VShardErrNameNonMaster:
// vshard.storage has returned NON_MASTER error, lua vshard router updates info about master in this case:
// See: https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L704.
// Since we use go-tarantool library, and go-tarantool library doesn't provide API to update info about current master,
// we just return this error as is.
return nil, nil, vshardErr
return nil, nil, vshardError
default:
return nil, nil, vshardErr
return nil, nil, vshardError
}
}

if resp.assertError != nil {
return nil, nil, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, resp.assertError)
if storageCallResponse.assertError != nil {
return nil, nil, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, storageCallResponse.assertError)
}

r.metrics().RequestDuration(time.Since(timeStart), true, false)

return resp.data, func(result interface{}) error {
if len(resp.data) == 0 {
return nil
}

var stub bool

return future.GetTyped(&[]interface{}{&stub, result})
return storageCallResponse.data, func(result interface{}) error {

Check failure on line 328 in api.go

View workflow job for this annotation

GitHub Actions / golangci-lint

unlambda: replace `func(result interface{}) error {

Check failure on line 328 in api.go

View workflow job for this annotation

GitHub Actions / golangci-lint

unlambda: replace `func(result interface{}) error {
return storageCallResponse.decodeDataFirstElementInto(result)
}, nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion error.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (bse bucketStatError) Error() string {
}

func newVShardErrorNoRouteToBucket(bucketID uint64) error {
return &vshardError{
return &StorageCallVShardError{
Name: VShardErrNameNoRouteToBucket,
Code: VShardErrCodeNoRouteToBucket,
Type: "ShardingError",
Expand Down

0 comments on commit 54070c7

Please sign in to comment.