Skip to content

Commit

Permalink
Add listing api to stow storage (#5741)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugra Gedik <bgedik@gmail.com>
  • Loading branch information
bgedik authored Sep 12, 2024
1 parent 86c63f7 commit e67aae0
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 3 deletions.
4 changes: 4 additions & 0 deletions flyteadmin/pkg/common/mocks/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func (t *TestDataStore) Head(ctx context.Context, reference storage.DataReferenc
return t.HeadCb(ctx, reference)
}

func (t *TestDataStore) List(ctx context.Context, reference storage.DataReference, maxItems int, cursor storage.Cursor) ([]storage.DataReference, storage.Cursor, error) {
return nil, storage.NewCursorAtEnd(), fmt.Errorf("Not implemented yet")
}

func (t *TestDataStore) ReadProtobuf(ctx context.Context, reference storage.DataReference, msg proto.Message) error {
return t.ReadProtobufCb(ctx, reference, msg)
}
Expand Down
4 changes: 4 additions & 0 deletions flytepropeller/pkg/utils/failing_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func (FailingRawStore) Head(ctx context.Context, reference storage.DataReference
return nil, fmt.Errorf("failed metadata fetch")
}

func (FailingRawStore) List(ctx context.Context, reference storage.DataReference, maxItems int, cursor storage.Cursor) ([]storage.DataReference, storage.Cursor, error) {
return nil, storage.NewCursorAtEnd(), fmt.Errorf("Not implemented yet")
}

func (FailingRawStore) ReadRaw(ctx context.Context, reference storage.DataReference) (io.ReadCloser, error) {
return nil, fmt.Errorf("failed read raw")
}
Expand Down
4 changes: 4 additions & 0 deletions flytestdlib/storage/cached_rawstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func (d *dummyStore) Head(ctx context.Context, reference DataReference) (Metadat
return d.HeadCb(ctx, reference)
}

func (d *dummyStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error) {
return nil, NewCursorAtEnd(), fmt.Errorf("Not implemented yet")
}

func (d *dummyStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) {
return d.ReadRawCb(ctx, reference)
}
Expand Down
4 changes: 4 additions & 0 deletions flytestdlib/storage/mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (s *InMemoryStore) Head(ctx context.Context, reference DataReference) (Meta
}, nil
}

func (s *InMemoryStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error) {
return nil, NewCursorAtEnd(), fmt.Errorf("Not implemented yet")
}

