Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial AVS 0.11.0 support #17

Merged
merged 14 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cmd/flags/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ const (
HnswConstructionEf = "hnsw-ef-construction"
HnswEf = "hnsw-ef"
HnswMaxMemQueueSize = "hnsw-max-mem-queue-size"
BatchMaxRecords = "hnsw-batch-max-records"
BatchInterval = "hnsw-batch-interval"
BatchMaxIndexRecords = "hnsw-batch-max-index-records"
BatchIndexInterval = "hnsw-batch-index-interval"
BatchMaxReindexRecords = "hnsw-batch-max-reindex-records"
BatchReindexInterval = "hnsw-batch-reindex-interval"
HnswCacheMaxEntries = "hnsw-cache-max-entries"
HnswCacheExpiry = "hnsw-cache-expiry"
HnswHealerMaxScanRatePerNode = "hnsw-healer-max-scan-rate-per-node"
Expand All @@ -54,6 +56,7 @@ const (
HnswHealerParallelism = "hnsw-healer-parallelism"
HnswMergeParallelism = "hnsw-merge-index-parallelism"
HnswMergeReIndexParallelism = "hnsw-merge-reindex-parallelism"
HnswVectorIntegrityCheck = "hnsw-vector-integrity-check"
TLSProtocols = "tls-protocols"
TLSCaFile = "tls-cafile"
TLSCaPath = "tls-capath"
Expand Down
6 changes: 3 additions & 3 deletions cmd/flags/credentials_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func TestCredentialsFlag_Type(t *testing.T) {
func TestCredentialsFlag_String(t *testing.T) {
// Test string representation with user and password
flag := CredentialsFlag{
User: StringOptionalFlag{Val: tests.GetStrPtr("username")},
Password: StringOptionalFlag{Val: tests.GetStrPtr("password")},
User: StringOptionalFlag{Val: tests.Ptr("username")},
Password: StringOptionalFlag{Val: tests.Ptr("password")},
}
str := flag.String()
expected := "username:password"
Expand All @@ -58,7 +58,7 @@ func TestCredentialsFlag_String(t *testing.T) {

// Test string representation with user only
flag = CredentialsFlag{
User: StringOptionalFlag{Val: tests.GetStrPtr("username")},
User: StringOptionalFlag{Val: tests.Ptr("username")},
Password: StringOptionalFlag{},
}
str = flag.String()
Expand Down
29 changes: 19 additions & 10 deletions cmd/flags/hnsw.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,37 @@ import (
)

type BatchingFlags struct {
MaxRecords Uint32OptionalFlag
Interval DurationOptionalFlag
MaxIndexRecords Uint32OptionalFlag
IndexInterval DurationOptionalFlag
MaxReindexRecords Uint32OptionalFlag
ReindexInterval DurationOptionalFlag
}

func NewHnswBatchingFlags() *BatchingFlags {
return &BatchingFlags{
MaxRecords: Uint32OptionalFlag{},
Interval: DurationOptionalFlag{},
MaxIndexRecords: Uint32OptionalFlag{},
IndexInterval: DurationOptionalFlag{},
MaxReindexRecords: Uint32OptionalFlag{},
ReindexInterval: DurationOptionalFlag{},
}
}

func (cf *BatchingFlags) NewFlagSet() *pflag.FlagSet {
flagSet := &pflag.FlagSet{}
flagSet.Var(&cf.MaxRecords, BatchMaxRecords, "Maximum number of records to fit in a batch.") //nolint:lll // For readability
flagSet.Var(&cf.Interval, BatchInterval, "The maximum amount of time to wait before finalizing a batch.") //nolint:lll // For readability
flagSet.Var(&cf.MaxIndexRecords, BatchMaxIndexRecords, "Maximum number of records to fit in a batch.") //nolint:lll // For readability
dwelch-spike marked this conversation as resolved.
Show resolved Hide resolved
flagSet.Var(&cf.IndexInterval, BatchIndexInterval, "The maximum amount of time to wait before finalizing a batch.") //nolint:lll // For readability
flagSet.Var(&cf.MaxReindexRecords, BatchMaxReindexRecords, "Maximum number of re-index records to fit in a batch.") //nolint:lll // For readability
flagSet.Var(&cf.ReindexInterval, BatchReindexInterval, "The maximum amount of time to wait before finalizing a re-index batch.") //nolint:lll // For readability

return flagSet
}

func (cf *BatchingFlags) NewSLogAttr() []any {
return []any{
slog.Any(BatchMaxRecords, cf.MaxRecords.Val),
slog.Any(BatchInterval, cf.Interval.Val),
slog.Any(BatchMaxIndexRecords, cf.MaxIndexRecords.Val),
slog.Any(BatchIndexInterval, cf.IndexInterval.Val),
slog.Any(BatchMaxReindexRecords, cf.MaxIndexRecords.Val),
slog.Any(BatchReindexInterval, cf.IndexInterval.Val),
}
}

Expand All @@ -45,10 +53,11 @@ func NewHnswCachingFlags() *CachingFlags {
}
}

//nolint:lll // For readability
func (cf *CachingFlags) NewFlagSet() *pflag.FlagSet {
flagSet := &pflag.FlagSet{}
flagSet.Var(&cf.MaxEntries, HnswCacheMaxEntries, "Maximum number of entries to cache.") //nolint:lll // For readability
flagSet.Var(&cf.Expiry, HnswCacheExpiry, "A cache entry will expire after this amount of time has passed since the entry was added to cache") //nolint:lll // For readability
flagSet.Var(&cf.MaxEntries, HnswCacheMaxEntries, "Maximum number of entries to cache.")
flagSet.Var(&cf.Expiry, HnswCacheExpiry, "A cache entry will expire after this amount of time has passed since the entry was added to cache")

return flagSet
}
Expand Down
10 changes: 10 additions & 0 deletions cmd/flags/optionals.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,13 @@ func (f *DurationOptionalFlag) Uint32() *uint32 {

return &milli
}

func (f *DurationOptionalFlag) Int64() *int64 {
if f.Val == nil {
return nil
}

milli := f.Val.Milliseconds()

return &milli
}
19 changes: 19 additions & 0 deletions cmd/flags/optionals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package flags

import (
"testing"
"time"

"github.com/stretchr/testify/suite"
)
Expand Down Expand Up @@ -91,3 +92,21 @@ func (suite *OptionalFlagSuite) TestIntOptionalFlag() {
suite.T().Errorf("Expected error, got nil")
}
}

func (suite *OptionalFlagSuite) TestDurationOptionalFlag() {
f := &DurationOptionalFlag{}

err := f.Set("300ms")
if err != nil {
suite.T().Errorf("Unexpected error: %v", err)
}

if f.Val == nil || *f.Val != time.Duration(300)*time.Millisecond {
suite.T().Errorf("Expected 300ms, got %v", f.Val)
}

err = f.Set("not a time")
if err == nil {
suite.T().Errorf("Expected error, got nil")
}
}
81 changes: 44 additions & 37 deletions cmd/indexCreate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,41 @@ import (

//nolint:govet // Padding not a concern for a CLI
var indexCreateFlags = &struct {
clientFlags *flags.ClientFlags
yes bool
inputFile string
namespace string
set flags.StringOptionalFlag
indexName string
vectorField string
dimensions uint32
distanceMetric flags.DistanceMetricFlag
indexLabels map[string]string
storageNamespace flags.StringOptionalFlag
storageSet flags.StringOptionalFlag
hnswMaxEdges flags.Uint32OptionalFlag
hnswEf flags.Uint32OptionalFlag
hnswConstructionEf flags.Uint32OptionalFlag
hnswMaxMemQueueSize flags.Uint32OptionalFlag
hnswBatch flags.BatchingFlags
hnswCache flags.CachingFlags
hnswHealer flags.HealerFlags
hnswMerge flags.MergeFlags
clientFlags *flags.ClientFlags
yes bool
inputFile string
namespace string
set flags.StringOptionalFlag
indexName string
vectorField string
dimensions uint32
distanceMetric flags.DistanceMetricFlag
indexLabels map[string]string
storageNamespace flags.StringOptionalFlag
storageSet flags.StringOptionalFlag
hnswMaxEdges flags.Uint32OptionalFlag
hnswEf flags.Uint32OptionalFlag
hnswConstructionEf flags.Uint32OptionalFlag
hnswMaxMemQueueSize flags.Uint32OptionalFlag
hnswBatch flags.BatchingFlags
hnswCache flags.CachingFlags
hnswHealer flags.HealerFlags
hnswMerge flags.MergeFlags
hnswVectorIntegrityCheck flags.BoolOptionalFlag
}{
clientFlags: rootFlags.clientFlags,
set: flags.StringOptionalFlag{},
storageNamespace: flags.StringOptionalFlag{},
storageSet: flags.StringOptionalFlag{},
hnswMaxEdges: flags.Uint32OptionalFlag{},
hnswEf: flags.Uint32OptionalFlag{},
hnswConstructionEf: flags.Uint32OptionalFlag{},
hnswMaxMemQueueSize: flags.Uint32OptionalFlag{},
hnswBatch: *flags.NewHnswBatchingFlags(),
hnswCache: *flags.NewHnswCachingFlags(),
hnswHealer: *flags.NewHnswHealerFlags(),
hnswMerge: *flags.NewHnswMergeFlags(),
clientFlags: rootFlags.clientFlags,
set: flags.StringOptionalFlag{},
storageNamespace: flags.StringOptionalFlag{},
storageSet: flags.StringOptionalFlag{},
hnswMaxEdges: flags.Uint32OptionalFlag{},
hnswEf: flags.Uint32OptionalFlag{},
hnswConstructionEf: flags.Uint32OptionalFlag{},
hnswMaxMemQueueSize: flags.Uint32OptionalFlag{},
hnswBatch: *flags.NewHnswBatchingFlags(),
hnswCache: *flags.NewHnswCachingFlags(),
hnswHealer: *flags.NewHnswHealerFlags(),
hnswMerge: *flags.NewHnswMergeFlags(),
hnswVectorIntegrityCheck: flags.BoolOptionalFlag{},
}

func newIndexCreateFlagSet() *pflag.FlagSet {
Expand All @@ -72,7 +74,8 @@ func newIndexCreateFlagSet() *pflag.FlagSet {
flagSet.Var(&indexCreateFlags.hnswMaxEdges, flags.HnswMaxEdges, "Maximum number bi-directional links per HNSW vertex. Greater values of 'm' in general provide better recall for data with high dimensionality, while lower values work well for data with lower dimensionality. The storage space required for the index increases proportionally with 'm'.") //nolint:lll // For readability
flagSet.Var(&indexCreateFlags.hnswConstructionEf, flags.HnswConstructionEf, "The number of candidate nearest neighbors shortlisted during index creation. Larger values provide better recall at the cost of longer index update times.") //nolint:lll // For readability
flagSet.Var(&indexCreateFlags.hnswEf, flags.HnswEf, "The default number of candidate nearest neighbors shortlisted during search. Larger values provide better recall at the cost of longer search times.") //nolint:lll // For readability
flagSet.Var(&indexCreateFlags.hnswMaxMemQueueSize, flags.HnswMaxMemQueueSize, "Maximum size of in-memory queue for inserted/updated vector records.") //nolint:lll // For readability
flagSet.Var(&indexCreateFlags.hnswMaxMemQueueSize, flags.HnswMaxMemQueueSize, "Maximum size of in-memory queue for inserted/updated vector records.") //nolint:lll // For readability //nolint:lll // For readability
flagSet.Var(&indexCreateFlags.hnswVectorIntegrityCheck, flags.HnswVectorIntegrityCheck, "Enable/disable vector integrity check. Defaults to enabled.") //nolint:lll // For readability
flagSet.AddFlagSet(indexCreateFlags.hnswBatch.NewFlagSet())
flagSet.AddFlagSet(indexCreateFlags.hnswCache.NewFlagSet())
flagSet.AddFlagSet(indexCreateFlags.hnswHealer.NewFlagSet())
Expand Down Expand Up @@ -228,6 +231,7 @@ asvec index create -i myindex -n test -s testset -d 256 -m COSINE --%s vector \
slog.Any(flags.HnswEf, indexCreateFlags.hnswEf.Val),
slog.Any(flags.HnswConstructionEf, indexCreateFlags.hnswConstructionEf.Val),
slog.Any(flags.HnswMaxMemQueueSize, indexCreateFlags.hnswMaxMemQueueSize.Val),
slog.Any(flags.HnswVectorIntegrityCheck, indexCreateFlags.hnswVectorIntegrityCheck.Val),
)...,
)

Expand Down Expand Up @@ -330,12 +334,14 @@ func runCreateIndexFromFlags(client *avs.Client) error {
EfConstruction: indexCreateFlags.hnswConstructionEf.Val,
MaxMemQueueSize: indexCreateFlags.hnswMaxMemQueueSize.Val,
BatchingParams: &protos.HnswBatchingParams{
MaxRecords: indexCreateFlags.hnswBatch.MaxRecords.Val,
Interval: indexCreateFlags.hnswBatch.Interval.Uint32(),
MaxIndexRecords: indexCreateFlags.hnswBatch.MaxIndexRecords.Val,
IndexInterval: indexCreateFlags.hnswBatch.IndexInterval.Uint32(),
MaxReindexRecords: indexCreateFlags.hnswBatch.MaxReindexRecords.Val,
ReindexInterval: indexCreateFlags.hnswBatch.ReindexInterval.Uint32(),
},
CachingParams: &protos.HnswCachingParams{
IndexCachingParams: &protos.HnswCachingParams{
MaxEntries: indexCreateFlags.hnswCache.MaxEntries.Val,
Expiry: indexCreateFlags.hnswCache.Expiry.Uint64(),
Expiry: indexCreateFlags.hnswCache.Expiry.Int64(),
},
HealerParams: &protos.HnswHealerParams{
MaxScanRatePerNode: indexCreateFlags.hnswHealer.MaxScanRatePerNode.Val,
Expand All @@ -348,6 +354,7 @@ func runCreateIndexFromFlags(client *avs.Client) error {
IndexParallelism: indexCreateFlags.hnswMerge.IndexParallelism.Val,
ReIndexParallelism: indexCreateFlags.hnswMerge.ReIndexParallelism.Val,
},
EnableVectorIntegrityCheck: indexCreateFlags.hnswVectorIntegrityCheck.Val,
},
}

Expand Down
Loading
Loading