Skip to content

Commit

Permalink
fix: remove locks and ticker goroutine during writes, and optimize bu…
Browse files Browse the repository at this point in the history
…fio parameters
  • Loading branch information
EquentR committed Dec 16, 2024
1 parent a46928d commit e706735
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 37 deletions.
2 changes: 1 addition & 1 deletion internal/client/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewRedisClient(ctx context.Context, address string, username string, passwo
// Increase the size of the underlying TCP send cache to avoid short-write errors
SetWriteConnBuff(r.conn, 128*1024)
r.reader = bufio.NewReader(conn)
r.writer = bufio.NewWriterSize(conn, 16*1024*1024) // size is 16MB
r.writer = bufio.NewWriterSize(conn, 32*1024) // size is 32KiB
r.protoReader = proto.NewReader(r.reader)
r.protoWriter = proto.NewWriter(r.writer)

Expand Down
48 changes: 12 additions & 36 deletions internal/writer/redis_standalone_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Wri
rw.offReply = true
rw.client.Send("CLIENT", "REPLY", "OFF")
} else {
rw.chWaitReply = make(chan *entry.Entry, config.Opt.Advanced.PipelineCountLimit)
rw.chWaitReply = make(chan *entry.Entry, config.Opt.Advanced.PipelineCountLimit*2)
rw.chWaitWg.Add(1)
go rw.processReply()
}
Expand All @@ -75,7 +75,7 @@ func (w *redisStandaloneWriter) Close() {
func (w *redisStandaloneWriter) StartWrite(ctx context.Context) chan *entry.Entry {
w.chWg = sync.WaitGroup{}
w.chWg.Add(1)
go w.ProcessWrite(ctx)
go w.processWrite(ctx)
return w.ch
}

Expand All @@ -95,41 +95,19 @@ func (w *redisStandaloneWriter) switchDbTo(newDbId int) {
}
}

func (w *redisStandaloneWriter) ProcessWrite(ctx context.Context) {
func (w *redisStandaloneWriter) processWrite(ctx context.Context) {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()

var (
mu sync.Mutex
sig = make(chan struct{})
sendBytes uint64
)

go func() {
for {
select {
case <-sig:
return
case <-ticker.C:
if atomic.LoadUint64(&sendBytes) > 0 {
mu.Lock()
w.client.Flush()
mu.Unlock()
}
}
}
}()
for {
select {
case <-ctx.Done():
// do nothing until w.ch is closed
case <-ticker.C:
w.client.Flush()
case e, ok := <-w.ch:
if !ok {
// Clean up and exit
mu.Lock()
// clean up and exit
w.client.Flush()
mu.Unlock()
close(sig)
w.chWg.Done()
return
}
Expand All @@ -144,18 +122,16 @@ func (w *redisStandaloneWriter) ProcessWrite(ctx context.Context) {
}
log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String())
if !w.offReply {
w.chWaitReply <- e
atomic.AddUint64(&sendBytes, uint64(e.SerializedSize))
select {
case w.chWaitReply <- e:
default:
w.client.Flush()
w.chWaitReply <- e
}
atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize)
atomic.AddInt64(&w.stat.UnansweredEntries, 1)
}
mu.Lock()
w.client.SendBytesBuff(bytes)
if atomic.LoadUint64(&sendBytes) >= 32*1024 {
w.client.Flush()
atomic.StoreUint64(&sendBytes, 0)
}
mu.Unlock()
}
}
}
Expand Down

0 comments on commit e706735

Please sign in to comment.