func (s *InMemoryStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) {
if raw, found := s.cache[reference]; found {
return ioutil.NopCloser(bytes.NewReader(raw)), nil
Expand Down
48 changes: 48 additions & 0 deletions flytestdlib/storage/mocks/composed_protobuf_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions flytestdlib/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,41 @@ type Metadata interface {
ContentMD5() string
}

type CursorState int

const (
// Enum representing state of the cursor
AtStartCursorState CursorState = 0
AtEndCursorState CursorState = 1
AtCustomPosCursorState CursorState = 2
)

type Cursor struct {
cursorState CursorState
customPosition string
}

func NewCursorAtStart() Cursor {
return Cursor{
cursorState: AtStartCursorState,
customPosition: "",
}
}

func NewCursorAtEnd() Cursor {
return Cursor{
cursorState: AtEndCursorState,
customPosition: "",
}
}

func NewCursorFromCustomPosition(customPosition string) Cursor {
return Cursor{
cursorState: AtCustomPosCursorState,
customPosition: customPosition,
}
}

// DataStore is a simplified interface for accessing and storing data in one of the Cloud stores.
// Today we rely on Stow for multi-cloud support, but this interface abstracts that part
type DataStore struct {
Expand Down Expand Up @@ -78,6 +113,9 @@ type RawStore interface {
// Head gets metadata about the reference. This should generally be a light weight operation.
Head(ctx context.Context, reference DataReference) (Metadata, error)

// List gets a list of items given a prefix, using a paginated API
List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error)

// ReadRaw retrieves a byte array from the Blob store or an error
ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error)

Expand Down
46 changes: 46 additions & 0 deletions flytestdlib/storage/stow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ type stowMetrics struct {
HeadFailure labeled.Counter
HeadLatency labeled.StopWatch

ListFailure labeled.Counter
ListLatency labeled.StopWatch

ReadFailure labeled.Counter
ReadOpenLatency labeled.StopWatch

Expand Down Expand Up @@ -251,6 +254,46 @@ func (s *StowStore) Head(ctx context.Context, reference DataReference) (Metadata
return StowMetadata{exists: false}, errs.Wrapf(err, "path:%v", k)
}

func (s *StowStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error) {
_, c, k, err := reference.Split()
if err != nil {
s.metrics.BadReference.Inc(ctx)
return nil, NewCursorAtEnd(), err
}

container, err := s.getContainer(ctx, locationIDMain, c)
if err != nil {
return nil, NewCursorAtEnd(), err
}

t := s.metrics.ListLatency.Start(ctx)
var stowCursor string
if cursor.cursorState == AtStartCursorState {
stowCursor = stow.CursorStart
} else if cursor.cursorState == AtEndCursorState {
return nil, NewCursorAtEnd(), fmt.Errorf("Cursor cannot be at end for the List call")
} else {
stowCursor = cursor.customPosition
}
items, stowCursor, err := container.Items(k, stowCursor, maxItems)
if err == nil {
results := make([]DataReference, len(items))
for index, item := range items {
results[index] = DataReference(item.URL().String())
}
if stow.IsCursorEnd(stowCursor) {
cursor = NewCursorAtEnd()
} else {
cursor = NewCursorFromCustomPosition(stowCursor)
}
t.Stop()
return results, cursor, nil
}

incFailureCounterForError(ctx, s.metrics.ListFailure, err)
return nil, NewCursorAtEnd(), errs.Wrapf(err, "path:%v", k)
}

func (s *StowStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) {
_, c, k, err := reference.Split()
if err != nil {
Expand Down Expand Up @@ -434,6 +477,9 @@ func newStowMetrics(scope promutils.Scope) *stowMetrics {
HeadFailure: labeled.NewCounter("head_failure", "Indicates failure in HEAD for a given reference", scope, labeled.EmitUnlabeledMetric),
HeadLatency: labeled.NewStopWatch("head", "Indicates time to fetch metadata using the Head API", time.Millisecond, scope, labeled.EmitUnlabeledMetric),

ListFailure: labeled.NewCounter("list_failure", "Indicates failure in item listing for a given reference", scope, labeled.EmitUnlabeledMetric),
ListLatency: labeled.NewStopWatch("list", "Indicates time to fetch item listing using the List API", time.Millisecond, scope, labeled.EmitUnlabeledMetric),

ReadFailure: labeled.NewCounter("read_failure", "Indicates failure in GET for a given reference", scope, labeled.EmitUnlabeledMetric, failureTypeOption),
ReadOpenLatency: labeled.NewStopWatch("read_open", "Indicates time to first byte when reading", time.Millisecond, scope, labeled.EmitUnlabeledMetric),

Expand Down
96 changes: 94 additions & 2 deletions flytestdlib/storage/stow_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"net/url"
"os"
"path/filepath"
"sort"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -73,8 +75,37 @@ func (m mockStowContainer) Item(id string) (stow.Item, error) {
return nil, stow.ErrNotFound
}

func (mockStowContainer) Items(prefix, cursor string, count int) ([]stow.Item, string, error) {
return []stow.Item{}, "", nil
func (m mockStowContainer) Items(prefix, cursor string, count int) ([]stow.Item, string, error) {
startIndex := 0
if cursor != "" {
index, err := strconv.Atoi(cursor)
if err != nil {
return nil, "", fmt.Errorf("Invalid cursor '%s'", cursor)
}
startIndex = index
}
endIndexExc := min(len(m.items), startIndex+count)

itemKeys := make([]string, len(m.items))
index := 0
for key := range m.items {
itemKeys[index] = key
index++
}
sort.Strings(itemKeys)

numItems := endIndexExc - startIndex
results := make([]stow.Item, numItems)
for index, itemKey := range itemKeys[startIndex:endIndexExc] {
results[index] = m.items[itemKey]
}

if endIndexExc == len(m.items) {
cursor = ""
} else {
cursor = fmt.Sprintf("%d", endIndexExc)
}
return results, cursor, nil
}

func (m mockStowContainer) RemoveItem(id string) error {
Expand Down Expand Up @@ -361,6 +392,67 @@ func TestStowStore_ReadRaw(t *testing.T) {
})
}

func TestStowStore_List(t *testing.T) {
const container = "container"
t.Run("Listing", func(t *testing.T) {
ctx := context.Background()
fn := fQNFn["s3"]
s, err := NewStowRawStore(fn(container), &mockStowLoc{
ContainerCb: func(id string) (stow.Container, error) {
if id == container {
return newMockStowContainer(container), nil
}
return nil, fmt.Errorf("container is not supported")
},
CreateContainerCb: func(name string) (stow.Container, error) {
if name == container {
return newMockStowContainer(container), nil
}
return nil, fmt.Errorf("container is not supported")
},
}, nil, false, metrics)
assert.NoError(t, err)
writeTestFile(ctx, t, s, "s3://container/a/1")
writeTestFile(ctx, t, s, "s3://container/a/2")
var maxResults = 10
var dataReference DataReference = "s3://container/a"
items, cursor, err := s.List(ctx, dataReference, maxResults, NewCursorAtStart())
assert.NoError(t, err)
assert.Equal(t, NewCursorAtEnd(), cursor)
assert.Equal(t, []DataReference{"a/1", "a/2"}, items)
})

t.Run("Listing with pagination", func(t *testing.T) {
ctx := context.Background()
fn := fQNFn["s3"]
s, err := NewStowRawStore(fn(container), &mockStowLoc{
ContainerCb: func(id string) (stow.Container, error) {
if id == container {
return newMockStowContainer(container), nil
}
return nil, fmt.Errorf("container is not supported")
},
CreateContainerCb: func(name string) (stow.Container, error) {
if name == container {
return newMockStowContainer(container), nil
}
return nil, fmt.Errorf("container is not supported")
},
}, nil, false, metrics)
assert.NoError(t, err)
writeTestFile(ctx, t, s, "s3://container/a/1")
writeTestFile(ctx, t, s, "s3://container/a/2")
var maxResults = 1
var dataReference DataReference = "s3://container/a"
items, cursor, err := s.List(ctx, dataReference, maxResults, NewCursorAtStart())
assert.NoError(t, err)
assert.Equal(t, []DataReference{"a/1"}, items)
items, _, err = s.List(ctx, dataReference, maxResults, cursor)
assert.NoError(t, err)
assert.Equal(t, []DataReference{"a/2"}, items)
})
}

func TestNewLocalStore(t *testing.T) {
labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey)
t.Run("Valid config", func(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion script/generate_helm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ echo "Generating Helm"
HELM_SKIP_INSTALL=${HELM_SKIP_INSTALL:-false}

if [ "${HELM_SKIP_INSTALL}" != "true" ]; then
curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash
# See https://github.com/helm/helm/issues/13324 for a breaking change in latest version of helm
curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | DESIRED_VERSION=v3.15.4 bash
fi

helm version
Expand Down

0 comments on commit e67aae0

Please sign in to comment.