From d2ca8b7e605b04df94e9371b7ff447b1bfc6b154 Mon Sep 17 00:00:00 2001 From: suxb201 Date: Tue, 14 Jan 2025 15:39:44 +0800 Subject: [PATCH] fix: fix "read aof file failed" error --- internal/reader/sync_standalone_reader.go | 11 +++++----- internal/utils/file_rotate/aof_reader.go | 25 +++++++++++++++++------ internal/writer/file_writer.go | 2 +- 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index c8d16a9e..2b69f929 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "errors" "encoding/json" "fmt" "io" @@ -477,7 +478,7 @@ func (r *syncStandaloneReader) sendRDB(rdbFilePath string) { } func (r *syncStandaloneReader) sendAOF(offset int64) { - aofReader := rotate.NewAOFReader(r.stat.Name, r.stat.Dir, offset) + aofReader := rotate.NewAOFReader(r.ctx, r.stat.Name, r.stat.Dir, offset) defer aofReader.Close() protoReader := proto.NewReader(bufio.NewReader(aofReader)) for { @@ -488,12 +489,10 @@ func (r *syncStandaloneReader) sendAOF(offset int64) { iArgv, err := protoReader.ReadReply() if err != nil { - if err == io.EOF { - time.Sleep(10 * time.Millisecond) - continue - } else { - log.Panicf("[%s] read aof file failed. error=[%v]", r.stat.Name, err) + if errors.Is(err, context.Canceled) { + return } + log.Panicf("[%s] read aof file failed. error=[%v]", r.stat.Name, err) } argv := client.ArrayString(iArgv, nil) diff --git a/internal/utils/file_rotate/aof_reader.go b/internal/utils/file_rotate/aof_reader.go index 657d06a9..b4cf87a9 100644 --- a/internal/utils/file_rotate/aof_reader.go +++ b/internal/utils/file_rotate/aof_reader.go @@ -3,6 +3,7 @@ package rotate import ( "RedisShake/internal/log" "RedisShake/internal/utils" + "context" "fmt" "io" "os" @@ -10,6 +11,7 @@ import ( ) type AOFReader struct { + ctx context.Context name string dir string file *os.File @@ -18,8 +20,9 @@ type AOFReader struct { filepath string } -func NewAOFReader(name string, dir string, offset int64) *AOFReader { +func NewAOFReader(ctx context.Context, name string, dir string, offset int64) *AOFReader { r := new(AOFReader) + r.ctx = ctx r.name = name r.dir = dir @@ -69,17 +72,27 @@ func (r *AOFReader) readNextFile(offset int64) bool { func (r *AOFReader) Read(buf []byte) (n int, err error) { n, err = r.file.Read(buf) - if err == io.EOF { - if !r.readNextFile(r.offset) { - return n, io.EOF + for err == io.EOF { + // sleep or context + timer := time.NewTimer(1 * time.Millisecond) + select { + case <-r.ctx.Done(): + return n, r.ctx.Err() + case <-timer.C: } + r.readNextFile(r.offset) // try to read next file _, err = r.file.Seek(0, 1) if err != nil { log.Panicf(err.Error()) } n, err = r.file.Read(buf) - if err != nil { - return n, err + + if err == nil { + break + } else if err == io.EOF { + continue + } else { + log.Panicf("[%s] read file failed. filename=[%s], err=[%v]", r.name, r.filepath, err) } } if err != nil { diff --git a/internal/writer/file_writer.go b/internal/writer/file_writer.go index 7efbe706..ef4dacbc 100644 --- a/internal/writer/file_writer.go +++ b/internal/writer/file_writer.go @@ -90,7 +90,7 @@ func (w *fileWriter) processWrite(ctx context.Context) { defer ticker.Stop() file, err := os.Create(w.path) if err != nil { - log.Panicf("create file failed:", err) + log.Panicf("create file failed: %v", err) return } defer file.Close()