diff --git a/docs/src/en/reader/aof_reader.md b/docs/src/en/reader/aof_reader.md index 96d0b7e7..1faab38c 100644 --- a/docs/src/en/reader/aof_reader.md +++ b/docs/src/en/reader/aof_reader.md @@ -9,7 +9,7 @@ It is commonly used to recover data from backup files and also supports data fla ```toml [aof_reader] -aoffilepath="/tmp/appendonly.aof.manifest" "or single-aof: /tmp/appendonly.aof "" +aoffilepath="/tmp/appendonly.aof.manifest" "or single-aof: /tmp/appendonly.aof" aoftimestamp="0" ``` diff --git a/docs/src/zh/reader/aof_reader.md b/docs/src/zh/reader/aof_reader.md index 0853f410..e479556b 100644 --- a/docs/src/zh/reader/aof_reader.md +++ b/docs/src/zh/reader/aof_reader.md @@ -8,7 +8,7 @@ ```toml [aof_reader] -aoffilepath="/tmp/appendonly.aof.manifest" "或者单aof文件 "/tmp/appendonly.aof"" +aoffilepath="/tmp/appendonly.aof.manifest" "或者单aof文件 "/tmp/appendonly.aof" aoftimestamp="0" ``` diff --git a/internal/aof/aof.go b/internal/aof/aof.go index abc6e590..78625cde 100644 --- a/internal/aof/aof.go +++ b/internal/aof/aof.go @@ -105,7 +105,7 @@ func (ld *Loader) LoadSingleAppendOnlyFile(AOFTimeStamp int64) int { if ts > AOFTimeStamp { ret = AOFTruncated - log.Infof("AOFTruncated %s", line) + log.Infof("Reached recovery timestamp: %s, subsequent data will no longer be read.", line) return ret } } diff --git a/internal/reader/aof_reader.go b/internal/reader/aof_reader.go index e5299989..b4631034 100644 --- a/internal/reader/aof_reader.go +++ b/internal/reader/aof_reader.go @@ -101,4 +101,4 @@ func (r *aofReader) StartRead() chan *entry.Entry { }() return r.ch -} +} \ No newline at end of file diff --git a/internal/reader/parsing_aof.go b/internal/reader/parsing_aof.go index a3b2f2de..e6ca00fd 100644 --- a/internal/reader/parsing_aof.go +++ b/internal/reader/parsing_aof.go @@ -153,7 +153,7 @@ func HexDigitToInt(c byte) int { } func SplitArgs(line string) ([]string, int) { - var p string = line + var p = line var Current string var vector []string argc := 0 @@ -175,7 +175,7 @@ func SplitArgs(line string) ([]string, int) { _, err1 := strconv.ParseInt(string(p[i+2]), 16, 64) _, err2 := strconv.ParseInt(string(p[i+3]), 16, 64) if err1 == nil && err2 == nil { - hexadecimal := (HexDigitToInt((p[i+2])) * 16) + HexDigitToInt(p[i+3]) + hexadecimal := (HexDigitToInt(p[i+2]) * 16) + HexDigitToInt(p[i+3]) Current = Current + fmt.Sprint(hexadecimal) i += 3 } @@ -301,17 +301,6 @@ func AOFInfoCreate() *AOFInfo { return new(AOFInfo) } -func AOFInfoDup(orig *AOFInfo) *AOFInfo { - if orig == nil { - log.Panicf("Assertion failed: orig != nil") - } - ai := AOFInfoCreate() - ai.FileName = orig.FileName - ai.FileSeq = orig.FileSeq - ai.AOFFileType = orig.AOFFileType - return ai -} - func AOFInfoFormat(buf string, ai *AOFInfo) string { var aofManifestTostring string if StringNeedsRepr(ai.FileName) == 1 { @@ -554,6 +543,17 @@ func GetBaseAndIncrAppendOnlyFilesNum(am *AOFManifest) int { return num } +func GetHistoryAndIncrAppendOnlyFilesNum(am *AOFManifest) int { + num := 0 + if am.HistoryList != nil { + num += am.HistoryList.Len() + } + if am.incrAOFList != nil { + num += am.incrAOFList.Len() + } + return num +} + func (aofInfo *INFO) LoadAppendOnlyFile(am *AOFManifest, AOFTimeStamp int64) int { if am == nil { log.Panicf("AOFManifest is null") @@ -566,15 +566,6 @@ func (aofInfo *INFO) LoadAppendOnlyFile(am *AOFManifest, AOFTimeStamp int64) int var AOFName string var totalNum, AOFNum int - if aofInfo.AOFFileExist(aofInfo.AOFFileName) == 1 { - if DirExists(aofInfo.AOFDirName) == 0 || - (am.BaseAOFInfo == nil && am.incrAOFList.Len() == 0) || - (am.BaseAOFInfo != nil && am.incrAOFList.Len() == 0 && - strings.Compare(am.BaseAOFInfo.FileName, aofInfo.AOFFileName) == 0 && aofInfo.AOFFileExist(aofInfo.AOFFileName) == 0) { - log.Panicf("This is an old version of the AOF File") - } - } - if am.BaseAOFInfo == nil && am.incrAOFList == nil { return AOFNotExist } @@ -622,6 +613,39 @@ func (aofInfo *INFO) LoadAppendOnlyFile(am *AOFManifest, AOFTimeStamp int64) int } } totalNum-- + } else { + totalNum = GetHistoryAndIncrAppendOnlyFilesNum(am) + log.Infof("The BaseAOF file does not exist. Start loading the HistoryAOF and IncrAOF files.") + if am.HistoryList.Len() > 0 { + for ln := am.HistoryList.Front(); ln != nil; ln = ln.Next() { + ai := ln.Value.(*AOFInfo) + if ai.AOFFileType != AOFManifestTypeHist { + log.Panicf("The manifestType must be Hist") + } + AOFName = ai.FileName + aofInfo.UpdateLoadingFileName(AOFName) + AOFNum++ + start = Ustime() + ret = aofInfo.ParsingSingleAppendOnlyFile(AOFName, AOFTimeStamp) + if ret == AOFOk || (ret == AOFTruncated) { + log.Infof("DB loaded from History File %v: %.3f seconds", AOFName, float64(Ustime()-start)/1000000) + return ret + } + if ret == AOFEmpty { + ret = AOFOk + } + if ret == AOFOpenErr || ret == AOFFailed { + if ret == AOFOpenErr { + log.Panicf("There was an error opening the AOF File.") + } else { + log.Infof("Failed to open AOF File.") + } + return ret + } + totalNum-- + } + } + } if am.incrAOFList.Len() > 0 { @@ -637,6 +661,7 @@ func (aofInfo *INFO) LoadAppendOnlyFile(am *AOFManifest, AOFTimeStamp int64) int ret = aofInfo.ParsingSingleAppendOnlyFile(AOFName, AOFTimeStamp) if ret == AOFOk || (ret == AOFTruncated) { log.Infof("DB loaded from incr File %v: %.3f seconds", AOFName, float64(Ustime()-start)/1000000) + return ret } if ret == AOFEmpty { ret = AOFOk diff --git a/shake.toml b/shake.toml index c57fb7f1..e670dc82 100644 --- a/shake.toml +++ b/shake.toml @@ -23,7 +23,7 @@ sync_aof = true # set to false if you don't want to sync aof # [aof_reader] # filepath = "/tmp/.aof" -# timestamp = 0 # Unix timestamp (millisecond level) +# timestamp = 0 # subsecond [redis_writer] cluster = false # set to true if target is a redis cluster diff --git a/tests/cases/aof.py b/tests/cases/aof.py index 91b50612..b601a4f2 100644 --- a/tests/cases/aof.py +++ b/tests/cases/aof.py @@ -1,7 +1,6 @@ import pybbt as p import helpers as h - import os def get_aof_file_relative_path(): @@ -22,7 +21,6 @@ def test(src, dst): opts = h.ShakeOpts.create_aof_opts(f"{src.dir}{get_aof_file_relative_path()}", dst) h.Shake.run_once(opts) # check data - inserter.check_data(src, cross_slots_cmd=cross_slots_cmd) inserter.check_data(dst, cross_slots_cmd=cross_slots_cmd) p.ASSERT_EQ(src.dbsize(), dst.dbsize()) @@ -80,6 +78,20 @@ def test_timestamp(dst): p.ASSERT_EQ(ret, [b"string", b"0", b"-1", b"123456789"]) p.ASSERT_EQ(dst.dbsize(), 4) +def test_history_file(src, dst): + + cross_slots_cmd = not (src.is_cluster() or dst.is_cluster()) + inserter = h.DataInserter() + for i in range(1000): + inserter.add_data(src, cross_slots_cmd=cross_slots_cmd) + p.ASSERT_TRUE(src.do("BGREWRITEAOF")) + + opts = h.ShakeOpts.create_aof_opts(f"{src.dir}{get_aof_file_relative_path()}", dst) + h.Shake.run_once(opts) + # check data + inserter.check_data(dst, cross_slots_cmd=cross_slots_cmd) + p.ASSERT_EQ(src.dbsize(), dst.dbsize()) + @p.subcase() def aof_to_standalone(): if h.REDIS_SERVER_VERSION < 7.0: @@ -146,7 +158,25 @@ def aof_to_standalone_timestamp(): return dst = h.Redis() test_timestamp(dst) - + +def aof_to_standalone_hsitory_file(): + if h.REDIS_SERVER_VERSION < 7.0: + return + src = h.Redis() + #set aof + #set hist + ret = src.do("CONFIG SET", "aof-disable-auto-gc", "yes") + p.log(f"aof_ret: {ret}") + + ret = src.do("CONFIG SET", "appendonly", "yes") + p.log(f"aof_ret: {ret}") + + ret = src.do("CONFIG SET", "aof-timestamp-enabled", "yes") + p.log(f"aof_ret: {ret}") + + dst = h.Redis() + test_history_file(src, dst) + @p.case(tags=["sync"]) def main(): aof_to_standalone() @@ -155,5 +185,6 @@ def main(): aof_to_standalone_rm_file() aof_to_cluster() aof_to_standalone_timestamp() + aof_to_standalone_hsitory_file() if __name__ == '__main__': main()