Skip to content

Commit

Permalink
fix humanize missing in sync_standalone_reader and add testcase to co…
Browse files Browse the repository at this point in the history
…ver it
  • Loading branch information
cyningsun committed Dec 26, 2024
1 parent dd86fbe commit 064fd9e
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 24 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@ require (
github.com/mcuadros/go-defaults v1.2.0
github.com/rs/zerolog v1.28.0
github.com/spf13/viper v1.18.1
github.com/stretchr/testify v1.8.4
github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64
)

require (
github.com/a8m/envsubst v1.4.2 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/a8m/envsubst v1.4.2 h1:4yWIHXOLEJHQEFd4UjrWDrYeYlV7ncFWJOCBRLOZHQg=
github.com/a8m/envsubst v1.4.2/go.mod h1:MVUTQNGQ3tsjOOtKCNd+fl8RzhsXcDvvAEzkhGtlsbY=
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
76 changes: 55 additions & 21 deletions internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -51,6 +52,56 @@ const (
kSyncAof State = "syncing aof"
)

type syncStandaloneReaderStat struct {
Name string `json:"name"`
Address string `json:"address"`
Dir string `json:"dir"`

// status
Status State `json:"status"`

// rdb info
RdbFileSizeBytes uint64 `json:"rdb_file_size_bytes"` // bytes of the rdb file
RdbReceivedBytes uint64 `json:"rdb_received_bytes"` // bytes of RDB received from master
RdbSentBytes uint64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan

// aof info
AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master
AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan
AofReceivedBytes uint64 `json:"aof_received_bytes"` // bytes of AOF received from master
}

func (s syncStandaloneReaderStat) MarshalJSON() ([]byte, error) {
rdbFileSizeHuman, rdbReceivedHuman, rdbSentHuman, aofReceivedHuman := "", "", "", ""
if s.RdbFileSizeBytes != 0 {
rdbFileSizeHuman = humanize.IBytes(s.RdbFileSizeBytes)
}
if s.RdbReceivedBytes != 0 {
rdbReceivedHuman = humanize.IBytes(s.RdbReceivedBytes)
}
if s.RdbSentBytes != 0 {
rdbSentHuman = humanize.IBytes(s.RdbSentBytes)
}
if s.AofReceivedBytes != 0 {
aofReceivedHuman = humanize.IBytes(s.AofReceivedBytes)
}

type aliasStat syncStandaloneReaderStat // alias to avoid infinite recursion
return json.Marshal(struct {
aliasStat
RdbFileSizeHuman string `json:"rdb_file_size_human"`
RdbReceivedHuman string `json:"rdb_received_human"`
RdbSentHuman string `json:"rdb_sent_human"`
AofReceivedHuman string `json:"aof_received_human"`
}{
aliasStat: aliasStat(s),
RdbFileSizeHuman: rdbFileSizeHuman,
RdbReceivedHuman: rdbReceivedHuman,
RdbSentHuman: rdbSentHuman,
AofReceivedHuman: aofReceivedHuman,
})
}

type syncStandaloneReader struct {
ctx context.Context
opts *SyncReaderOptions
Expand All @@ -59,24 +110,7 @@ type syncStandaloneReader struct {
ch chan *entry.Entry
DbId int

stat struct {
Name string `json:"name"`
Address string `json:"address"`
Dir string `json:"dir"`

// status
Status State `json:"status"`

// rdb info
RdbFileSizeBytes uint64 `json:"rdb_file_size_bytes"` // bytes of the rdb file
RdbReceivedBytes uint64 `json:"rdb_received_bytes"` // bytes of RDB received from master
RdbSentBytes int64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan

// aof info
AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master
AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan
AofReceivedBytes int64 `json:"aof_received_bytes"` // bytes of AOF received from master
}
stat syncStandaloneReaderStat

// version info
isDiskless bool
Expand Down Expand Up @@ -431,7 +465,7 @@ func (r *syncStandaloneReader) receiveAOF() {
if err != nil {
log.Panicf(err.Error())
}
r.stat.AofReceivedBytes += int64(n)
r.stat.AofReceivedBytes += uint64(n)
aofWriter.Write(buf[:n])
r.stat.AofReceivedOffset += int64(n)
}
Expand All @@ -443,7 +477,7 @@ func (r *syncStandaloneReader) sendRDB(rdbFilePath string) {
log.Debugf("[%s] start sending RDB to target", r.stat.Name)
r.stat.Status = kSyncRdb
updateFunc := func(offset int64) {
r.stat.RdbSentBytes = offset
r.stat.RdbSentBytes = uint64(offset)
}
rdbLoader := rdb.NewLoader(r.stat.Name, updateFunc, rdbFilePath, r.ch)
r.DbId = rdbLoader.ParseRDB(r.ctx)
Expand Down Expand Up @@ -534,7 +568,7 @@ func (r *syncStandaloneReader) Status() interface{} {

func (r *syncStandaloneReader) StatusString() string {
if r.stat.Status == kSyncRdb {
return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, humanize.IBytes(uint64(r.stat.RdbSentBytes)), humanize.IBytes(r.stat.RdbFileSizeBytes))
return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, humanize.IBytes(r.stat.RdbSentBytes), humanize.IBytes(r.stat.RdbFileSizeBytes))
}
if r.stat.Status == kSyncAof {
return fmt.Sprintf("%s, diff=[%v]", r.stat.Status, -r.stat.AofSentOffset+r.stat.AofReceivedOffset)
Expand Down
78 changes: 78 additions & 0 deletions internal/reader/sync_standalone_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package reader

import (
"context"
"encoding/json"
"testing"

"RedisShake/internal/log"
"github.com/stretchr/testify/require"
)

func Test_syncStandaloneReader_Status(t *testing.T) {
type fields struct {
ctx context.Context
opts *SyncReaderOptions
}
tests := []struct {
name string
fields fields
want interface{}
}{
{
name: "syncStandaloneReader_Status_Marshal",
fields: fields{
ctx: context.Background(),
opts: &SyncReaderOptions{
Cluster: false,
Address: "127.0.0.1:6379",
Username: "username",
Password: "password",
Tls: false,
SyncRdb: false,
SyncAof: false,
PreferReplica: false,
TryDiskless: false,
},
},
want: map[string]interface{}{
"name": "",
"address": "",
"dir": "",
"status": "",
"rdb_file_size_bytes": 0,
"rdb_file_size_human": "",
"rdb_received_bytes": 0,
"rdb_received_human": "",
"rdb_sent_bytes": 0,
"rdb_sent_human": "",
"aof_received_offset": 0,
"aof_sent_offset": 0,
"aof_received_bytes": 0,
"aof_received_human": "",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &syncStandaloneReader{
ctx: tt.fields.ctx,
opts: tt.fields.opts,
}

want, err := json.Marshal(tt.want)
if err != nil {
log.Warnf("marshal status failed, err=[%v]", err)
return
}

got, err := json.Marshal(r.Status())
if err != nil {
log.Warnf("marshal status failed, err=[%v]", err)
return
}

require.JSONEq(t, string(want), string(got))
})
}
}

0 comments on commit 064fd9e

Please sign in to comment.