From 9a40f0ad7dc282f3bfe670abbdd5ed93983d1b17 Mon Sep 17 00:00:00 2001 From: Jason Parraga Date: Wed, 15 Jan 2025 00:28:17 -0800 Subject: [PATCH 1/2] Minor auto refresh cleanup Signed-off-by: Jason Parraga --- flytestdlib/cache/auto_refresh.go | 365 --------------- .../cache/auto_refresh_example_test.go | 2 +- flytestdlib/cache/in_memory_auto_refresh.go | 424 ++++++++++++++++++ ...test.go => in_memory_auto_refresh_test.go} | 69 ++- 4 files changed, 472 insertions(+), 388 deletions(-) create mode 100644 flytestdlib/cache/in_memory_auto_refresh.go rename flytestdlib/cache/{auto_refresh_test.go => in_memory_auto_refresh_test.go} (69%) diff --git a/flytestdlib/cache/auto_refresh.go b/flytestdlib/cache/auto_refresh.go index a77da48c08..444ffd100a 100644 --- a/flytestdlib/cache/auto_refresh.go +++ b/flytestdlib/cache/auto_refresh.go @@ -2,20 +2,8 @@ package cache import ( "context" - "fmt" - "runtime/debug" - "sync" - "time" - lru "github.com/hashicorp/golang-lru" - "github.com/prometheus/client_golang/prometheus" - "k8s.io/client-go/util/workqueue" - "k8s.io/utils/clock" - - "github.com/flyteorg/flyte/flytestdlib/contextutils" "github.com/flyteorg/flyte/flytestdlib/errors" - "github.com/flyteorg/flyte/flytestdlib/logger" - "github.com/flyteorg/flyte/flytestdlib/promutils" ) type ItemID = string @@ -45,16 +33,6 @@ type AutoRefresh interface { DeleteDelayed(id ItemID) error } -type metrics struct { - SyncErrors prometheus.Counter - Evictions prometheus.Counter - SyncLatency promutils.StopWatch - CacheHit prometheus.Counter - CacheMiss prometheus.Counter - Size prometheus.Gauge - scope promutils.Scope -} - type Item interface { IsTerminal() bool } @@ -91,346 +69,3 @@ type SyncFunc func(ctx context.Context, batch Batch) ( // CreateBatchesFunc is a func type. Your implementation of this function for your cache instance is responsible for // subdividing the list of cache items into batches. type CreateBatchesFunc func(ctx context.Context, snapshot []ItemWrapper) (batches []Batch, err error) - -type itemWrapper struct { - id ItemID - item Item -} - -func (i itemWrapper) GetID() ItemID { - return i.id -} - -func (i itemWrapper) GetItem() Item { - return i.item -} - -// Thread-safe general purpose auto-refresh cache that watches for updates asynchronously for the keys after they are added to -// the cache. An item can be inserted only once. -// -// Get reads from sync.map while refresh is invoked on a snapshot of keys. Cache eventually catches up on deleted items. -// -// Sync is run as a fixed-interval-scheduled-task, and is skipped if sync from previous cycle is still running. -type autoRefresh struct { - name string - metrics metrics - syncCb SyncFunc - createBatchesCb CreateBatchesFunc - lruMap *lru.Cache - // Items that are currently being processed are in the processing set. - // It will prevent the same item from being processed multiple times by different workers. - processing *sync.Map - toDelete *syncSet - syncPeriod time.Duration - workqueue workqueue.RateLimitingInterface - parallelism uint - lock sync.RWMutex - clock clock.Clock -} - -func getEvictionFunction(counter prometheus.Counter) func(key interface{}, value interface{}) { - return func(_ interface{}, _ interface{}) { - counter.Inc() - } -} - -func SingleItemBatches(_ context.Context, snapshot []ItemWrapper) (batches []Batch, err error) { - res := make([]Batch, 0, len(snapshot)) - for _, item := range snapshot { - res = append(res, Batch{item}) - } - - return res, nil -} - -func newMetrics(scope promutils.Scope) metrics { - return metrics{ - SyncErrors: scope.MustNewCounter("sync_errors", "Counter for sync errors."), - Evictions: scope.MustNewCounter("lru_evictions", "Counter for evictions from LRU."), - SyncLatency: scope.MustNewStopWatch("latency", "Latency for sync operations.", time.Millisecond), - CacheHit: scope.MustNewCounter("cache_hit", "Counter for cache hits."), - CacheMiss: scope.MustNewCounter("cache_miss", "Counter for cache misses."), - Size: scope.MustNewGauge("size", "Current size of the cache"), - scope: scope, - } -} - -func (w *autoRefresh) Start(ctx context.Context) error { - for i := uint(0); i < w.parallelism; i++ { - go func(ctx context.Context) { - err := w.sync(ctx) - if err != nil { - logger.Errorf(ctx, "Failed to sync. Error: %v", err) - } - }(contextutils.WithGoroutineLabel(ctx, fmt.Sprintf("%v-worker-%v", w.name, i))) - } - - enqueueCtx := contextutils.WithGoroutineLabel(ctx, fmt.Sprintf("%v-enqueue", w.name)) - go w.enqueueLoop(enqueueCtx) - - return nil -} - -func (w *autoRefresh) enqueueLoop(ctx context.Context) { - timer := w.clock.NewTimer(w.syncPeriod) - defer timer.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-timer.C(): - err := w.enqueueBatches(ctx) - if err != nil { - logger.Errorf(ctx, "Failed to enqueue. Error: %v", err) - } - timer.Reset(w.syncPeriod) - } - } -} - -// Update updates the item only if it exists in the cache, return true if we updated the item. -func (w *autoRefresh) Update(id ItemID, item Item) (ok bool) { - w.lock.Lock() - defer w.lock.Unlock() - ok = w.lruMap.Contains(id) - if ok { - w.lruMap.Add(id, item) - } - return ok -} - -// Delete deletes the item from the cache if it exists. -func (w *autoRefresh) Delete(key interface{}) { - w.lock.Lock() - defer w.lock.Unlock() - w.toDelete.Remove(key) - w.lruMap.Remove(key) -} - -func (w *autoRefresh) Get(id ItemID) (Item, error) { - if val, ok := w.lruMap.Get(id); ok { - w.metrics.CacheHit.Inc() - return val.(Item), nil - } - - w.metrics.CacheMiss.Inc() - return nil, errors.Errorf(ErrNotFound, "Item with id [%v] not found.", id) -} - -// Return the item if exists else create it. -// Create should be invoked only once. recreating the object is not supported. -func (w *autoRefresh) GetOrCreate(id ItemID, item Item) (Item, error) { - if val, ok := w.lruMap.Get(id); ok { - w.metrics.CacheHit.Inc() - return val.(Item), nil - } - - w.lruMap.Add(id, item) - w.metrics.CacheMiss.Inc() - - // It fixes cold start issue in the AutoRefreshCache by adding the item to the workqueue when it is created. - // This way, the item will be processed without waiting for the next sync cycle (30s by default). - batch := make([]ItemWrapper, 0, 1) - batch = append(batch, itemWrapper{id: id, item: item}) - w.workqueue.AddRateLimited(&batch) - w.processing.Store(id, w.clock.Now()) - return item, nil -} - -// DeleteDelayed queues an item for deletion. It Will get deleted as part of the next Sync cycle. Until the next sync -// cycle runs, Get and GetOrCreate will continue to return the Item in its previous state. -func (w *autoRefresh) DeleteDelayed(id ItemID) error { - w.toDelete.Insert(id) - return nil -} - -// This function is called internally by its own timer. Roughly, it will list keys, create batches of keys based on -// createBatchesCb and, enqueue all the batches into the workqueue. -func (w *autoRefresh) enqueueBatches(ctx context.Context) error { - keys := w.lruMap.Keys() - w.metrics.Size.Set(float64(len(keys))) - - snapshot := make([]ItemWrapper, 0, len(keys)) - for _, k := range keys { - if w.toDelete.Contains(k) { - w.Delete(k) - continue - } - // If not ok, it means evicted between the item was evicted between getting the keys and this update loop - // which is fine, we can just ignore. - if value, ok := w.lruMap.Peek(k); ok { - if item, ok := value.(Item); !ok || (ok && !item.IsTerminal() && !w.inProcessing(k)) { - snapshot = append(snapshot, itemWrapper{ - id: k.(ItemID), - item: value.(Item), - }) - } - } - } - - batches, err := w.createBatchesCb(ctx, snapshot) - if err != nil { - return err - } - - for _, batch := range batches { - b := batch - w.workqueue.AddRateLimited(&b) - for i := 1; i < len(b); i++ { - w.processing.Store(b[i].GetID(), w.clock.Now()) - } - } - - return nil -} - -// There are w.parallelism instances of this function running all the time, each one will: -// - Retrieve an item from the workqueue -// - For each batch of the keys, call syncCb, which tells us if the items have been updated -// -- If any has, then overwrite the item in the cache. -// -// What happens when the number of things that a user is trying to keep track of exceeds the size -// of the cache? Trivial case where the cache is size 1 and we're trying to keep track of two things. -// * Plugin asks for update on item 1 - cache evicts item 2, stores 1 and returns it unchanged -// * Plugin asks for update on item 2 - cache evicts item 1, stores 2 and returns it unchanged -// * Sync loop updates item 2, repeat -func (w *autoRefresh) sync(ctx context.Context) (err error) { - defer func() { - var isErr bool - rVal := recover() - if rVal == nil { - return - } - - if err, isErr = rVal.(error); isErr { - err = fmt.Errorf("worker panic'd and is shutting down. Error: %w with Stack: %v", err, string(debug.Stack())) - } else { - err = fmt.Errorf("worker panic'd and is shutting down. Panic value: %v with Stack: %v", rVal, string(debug.Stack())) - } - - logger.Error(ctx, err) - }() - - for { - select { - case <-ctx.Done(): - return nil - default: - batch, shutdown := w.workqueue.Get() - if shutdown { - logger.Debugf(ctx, "Shutting down worker") - return nil - } - // Since we create batches every time we sync, we will just remove the item from the queue here - // regardless of whether it succeeded the sync or not. - w.workqueue.Forget(batch) - w.workqueue.Done(batch) - - newBatch := make(Batch, 0, len(*batch.(*Batch))) - for _, b := range *batch.(*Batch) { - itemID := b.GetID() - w.processing.Delete(itemID) - item, ok := w.lruMap.Get(itemID) - if !ok { - logger.Debugf(ctx, "item with id [%v] not found in cache", itemID) - continue - } - if item.(Item).IsTerminal() { - logger.Debugf(ctx, "item with id [%v] is terminal", itemID) - continue - } - newBatch = append(newBatch, b) - } - if len(newBatch) == 0 { - continue - } - - t := w.metrics.SyncLatency.Start() - updatedBatch, err := w.syncCb(ctx, newBatch) - - if err != nil { - w.metrics.SyncErrors.Inc() - logger.Errorf(ctx, "failed to get latest copy of a batch. Error: %v", err) - t.Stop() - continue - } - - for _, item := range updatedBatch { - if item.Action == Update { - // Updates an existing item. - w.Update(item.ID, item.Item) - } - } - - w.toDelete.Range(func(key interface{}) bool { - w.Delete(key) - return true - }) - - t.Stop() - } - } -} - -// Checks if the item is currently being processed and returns false if the item has been in processing for too long -func (w *autoRefresh) inProcessing(key interface{}) bool { - item, found := w.processing.Load(key) - if found { - // handle potential race conditions where the item is in processing but not in the workqueue - if timeItem, ok := item.(time.Time); ok && w.clock.Since(timeItem) > (w.syncPeriod*5) { - w.processing.Delete(key) - return false - } - return true - } - return false -} - -// Instantiates a new AutoRefresh Cache that syncs items in batches. -func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, - resyncPeriod time.Duration, parallelism, size uint, scope promutils.Scope) (AutoRefresh, error) { - return newAutoRefreshBatchedCacheWithClock(name, createBatches, syncCb, syncRateLimiter, resyncPeriod, parallelism, size, scope, clock.RealClock{}) -} - -func newAutoRefreshBatchedCacheWithClock(name string, createBatches CreateBatchesFunc, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, - resyncPeriod time.Duration, parallelism, size uint, scope promutils.Scope, clock clock.WithTicker) (AutoRefresh, error) { - - metrics := newMetrics(scope) - // #nosec G115 - lruCache, err := lru.NewWithEvict(int(size), getEvictionFunction(metrics.Evictions)) - if err != nil { - return nil, err - } - - cache := &autoRefresh{ - name: name, - metrics: metrics, - parallelism: parallelism, - createBatchesCb: createBatches, - syncCb: syncCb, - lruMap: lruCache, - processing: &sync.Map{}, - toDelete: newSyncSet(), - syncPeriod: resyncPeriod, - workqueue: workqueue.NewRateLimitingQueueWithConfig(syncRateLimiter, workqueue.RateLimitingQueueConfig{ - Name: scope.CurrentScope(), - Clock: clock, - }), - clock: clock, - } - - return cache, nil -} - -// Instantiates a new AutoRefresh Cache that syncs items periodically. -func NewAutoRefreshCache(name string, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration, - parallelism, size uint, scope promutils.Scope) (AutoRefresh, error) { - - return NewAutoRefreshBatchedCache(name, SingleItemBatches, syncCb, syncRateLimiter, resyncPeriod, parallelism, size, scope) -} - -func newAutoRefreshCacheWithClock(name string, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration, - parallelism, size uint, scope promutils.Scope, clock clock.WithTicker) (AutoRefresh, error) { - return newAutoRefreshBatchedCacheWithClock(name, SingleItemBatches, syncCb, syncRateLimiter, resyncPeriod, parallelism, size, scope, clock) -} diff --git a/flytestdlib/cache/auto_refresh_example_test.go b/flytestdlib/cache/auto_refresh_example_test.go index a7087f1e3a..952b582c0f 100644 --- a/flytestdlib/cache/auto_refresh_example_test.go +++ b/flytestdlib/cache/auto_refresh_example_test.go @@ -15,7 +15,7 @@ import ( type ExampleItemStatus string const ( - ExampleStatusNotStarted ExampleItemStatus = "Not-started" + ExampleStatusNotStarted ExampleItemStatus = "Not-enqueueLoopRunning" ExampleStatusStarted ExampleItemStatus = "Started" ExampleStatusSucceeded ExampleItemStatus = "Completed" ) diff --git a/flytestdlib/cache/in_memory_auto_refresh.go b/flytestdlib/cache/in_memory_auto_refresh.go new file mode 100644 index 0000000000..08fb858123 --- /dev/null +++ b/flytestdlib/cache/in_memory_auto_refresh.go @@ -0,0 +1,424 @@ +package cache + +import ( + "context" + "fmt" + "runtime/debug" + "sync" + "time" + + lru "github.com/hashicorp/golang-lru" + "github.com/prometheus/client_golang/prometheus" + "k8s.io/client-go/util/workqueue" + "k8s.io/utils/clock" + + "github.com/flyteorg/flyte/flytestdlib/atomic" + "github.com/flyteorg/flyte/flytestdlib/contextutils" + "github.com/flyteorg/flyte/flytestdlib/errors" + "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/promutils" +) + +type metrics struct { + SyncErrors prometheus.Counter + Evictions prometheus.Counter + SyncLatency promutils.StopWatch + CacheHit prometheus.Counter + CacheMiss prometheus.Counter + Size prometheus.Gauge + scope promutils.Scope +} + +func newMetrics(scope promutils.Scope) metrics { + return metrics{ + SyncErrors: scope.MustNewCounter("sync_errors", "Counter for sync errors."), + Evictions: scope.MustNewCounter("lru_evictions", "Counter for evictions from LRU."), + SyncLatency: scope.MustNewStopWatch("latency", "Latency for sync operations.", time.Millisecond), + CacheHit: scope.MustNewCounter("cache_hit", "Counter for cache hits."), + CacheMiss: scope.MustNewCounter("cache_miss", "Counter for cache misses."), + Size: scope.MustNewGauge("size", "Current size of the cache"), + scope: scope, + } +} + +func getEvictionFunction(counter prometheus.Counter) func(key interface{}, value interface{}) { + return func(_ interface{}, _ interface{}) { + counter.Inc() + } +} + +// Options are configurable options for the InMemoryAutoRefresh. +type Options struct { + clock clock.WithTicker + createBatchesCb CreateBatchesFunc +} + +// WithClock configures the clock to use for time related operations. Mainly used for unit testing. +func WithClock(clock clock.WithTicker) Option { + return func(mo *Options) { + mo.clock = clock + } +} + +// WithCreateBatchesFunc configures how cache items should be batched for refresh. Defaults to single item batching. +func WithCreateBatchesFunc(createBatchesCb CreateBatchesFunc) Option { + return func(mo *Options) { + mo.createBatchesCb = createBatchesCb + } +} + +func defaultOptions() *Options { + opts := &Options{} + WithClock(clock.RealClock{})(opts) + WithCreateBatchesFunc(SingleItemBatches)(opts) + return opts +} + +// Option for the KeyfuncProvider +type Option func(*Options) + +// InMemoryAutoRefresh is an in-memory implementation of the AutoRefresh interface. It is a thread-safe general +// purpose auto-refresh cache that watches for updates asynchronously for the keys after they are added to +// the cache. An item can be inserted only once. +// +// Get reads from sync.map while refresh is invoked on a snapshot of keys. Cache eventually catches up on deleted items. +// +// Sync is run as a fixed-interval-scheduled-task, and is skipped if sync from previous cycle is still running. +type InMemoryAutoRefresh struct { + name string + metrics metrics + syncCb SyncFunc + createBatchesCb CreateBatchesFunc + lruMap *lru.Cache + // Items that are currently being processed are in the processing set. + // It will prevent the same item from being processed multiple times by different workers. + processing *sync.Map + toDelete *syncSet + syncPeriod time.Duration + workqueue workqueue.RateLimitingInterface + parallelizm int + lock sync.RWMutex + clock clock.Clock // pluggable clock for unit testing + syncCount atomic.Int32 // internal sync counter for unit testing + enqueueCount atomic.Int32 // internal enqueue counter for unit testing + enqueueLoopRunning atomic.Bool // internal bool to ensure goroutines are running +} + +// NewInMemoryAutoRefresh creates a new InMemoryAutoRefresh +func NewInMemoryAutoRefresh( + name string, + syncCb SyncFunc, + syncRateLimiter workqueue.RateLimiter, + resyncPeriod time.Duration, + parallelizm int, + size int, + scope promutils.Scope, + options ...Option, +) (*InMemoryAutoRefresh, error) { + opts := defaultOptions() + for _, option := range options { + option(opts) + } + + metrics := newMetrics(scope) + lruCache, err := lru.NewWithEvict(size, getEvictionFunction(metrics.Evictions)) + if err != nil { + return nil, fmt.Errorf("creating LRU cache: %w", err) + } + + cache := &InMemoryAutoRefresh{ + name: name, + metrics: metrics, + parallelizm: parallelizm, + createBatchesCb: opts.createBatchesCb, + syncCb: syncCb, + lruMap: lruCache, + processing: &sync.Map{}, + toDelete: newSyncSet(), + syncPeriod: resyncPeriod, + workqueue: workqueue.NewRateLimitingQueueWithConfig(syncRateLimiter, workqueue.RateLimitingQueueConfig{ + Name: scope.CurrentScope(), + Clock: opts.clock, + }), + clock: opts.clock, + syncCount: atomic.NewInt32(0), + enqueueCount: atomic.NewInt32(0), + enqueueLoopRunning: atomic.NewBool(false), + } + + return cache, nil +} + +func (w *InMemoryAutoRefresh) Start(ctx context.Context) error { + for i := 0; i < w.parallelizm; i++ { + go func(ctx context.Context) { + err := w.sync(ctx) + if err != nil { + logger.Errorf(ctx, "Failed to sync. Error: %v", err) + } + }(contextutils.WithGoroutineLabel(ctx, fmt.Sprintf("%v-worker-%v", w.name, i))) + } + + enqueueCtx := contextutils.WithGoroutineLabel(ctx, fmt.Sprintf("%v-enqueue", w.name)) + go w.enqueueLoop(enqueueCtx) + + return nil +} + +func (w *InMemoryAutoRefresh) enqueueLoop(ctx context.Context) { + timer := w.clock.NewTimer(w.syncPeriod) + defer timer.Stop() + + w.enqueueLoopRunning.Store(true) + + for { + select { + case <-ctx.Done(): + return + case <-timer.C(): + err := w.enqueueBatches(ctx) + if err != nil { + logger.Errorf(ctx, "Failed to enqueue. Error: %v", err) + } + timer.Reset(w.syncPeriod) + } + } +} + +// Update updates the item only if it exists in the cache, return true if we updated the item. +func (w *InMemoryAutoRefresh) Update(id ItemID, item Item) (ok bool) { + w.lock.Lock() + defer w.lock.Unlock() + ok = w.lruMap.Contains(id) + if ok { + w.lruMap.Add(id, item) + } + return ok +} + +// Delete deletes the item from the cache if it exists. +func (w *InMemoryAutoRefresh) Delete(key interface{}) { + w.lock.Lock() + defer w.lock.Unlock() + w.toDelete.Remove(key) + w.lruMap.Remove(key) +} + +func (w *InMemoryAutoRefresh) Get(id ItemID) (Item, error) { + if val, ok := w.lruMap.Get(id); ok { + w.metrics.CacheHit.Inc() + return val.(Item), nil + } + + w.metrics.CacheMiss.Inc() + return nil, errors.Errorf(ErrNotFound, "Item with id [%v] not found.", id) +} + +// Return the item if exists else create it. +// Create should be invoked only once. recreating the object is not supported. +func (w *InMemoryAutoRefresh) GetOrCreate(id ItemID, item Item) (Item, error) { + if val, ok := w.lruMap.Get(id); ok { + w.metrics.CacheHit.Inc() + return val.(Item), nil + } + + w.lruMap.Add(id, item) + w.metrics.CacheMiss.Inc() + + // It fixes cold start issue in the AutoRefreshCache by adding the item to the workqueue when it is created. + // This way, the item will be processed without waiting for the next sync cycle (30s by default). + batch := make([]ItemWrapper, 0, 1) + batch = append(batch, itemWrapper{id: id, item: item}) + w.workqueue.AddRateLimited(&batch) + w.processing.Store(id, w.clock.Now()) + return item, nil +} + +// DeleteDelayed queues an item for deletion. It Will get deleted as part of the next Sync cycle. Until the next sync +// cycle runs, Get and GetOrCreate will continue to return the Item in its previous state. +func (w *InMemoryAutoRefresh) DeleteDelayed(id ItemID) error { + w.toDelete.Insert(id) + return nil +} + +// This function is called internally by its own timer. Roughly, it will list keys, create batches of keys based on +// createBatchesCb and, enqueue all the batches into the workqueue. +func (w *InMemoryAutoRefresh) enqueueBatches(ctx context.Context) error { + defer w.enqueueCount.Inc() + + keys := w.lruMap.Keys() + w.metrics.Size.Set(float64(len(keys))) + + snapshot := make([]ItemWrapper, 0, len(keys)) + for _, k := range keys { + if w.toDelete.Contains(k) { + w.Delete(k) + continue + } + // If not ok, it means evicted between the item was evicted between getting the keys and this update loop + // which is fine, we can just ignore. + if value, ok := w.lruMap.Peek(k); ok { + if item, ok := value.(Item); !ok || (ok && !item.IsTerminal() && !w.inProcessing(k)) { + snapshot = append(snapshot, itemWrapper{ + id: k.(ItemID), + item: value.(Item), + }) + } + } + } + + batches, err := w.createBatchesCb(ctx, snapshot) + if err != nil { + return err + } + + for _, batch := range batches { + b := batch + w.workqueue.AddRateLimited(&b) + for i := 1; i < len(b); i++ { + w.processing.Store(b[i].GetID(), w.clock.Now()) + } + } + + return nil +} + +// There are w.parallelizm instances of this function running all the time, each one will: +// - Retrieve an item from the workqueue +// - For each batch of the keys, call syncCb, which tells us if the items have been updated +// -- If any has, then overwrite the item in the cache. +// +// What happens when the number of things that a user is trying to keep track of exceeds the size +// of the cache? Trivial case where the cache is size 1 and we're trying to keep track of two things. +// * Plugin asks for update on item 1 - cache evicts item 2, stores 1 and returns it unchanged +// * Plugin asks for update on item 2 - cache evicts item 1, stores 2 and returns it unchanged +// * Sync loop updates item 2, repeat +func (w *InMemoryAutoRefresh) sync(ctx context.Context) (err error) { + defer func() { + var isErr bool + rVal := recover() + if rVal == nil { + return + } + + if err, isErr = rVal.(error); isErr { + err = fmt.Errorf("worker panic'd and is shutting down. Error: %w with Stack: %v", err, string(debug.Stack())) + } else { + err = fmt.Errorf("worker panic'd and is shutting down. Panic value: %v with Stack: %v", rVal, string(debug.Stack())) + } + + logger.Error(ctx, err) + }() + + for { + select { + case <-ctx.Done(): + return nil + default: + batch, shutdown := w.workqueue.Get() + if shutdown { + logger.Debugf(ctx, "Shutting down worker") + return nil + } + // Since we create batches every time we sync, we will just remove the item from the queue here + // regardless of whether it succeeded the sync or not. + w.workqueue.Forget(batch) + w.workqueue.Done(batch) + + newBatch := make(Batch, 0, len(*batch.(*Batch))) + for _, b := range *batch.(*Batch) { + itemID := b.GetID() + w.processing.Delete(itemID) + item, ok := w.lruMap.Get(itemID) + if !ok { + logger.Debugf(ctx, "item with id [%v] not found in cache", itemID) + continue + } + if item.(Item).IsTerminal() { + logger.Debugf(ctx, "item with id [%v] is terminal", itemID) + continue + } + newBatch = append(newBatch, b) + } + if len(newBatch) == 0 { + continue + } + + t := w.metrics.SyncLatency.Start() + updatedBatch, err := w.syncCb(ctx, newBatch) + + if err != nil { + w.metrics.SyncErrors.Inc() + logger.Errorf(ctx, "failed to get latest copy of a batch. Error: %v", err) + t.Stop() + continue + } + + for _, item := range updatedBatch { + if item.Action == Update { + // Updates an existing item. + w.Update(item.ID, item.Item) + } + } + + w.toDelete.Range(func(key interface{}) bool { + w.Delete(key) + return true + }) + + t.Stop() + } + + w.syncCount.Inc() + } +} + +// Checks if the item is currently being processed and returns false if the item has been in processing for too long +func (w *InMemoryAutoRefresh) inProcessing(key interface{}) bool { + item, found := w.processing.Load(key) + if found { + // handle potential race conditions where the item is in processing but not in the workqueue + if timeItem, ok := item.(time.Time); ok && w.clock.Since(timeItem) > (w.syncPeriod*5) { + w.processing.Delete(key) + return false + } + return true + } + return false +} + +// Instantiates a new AutoRefresh Cache that syncs items in batches. +func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, + resyncPeriod time.Duration, parallelizm, size int, scope promutils.Scope) (AutoRefresh, error) { + return NewInMemoryAutoRefresh(name, syncCb, syncRateLimiter, resyncPeriod, parallelizm, size, scope, WithCreateBatchesFunc(createBatches)) +} + +// Instantiates a new AutoRefresh Cache that syncs items periodically. +func NewAutoRefreshCache(name string, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration, + parallelizm, size int, scope promutils.Scope) (AutoRefresh, error) { + return NewAutoRefreshBatchedCache(name, SingleItemBatches, syncCb, syncRateLimiter, resyncPeriod, parallelizm, size, scope) +} + +// SingleItemBatches is a function that creates n batches of items, each with size 1 +func SingleItemBatches(_ context.Context, snapshot []ItemWrapper) (batches []Batch, err error) { + res := make([]Batch, 0, len(snapshot)) + for _, item := range snapshot { + res = append(res, Batch{item}) + } + + return res, nil +} + +// itemWrapper is an implementation of ItemWrapper +type itemWrapper struct { + id ItemID + item Item +} + +func (i itemWrapper) GetID() ItemID { + return i.id +} + +func (i itemWrapper) GetItem() Item { + return i.item +} diff --git a/flytestdlib/cache/auto_refresh_test.go b/flytestdlib/cache/in_memory_auto_refresh_test.go similarity index 69% rename from flytestdlib/cache/auto_refresh_test.go rename to flytestdlib/cache/in_memory_auto_refresh_test.go index 66f3e11e7e..6fb87c8255 100644 --- a/flytestdlib/cache/auto_refresh_test.go +++ b/flytestdlib/cache/in_memory_auto_refresh_test.go @@ -81,7 +81,7 @@ func TestCacheFour(t *testing.T) { t.Run("normal operation", func(t *testing.T) { // the size of the cache is at least as large as the number of items we're storing - cache, err := newAutoRefreshCacheWithClock("fake1", syncFakeItem, rateLimiter, testResyncPeriod, 10, 10, promutils.NewTestScope(), fakeClock) + cache, err := NewInMemoryAutoRefresh("fake1", syncFakeItem, rateLimiter, testResyncPeriod, 10, 10, promutils.NewTestScope(), WithClock(fakeClock)) assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -110,7 +110,7 @@ func TestCacheFour(t *testing.T) { t.Run("Not Found", func(t *testing.T) { // the size of the cache is at least as large as the number of items we're storing - cache, err := newAutoRefreshCacheWithClock("fake2", syncFakeItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope(), fakeClock) + cache, err := NewInMemoryAutoRefresh("fake2", syncFakeItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope(), WithClock(fakeClock)) assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -138,12 +138,15 @@ func TestCacheFour(t *testing.T) { }) t.Run("Enqueue nothing", func(t *testing.T) { - cache, err := newAutoRefreshCacheWithClock("fake3", syncTerminalItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope(), fakeClock) + cache, err := NewInMemoryAutoRefresh("fake3", syncTerminalItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope(), WithClock(fakeClock)) assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) assert.NoError(t, cache.Start(ctx)) + // Wait for goroutines to run + assert.Eventually(t, func() bool { return cache.enqueueLoopRunning.Load() }, time.Second, time.Millisecond) + // Create ten items in the cache for i := 1; i <= 10; i++ { _, err := cache.GetOrCreate(fmt.Sprintf("%d", i), terminalCacheItem{ @@ -152,29 +155,51 @@ func TestCacheFour(t *testing.T) { assert.NoError(t, err) } - // Enqueue first batch + syncCount := cache.syncCount.Load() + enqueueCount := cache.enqueueCount.Load() + // Move time forwards and trigger the first batch fakeClock.Step(testResyncPeriod) // If the cache tries to enqueue the item, a panic will be thrown. + assert.Eventually(t, func() bool { return cache.enqueueCount.Load() > enqueueCount }, time.Second, time.Millisecond) + // Should not enqueue + assert.Equal(t, syncCount, cache.syncCount.Load()) + + syncCount = cache.syncCount.Load() + enqueueCount = cache.enqueueCount.Load() + // Move time forwards and trigger the first batch fakeClock.Step(testResyncPeriod) + // If the cache tries to enqueue the item, a panic will be thrown. + assert.Eventually(t, func() bool { return cache.enqueueCount.Load() > enqueueCount }, time.Second, time.Millisecond) + // Should not enqueue + assert.Equal(t, syncCount, cache.syncCount.Load()) cancel() }) t.Run("Test update and delete cache", func(t *testing.T) { - cache, err := newAutoRefreshCacheWithClock("fake3", syncTerminalItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope(), fakeClock) + cache, err := NewInMemoryAutoRefresh("fake3", syncTerminalItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope(), WithClock(fakeClock)) assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) assert.NoError(t, cache.Start(ctx)) + // Wait for goroutines to run + assert.Eventually(t, func() bool { return cache.enqueueLoopRunning.Load() }, time.Second, time.Millisecond) + itemID := "dummy_id" _, err = cache.GetOrCreate(itemID, terminalCacheItem{ val: 0, }) assert.NoError(t, err) - // If the cache tries to enqueue the item, a panic will be thrown. + syncCount := cache.syncCount.Load() + enqueueCount := cache.enqueueCount.Load() + // Move time forwards and trigger the enqueue loop fakeClock.Step(testResyncPeriod) + // If the cache tries to enqueue the item, a panic will be thrown. + assert.Eventually(t, func() bool { return cache.enqueueCount.Load() > enqueueCount }, time.Second, time.Millisecond) + // Should not enqueue + assert.Equal(t, syncCount, cache.syncCount.Load()) err = cache.DeleteDelayed(itemID) assert.NoError(t, err) @@ -193,7 +218,7 @@ func TestCacheFour(t *testing.T) { t.Run("Test panic on sync and shutdown", func(t *testing.T) { syncer := &panickingSyncer{} - cache, err := newAutoRefreshCacheWithClock("fake3", syncer.sync, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope(), fakeClock) + cache, err := NewInMemoryAutoRefresh("fake3", syncer.sync, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope(), WithClock(fakeClock)) assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -237,22 +262,20 @@ func TestQueueBuildUp(t *testing.T) { return nil, fmt.Errorf("expected error") } - size := uint(100) - cache, err := newAutoRefreshCacheWithClock("fake2", alwaysFailing, rateLimiter, testResyncPeriod, 10, size, promutils.NewTestScope(), fakeClock) + size := 100 + cache, err := NewInMemoryAutoRefresh("fake2", alwaysFailing, rateLimiter, testResyncPeriod, 10, size, promutils.NewTestScope(), WithClock(fakeClock)) assert.NoError(t, err) - ctx := context.Background() - ctx, cancelNow := context.WithCancel(ctx) - defer cancelNow() + ctx, cancel := context.WithCancel(context.Background()) + assert.NoError(t, cache.Start(ctx)) + defer cancel() - for i := uint(0); i < size; i++ { + for i := 0; i < size; i++ { // #nosec G115 - _, err := cache.GetOrCreate(strconv.Itoa(int(i)), fakeCacheItem{val: 3}) + _, err := cache.GetOrCreate(strconv.Itoa(i), fakeCacheItem{val: 3}) assert.NoError(t, err) } - assert.NoError(t, cache.Start(ctx)) - // wait for all workers to run assert.Eventually(t, func() bool { // trigger a sync and unlock the work queue @@ -264,14 +287,16 @@ func TestQueueBuildUp(t *testing.T) { } func TestInProcessing(t *testing.T) { - + syncer := &panickingSyncer{} syncPeriod := time.Millisecond + rateLimiter := workqueue.DefaultControllerRateLimiter() fakeClock := testingclock.NewFakeClock(time.Now()) - cache := &autoRefresh{ - processing: &sync.Map{}, - syncPeriod: syncPeriod, - clock: fakeClock, - } + cache, err := NewInMemoryAutoRefresh("fake3", syncer.sync, rateLimiter, syncPeriod, 10, 2, promutils.NewTestScope(), WithClock(fakeClock)) + assert.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + assert.NoError(t, cache.Start(ctx)) + defer cancel() assert.False(t, cache.inProcessing("test")) From 857af1b6b8b32eb2be611c9e79d474213051fbde Mon Sep 17 00:00:00 2001 From: Jason Parraga Date: Wed, 15 Jan 2025 00:42:23 -0800 Subject: [PATCH 2/2] lint-fix Signed-off-by: Jason Parraga --- flytestdlib/cache/in_memory_auto_refresh.go | 15 ++++++++------- flytestdlib/cache/in_memory_auto_refresh_test.go | 6 +++--- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/flytestdlib/cache/in_memory_auto_refresh.go b/flytestdlib/cache/in_memory_auto_refresh.go index 08fb858123..a566c7928c 100644 --- a/flytestdlib/cache/in_memory_auto_refresh.go +++ b/flytestdlib/cache/in_memory_auto_refresh.go @@ -96,7 +96,7 @@ type InMemoryAutoRefresh struct { toDelete *syncSet syncPeriod time.Duration workqueue workqueue.RateLimitingInterface - parallelizm int + parallelizm uint lock sync.RWMutex clock clock.Clock // pluggable clock for unit testing syncCount atomic.Int32 // internal sync counter for unit testing @@ -110,8 +110,8 @@ func NewInMemoryAutoRefresh( syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration, - parallelizm int, - size int, + parallelizm uint, + size uint, scope promutils.Scope, options ...Option, ) (*InMemoryAutoRefresh, error) { @@ -121,7 +121,8 @@ func NewInMemoryAutoRefresh( } metrics := newMetrics(scope) - lruCache, err := lru.NewWithEvict(size, getEvictionFunction(metrics.Evictions)) + // #nosec G115 + lruCache, err := lru.NewWithEvict(int(size), getEvictionFunction(metrics.Evictions)) if err != nil { return nil, fmt.Errorf("creating LRU cache: %w", err) } @@ -150,7 +151,7 @@ func NewInMemoryAutoRefresh( } func (w *InMemoryAutoRefresh) Start(ctx context.Context) error { - for i := 0; i < w.parallelizm; i++ { + for i := uint(0); i < w.parallelizm; i++ { go func(ctx context.Context) { err := w.sync(ctx) if err != nil { @@ -389,13 +390,13 @@ func (w *InMemoryAutoRefresh) inProcessing(key interface{}) bool { // Instantiates a new AutoRefresh Cache that syncs items in batches. func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, - resyncPeriod time.Duration, parallelizm, size int, scope promutils.Scope) (AutoRefresh, error) { + resyncPeriod time.Duration, parallelizm, size uint, scope promutils.Scope) (AutoRefresh, error) { return NewInMemoryAutoRefresh(name, syncCb, syncRateLimiter, resyncPeriod, parallelizm, size, scope, WithCreateBatchesFunc(createBatches)) } // Instantiates a new AutoRefresh Cache that syncs items periodically. func NewAutoRefreshCache(name string, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration, - parallelizm, size int, scope promutils.Scope) (AutoRefresh, error) { + parallelizm, size uint, scope promutils.Scope) (AutoRefresh, error) { return NewAutoRefreshBatchedCache(name, SingleItemBatches, syncCb, syncRateLimiter, resyncPeriod, parallelizm, size, scope) } diff --git a/flytestdlib/cache/in_memory_auto_refresh_test.go b/flytestdlib/cache/in_memory_auto_refresh_test.go index 6fb87c8255..229664c720 100644 --- a/flytestdlib/cache/in_memory_auto_refresh_test.go +++ b/flytestdlib/cache/in_memory_auto_refresh_test.go @@ -262,7 +262,7 @@ func TestQueueBuildUp(t *testing.T) { return nil, fmt.Errorf("expected error") } - size := 100 + size := uint(100) cache, err := NewInMemoryAutoRefresh("fake2", alwaysFailing, rateLimiter, testResyncPeriod, 10, size, promutils.NewTestScope(), WithClock(fakeClock)) assert.NoError(t, err) @@ -270,9 +270,9 @@ func TestQueueBuildUp(t *testing.T) { assert.NoError(t, cache.Start(ctx)) defer cancel() - for i := 0; i < size; i++ { + for i := uint(0); i < size; i++ { // #nosec G115 - _, err := cache.GetOrCreate(strconv.Itoa(i), fakeCacheItem{val: 3}) + _, err := cache.GetOrCreate(strconv.Itoa(int(i)), fakeCacheItem{val: 3}) assert.NoError(t, err) }