Skip to content

Commit

Permalink
fix: fix "read aof file failed" error
Browse files Browse the repository at this point in the history
  • Loading branch information
suxb201 committed Jan 14, 2025
1 parent 0046b84 commit d2ca8b7
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 13 deletions.
11 changes: 5 additions & 6 deletions internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"context"
"errors"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -477,7 +478,7 @@ func (r *syncStandaloneReader) sendRDB(rdbFilePath string) {
}

func (r *syncStandaloneReader) sendAOF(offset int64) {
aofReader := rotate.NewAOFReader(r.stat.Name, r.stat.Dir, offset)
aofReader := rotate.NewAOFReader(r.ctx, r.stat.Name, r.stat.Dir, offset)
defer aofReader.Close()
protoReader := proto.NewReader(bufio.NewReader(aofReader))
for {
Expand All @@ -488,12 +489,10 @@ func (r *syncStandaloneReader) sendAOF(offset int64) {

iArgv, err := protoReader.ReadReply()
if err != nil {
if err == io.EOF {
time.Sleep(10 * time.Millisecond)
continue
} else {
log.Panicf("[%s] read aof file failed. error=[%v]", r.stat.Name, err)
if errors.Is(err, context.Canceled) {
return
}
log.Panicf("[%s] read aof file failed. error=[%v]", r.stat.Name, err)
}

argv := client.ArrayString(iArgv, nil)
Expand Down
25 changes: 19 additions & 6 deletions internal/utils/file_rotate/aof_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package rotate
import (
"RedisShake/internal/log"
"RedisShake/internal/utils"
"context"
"fmt"
"io"
"os"
"time"
)

type AOFReader struct {
ctx context.Context
name string
dir string
file *os.File
Expand All @@ -18,8 +20,9 @@ type AOFReader struct {
filepath string
}

func NewAOFReader(name string, dir string, offset int64) *AOFReader {
func NewAOFReader(ctx context.Context, name string, dir string, offset int64) *AOFReader {
r := new(AOFReader)
r.ctx = ctx
r.name = name
r.dir = dir

Expand Down Expand Up @@ -69,17 +72,27 @@ func (r *AOFReader) readNextFile(offset int64) bool {

func (r *AOFReader) Read(buf []byte) (n int, err error) {
n, err = r.file.Read(buf)
if err == io.EOF {
if !r.readNextFile(r.offset) {
return n, io.EOF
for err == io.EOF {
// sleep or context
timer := time.NewTimer(1 * time.Millisecond)
select {
case <-r.ctx.Done():
return n, r.ctx.Err()
case <-timer.C:
}
r.readNextFile(r.offset) // try to read next file
_, err = r.file.Seek(0, 1)
if err != nil {
log.Panicf(err.Error())
}
n, err = r.file.Read(buf)
if err != nil {
return n, err

if err == nil {
break
} else if err == io.EOF {
continue
} else {
log.Panicf("[%s] read file failed. filename=[%s], err=[%v]", r.name, r.filepath, err)
}
}
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/writer/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (w *fileWriter) processWrite(ctx context.Context) {
defer ticker.Stop()
file, err := os.Create(w.path)
if err != nil {
log.Panicf("create file failed:", err)
log.Panicf("create file failed: %v", err)
return
}
defer file.Close()
Expand Down

0 comments on commit d2ca8b7

Please sign in to comment.