Skip to content

Commit

Permalink
Merge pull request #158 from FZambia/max_flush_delay
Browse files Browse the repository at this point in the history
Add MaxFlushDelay option
  • Loading branch information
rueian authored Dec 10, 2022
2 parents 4c04568 + 12ba3be commit 717c1ad
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 3 deletions.
18 changes: 15 additions & 3 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type pipe struct {
info map[string]RedisMessage
timeout time.Duration
pinggap time.Duration
maxFlushDelay time.Duration
once sync.Once
r2mu sync.Mutex
version int32
Expand Down Expand Up @@ -87,8 +88,9 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps bool)
ssubs: newSubs(),
close: make(chan struct{}),

timeout: option.ConnWriteTimeout,
pinggap: option.Dialer.KeepAlive,
timeout: option.ConnWriteTimeout,
pinggap: option.Dialer.KeepAlive,
maxFlushDelay: option.MaxFlushDelay,

r2ps: r2ps,
}
Expand Down Expand Up @@ -291,7 +293,17 @@ func (p *pipe) _backgroundWrite() (err error) {
if p.w.Buffered() == 0 {
err = p.Error()
} else {
err = p.w.Flush()
if p.maxFlushDelay == 0 {
err = p.w.Flush()
} else {
if atomic.LoadInt32(&p.waits) == 1 {
err = p.w.Flush()
} else {
ts := time.Now()
err = p.w.Flush()
time.Sleep(p.maxFlushDelay - time.Since(ts))
}
}
}
if err == nil {
if atomic.LoadInt32(&p.state) == 1 {
Expand Down
20 changes: 20 additions & 0 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,26 @@ func TestWriteSinglePipelineFlush(t *testing.T) {
}
}

func TestWriteWithMaxFlushDelay(t *testing.T) {
p, mock, cancel, _ := setup(t, ClientOption{
AlwaysPipelining: true,
MaxFlushDelay: 20 * time.Microsecond,
})
defer cancel()
times := 2000
wg := sync.WaitGroup{}
wg.Add(times)

for i := 0; i < times; i++ {
go func() {
ExpectOK(t, p.Do(context.Background(), cmds.NewCompleted([]string{"PING"})))
}()
}
for i := 0; i < times; i++ {
mock.Expect("PING").ReplyString("OK")
}
}

func TestWriteMultiFlush(t *testing.T) {
p, mock, cancel, _ := setup(t, ClientOption{})
defer cancel()
Expand Down
8 changes: 8 additions & 0 deletions rueidis.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ type ClientOption struct {
DisableCache bool
// AlwaysPipelining makes rueidis.Client always pipeline redis commands even if they are not issued concurrently.
AlwaysPipelining bool
// MaxFlushDelay when greater than zero pauses pipeline write loop for some time (not larger than MaxFlushDelay)
// after each flushing of data to the connection. This gives pipeline a chance to collect more commands to send
// to Redis. Adding this delay increases latency, reduces throughput – but in most cases may significantly reduce
// application and Redis CPU utilization due to less executed system calls. By default, Rueidis flushes data to the
// connection without extra delays. Depending on network latency and application-specific conditions the value
// of MaxFlushDelay may vary, sth like 20 microseconds should not affect latency/throughput a lot but still
// produce notable CPU usage reduction under load.
MaxFlushDelay time.Duration
}

// SentinelOption contains MasterSet,
Expand Down

0 comments on commit 717c1ad

Please sign in to comment.