Skip to content

Commit

Permalink
[minor] added panic recovery
Browse files Browse the repository at this point in the history
[-] added tests to cover more scenarios
[-] updated README
  • Loading branch information
bnkamalesh committed Nov 8, 2024
1 parent 5e73983 commit 04fed3a
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 37 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,4 @@ func main() {

## The gopher

The gopher used here was created using [Gopherize.me](https://gopherize.me/). Incache helps you keep your application latency low and your database destressed.
The gopher used here was created using [Gopherize.me](https://gopherize.me/). Pocache helps you to stop the herd from thundering.
22 changes: 18 additions & 4 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

var (
ErrValidation = errors.New("invalid")
ErrPanic = errors.New("panicked")
)

type (
Expand Down Expand Up @@ -69,16 +70,15 @@ func (cfg *Config[K, T]) Sanitize() {
cfg.QLength = 1000
}

// there's no practical usecase of cache less than a second, as of now
if cfg.CacheAge == 0 {
if cfg.CacheAge <= 0 {
cfg.CacheAge = time.Minute
}

if cfg.Threshold == 0 {
if cfg.Threshold <= 0 {
cfg.Threshold = cfg.CacheAge - time.Second
}

if cfg.UpdaterTimeout == 0 {
if cfg.UpdaterTimeout <= 0 {
cfg.UpdaterTimeout = time.Second
}
}
Expand Down Expand Up @@ -211,6 +211,20 @@ func (ch *Cache[K, T]) updateListener(keys <-chan K) {
}

func (ch *Cache[K, T]) update(key K) {
defer func() {
rec := recover()
if rec == nil {
return
}
ch.updateInProgress.Delete(key)
err, isErr := rec.(error)
if isErr {
ch.errCallback(errors.Join(ErrPanic, err))
return
}
ch.errCallback(errors.Join(ErrPanic, fmt.Errorf("%+v", rec)))
}()

ctx, cancel := context.WithTimeout(context.Background(), ch.updaterTimeout)
defer cancel()

Expand Down
189 changes: 157 additions & 32 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pocache

import (
"context"
"errors"
"fmt"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -153,66 +154,51 @@ func TestCache(tt *testing.T) {
asserter.True(found)
})

tt.Run("err watcher", func(t *testing.T) {
forcedErr := fmt.Errorf("forced error")
ranUpdater := atomic.Bool{}
ranErrWatcher := atomic.Bool{}

tt.Run("disabled", func(t *testing.T) {
cache, err := New(Config[string, any]{
LRUCacheSize: 10000,
CacheAge: time.Minute,
Threshold: time.Second * 59,
DisableCache: false,
DisableCache: true,
Updater: func(ctx context.Context, key string) (any, error) {
ranUpdater.Store(true)
return nil, forcedErr
},
ErrWatcher: func(watcherErr error) {
ranErrWatcher.Store(true)
asserter.ErrorIs(watcherErr, forcedErr)
return key, nil
},
})
requirer.NoError(err)

_ = cache.BulkAdd([]Tuple[string, any]{{Key: prefix, Value: value}})
// wait for threshold window
time.Sleep(time.Second)
time.Sleep(time.Second * 2)

// trigger auto update within threshold window
_ = cache.Get(prefix)

// wait for the updater callback to be executed
time.Sleep(time.Second * 2)
asserter.True(ranUpdater.Load())
asserter.True(ranErrWatcher.Load())
// wait for updater to be executed
time.Sleep(time.Second * 1)
v := cache.Get(prefix)
asserter.False(v.Found)
})

tt.Run("no err watcher", func(t *testing.T) {
forcedErr := fmt.Errorf("forced error")
ranUpdater := atomic.Bool{}
ranErrWatcher := atomic.Bool{}

tt.Run("no updater", func(t *testing.T) {
cache, err := New(Config[string, any]{
LRUCacheSize: 10000,
CacheAge: time.Minute,
Threshold: time.Second * 59,
DisableCache: false,
Updater: func(ctx context.Context, key string) (any, error) {
ranUpdater.Store(true)
return nil, forcedErr
},
Updater: nil,
})
requirer.NoError(err)

_ = cache.BulkAdd([]Tuple[string, any]{{Key: prefix, Value: value}})
_ = cache.Add(prefix, value)
// wait for threshold window
time.Sleep(time.Second)
time.Sleep(time.Second * 2)
// trigger auto update within threshold window
_ = cache.Get(prefix)

// wait for the updater callback to be executed
// wait for updater to run
time.Sleep(time.Second * 2)
asserter.True(ranUpdater.Load())
asserter.False(ranErrWatcher.Load())

v := cache.Get(prefix)
asserter.EqualValues(value, v.V)
})

}
Expand Down Expand Up @@ -369,3 +355,142 @@ func TestPayload(tt *testing.T) {
asserter.EqualValues(time.Time{}, pyl.Expiry())
})
}

func TestErrWatcher(tt *testing.T) {
var (
prefix = "prefix"
value = "value"
requirer = require.New(tt)
asserter = require.New(tt)
)

tt.Run("err watcher", func(t *testing.T) {
forcedErr := fmt.Errorf("forced error")
ranUpdater := atomic.Bool{}
ranErrWatcher := atomic.Bool{}

cache, err := New(Config[string, any]{
LRUCacheSize: 10000,
CacheAge: time.Minute,
Threshold: time.Second * 59,
DisableCache: false,
Updater: func(ctx context.Context, key string) (any, error) {
ranUpdater.Store(true)
return nil, forcedErr
},
ErrWatcher: func(watcherErr error) {
ranErrWatcher.Store(true)
asserter.ErrorIs(watcherErr, forcedErr)
},
})
requirer.NoError(err)

_ = cache.BulkAdd([]Tuple[string, any]{{Key: prefix, Value: value}})
// wait for threshold window
time.Sleep(time.Second)
// trigger auto update within threshold window
_ = cache.Get(prefix)

// wait for the updater callback to be executed
time.Sleep(time.Second * 2)
asserter.True(ranUpdater.Load())
asserter.True(ranErrWatcher.Load())
})

tt.Run("no err watcher", func(t *testing.T) {
forcedErr := fmt.Errorf("forced error")
ranUpdater := atomic.Bool{}
ranErrWatcher := atomic.Bool{}

cache, err := New(Config[string, any]{
LRUCacheSize: 10000,
CacheAge: time.Minute,
Threshold: time.Second * 59,
DisableCache: false,
Updater: func(ctx context.Context, key string) (any, error) {
ranUpdater.Store(true)
return nil, forcedErr
},
})
requirer.NoError(err)

_ = cache.BulkAdd([]Tuple[string, any]{{Key: prefix, Value: value}})
// wait for threshold window
time.Sleep(time.Second)
// trigger auto update within threshold window
_ = cache.Get(prefix)

// wait for the updater callback to be executed
time.Sleep(time.Second * 2)
asserter.True(ranUpdater.Load())
asserter.False(ranErrWatcher.Load())
})

tt.Run("err watcher: catch panic text", func(t *testing.T) {
ranUpdater := atomic.Bool{}
ranErrWatcher := atomic.Bool{}

cache, err := New(Config[string, any]{
LRUCacheSize: 10000,
CacheAge: time.Minute,
Threshold: time.Second * 59,
DisableCache: false,
Updater: func(ctx context.Context, key string) (any, error) {
ranUpdater.Store(true)
panic("force panicked")
},
ErrWatcher: func(watcherErr error) {
ranErrWatcher.Store(true)
asserter.ErrorContains(watcherErr, "force panicked")
},
})
requirer.NoError(err)
cache.Add(prefix, value)

// wait for threshold window
time.Sleep(time.Second)
// trigger auto update within threshold window
_ = cache.Get(prefix)

// wait for the updater callback to be executed
time.Sleep(time.Second * 2)
asserter.True(ranUpdater.Load())
asserter.True(ranErrWatcher.Load())

})

tt.Run("err watcher: catch panic err", func(t *testing.T) {
ranUpdater := atomic.Bool{}
ranErrWatcher := atomic.Bool{}
ErrPanic := errors.New("panic err")

cache, err := New(Config[string, any]{
LRUCacheSize: 10000,
CacheAge: time.Minute,
Threshold: time.Second * 59,
DisableCache: false,
Updater: func(ctx context.Context, key string) (any, error) {
ranUpdater.Store(true)
panic(ErrPanic)
},
ErrWatcher: func(watcherErr error) {
ranErrWatcher.Store(true)
asserter.ErrorIs(watcherErr, ErrPanic)
},
})
requirer.NoError(err)
cache.Add(prefix, value)

// wait for threshold window
time.Sleep(time.Second)
// trigger auto update within threshold window
_ = cache.Get(prefix)

// wait for the updater callback to be executed
time.Sleep(time.Second * 2)
asserter.True(ranUpdater.Load())
asserter.True(ranErrWatcher.Load())

})

}

0 comments on commit 04fed3a

Please sign in to comment.