Skip to content

Commit

Permalink
Optimize: bytes.Index and humanize.IBytes in SyncStandaloneReader (#909)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyningsun authored Jan 14, 2025
1 parent dbe3d7e commit 0046b84
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 55 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,21 @@ require (
github.com/mcuadros/go-defaults v1.2.0
github.com/rs/zerolog v1.28.0
github.com/spf13/viper v1.18.1
github.com/stretchr/testify v1.8.4
github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)

require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
Expand Down
135 changes: 80 additions & 55 deletions internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package reader

import (
"RedisShake/internal/client/proto"
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
Expand All @@ -14,6 +14,8 @@ import (
"strings"
"time"

"RedisShake/internal/client/proto"

"RedisShake/internal/client"
"RedisShake/internal/config"
"RedisShake/internal/entry"
Expand All @@ -38,6 +40,8 @@ type SyncReaderOptions struct {
Sentinel client.SentinelOptions `mapstructure:"sentinel"`
}

const RDB_EOF_MARKER_LEN = 40

type State string

const (
Expand All @@ -48,6 +52,47 @@ const (
kSyncAof State = "syncing aof"
)

type syncStandaloneReaderStat struct {
Name string `json:"name"`
Address string `json:"address"`
Dir string `json:"dir"`

// status
Status State `json:"status"`

// rdb info
RdbFileSizeBytes uint64 `json:"rdb_file_size_bytes"` // bytes of the rdb file
RdbFileSizeHuman string `json:"rdb_file_size_human"`
RdbReceivedBytes uint64 `json:"rdb_received_bytes"` // bytes of RDB received from master
RdbReceivedHuman string `json:"rdb_received_human"`
RdbSentBytes uint64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan
RdbSentHuman string `json:"rdb_sent_human"`

// aof info
AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master
AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan
AofReceivedBytes uint64 `json:"aof_received_bytes"` // bytes of AOF received from master
AofReceivedHuman string `json:"aof_received_human"`
}

func (s syncStandaloneReaderStat) MarshalJSON() ([]byte, error) {
if s.RdbFileSizeBytes != 0 {
s.RdbFileSizeHuman = humanize.IBytes(s.RdbFileSizeBytes)
}
if s.RdbReceivedBytes != 0 {
s.RdbReceivedHuman = humanize.IBytes(s.RdbReceivedBytes)
}
if s.RdbSentBytes != 0 {
s.RdbSentHuman = humanize.IBytes(s.RdbSentBytes)
}
if s.AofReceivedBytes != 0 {
s.AofReceivedHuman = humanize.IBytes(s.AofReceivedBytes)
}

type aliasStat syncStandaloneReaderStat // alias to avoid infinite recursion
return json.Marshal(aliasStat(s))
}

type syncStandaloneReader struct {
ctx context.Context
opts *SyncReaderOptions
Expand All @@ -56,28 +101,7 @@ type syncStandaloneReader struct {
ch chan *entry.Entry
DbId int

stat struct {
Name string `json:"name"`
Address string `json:"address"`
Dir string `json:"dir"`

// status
Status State `json:"status"`

// rdb info
RdbFileSizeBytes int64 `json:"rdb_file_size_bytes"` // bytes of the rdb file
RdbFileSizeHuman string `json:"rdb_file_size_human"`
RdbReceivedBytes int64 `json:"rdb_received_bytes"` // bytes of RDB received from master
RdbReceivedHuman string `json:"rdb_received_human"`
RdbSentBytes int64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan
RdbSentHuman string `json:"rdb_sent_human"`

// aof info
AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master
AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan
AofReceivedBytes int64 `json:"aof_received_bytes"` // bytes of AOF received from master
AofReceivedHuman string `json:"aof_received_human"`
}
stat syncStandaloneReaderStat

// version info
isDiskless bool
Expand Down Expand Up @@ -319,7 +343,7 @@ func (r *syncStandaloneReader) receiveRDB() string {
}
timeStart = time.Now()
log.Debugf("[%s] start receiving RDB. path=[%s]", r.stat.Name, rdbFilePath)
rdbFileHandle, err := os.OpenFile(rdbFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
rdbFileHandle, err := os.OpenFile(rdbFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o666)
if err != nil {
log.Panicf(err.Error())
}
Expand All @@ -345,39 +369,44 @@ func (r *syncStandaloneReader) receiveRDBWithDiskless(marker string, wt io.Write
buf := make([]byte, bufSize)

marker = strings.Split(marker, ":")[1]
if len(marker) != 40 {
if len(marker) != RDB_EOF_MARKER_LEN {
log.Panicf("[%s] invalid len of EOF marker. value=[%s]", r.stat.Name, marker)
}
log.Infof("meet EOF begin marker: %s", marker)
bMarker := []byte(marker)
goon := true
for goon {
n, err := r.client.Read(buf[:bufSize])
var lastBytes []byte
for {
copy(buf, lastBytes) // copy previous tail bytes to head of buf

nread, err := r.client.Read(buf[len(lastBytes):])
if err != nil {
log.Panicf(err.Error())
}
buffer := buf[:n]
if bytes.Contains(buffer, bMarker) {

bufLen := len(lastBytes) + nread
nwrite := 0
if bufLen >= RDB_EOF_MARKER_LEN && bytes.Equal(buf[bufLen-RDB_EOF_MARKER_LEN:bufLen], bMarker) {
log.Infof("meet EOF end marker.")
// replace it
fi := bytes.Index(buffer, bMarker)
if len(buffer[fi+40:]) > 0 {
log.Warnf("data after end marker will be discarded: %s", string(buffer[fi+40:]))
// Write all buf without EOF marker and break
if nwrite, err = wt.Write(buf[:bufLen-RDB_EOF_MARKER_LEN]); err != nil {
log.Panicf(err.Error())
}
buffer = buffer[:fi]

goon = false
break
}

_, err = wt.Write(buffer)
if err != nil {
log.Panicf(err.Error())
if bufLen >= RDB_EOF_MARKER_LEN {
// left RDB_EOF_MARKER_LEN bytes to next round
if nwrite, err = wt.Write(buf[:bufLen-RDB_EOF_MARKER_LEN]); err != nil {
log.Panicf(err.Error())
}
lastBytes = buf[bufLen-RDB_EOF_MARKER_LEN : bufLen] // save last RDB_EOF_MARKER_LEN bytes into lastBytes for next round
} else {
// save all bytes into lastBytes for next round if less than RDB_EOF_MARKER_LEN
lastBytes = buf[:bufLen]
}

r.stat.RdbFileSizeBytes += int64(n)
r.stat.RdbFileSizeHuman = humanize.IBytes(uint64(r.stat.RdbFileSizeBytes))
r.stat.RdbReceivedBytes += int64(n)
r.stat.RdbReceivedHuman = humanize.IBytes(uint64(r.stat.RdbReceivedBytes))
r.stat.RdbFileSizeBytes += uint64(nwrite)
r.stat.RdbReceivedBytes += uint64(nwrite)
}
}

Expand All @@ -387,8 +416,7 @@ func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Wr
log.Panicf(err.Error())
}
log.Debugf("[%s] rdb file size: [%v]", r.stat.Name, humanize.IBytes(uint64(length)))
r.stat.RdbFileSizeBytes = length
r.stat.RdbFileSizeHuman = humanize.IBytes(uint64(length))
r.stat.RdbFileSizeBytes = uint64(length)

remainder := length
const bufSize int64 = 32 * 1024 * 1024 // 32MB
Expand All @@ -408,8 +436,7 @@ func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Wr
log.Panicf(err.Error())
}

r.stat.RdbReceivedBytes += int64(n)
r.stat.RdbReceivedHuman = humanize.IBytes(uint64(r.stat.RdbReceivedBytes))
r.stat.RdbReceivedBytes += uint64(n)
}
}

Expand All @@ -427,8 +454,7 @@ func (r *syncStandaloneReader) receiveAOF() {
if err != nil {
log.Panicf(err.Error())
}
r.stat.AofReceivedBytes += int64(n)
r.stat.AofReceivedHuman = humanize.IBytes(uint64(r.stat.AofReceivedBytes))
r.stat.AofReceivedBytes += uint64(n)
aofWriter.Write(buf[:n])
r.stat.AofReceivedOffset += int64(n)
}
Expand All @@ -440,8 +466,7 @@ func (r *syncStandaloneReader) sendRDB(rdbFilePath string) {
log.Debugf("[%s] start sending RDB to target", r.stat.Name)
r.stat.Status = kSyncRdb
updateFunc := func(offset int64) {
r.stat.RdbSentBytes = offset
r.stat.RdbSentHuman = humanize.IBytes(uint64(offset))
r.stat.RdbSentBytes = uint64(offset)
}
rdbLoader := rdb.NewLoader(r.stat.Name, updateFunc, rdbFilePath, r.ch)
r.DbId = rdbLoader.ParseRDB(r.ctx)
Expand Down Expand Up @@ -532,16 +557,16 @@ func (r *syncStandaloneReader) Status() interface{} {

func (r *syncStandaloneReader) StatusString() string {
if r.stat.Status == kSyncRdb {
return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, r.stat.RdbSentHuman, r.stat.RdbFileSizeHuman)
return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, humanize.IBytes(r.stat.RdbSentBytes), humanize.IBytes(r.stat.RdbFileSizeBytes))
}
if r.stat.Status == kSyncAof {
return fmt.Sprintf("%s, diff=[%v]", r.stat.Status, -r.stat.AofSentOffset+r.stat.AofReceivedOffset)
}
if r.stat.Status == kReceiveRdb {
if r.isDiskless {
return fmt.Sprintf("%s diskless, size=[%s]", r.stat.Status, r.stat.RdbReceivedHuman)
return fmt.Sprintf("%s diskless, size=[%s]", r.stat.Status, humanize.IBytes(r.stat.RdbReceivedBytes))
}
return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, r.stat.RdbReceivedHuman, r.stat.RdbFileSizeHuman)
return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, humanize.IBytes(r.stat.RdbReceivedBytes), humanize.IBytes(r.stat.RdbFileSizeBytes))
}
return string(r.stat.Status)
}
Expand Down
78 changes: 78 additions & 0 deletions internal/reader/sync_standalone_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package reader

import (
"context"
"encoding/json"
"testing"

"RedisShake/internal/log"
"github.com/stretchr/testify/require"
)

func Test_syncStandaloneReader_Status(t *testing.T) {
type fields struct {
ctx context.Context
opts *SyncReaderOptions
}
tests := []struct {
name string
fields fields
want interface{}
}{
{
name: "syncStandaloneReader_Status_Marshal",
fields: fields{
ctx: context.Background(),
opts: &SyncReaderOptions{
Cluster: false,
Address: "127.0.0.1:6379",
Username: "username",
Password: "password",
Tls: false,
SyncRdb: false,
SyncAof: false,
PreferReplica: false,
TryDiskless: false,
},
},
want: map[string]interface{}{
"name": "",
"address": "",
"dir": "",
"status": "",
"rdb_file_size_bytes": 0,
"rdb_file_size_human": "",
"rdb_received_bytes": 0,
"rdb_received_human": "",
"rdb_sent_bytes": 0,
"rdb_sent_human": "",
"aof_received_offset": 0,
"aof_sent_offset": 0,
"aof_received_bytes": 0,
"aof_received_human": "",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &syncStandaloneReader{
ctx: tt.fields.ctx,
opts: tt.fields.opts,
}

want, err := json.Marshal(tt.want)
if err != nil {
log.Warnf("marshal want failed, err=[%v]", err)
return
}

got, err := json.Marshal(r.Status())
if err != nil {
log.Warnf("marshal status failed, err=[%v]", err)
return
}

require.JSONEq(t, string(want), string(got))
})
}
}

0 comments on commit 0046b84

Please sign in to comment.