Skip to content

Commit

Permalink
update filestore to use ints for size
Browse files Browse the repository at this point in the history
  • Loading branch information
esimkowitz committed Jan 22, 2025
1 parent 0fb579c commit d37e7de
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 37 deletions.
35 changes: 19 additions & 16 deletions pkg/filestore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"fmt"
"io/fs"
"log"
"math"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -42,7 +43,7 @@ const NoPartIdx = -1
var warningCount = &atomic.Int32{}
var flushErrorCount = &atomic.Int32{}

var partDataSize int64 = DefaultPartDataSize // overridden in tests
var partDataSize int = DefaultPartDataSize // overridden in tests
var stopFlush = &atomic.Bool{}

var WFS *FileStore = &FileStore{
Expand All @@ -67,7 +68,7 @@ type WaveFile struct {
// for circular files this is min(Size, MaxSize)
func (f WaveFile) DataLength() int64 {
if f.Opts.Circular {
return minInt64(f.Size, f.Opts.MaxSize)
return min(f.Size, f.Opts.MaxSize)
}
return f.Size
}
Expand Down Expand Up @@ -122,8 +123,9 @@ func (s *FileStore) MakeFile(ctx context.Context, zoneId string, name string, me
return fmt.Errorf("circular file cannot be ijson")
}
if opts.Circular {
if opts.MaxSize%partDataSize != 0 {
opts.MaxSize = (opts.MaxSize/partDataSize + 1) * partDataSize
partDataSizeInt64 := int64(partDataSize)
if opts.MaxSize%partDataSizeInt64 != 0 {
opts.MaxSize = (opts.MaxSize/partDataSizeInt64 + 1) * partDataSizeInt64
}
}
if opts.IJsonBudget > 0 && !opts.IJson {
Expand Down Expand Up @@ -249,7 +251,7 @@ func (s *FileStore) WriteAt(ctx context.Context, zoneId string, name string, off
if offset > file.Size {
return fmt.Errorf("offset is past the end of the file")
}
partMap := file.computePartMap(offset, int64(len(data)))
partMap := file.computePartMap(offset, len(data))
incompleteParts := incompletePartsFromMap(partMap)
err = entry.loadDataPartsIntoCache(ctx, incompleteParts)
if err != nil {
Expand All @@ -266,7 +268,7 @@ func (s *FileStore) AppendData(ctx context.Context, zoneId string, name string,
if err != nil {
return err
}
partMap := entry.File.computePartMap(entry.File.Size, int64(len(data)))
partMap := entry.File.computePartMap(entry.File.Size, len(data))
incompleteParts := incompletePartsFromMap(partMap)
if len(incompleteParts) > 0 {
err = entry.loadDataPartsIntoCache(ctx, incompleteParts)
Expand Down Expand Up @@ -332,7 +334,7 @@ func (s *FileStore) AppendIJson(ctx context.Context, zoneId string, name string,
if !entry.File.Opts.IJson {
return fmt.Errorf("file %s:%s is not an ijson file", zoneId, name)
}
partMap := entry.File.computePartMap(entry.File.Size, int64(len(data)))
partMap := entry.File.computePartMap(entry.File.Size, len(data))
incompleteParts := incompletePartsFromMap(partMap)
if len(incompleteParts) > 0 {
err = entry.loadDataPartsIntoCache(ctx, incompleteParts)
Expand Down Expand Up @@ -366,7 +368,7 @@ func (s *FileStore) GetAllZoneIds(ctx context.Context) ([]string, error) {

// returns (offset, data, error)
// we return the offset because the offset may have been adjusted if the size was too big (for circular files)
func (s *FileStore) ReadAt(ctx context.Context, zoneId string, name string, offset int64, size int64) (rtnOffset int64, rtnData []byte, rtnErr error) {
func (s *FileStore) ReadAt(ctx context.Context, zoneId string, name string, offset int64, size int) (rtnOffset int64, rtnData []byte, rtnErr error) {
withLock(s, zoneId, name, func(entry *CacheEntry) error {
rtnOffset, rtnData, rtnErr = entry.readAt(ctx, offset, size, false)
return nil
Expand Down Expand Up @@ -422,9 +424,9 @@ func (s *FileStore) FlushCache(ctx context.Context) (stats FlushStats, rtnErr er
///////////////////////////////////

func (f *WaveFile) partIdxAtOffset(offset int64) int {
partIdx := int(offset / partDataSize)
partIdx := int(min(offset/int64(partDataSize), math.MaxInt))
if f.Opts.Circular {
maxPart := int(f.Opts.MaxSize / partDataSize)
maxPart := int(min(f.Opts.MaxSize/int64(partDataSize), math.MaxInt))
partIdx = partIdx % maxPart
}
return partIdx
Expand All @@ -449,16 +451,17 @@ func getPartIdxsFromMap(partMap map[int]int) []int {
}

// returns a map of partIdx to amount of data to write to that part
func (file *WaveFile) computePartMap(startOffset int64, size int64) map[int]int {
func (file *WaveFile) computePartMap(startOffset int64, size int) map[int]int {
partMap := make(map[int]int)
endOffset := startOffset + size
startFileOffset := startOffset - (startOffset % partDataSize)
for testOffset := startFileOffset; testOffset < endOffset; testOffset += partDataSize {
endOffset := startOffset + int64(size)
partDataSizeInt64 := int64(partDataSize)
startFileOffset := startOffset - (startOffset % partDataSizeInt64)
for testOffset := startFileOffset; testOffset < endOffset; testOffset += partDataSizeInt64 {
partIdx := file.partIdxAtOffset(testOffset)
partStartOffset := testOffset
partEndOffset := testOffset + partDataSize
partEndOffset := testOffset + partDataSizeInt64
partWriteStartOffset := 0
partWriteEndOffset := int(partDataSize)
partWriteEndOffset := partDataSize
if startOffset > partStartOffset && startOffset < partEndOffset {
partWriteStartOffset = int(startOffset - partStartOffset)
}
Expand Down
45 changes: 26 additions & 19 deletions pkg/filestore/blockstore_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"fmt"
"io/fs"
"math"
"sync"
"time"
)
Expand Down Expand Up @@ -144,7 +145,7 @@ func withLockRtn[T any](s *FileStore, zoneId string, name string, fn func(*Cache
}

func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) (int64, *DataCacheEntry) {
leftInPart := partDataSize - offset
leftInPart := int64(partDataSize) - offset
toWrite := int64(len(data))
if toWrite > leftInPart {
toWrite = leftInPart
Expand All @@ -161,7 +162,7 @@ func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) {
entry.File.Size = 0
}
if entry.File.Opts.Circular {
startCirFileOffset := entry.File.Size - entry.File.Opts.MaxSize
startCirFileOffset := entry.File.Size - int64(entry.File.Opts.MaxSize)
if offset+int64(len(data)) <= startCirFileOffset {
// write is before the start of the circular file
return
Expand All @@ -172,26 +173,32 @@ func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) {
data = data[truncateAmt:]
offset += truncateAmt
}
if int64(len(data)) > entry.File.Opts.MaxSize {
dataLen := int64(len(data))
if dataLen > entry.File.Opts.MaxSize {
// truncate data (from the front), update offset
truncateAmt := int64(len(data)) - entry.File.Opts.MaxSize
truncateAmt := int(max(dataLen-entry.File.Opts.MaxSize, 0))
data = data[truncateAmt:]
offset += truncateAmt
offset += int64(truncateAmt)
}
}
endWriteOffset := offset + int64(len(data))
if replace {
entry.DataEntries = make(map[int]*DataCacheEntry)
}
for len(data) > 0 {
partIdx := int(offset / partDataSize)
partIdxI64 := offset / int64(partDataSize)
if partIdxI64 > math.MaxInt {
// too big
return
}
partIdx := int(partIdxI64)
if entry.File.Opts.Circular {
maxPart := int(entry.File.Opts.MaxSize / partDataSize)
maxPart := int(min(entry.File.Opts.MaxSize/int64(partDataSize), math.MaxInt))
partIdx = partIdx % maxPart
}
partOffset := offset % partDataSize
partOffset := int(offset % int64(partDataSize))
partData := entry.getOrCreateDataCacheEntry(partIdx)
nw, newDce := partData.writeToPart(partOffset, data)
nw, newDce := partData.writeToPart(int64(partOffset), data)
entry.DataEntries[partIdx] = newDce
data = data[nw:]
offset += nw
Expand All @@ -203,7 +210,7 @@ func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) {
}

// returns (realOffset, data, error)
func (entry *CacheEntry) readAt(ctx context.Context, offset int64, size int64, readFull bool) (int64, []byte, error) {
func (entry *CacheEntry) readAt(ctx context.Context, offset int64, size int, readFull bool) (int64, []byte, error) {
if offset < 0 {
return 0, nil, fmt.Errorf("offset cannot be negative")
}
Expand All @@ -212,20 +219,20 @@ func (entry *CacheEntry) readAt(ctx context.Context, offset int64, size int64, r
return 0, nil, err
}
if readFull {
size = file.Size - offset
size = int(min(file.Size-offset, math.MaxInt))
}
if offset+size > file.Size {
size = file.Size - offset
if offset+int64(size) > file.Size {
size = int(min(file.Size-offset, math.MaxInt))
}
if file.Opts.Circular {
realDataOffset := int64(0)
if file.Size > file.Opts.MaxSize {
realDataOffset = file.Size - file.Opts.MaxSize
if file.Size > int64(file.Opts.MaxSize) {
realDataOffset = file.Size - int64(file.Opts.MaxSize)
}
if offset < realDataOffset {
truncateAmt := realDataOffset - offset
offset += truncateAmt
size -= truncateAmt
size = int(max(int64(size)-truncateAmt, 0))
}
if size <= 0 {
return realDataOffset, nil, nil
Expand All @@ -250,11 +257,11 @@ func (entry *CacheEntry) readAt(ctx context.Context, offset int64, size int64, r
} else {
partData = partDataEntry.Data[0:partDataSize]
}
partOffset := curReadOffset % partDataSize
amtToRead := minInt64(partDataSize-partOffset, amtLeftToRead)
partOffset := int(curReadOffset % int64(partDataSize))
amtToRead := min(partDataSize-partOffset, amtLeftToRead)
rtnData = append(rtnData, partData[partOffset:partOffset+amtToRead]...)
amtLeftToRead -= amtToRead
curReadOffset += amtToRead
curReadOffset += int64(amtToRead)
}
return offset, rtnData, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/filestore/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func checkFileByteCount(t *testing.T, ctx context.Context, zoneId string, name s
}

func checkFileDataAt(t *testing.T, ctx context.Context, zoneId string, name string, offset int64, data string) {
_, rdata, err := WFS.ReadAt(ctx, zoneId, name, offset, int64(len(data)))
_, rdata, err := WFS.ReadAt(ctx, zoneId, name, offset, len(data))
if err != nil {
t.Errorf("error reading data for file %q: %v", name, err)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/remote/fileshare/wavefs/wavefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (c WaveClient) Read(ctx context.Context, conn *connparse.Connection, data w
return nil, fmt.Errorf("error cleaning path: %w", err)
}
if data.At != nil {
_, dataBuf, err := filestore.WFS.ReadAt(ctx, zoneId, fileName, data.At.Offset, int64(data.At.Size))
_, dataBuf, err := filestore.WFS.ReadAt(ctx, zoneId, fileName, data.At.Offset, data.At.Size)
if err == nil {
return &wshrpc.FileData{Info: data.Info, Data64: base64.StdEncoding.EncodeToString(dataBuf)}, nil
} else if errors.Is(err, fs.ErrNotExist) {
Expand Down

0 comments on commit d37e7de

Please sign in to comment.