diff --git a/pkg/filestore/blockstore.go b/pkg/filestore/blockstore.go index d67dea93a..1e0f8e83c 100644 --- a/pkg/filestore/blockstore.go +++ b/pkg/filestore/blockstore.go @@ -12,6 +12,7 @@ import ( "fmt" "io/fs" "log" + "math" "sync" "sync/atomic" "time" @@ -42,7 +43,7 @@ const NoPartIdx = -1 var warningCount = &atomic.Int32{} var flushErrorCount = &atomic.Int32{} -var partDataSize int64 = DefaultPartDataSize // overridden in tests +var partDataSize int = DefaultPartDataSize // overridden in tests var stopFlush = &atomic.Bool{} var WFS *FileStore = &FileStore{ @@ -67,7 +68,7 @@ type WaveFile struct { // for circular files this is min(Size, MaxSize) func (f WaveFile) DataLength() int64 { if f.Opts.Circular { - return minInt64(f.Size, f.Opts.MaxSize) + return min(f.Size, f.Opts.MaxSize) } return f.Size } @@ -122,8 +123,9 @@ func (s *FileStore) MakeFile(ctx context.Context, zoneId string, name string, me return fmt.Errorf("circular file cannot be ijson") } if opts.Circular { - if opts.MaxSize%partDataSize != 0 { - opts.MaxSize = (opts.MaxSize/partDataSize + 1) * partDataSize + partDataSizeInt64 := int64(partDataSize) + if opts.MaxSize%partDataSizeInt64 != 0 { + opts.MaxSize = (opts.MaxSize/partDataSizeInt64 + 1) * partDataSizeInt64 } } if opts.IJsonBudget > 0 && !opts.IJson { @@ -249,7 +251,7 @@ func (s *FileStore) WriteAt(ctx context.Context, zoneId string, name string, off if offset > file.Size { return fmt.Errorf("offset is past the end of the file") } - partMap := file.computePartMap(offset, int64(len(data))) + partMap := file.computePartMap(offset, len(data)) incompleteParts := incompletePartsFromMap(partMap) err = entry.loadDataPartsIntoCache(ctx, incompleteParts) if err != nil { @@ -266,7 +268,7 @@ func (s *FileStore) AppendData(ctx context.Context, zoneId string, name string, if err != nil { return err } - partMap := entry.File.computePartMap(entry.File.Size, int64(len(data))) + partMap := entry.File.computePartMap(entry.File.Size, len(data)) incompleteParts := incompletePartsFromMap(partMap) if len(incompleteParts) > 0 { err = entry.loadDataPartsIntoCache(ctx, incompleteParts) @@ -332,7 +334,7 @@ func (s *FileStore) AppendIJson(ctx context.Context, zoneId string, name string, if !entry.File.Opts.IJson { return fmt.Errorf("file %s:%s is not an ijson file", zoneId, name) } - partMap := entry.File.computePartMap(entry.File.Size, int64(len(data))) + partMap := entry.File.computePartMap(entry.File.Size, len(data)) incompleteParts := incompletePartsFromMap(partMap) if len(incompleteParts) > 0 { err = entry.loadDataPartsIntoCache(ctx, incompleteParts) @@ -366,7 +368,7 @@ func (s *FileStore) GetAllZoneIds(ctx context.Context) ([]string, error) { // returns (offset, data, error) // we return the offset because the offset may have been adjusted if the size was too big (for circular files) -func (s *FileStore) ReadAt(ctx context.Context, zoneId string, name string, offset int64, size int64) (rtnOffset int64, rtnData []byte, rtnErr error) { +func (s *FileStore) ReadAt(ctx context.Context, zoneId string, name string, offset int64, size int) (rtnOffset int64, rtnData []byte, rtnErr error) { withLock(s, zoneId, name, func(entry *CacheEntry) error { rtnOffset, rtnData, rtnErr = entry.readAt(ctx, offset, size, false) return nil @@ -422,9 +424,9 @@ func (s *FileStore) FlushCache(ctx context.Context) (stats FlushStats, rtnErr er /////////////////////////////////// func (f *WaveFile) partIdxAtOffset(offset int64) int { - partIdx := int(offset / partDataSize) + partIdx := int(min(offset/int64(partDataSize), math.MaxInt)) if f.Opts.Circular { - maxPart := int(f.Opts.MaxSize / partDataSize) + maxPart := int(min(f.Opts.MaxSize/int64(partDataSize), math.MaxInt)) partIdx = partIdx % maxPart } return partIdx @@ -449,16 +451,17 @@ func getPartIdxsFromMap(partMap map[int]int) []int { } // returns a map of partIdx to amount of data to write to that part -func (file *WaveFile) computePartMap(startOffset int64, size int64) map[int]int { +func (file *WaveFile) computePartMap(startOffset int64, size int) map[int]int { partMap := make(map[int]int) - endOffset := startOffset + size - startFileOffset := startOffset - (startOffset % partDataSize) - for testOffset := startFileOffset; testOffset < endOffset; testOffset += partDataSize { + endOffset := startOffset + int64(size) + partDataSizeInt64 := int64(partDataSize) + startFileOffset := startOffset - (startOffset % partDataSizeInt64) + for testOffset := startFileOffset; testOffset < endOffset; testOffset += partDataSizeInt64 { partIdx := file.partIdxAtOffset(testOffset) partStartOffset := testOffset - partEndOffset := testOffset + partDataSize + partEndOffset := testOffset + partDataSizeInt64 partWriteStartOffset := 0 - partWriteEndOffset := int(partDataSize) + partWriteEndOffset := partDataSize if startOffset > partStartOffset && startOffset < partEndOffset { partWriteStartOffset = int(startOffset - partStartOffset) } diff --git a/pkg/filestore/blockstore_cache.go b/pkg/filestore/blockstore_cache.go index af8632022..dbb00628d 100644 --- a/pkg/filestore/blockstore_cache.go +++ b/pkg/filestore/blockstore_cache.go @@ -8,6 +8,7 @@ import ( "context" "fmt" "io/fs" + "math" "sync" "time" ) @@ -144,7 +145,7 @@ func withLockRtn[T any](s *FileStore, zoneId string, name string, fn func(*Cache } func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) (int64, *DataCacheEntry) { - leftInPart := partDataSize - offset + leftInPart := int64(partDataSize) - offset toWrite := int64(len(data)) if toWrite > leftInPart { toWrite = leftInPart @@ -161,7 +162,7 @@ func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) { entry.File.Size = 0 } if entry.File.Opts.Circular { - startCirFileOffset := entry.File.Size - entry.File.Opts.MaxSize + startCirFileOffset := entry.File.Size - int64(entry.File.Opts.MaxSize) if offset+int64(len(data)) <= startCirFileOffset { // write is before the start of the circular file return @@ -172,11 +173,12 @@ func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) { data = data[truncateAmt:] offset += truncateAmt } - if int64(len(data)) > entry.File.Opts.MaxSize { + dataLen := int64(len(data)) + if dataLen > entry.File.Opts.MaxSize { // truncate data (from the front), update offset - truncateAmt := int64(len(data)) - entry.File.Opts.MaxSize + truncateAmt := int(max(dataLen-entry.File.Opts.MaxSize, 0)) data = data[truncateAmt:] - offset += truncateAmt + offset += int64(truncateAmt) } } endWriteOffset := offset + int64(len(data)) @@ -184,14 +186,19 @@ func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) { entry.DataEntries = make(map[int]*DataCacheEntry) } for len(data) > 0 { - partIdx := int(offset / partDataSize) + partIdxI64 := offset / int64(partDataSize) + if partIdxI64 > math.MaxInt { + // too big + return + } + partIdx := int(partIdxI64) if entry.File.Opts.Circular { - maxPart := int(entry.File.Opts.MaxSize / partDataSize) + maxPart := int(min(entry.File.Opts.MaxSize/int64(partDataSize), math.MaxInt)) partIdx = partIdx % maxPart } - partOffset := offset % partDataSize + partOffset := int(offset % int64(partDataSize)) partData := entry.getOrCreateDataCacheEntry(partIdx) - nw, newDce := partData.writeToPart(partOffset, data) + nw, newDce := partData.writeToPart(int64(partOffset), data) entry.DataEntries[partIdx] = newDce data = data[nw:] offset += nw @@ -203,7 +210,7 @@ func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) { } // returns (realOffset, data, error) -func (entry *CacheEntry) readAt(ctx context.Context, offset int64, size int64, readFull bool) (int64, []byte, error) { +func (entry *CacheEntry) readAt(ctx context.Context, offset int64, size int, readFull bool) (int64, []byte, error) { if offset < 0 { return 0, nil, fmt.Errorf("offset cannot be negative") } @@ -212,20 +219,20 @@ func (entry *CacheEntry) readAt(ctx context.Context, offset int64, size int64, r return 0, nil, err } if readFull { - size = file.Size - offset + size = int(min(file.Size-offset, math.MaxInt)) } - if offset+size > file.Size { - size = file.Size - offset + if offset+int64(size) > file.Size { + size = int(min(file.Size-offset, math.MaxInt)) } if file.Opts.Circular { realDataOffset := int64(0) - if file.Size > file.Opts.MaxSize { - realDataOffset = file.Size - file.Opts.MaxSize + if file.Size > int64(file.Opts.MaxSize) { + realDataOffset = file.Size - int64(file.Opts.MaxSize) } if offset < realDataOffset { truncateAmt := realDataOffset - offset offset += truncateAmt - size -= truncateAmt + size = int(max(int64(size)-truncateAmt, 0)) } if size <= 0 { return realDataOffset, nil, nil @@ -250,11 +257,11 @@ func (entry *CacheEntry) readAt(ctx context.Context, offset int64, size int64, r } else { partData = partDataEntry.Data[0:partDataSize] } - partOffset := curReadOffset % partDataSize - amtToRead := minInt64(partDataSize-partOffset, amtLeftToRead) + partOffset := int(curReadOffset % int64(partDataSize)) + amtToRead := min(partDataSize-partOffset, amtLeftToRead) rtnData = append(rtnData, partData[partOffset:partOffset+amtToRead]...) amtLeftToRead -= amtToRead - curReadOffset += amtToRead + curReadOffset += int64(amtToRead) } return offset, rtnData, nil } diff --git a/pkg/filestore/blockstore_test.go b/pkg/filestore/blockstore_test.go index 42fc7a343..2cda20647 100644 --- a/pkg/filestore/blockstore_test.go +++ b/pkg/filestore/blockstore_test.go @@ -306,7 +306,7 @@ func checkFileByteCount(t *testing.T, ctx context.Context, zoneId string, name s } func checkFileDataAt(t *testing.T, ctx context.Context, zoneId string, name string, offset int64, data string) { - _, rdata, err := WFS.ReadAt(ctx, zoneId, name, offset, int64(len(data))) + _, rdata, err := WFS.ReadAt(ctx, zoneId, name, offset, len(data)) if err != nil { t.Errorf("error reading data for file %q: %v", name, err) return diff --git a/pkg/remote/fileshare/wavefs/wavefs.go b/pkg/remote/fileshare/wavefs/wavefs.go index 1aaca37a1..5382d7bbf 100644 --- a/pkg/remote/fileshare/wavefs/wavefs.go +++ b/pkg/remote/fileshare/wavefs/wavefs.go @@ -69,7 +69,7 @@ func (c WaveClient) Read(ctx context.Context, conn *connparse.Connection, data w return nil, fmt.Errorf("error cleaning path: %w", err) } if data.At != nil { - _, dataBuf, err := filestore.WFS.ReadAt(ctx, zoneId, fileName, data.At.Offset, int64(data.At.Size)) + _, dataBuf, err := filestore.WFS.ReadAt(ctx, zoneId, fileName, data.At.Offset, data.At.Size) if err == nil { return &wshrpc.FileData{Info: data.Info, Data64: base64.StdEncoding.EncodeToString(dataBuf)}, nil } else if errors.Is(err, fs.ErrNotExist) {