diff --git a/internal/client/redis.go b/internal/client/redis.go index 5c12bfa1..9e411844 100644 --- a/internal/client/redis.go +++ b/internal/client/redis.go @@ -49,7 +49,7 @@ func NewRedisClient(ctx context.Context, address string, username string, passwo // Increase the size of the underlying TCP send cache to avoid short-write errors SetWriteConnBuff(r.conn, 128*1024) r.reader = bufio.NewReader(conn) - r.writer = bufio.NewWriterSize(conn, 16*1024*1024) // size is 16MB + r.writer = bufio.NewWriterSize(conn, 32*1024) // size is 32KiB r.protoReader = proto.NewReader(r.reader) r.protoWriter = proto.NewWriter(r.writer) diff --git a/internal/writer/redis_standalone_writer.go b/internal/writer/redis_standalone_writer.go index d763bae9..518452b8 100644 --- a/internal/writer/redis_standalone_writer.go +++ b/internal/writer/redis_standalone_writer.go @@ -56,7 +56,7 @@ func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Wri rw.offReply = true rw.client.Send("CLIENT", "REPLY", "OFF") } else { - rw.chWaitReply = make(chan *entry.Entry, config.Opt.Advanced.PipelineCountLimit) + rw.chWaitReply = make(chan *entry.Entry, config.Opt.Advanced.PipelineCountLimit*2) rw.chWaitWg.Add(1) go rw.processReply() } @@ -75,7 +75,7 @@ func (w *redisStandaloneWriter) Close() { func (w *redisStandaloneWriter) StartWrite(ctx context.Context) chan *entry.Entry { w.chWg = sync.WaitGroup{} w.chWg.Add(1) - go w.ProcessWrite(ctx) + go w.processWrite(ctx) return w.ch } @@ -95,41 +95,19 @@ func (w *redisStandaloneWriter) switchDbTo(newDbId int) { } } -func (w *redisStandaloneWriter) ProcessWrite(ctx context.Context) { +func (w *redisStandaloneWriter) processWrite(ctx context.Context) { ticker := time.NewTicker(10 * time.Millisecond) defer ticker.Stop() - - var ( - mu sync.Mutex - sig = make(chan struct{}) - sendBytes uint64 - ) - - go func() { - for { - select { - case <-sig: - return - case <-ticker.C: - if atomic.LoadUint64(&sendBytes) > 0 { - mu.Lock() - w.client.Flush() - mu.Unlock() - } - } - } - }() for { select { case <-ctx.Done(): // do nothing until w.ch is closed + case <-ticker.C: + w.client.Flush() case e, ok := <-w.ch: if !ok { - // Clean up and exit - mu.Lock() + // clean up and exit w.client.Flush() - mu.Unlock() - close(sig) w.chWg.Done() return } @@ -144,18 +122,16 @@ func (w *redisStandaloneWriter) ProcessWrite(ctx context.Context) { } log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String()) if !w.offReply { - w.chWaitReply <- e - atomic.AddUint64(&sendBytes, uint64(e.SerializedSize)) + select { + case w.chWaitReply <- e: + default: + w.client.Flush() + w.chWaitReply <- e + } atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize) atomic.AddInt64(&w.stat.UnansweredEntries, 1) } - mu.Lock() w.client.SendBytesBuff(bytes) - if atomic.LoadUint64(&sendBytes) >= 32*1024 { - w.client.Flush() - atomic.StoreUint64(&sendBytes, 0) - } - mu.Unlock() } } }