Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
partially resolve #100 (#113)
Browse files Browse the repository at this point in the history
* custom msgpackv5 decoders
* use StorageCallVShardError instead of bucketStatError
  • Loading branch information
nurzhan-saktaganov authored Dec 17, 2024
1 parent a486c03 commit 0a8677f
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 38 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ REFACTOR:
* Use constants for vshard error names and codes.
* Reduce SLOC by using CallAsync method.
* BucketForceCreate optimization: don't decode tnt response.
* Remove bucketStatError type, use StorageCallVShardError type instead.
* Add custom msgpackv5 decoder for 'vshard.storage.bucket_stat' response (partially #100).
* Add custom msgpackv5 decoder for 'BucketStatInfo', since msgpackv5 library has an issue (see commit content).

TESTS:
* Rename bootstrap_test.go -> tarantool_test.go and new test in this file.
Expand Down
4 changes: 2 additions & 2 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Repl

for _, rsFuture := range rsFutures {
if _, err := bucketStatWait(rsFuture.future); err != nil {
var bsError bucketStatError
if !errors.As(err, &bsError) {
var vshardError StorageCallVShardError
if !errors.As(err, &vshardError) {
r.log().Errorf(ctx, "bucketSearchLegacy: bucketStatWait call error for %v: %v", rsFuture.rsID, err)
}
// just skip, bucket may not belong to this replicaset
Expand Down
14 changes: 0 additions & 14 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,6 @@ const (
VShardErrNameInstanceNameMismatch = "INSTANCE_NAME_MISMATCH"
)

type bucketStatError struct {
BucketID uint64 `msgpack:"bucket_id"`
Reason string `msgpack:"reason"`
Code int `msgpack:"code"`
Type string `msgpack:"type"`
Message string `msgpack:"message"`
Name string `msgpack:"name"`
}

func (bse bucketStatError) Error() string {
type alias bucketStatError
return fmt.Sprintf("%+v", alias(bse))
}

func newVShardErrorNoRouteToBucket(bucketID uint64) error {
return &StorageCallVShardError{
Name: VShardErrNameNoRouteToBucket,
Expand Down
71 changes: 51 additions & 20 deletions replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
"time"

"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/pool"
"github.com/vmihailenco/msgpack/v5"
"github.com/vmihailenco/msgpack/v5/msgpcode"
)

type ReplicasetInfo struct {
Expand Down Expand Up @@ -51,41 +52,71 @@ func (rs *Replicaset) bucketStatAsync(ctx context.Context, bucketID uint64) *tar
return rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.RO}, bucketStatFnc, []interface{}{bucketID})
}

func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) {
var bsInfo BucketStatInfo
type vshardStorageBucketStatResponseProto struct {
ok bool
info BucketStatInfo
err StorageCallVShardError
}

respData, err := future.Get()
func (r *vshardStorageBucketStatResponseProto) DecodeMsgpack(d *msgpack.Decoder) error {
// bucket_stat returns pair: stat, err
// https://github.com/tarantool/vshard/blob/e1c806e1d3d2ce8a4e6b4d498c09051bf34ab92a/vshard/storage/init.lua#L1413

respArrayLen, err := d.DecodeArrayLen()
if err != nil {
return bsInfo, err
return err
}

if len(respData) == 0 {
return bsInfo, fmt.Errorf("protocol violation bucketStatWait: empty response")
if respArrayLen == 0 {
return fmt.Errorf("protocol violation bucketStatWait: empty response")
}

if respData[0] == nil {
if len(respData) != 2 {
return bsInfo, fmt.Errorf("protocol violation bucketStatWait: invalid response length %d when respData[0] is nil", len(respData))
code, err := d.PeekCode()
if err != nil {
return err
}

if code == msgpcode.Nil {
err = d.DecodeNil()
if err != nil {
return err
}

if respArrayLen != 2 {
return fmt.Errorf("protocol violation bucketStatWait: length is %d on vshard error case", respArrayLen)
}

var bsError bucketStatError
err = mapstructure.Decode(respData[1], &bsError)
err = d.Decode(&r.err)
if err != nil {
// We could not decode respData[1] as bsError, so return respData[1] as is, add info why we could not decode.
return bsInfo, fmt.Errorf("bucketStatWait error: %v (can't decode into bsError: %v)", respData[1], err)
return fmt.Errorf("failed to decode storage vshard error: %w", err)
}

return bsInfo, bsError
return nil
}

// A problem with key-code 1
// todo: fix after https://github.com/tarantool/go-tarantool/issues/368
err = mapstructure.Decode(respData[0], &bsInfo)
err = d.Decode(&r.info)
if err != nil {
return bsInfo, fmt.Errorf("can't decode bsInfo: %w", err)
return fmt.Errorf("failed to decode bucket stat info: %w", err)
}

r.ok = true

return nil
}

func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) {
var bucketStatResponse vshardStorageBucketStatResponseProto

err := future.GetTyped(&bucketStatResponse)
if err != nil {
return BucketStatInfo{}, err
}

if !bucketStatResponse.ok {
return BucketStatInfo{}, bucketStatResponse.err
}

return bsInfo, nil
return bucketStatResponse.info, nil
}

// ReplicaCall perform function on remote storage
Expand Down
45 changes: 43 additions & 2 deletions vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/google/uuid"
"github.com/snksoft/crc"
"github.com/vmihailenco/msgpack/v5"

tarantool "github.com/tarantool/go-tarantool/v2"
)
Expand Down Expand Up @@ -122,8 +123,48 @@ type Config struct {
}

type BucketStatInfo struct {
BucketID uint64 `mapstructure:"id"`
Status string `mapstructure:"status"`
BucketID uint64 `msgpack:"id"`
Status string `msgpack:"status"`
}

// tnt vshard storage returns map with 'int' keys for bucketStatInfo,
// example: map[id:48 status:active 1:48 2:active].
// But msgpackv5 supports only string keys when decoding maps into structs,
// see issue: https://github.com/vmihailenco/msgpack/issues/372
// To workaround this we decode BucketStatInfo manually.
// When the issue above will be resolved, this code can be (and should be) deleted.
func (bsi *BucketStatInfo) DecodeMsgpack(d *msgpack.Decoder) error {
nKeys, err := d.DecodeMapLen()
if err != nil {
return err
}

for i := 0; i < nKeys; i++ {
key, err := d.DecodeInterface()
if err != nil {
return err
}

keyName, _ := key.(string)
switch keyName {
case "id":
if err := d.Decode(&bsi.BucketID); err != nil {
return err
}
case "status":
if err := d.Decode(&bsi.Status); err != nil {
return err
}
default:
// skip unused value
if err := d.Skip(); err != nil {
return err
}
}

}

return nil
}

type InstanceInfo struct {
Expand Down

0 comments on commit 0a8677f

Please sign in to comment.