From 064fd9ee5452f8ec1ab311e8a7071518dc767c69 Mon Sep 17 00:00:00 2001 From: "yinhang.sun" Date: Thu, 26 Dec 2024 11:10:16 +0800 Subject: [PATCH] fix humanize missing in sync_standalone_reader and add testcase to cover it --- go.mod | 4 +- go.sum | 2 - internal/reader/sync_standalone_reader.go | 76 +++++++++++++----- .../reader/sync_standalone_reader_test.go | 78 +++++++++++++++++++ 4 files changed, 136 insertions(+), 24 deletions(-) create mode 100644 internal/reader/sync_standalone_reader_test.go diff --git a/go.mod b/go.mod index b9ca5c74..b783ef9e 100644 --- a/go.mod +++ b/go.mod @@ -9,11 +9,12 @@ require ( github.com/mcuadros/go-defaults v1.2.0 github.com/rs/zerolog v1.28.0 github.com/spf13/viper v1.18.1 + github.com/stretchr/testify v1.8.4 github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64 ) require ( - github.com/a8m/envsubst v1.4.2 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/magiconair/properties v1.8.7 // indirect @@ -21,6 +22,7 @@ require ( github.com/mattn/go-isatty v0.0.17 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect diff --git a/go.sum b/go.sum index 92ef3dc5..ecce9d75 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -github.com/a8m/envsubst v1.4.2 h1:4yWIHXOLEJHQEFd4UjrWDrYeYlV7ncFWJOCBRLOZHQg= -github.com/a8m/envsubst v1.4.2/go.mod h1:MVUTQNGQ3tsjOOtKCNd+fl8RzhsXcDvvAEzkhGtlsbY= github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index cba2b17b..c6a37c06 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "encoding/json" "fmt" "io" "os" @@ -51,6 +52,56 @@ const ( kSyncAof State = "syncing aof" ) +type syncStandaloneReaderStat struct { + Name string `json:"name"` + Address string `json:"address"` + Dir string `json:"dir"` + + // status + Status State `json:"status"` + + // rdb info + RdbFileSizeBytes uint64 `json:"rdb_file_size_bytes"` // bytes of the rdb file + RdbReceivedBytes uint64 `json:"rdb_received_bytes"` // bytes of RDB received from master + RdbSentBytes uint64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan + + // aof info + AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master + AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan + AofReceivedBytes uint64 `json:"aof_received_bytes"` // bytes of AOF received from master +} + +func (s syncStandaloneReaderStat) MarshalJSON() ([]byte, error) { + rdbFileSizeHuman, rdbReceivedHuman, rdbSentHuman, aofReceivedHuman := "", "", "", "" + if s.RdbFileSizeBytes != 0 { + rdbFileSizeHuman = humanize.IBytes(s.RdbFileSizeBytes) + } + if s.RdbReceivedBytes != 0 { + rdbReceivedHuman = humanize.IBytes(s.RdbReceivedBytes) + } + if s.RdbSentBytes != 0 { + rdbSentHuman = humanize.IBytes(s.RdbSentBytes) + } + if s.AofReceivedBytes != 0 { + aofReceivedHuman = humanize.IBytes(s.AofReceivedBytes) + } + + type aliasStat syncStandaloneReaderStat // alias to avoid infinite recursion + return json.Marshal(struct { + aliasStat + RdbFileSizeHuman string `json:"rdb_file_size_human"` + RdbReceivedHuman string `json:"rdb_received_human"` + RdbSentHuman string `json:"rdb_sent_human"` + AofReceivedHuman string `json:"aof_received_human"` + }{ + aliasStat: aliasStat(s), + RdbFileSizeHuman: rdbFileSizeHuman, + RdbReceivedHuman: rdbReceivedHuman, + RdbSentHuman: rdbSentHuman, + AofReceivedHuman: aofReceivedHuman, + }) +} + type syncStandaloneReader struct { ctx context.Context opts *SyncReaderOptions @@ -59,24 +110,7 @@ type syncStandaloneReader struct { ch chan *entry.Entry DbId int - stat struct { - Name string `json:"name"` - Address string `json:"address"` - Dir string `json:"dir"` - - // status - Status State `json:"status"` - - // rdb info - RdbFileSizeBytes uint64 `json:"rdb_file_size_bytes"` // bytes of the rdb file - RdbReceivedBytes uint64 `json:"rdb_received_bytes"` // bytes of RDB received from master - RdbSentBytes int64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan - - // aof info - AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master - AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan - AofReceivedBytes int64 `json:"aof_received_bytes"` // bytes of AOF received from master - } + stat syncStandaloneReaderStat // version info isDiskless bool @@ -431,7 +465,7 @@ func (r *syncStandaloneReader) receiveAOF() { if err != nil { log.Panicf(err.Error()) } - r.stat.AofReceivedBytes += int64(n) + r.stat.AofReceivedBytes += uint64(n) aofWriter.Write(buf[:n]) r.stat.AofReceivedOffset += int64(n) } @@ -443,7 +477,7 @@ func (r *syncStandaloneReader) sendRDB(rdbFilePath string) { log.Debugf("[%s] start sending RDB to target", r.stat.Name) r.stat.Status = kSyncRdb updateFunc := func(offset int64) { - r.stat.RdbSentBytes = offset + r.stat.RdbSentBytes = uint64(offset) } rdbLoader := rdb.NewLoader(r.stat.Name, updateFunc, rdbFilePath, r.ch) r.DbId = rdbLoader.ParseRDB(r.ctx) @@ -534,7 +568,7 @@ func (r *syncStandaloneReader) Status() interface{} { func (r *syncStandaloneReader) StatusString() string { if r.stat.Status == kSyncRdb { - return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, humanize.IBytes(uint64(r.stat.RdbSentBytes)), humanize.IBytes(r.stat.RdbFileSizeBytes)) + return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, humanize.IBytes(r.stat.RdbSentBytes), humanize.IBytes(r.stat.RdbFileSizeBytes)) } if r.stat.Status == kSyncAof { return fmt.Sprintf("%s, diff=[%v]", r.stat.Status, -r.stat.AofSentOffset+r.stat.AofReceivedOffset) diff --git a/internal/reader/sync_standalone_reader_test.go b/internal/reader/sync_standalone_reader_test.go new file mode 100644 index 00000000..457d9cbb --- /dev/null +++ b/internal/reader/sync_standalone_reader_test.go @@ -0,0 +1,78 @@ +package reader + +import ( + "context" + "encoding/json" + "testing" + + "RedisShake/internal/log" + "github.com/stretchr/testify/require" +) + +func Test_syncStandaloneReader_Status(t *testing.T) { + type fields struct { + ctx context.Context + opts *SyncReaderOptions + } + tests := []struct { + name string + fields fields + want interface{} + }{ + { + name: "syncStandaloneReader_Status_Marshal", + fields: fields{ + ctx: context.Background(), + opts: &SyncReaderOptions{ + Cluster: false, + Address: "127.0.0.1:6379", + Username: "username", + Password: "password", + Tls: false, + SyncRdb: false, + SyncAof: false, + PreferReplica: false, + TryDiskless: false, + }, + }, + want: map[string]interface{}{ + "name": "", + "address": "", + "dir": "", + "status": "", + "rdb_file_size_bytes": 0, + "rdb_file_size_human": "", + "rdb_received_bytes": 0, + "rdb_received_human": "", + "rdb_sent_bytes": 0, + "rdb_sent_human": "", + "aof_received_offset": 0, + "aof_sent_offset": 0, + "aof_received_bytes": 0, + "aof_received_human": "", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &syncStandaloneReader{ + ctx: tt.fields.ctx, + opts: tt.fields.opts, + } + + want, err := json.Marshal(tt.want) + if err != nil { + log.Warnf("marshal status failed, err=[%v]", err) + return + } + + got, err := json.Marshal(r.Status()) + if err != nil { + log.Warnf("marshal status failed, err=[%v]", err) + return + } + + require.JSONEq(t, string(want), string(got)) + }) + } +}