Skip to content

Commit

Permalink
Resolve kvs already closed before last saving (#2390) (#2394)
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <kpango@vdaas.org>
Co-authored-by: Yusuke Kato <kpango@vdaas.org>
Co-authored-by: Kiichiro YUKAWA <kyukawa315@gmail.com>
  • Loading branch information
3 people authored Feb 21, 2024
1 parent 92032a6 commit a275e54
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 146 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ clean-generated:
mv $(ROOTDIR)/apis/grpc/v1/vald/vald.go $(TEMP_DIR)/vald.go
mv $(ROOTDIR)/apis/grpc/v1/agent/core/agent.go $(TEMP_DIR)/agent.go
mv $(ROOTDIR)/apis/grpc/v1/payload/interface.go $(TEMP_DIR)/interface.go
mv $(ROOTDIR)/apis/grpc/v1/mirror/mirror.go $(TEMP_DIR)/mirror.go
rm -rf \
$(ROOTDIR)/*.log \
$(ROOTDIR)/*.svg \
Expand All @@ -384,6 +385,8 @@ clean-generated:
mv $(TEMP_DIR)/agent.go $(ROOTDIR)/apis/grpc/v1/agent/core/agent.go
mkdir -p $(ROOTDIR)/apis/grpc/v1/payload
mv $(TEMP_DIR)/interface.go $(ROOTDIR)/apis/grpc/v1/payload/interface.go
mkdir -p $(ROOTDIR)/apis/grpc/v1/mirror
mv $(TEMP_DIR)/mirror.go $(ROOTDIR)/apis/grpc/v1/mirror/mirror.go

.PHONY: license
## add license to files
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ replace (
github.com/akrylysov/pogreb => github.com/akrylysov/pogreb v0.10.2
github.com/antihax/optional => github.com/antihax/optional v1.0.0
github.com/armon/go-socks5 => github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
github.com/aws/aws-sdk-go => github.com/aws/aws-sdk-go v1.50.20
github.com/aws/aws-sdk-go => github.com/aws/aws-sdk-go v1.50.21
github.com/aws/aws-sdk-go-v2 => github.com/aws/aws-sdk-go-v2 v1.25.0
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream => github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0
github.com/aws/aws-sdk-go-v2/config => github.com/aws/aws-sdk-go-v2/config v1.27.0
github.com/aws/aws-sdk-go-v2/credentials => github.com/aws/aws-sdk-go-v2/credentials v1.17.0
github.com/aws/aws-sdk-go-v2/feature/ec2/imds => github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0
github.com/aws/aws-sdk-go-v2/feature/s3/manager => github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1
github.com/aws/aws-sdk-go-v2/feature/s3/manager => github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.2
github.com/aws/aws-sdk-go-v2/internal/configsources => github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 => github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0
github.com/aws/aws-sdk-go-v2/internal/ini => github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0
Expand All @@ -59,7 +59,7 @@ replace (
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url => github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0
github.com/aws/aws-sdk-go-v2/service/internal/s3shared => github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0
github.com/aws/aws-sdk-go-v2/service/kms => github.com/aws/aws-sdk-go-v2/service/kms v1.28.1
github.com/aws/aws-sdk-go-v2/service/s3 => github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0
github.com/aws/aws-sdk-go-v2/service/s3 => github.com/aws/aws-sdk-go-v2/service/s3 v1.50.1
github.com/aws/aws-sdk-go-v2/service/secretsmanager => github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.27.1
github.com/aws/aws-sdk-go-v2/service/sns => github.com/aws/aws-sdk-go-v2/service/sns v1.28.0
github.com/aws/aws-sdk-go-v2/service/sqs => github.com/aws/aws-sdk-go-v2/service/sqs v1.30.1
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybF
github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/aws/aws-sdk-go v1.50.20 h1:xfAnSDVf/azIWTVQXQODp89bubvCS85r70O3nuQ4dnE=
github.com/aws/aws-sdk-go v1.50.20/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go v1.50.21 h1:W8awpwiInOt4qHQE6JghRYQJhHcf/cDJS3mlZYqioSQ=
github.com/aws/aws-sdk-go v1.50.21/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go-v2 v1.25.0 h1:sv7+1JVJxOu/dD/sz/csHX7jFqmP001TIY7aytBWDSQ=
github.com/aws/aws-sdk-go-v2 v1.25.0/go.mod h1:G104G1Aho5WqF+SR3mDIobTABQzpYV0WxMsKxlMggOA=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 h1:2UO6/nT1lCZq1LqM67Oa4tdgP1CvL1sLSxvuD+VrOeE=
Expand All @@ -197,8 +197,8 @@ github.com/aws/aws-sdk-go-v2/credentials v1.17.0 h1:lMW2x6sKBsiAJrpi1doOXqWFyEPo
github.com/aws/aws-sdk-go-v2/credentials v1.17.0/go.mod h1:uT41FIH8cCIxOdUYIL0PYyHlL1NoneDuDSCwg5VE/5o=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 h1:xWCwjjvVz2ojYTP4kBKUuUh9ZrXfcAXpflhOUUeXg1k=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0/go.mod h1:j3fACuqXg4oMTQOR2yY7m0NmJY0yBK4L4sLsRXq1Ins=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1 h1:FqtJUSBgT2yfZ8kZhTi9AO131qMLOzb4MiH4riAM8XM=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1/go.mod h1:G3V4qNUPMHKrXW/l149QXmHjf1vlMWBO4UuGPCK4a/c=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.2 h1:VEekE/fJWqAWYozxFQ07B+h8NdvTPAYhV13xIBenuO0=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.2/go.mod h1:8vozqAHmDNmoD4YbuDKIfpnLbByzngczL4My1RELLVo=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 h1:NPs/EqVO+ajwOoq56EfcGKa3L3ruWuazkIw1BqxwOPw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0/go.mod h1:D+duLy2ylgatV+yTlQ8JTuLfDD0BnFvnQRc+o6tbZ4M=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 h1:ks7KGMVUMoDzcxNWUlEdI+/lokMFD136EL6DWmUOV80=
Expand All @@ -215,8 +215,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0 h1:SHN/umDLT
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0/go.mod h1:l8gPU5RYGOFHJqWEpPMoRTP0VoaWQSkJdKo+hwWnnDA=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 h1:l5puwOHr7IxECuPMIuZG7UKOzAnF24v6t4l+Z5Moay4=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0/go.mod h1:Oov79flWa/n7Ni+lQC3z+VM7PoRM47omRqbJU9B5Y7E=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 h1:jZAdMD1ioZdqirzzVVRhpHHWJmcGGCn8JqDYBs5nmYA=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.1 h1:bjpWJEXch7moIt3PX2r5XpGROsletl7enqG1Q3Te1Dc=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.1/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs=
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 h1:u6OkVDxtBPnxPkZ9/63ynEe+8kHbtS5IfaC4PzVxzWM=
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0/go.mod h1:YqbU3RS/pkDVu+v+Nwxvn0i1WB0HkNWEePWbmODEbbs=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.5 h1:2k9KmFawS63euAkY4/ixVNsYYwrwnd5fIvgEKkfZFNM=
Expand Down
2 changes: 0 additions & 2 deletions internal/core/algorithm/ngt/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,9 +710,7 @@ func (n *ngt) Remove(id uint) error {
return n.newGoError(ne)
}
n.PutErrorBuffer(ne)

n.cnt.Add(^uint64(0))

return nil
}

Expand Down
17 changes: 15 additions & 2 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1697,8 +1697,11 @@ func (n *ngt) IsIndexing() bool {

func (n *ngt) UUIDs(ctx context.Context) (uuids []string) {
uuids = make([]string, 0, n.kvs.Len())
var mu sync.Mutex
n.kvs.Range(ctx, func(uuid string, oid uint32, _ int64) bool {
mu.Lock()
uuids = append(uuids, uuid)
mu.Unlock()
return true
})
return uuids
Expand Down Expand Up @@ -1741,8 +1744,18 @@ func (n *ngt) GetDimensionSize() int {

func (n *ngt) Close(ctx context.Context) (err error) {
defer n.core.Close()

err = n.kvs.Close()
defer func() {
kerr := n.kvs.Close()
if kerr != nil &&
!errors.Is(err, context.Canceled) &&
!errors.Is(err, context.DeadlineExceeded) {
if err != nil {
err = errors.Join(kerr, err)
} else {
err = kerr
}
}
}()
if len(n.path) != 0 {
if n.isReadReplica {
log.Info("skip create and save index operation on close because this is read replica")
Expand Down
35 changes: 17 additions & 18 deletions pkg/agent/internal/kvs/kvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"sync/atomic"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/sync"
"github.com/vdaas/vald/internal/sync/errgroup"
Expand Down Expand Up @@ -53,7 +55,6 @@ type bidi struct {
l uint64
ou [slen]*sync.Map[uint32, valueStructOu]
uo [slen]*sync.Map[string, ValueStructUo]
eg errgroup.Group
}

const (
Expand All @@ -79,14 +80,6 @@ func New(opts ...Option) BidiMap {
b.uo[i] = new(sync.Map[string, ValueStructUo])
}

if b.eg == nil {
b.eg, _ = errgroup.New(context.Background())
}

if b.concurrency > 0 {
b.eg.SetLimit(b.concurrency)
}

return b
}

Expand Down Expand Up @@ -151,24 +144,33 @@ func (b *bidi) DeleteInverse(val uint32) (key string, ok bool) {

// Range retrieves all set keys and values and calls the callback function f.
func (b *bidi) Range(ctx context.Context, f func(string, uint32, int64) bool) {
var wg sync.WaitGroup
eg, ctx := errgroup.New(ctx)
if b.concurrency > 0 {
eg.SetLimit(b.concurrency)
}
for i := range b.uo {
idx := i
wg.Add(1)
b.eg.Go(safety.RecoverFunc(func() (err error) {
eg.Go(safety.RecoverFunc(func() (err error) {
b.uo[idx].Range(func(uuid string, val ValueStructUo) bool {
select {
case <-ctx.Done():
err = ctx.Err()
return false
default:
return f(uuid, val.value, val.timestamp)
}
})
wg.Done()
if err != nil &&
(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
return err
}
return nil
}))
}
wg.Wait()
err := eg.Wait()
if err != nil {
log.Error(err)
}
}

// Len returns the length of the cache that is set in the bidi.
Expand All @@ -180,10 +182,7 @@ func (b *bidi) Len() uint64 {
}

func (b *bidi) Close() error {
if b == nil {
return nil
}
return b.eg.Wait()
return nil
}

func getShardID(key string) (id uint64) {
Expand Down
9 changes: 1 addition & 8 deletions pkg/agent/internal/kvs/kvs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func TestNew(t *testing.T) {
l: 0,
ou: wantOu,
uo: wantUo,
eg: errgroup.Get(),
},
},
}
Expand Down Expand Up @@ -1685,12 +1684,10 @@ func Test_bidi_Range(t *testing.T) {
defer goleak.VerifyNone(tt, goleak.IgnoreCurrent())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eg, egctx := errgroup.New(ctx)
b := &bidi{
ou: test.fields.ou,
uo: test.fields.uo,
l: test.fields.l,
eg: eg,
}
if test.beforeFunc != nil {
test.beforeFunc(tt, test.args, b)
Expand All @@ -1703,7 +1700,7 @@ func Test_bidi_Range(t *testing.T) {
checkFunc = defaultCheckFunc
}

b.Range(egctx, test.args.f)
b.Range(ctx, test.args.f)
if err := checkFunc(test.want, b); err != nil {
tt.Errorf("error = %v", err)
}
Expand Down Expand Up @@ -1800,7 +1797,6 @@ func Test_bidi_Len(t *testing.T) {
// l uint64
// ou [slen]*sync.Map[uint32, valueStructOu]
// uo [slen]*sync.Map[string, ValueStructUo]
// eg errgroup.Group
// }
// type want struct {
// err error
Expand Down Expand Up @@ -1829,7 +1825,6 @@ func Test_bidi_Len(t *testing.T) {
// l:0,
// ou:nil,
// uo:nil,
// eg:nil,
// },
// want: want{},
// checkFunc: defaultCheckFunc,
Expand All @@ -1852,7 +1847,6 @@ func Test_bidi_Len(t *testing.T) {
// l:0,
// ou:nil,
// uo:nil,
// eg:nil,
// },
// want: want{},
// checkFunc: defaultCheckFunc,
Expand Down Expand Up @@ -1887,7 +1881,6 @@ func Test_bidi_Len(t *testing.T) {
// l: test.fields.l,
// ou: test.fields.ou,
// uo: test.fields.uo,
// eg: test.fields.eg,
// }
//
// err := b.Close()
Expand Down
11 changes: 0 additions & 11 deletions pkg/agent/internal/kvs/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package kvs

import (
"runtime"

"github.com/vdaas/vald/internal/sync/errgroup"
)

// Option represents the functional option for bidi.
Expand All @@ -29,15 +27,6 @@ var defaultOptions = []Option{
WithConcurrency(runtime.GOMAXPROCS(-1) * 10),
}

// WithErrGroup returns the option to set the errgroup.
func WithErrGroup(eg errgroup.Group) Option {
return func(b *bidi) {
if eg != nil {
b.eg = eg
}
}
}

// WithConcurrency returns the option to set the concurrency.
func WithConcurrency(c int) Option {
return func(b *bidi) {
Expand Down
86 changes: 0 additions & 86 deletions pkg/agent/internal/kvs/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,92 +18,6 @@ package kvs

// NOT IMPLEMENTED BELOW
//
// func TestWithErrGroup(t *testing.T) {
// type args struct {
// eg errgroup.Group
// }
// type want struct {
// want Option
// }
// type test struct {
// name string
// args args
// want want
// checkFunc func(want, Option) error
// beforeFunc func(*testing.T, args)
// afterFunc func(*testing.T, args)
// }
// defaultCheckFunc := func(w want, got Option) error {
// if !reflect.DeepEqual(got, w.want) {
// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want)
// }
// return nil
// }
// tests := []test{
// // TODO test cases
// /*
// {
// name: "test_case_1",
// args: args {
// eg:nil,
// },
// want: want{},
// checkFunc: defaultCheckFunc,
// beforeFunc: func(t *testing.T, args args) {
// t.Helper()
// },
// afterFunc: func(t *testing.T, args args) {
// t.Helper()
// },
// },
// */
//
// // TODO test cases
// /*
// func() test {
// return test {
// name: "test_case_2",
// args: args {
// eg:nil,
// },
// want: want{},
// checkFunc: defaultCheckFunc,
// beforeFunc: func(t *testing.T, args args) {
// t.Helper()
// },
// afterFunc: func(t *testing.T, args args) {
// t.Helper()
// },
// }
// }(),
// */
// }
//
// for _, tc := range tests {
// test := tc
// t.Run(test.name, func(tt *testing.T) {
// tt.Parallel()
// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent())
// if test.beforeFunc != nil {
// test.beforeFunc(tt, test.args)
// }
// if test.afterFunc != nil {
// defer test.afterFunc(tt, test.args)
// }
// checkFunc := test.checkFunc
// if test.checkFunc == nil {
// checkFunc = defaultCheckFunc
// }
//
// got := WithErrGroup(test.args.eg)
// if err := checkFunc(test.want, got); err != nil {
// tt.Errorf("error = %v", err)
// }
//
// })
// }
// }
//
// func TestWithConcurrency(t *testing.T) {
// type args struct {
// c int
Expand Down
Loading

0 comments on commit a275e54

Please sign in to comment.