Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Manage all redis client into context.Context #745

Merged
merged 3 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions cmd/redis-shake/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ func main() {
utils.SetPprofPort()
luaRuntime := function.New(config.Opt.Function)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// create reader
var theReader reader.Reader
if v.IsSet("sync_reader") {
Expand All @@ -38,10 +41,10 @@ func main() {
log.Panicf("failed to read the SyncReader config entry. err: %v", err)
}
if opts.Cluster {
theReader = reader.NewSyncClusterReader(opts)
theReader = reader.NewSyncClusterReader(ctx, opts)
log.Infof("create SyncClusterReader: %v", opts.Address)
} else {
theReader = reader.NewSyncStandaloneReader(opts)
theReader = reader.NewSyncStandaloneReader(ctx, opts)
log.Infof("create SyncStandaloneReader: %v", opts.Address)
}
} else if v.IsSet("scan_reader") {
Expand All @@ -52,10 +55,10 @@ func main() {
log.Panicf("failed to read the ScanReader config entry. err: %v", err)
}
if opts.Cluster {
theReader = reader.NewScanClusterReader(opts)
theReader = reader.NewScanClusterReader(ctx, opts)
log.Infof("create ScanClusterReader: %v", opts.Address)
} else {
theReader = reader.NewScanStandaloneReader(opts)
theReader = reader.NewScanStandaloneReader(ctx, opts)
log.Infof("create ScanStandaloneReader: %v", opts.Address)
}
} else if v.IsSet("rdb_reader") {
Expand Down Expand Up @@ -93,10 +96,10 @@ func main() {
log.Panicf("the RDBRestoreCommandBehavior can't be 'panic' when the server not reply to commands")
}
if opts.Cluster {
theWriter = writer.NewRedisClusterWriter(opts)
theWriter = writer.NewRedisClusterWriter(ctx, opts)
log.Infof("create RedisClusterWriter: %v", opts.Address)
} else {
theWriter = writer.NewRedisStandaloneWriter(opts)
theWriter = writer.NewRedisStandaloneWriter(ctx, opts)
log.Infof("create RedisStandaloneWriter: %v", opts.Address)
}
if config.Opt.Advanced.EmptyDBBeforeSync {
Expand All @@ -114,7 +117,6 @@ func main() {

log.Infof("start syncing...")

ctx, cancel := context.WithCancel(context.Background())
ch := theReader.StartRead(ctx)
go waitShutdown(cancel)

Expand Down
17 changes: 12 additions & 5 deletions internal/client/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"bufio"
"context"
"crypto/tls"
"net"
"strconv"
Expand All @@ -19,16 +20,22 @@ type Redis struct {
protoWriter *proto.Writer
}

func NewRedisClient(address string, username string, password string, Tls bool) *Redis {
func NewRedisClient(ctx context.Context, address string, username string, password string, Tls bool) *Redis {
r := new(Redis)
var conn net.Conn
var dialer net.Dialer
var dialer = &net.Dialer{
Timeout: 5 * time.Minute,
KeepAlive: 5 * time.Minute,
}
var err error
dialer.Timeout = 3 * time.Second
if Tls {
conn, err = tls.DialWithDialer(&dialer, "tcp", address, &tls.Config{InsecureSkipVerify: true})
tlsDialer := &tls.Dialer{
NetDialer: dialer,
Config: &tls.Config{InsecureSkipVerify: true},
}
conn, err = tlsDialer.DialContext(ctx, "tcp", address)
} else {
conn, err = dialer.Dial("tcp", address)
conn, err = dialer.DialContext(ctx, "tcp", address)
}
if err != nil {
log.Panicf("dial failed. address=[%s], tls=[%v], err=[%v]", address, Tls, err)
Expand Down
6 changes: 3 additions & 3 deletions internal/reader/scan_cluster_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ type scanClusterReader struct {
statusId int
}

func NewScanClusterReader(opts *ScanReaderOptions) Reader {
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica)
func NewScanClusterReader(ctx context.Context, opts *ScanReaderOptions) Reader {
addresses, _ := utils.GetRedisClusterNodes(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica)

rd := &scanClusterReader{}
for _, address := range addresses {
theOpts := *opts
theOpts.Address = address
rd.readers = append(rd.readers, NewScanStandaloneReader(&theOpts))
rd.readers = append(rd.readers, NewScanStandaloneReader(ctx, &theOpts))
}
return rd
}
Expand Down
10 changes: 5 additions & 5 deletions internal/reader/scan_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ type scanStandaloneReader struct {
}
}

func NewScanStandaloneReader(opts *ScanReaderOptions) Reader {
func NewScanStandaloneReader(ctx context.Context, opts *ScanReaderOptions) Reader {
r := new(scanStandaloneReader)
// dbs
c := client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls)
c := client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls)
if c.IsCluster() { // not use opts.Cluster, because user may use standalone mode to scan a cluster node
r.dbs = []int{0}
} else {
Expand Down Expand Up @@ -82,7 +82,7 @@ func (r *scanStandaloneReader) subscript() {
if !r.opts.KSN {
return
}
c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
c.Send("psubscribe", "__keyevent@*__:*")

go func() {
Expand Down Expand Up @@ -114,7 +114,7 @@ func (r *scanStandaloneReader) subscript() {
}

func (r *scanStandaloneReader) scan() {
c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
defer c.Close()
for _, dbId := range r.dbs {
if dbId != 0 {
Expand Down Expand Up @@ -150,7 +150,7 @@ func (r *scanStandaloneReader) scan() {

func (r *scanStandaloneReader) fetch() {
nowDbId := 0
c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
defer c.Close()
for item := range r.keyQueue.Ch {
r.stat.NeedUpdateCount = int64(r.keyQueue.Len())
Expand Down
6 changes: 3 additions & 3 deletions internal/reader/sync_cluster_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ type syncClusterReader struct {
statusId int
}

func NewSyncClusterReader(opts *SyncReaderOptions) Reader {
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica)
func NewSyncClusterReader(ctx context.Context, opts *SyncReaderOptions) Reader {
addresses, _ := utils.GetRedisClusterNodes(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica)
log.Debugf("get redis cluster nodes:")
for _, address := range addresses {
log.Debugf("%s", address)
Expand All @@ -25,7 +25,7 @@ func NewSyncClusterReader(opts *SyncReaderOptions) Reader {
for _, address := range addresses {
theOpts := *opts
theOpts.Address = address
rd.readers = append(rd.readers, NewSyncStandaloneReader(&theOpts))
rd.readers = append(rd.readers, NewSyncStandaloneReader(ctx, &theOpts))
}
return rd
}
Expand Down
6 changes: 3 additions & 3 deletions internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package reader

import (
"context"
"bufio"
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -77,10 +77,10 @@ type syncStandaloneReader struct {
}
}

func NewSyncStandaloneReader(opts *SyncReaderOptions) Reader {
func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reader {
r := new(syncStandaloneReader)
r.opts = opts
r.client = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls)
r.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls)
r.rd = r.client.BufioReader()
r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1)
r.stat.Address = opts.Address
Expand Down
5 changes: 3 additions & 2 deletions internal/utils/cluster_nodes.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package utils

import (
"context"
"fmt"
"strconv"
"strings"
Expand All @@ -9,8 +10,8 @@ import (
"RedisShake/internal/log"
)

func GetRedisClusterNodes(address string, username string, password string, Tls bool, perferReplica bool) (addresses []string, slots [][]int) {
c := client.NewRedisClient(address, username, password, Tls)
func GetRedisClusterNodes(ctx context.Context, address string, username string, password string, Tls bool, perferReplica bool) (addresses []string, slots [][]int) {
c := client.NewRedisClient(ctx, address, username, password, Tls)
reply := c.DoWithStringReply("cluster", "nodes")
reply = strings.TrimSpace(reply)
slotsCount := 0
Expand Down
12 changes: 7 additions & 5 deletions internal/writer/redis_cluster_writer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package writer

import (
"context"

"RedisShake/internal/entry"
"RedisShake/internal/log"
"RedisShake/internal/utils"
Expand All @@ -16,9 +18,9 @@ type RedisClusterWriter struct {
stat []interface{}
}

func NewRedisClusterWriter(opts *RedisWriterOptions) Writer {
func NewRedisClusterWriter(ctx context.Context, opts *RedisWriterOptions) Writer {
rw := new(RedisClusterWriter)
rw.loadClusterNodes(opts)
rw.loadClusterNodes(ctx, opts)
log.Infof("redisClusterWriter connected to redis cluster successful. addresses=%v", rw.addresses)
return rw
}
Expand All @@ -29,13 +31,13 @@ func (r *RedisClusterWriter) Close() {
}
}

func (r *RedisClusterWriter) loadClusterNodes(opts *RedisWriterOptions) {
addresses, slots := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, false)
func (r *RedisClusterWriter) loadClusterNodes(ctx context.Context, opts *RedisWriterOptions) {
addresses, slots := utils.GetRedisClusterNodes(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, false)
r.addresses = addresses
for i, address := range addresses {
theOpts := *opts
theOpts.Address = address
redisWriter := NewRedisStandaloneWriter(&theOpts)
redisWriter := NewRedisStandaloneWriter(ctx, &theOpts)
r.writers = append(r.writers, redisWriter)
for _, s := range slots[i] {
if r.router[s] != nil {
Expand Down
5 changes: 3 additions & 2 deletions internal/writer/redis_standalone_writer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package writer

import (
"context"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -40,11 +41,11 @@ type redisStandaloneWriter struct {
}
}

func NewRedisStandaloneWriter(opts *RedisWriterOptions) Writer {
func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Writer {
rw := new(redisStandaloneWriter)
rw.address = opts.Address
rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1)
rw.client = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls)
rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls)
if opts.OffReply {
log.Infof("turn off the reply of write")
rw.offReply = true
Expand Down
Loading