Skip to content

Commit

Permalink
✨ feat: added redis keyspace notification #4
Browse files Browse the repository at this point in the history
  • Loading branch information
pnguyen215 committed Jan 7, 2024
1 parent 072280f commit 388b89f
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 16 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go.work
.DS_Store

# Main
main/
main.go

# Logs
Expand Down
28 changes: 14 additions & 14 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch Package",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/main.go"
}
]
}
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch Package",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/main/main.go"
}
]
}
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
# ==============================================================================
# Start Rest
run:
go run main.go
go run main/main.go

build:
go build main.go
go build main/main.go

# ==============================================================================
# Modules support
Expand Down
20 changes: 20 additions & 0 deletions example/redisconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,23 @@ func TestConsumeCallback(t *testing.T) {
return
}
}

func TestKeySpaceNotification(t *testing.T) {
r, _ := createConn()
svc := redisconn.NewRedisService(r.GetConn())
err := svc.Set("_redis_keyspace_key_", 123, time.Second*5)
if err != nil {
logger.Errorf("Setting key on redis got an error", err)
return
}
svc.SyncKeySpace().AddCallback(func(msg *redis.Message, err error) {
if err != nil {
logger.Errorf("Redis KeySpace Notification got an error", err)
return
}
logger.Infof(msg.String())
})
svc.SyncKeySpace().Register()
time.Sleep(15 * time.Second) // waiting to done
svc.SyncKeySpace().Unregister() // stop watching
}
76 changes: 76 additions & 0 deletions redisconn_keyspace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package redisconn

import (
"sync"

"github.com/go-redis/redis"
)

type RedisKeySpaceService interface {
AddCallback(c func(message *redis.Message, err error)) RedisKeySpaceService
Register()
Unregister()
PatternKeySpace() string
PatternKeyEvent() string
}

type redisKeySpaceServiceImpl struct {
client *redis.Client
callback []func(message *redis.Message, err error)
stop chan struct{}
mutex sync.Mutex
}

func NewRedisKeySpaceService(client *redis.Client) RedisKeySpaceService {
return &redisKeySpaceServiceImpl{
client: client,
callback: make([]func(event *redis.Message, err error), 0),
stop: make(chan struct{}),
}
}

func (s *redisKeySpaceServiceImpl) AddCallback(c func(message *redis.Message, err error)) RedisKeySpaceService {
s.mutex.Lock()
defer s.mutex.Unlock()
s.callback = append(s.callback, c)
return s
}

func (s *redisKeySpaceServiceImpl) Register() {
go s.run()
}

func (s *redisKeySpaceServiceImpl) Unregister() {
close(s.stop)
}

func (s *redisKeySpaceServiceImpl) PatternKeySpace() string {
return "__keyspace@*__:*"
}

func (s *redisKeySpaceServiceImpl) PatternKeyEvent() string {
return "__keyevent@*__:*"
}

func (s *redisKeySpaceServiceImpl) run() {
pattern := []string{s.PatternKeySpace(), s.PatternKeyEvent()}
pubsub := s.client.PSubscribe(pattern...)
for {
select {
case <-s.stop:
_ = pubsub.Close()
return
default:
msg, err := pubsub.ReceiveMessage()
s.hook(msg, err)
}
}
}

func (s *redisKeySpaceServiceImpl) hook(message *redis.Message, err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
for _, callback := range s.callback {
callback(message, err)
}
}
8 changes: 8 additions & 0 deletions redisconn_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ type RedisService interface {

// sync pubsub hook
SyncPubSub() RedisPubSubService
// sync keyspace notification hook
SyncKeySpace() RedisKeySpaceService
}

type redisServiceImpl struct {
redisConn *redis.Client
mutex *RedisMutex
pubsub RedisPubSubService
keyspace RedisKeySpaceService
}

func newRedisMutex() *RedisMutex {
Expand All @@ -41,6 +44,7 @@ func NewRedisService(redisConn *redis.Client) RedisService {
redisConn: redisConn,
mutex: newRedisMutex(),
pubsub: NewRedisPubSub(redisConn),
keyspace: NewRedisKeySpaceService(redisConn),
}
return s
}
Expand Down Expand Up @@ -221,3 +225,7 @@ func (r *redisServiceImpl) Handler() *redis.Client {
func (r *redisServiceImpl) SyncPubSub() RedisPubSubService {
return r.pubsub
}

func (r *redisServiceImpl) SyncKeySpace() RedisKeySpaceService {
return r.keyspace
}

0 comments on commit 388b89f

Please sign in to comment.