Skip to content

Commit

Permalink
Delete faster file cache (#219)
Browse files Browse the repository at this point in the history
* Added check on Atime on item to condition

* WIP nested if to check Atime

* Revert "WIP nested if to check Atime"

This reverts commit 6391dd5.

* Revert "Added check on Atime on item to condition"

This reverts commit ee3d7ad.

* added debug lines in Read() and Write()

* WIP Moved file_cache : Openfile call to Read() and Write()

* WIP Removed open() call in Write(). Troubleshooting file interface lag and write issues.

* Revert "WIP Removed open() call in Write(). Troubleshooting file interface lag and write issues."

This reverts commit bd0b50b.

* Revert "WIP Moved file_cache : Openfile call to Read() and Write()"

This reverts commit 8483de9.

* modified ReadInBuffer() to recieve handleID and load handleID

* Revert "modified ReadInBuffer() to recieve handleID and load handleID"

This reverts commit dc3640d.

* set up download() and getHandleData()

* added call to getHandleData() from ReadInBuffer()

* added call to getHandleData() in WriteFile()

* removed redundant fileObj nil check

* added additional handlemap.Handle var to replace options.Handle for ReadInBuffer()

* added additional handlemap.Handle var to replace options.Handle for WriteFile()

* updated println() into log.Err and log.Debug

* -changed test to call DownloadFile()
-exported download() to DownlaodFile() for tests
-added isDownloadRequired logic in TruncateFile()

* added DownloadFileOptions struct and adjusted function parameters and calls respectively

* chanced condition to be more specific for new handle produced by OpenFile()

* get handle out of download in TruncateFile() to fix test

* adjusted TestSyncFile(), WriteFile(), and ReadInBuffer() to hold flag sync state

* Added handle storing and retrieval in TestReadFileWithRefresh()

* added sync.mutex thread safe logic into getHandleData()

* added download logic into ReadFile()

* removed download portion of ReadFile()

* -removed extra handle being used to swap into handlemap
-added handle option in DownloadFileOptions
-fixed all references to DownloadFile() to include handle in options.

* used sync.RWMutex from handle to lock and unlock handle instead of file cache.

* -moved getHandleData() logic into DownloadFile()
-removed DownloadFile() flag and mode parameters

* -moved download logic into a downloadRequired conditional
-removed syscall.EACCES error check

* -used OpenFile() instead of creating a new handle direclty
-changed conditional to check if handle has value "flag" signifying that it is new and empty.

* added error handle to OpenFile() call in TruncateFile()

* fixed parameters calling DownloadFile()

* -set default values for flag and mode for DownloadFile()
-removed handlemap loading from test

* added another 12 second wait for file to expire in TestReadFileWithRefresh()

* put os.O_RDWR back into OpenFile() call in TruncateFile() to allow FlushFile()'s f.Sync() call to succeed

* moved fileExists assigned from isDownloadRequired() in DownlaodFile()

* added error handle in WriteFile()

* spell correction on error string

* Added isDownloadNeeded value in handle and adjusted download conditionals respectively

* removed redundant code from OpenFile().

* condensed values in handle into a single value. changed conditions checking for this one value.

* fix typo in DownlaodFile() log

* renamed rename flagModeStruct to openFileOptions in DownloadFile()

* renamed flag to flags in DownloadFile()

* removed DownloadFileOptions struct. refactored all function calls

* adjusted function call parameters in tests for DownloadFile()

* commented out debug log for Read() and Write()

* replaced value adjustment of handle with removing single fileFlagMode value at the end of DownloadFile()

* renamed flag to flags for value struct set in OpenFile()

* renamed flag to flags in type assertion in DownloadFile()

* moved openFileOptions to it's own defined struct and refactored code respecively.

* unexported DownloadFile() and is now downloadFile()

* WIP add downloadFile() call after openFile In tests to resolve pipeline

* removed var redefine for handle in TruncateFile()

* removed handle redefine assignment for downloadFile() call in ReadInBuffer() and WriteFile()

* add downlaodFile() after OpenFile() in TestRenameFileInCache()

* added error check to downloadFile() call in TestRenameFileAndCacheCleanupWithNoTimeout()

* Added downloadFile() call to test with error handle.

* cleaned up empty lines

* removed handlemap.Handle return from downloadFile

* adjusted type assertion to maintain consistent style

* changed "fileFlagMode" key to "openFileOptions" for value in handle

* deleted fileCacheStatsCollector from OpenFile

* corrected comment in TruncateFile

* corrected return type var assignment in TestChownInCache()

* added value check for handle in CloseFile()

* fixed var assignemnts when calling downloadFile

* adjsuted var assignements when calling downloadFile

* changed commment and removed akward wording

* moved handle lock into downloadFile

* move handle value check further up CloseFile

* remove var err error declarations from ReadInBuffer and WriteFile

* changed "FileCache::download" in erro stiring to "FileCache::downloadFile"

* removed "for' from error message in ReadInBuffer

* adjusted Error log to reflect function called

* put the TestFileCacheTestSuite back in file_cache_test.go

* put ReadFile() call back in place for TestReadFile()

* adjusted assertions for handle not being nil in TestHardLimitOnSize

* removed comments in test code

* Moved hard limit check from downloadFile to OpenFile

* Added ReadFile calls and respecitve asserts in tests.

* removed extra ReadFile and download calls in tests and added download to ReadFile function

* added error check for ReadFile
  • Loading branch information
Dabnsky authored Jun 12, 2024
1 parent 47ae7d8 commit 3fcfe20
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 85 deletions.
194 changes: 129 additions & 65 deletions component/file_cache/file_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ type FileCacheOptions struct {
HardLimit bool `config:"hard-limit" yaml:"hard-limit,omitempty"`
}

type openFileOptions struct {
flags int
fMode fs.FileMode
}

const (
compName = "file_cache"
defaultMaxEviction = 5000
Expand Down Expand Up @@ -691,136 +696,130 @@ func (fc *FileCache) DeleteFile(options internal.DeleteFileOptions) error {
return nil
}

// OpenFile: Makes the file available in the local cache for further file operations.
func (fc *FileCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Handle, error) {
log.Trace("FileCache::OpenFile : name=%s, flags=%d, mode=%s", options.Name, options.Flags, options.Mode)
func (fc *FileCache) downloadFile(handle *handlemap.Handle) error {
log.Trace("FileCache::downloadFile : name=%s", handle.Path)

localPath := common.JoinUnixFilepath(fc.tmpPath, options.Name)
handle.Lock()
defer handle.Unlock()

//extract flags and mode out of the value from handle
var flags int
var fMode fs.FileMode
val, found := handle.GetValue("openFileOptions")
if !found {
return nil
}
fileOptions := val.(openFileOptions)
flags = fileOptions.flags
fMode = fileOptions.fMode

localPath := common.JoinUnixFilepath(fc.tmpPath, handle.Path)
var f *os.File
var err error

flock := fc.fileLocks.Get(options.Name)
flock := fc.fileLocks.Get(handle.Path)
flock.Lock()
defer flock.Unlock()

fc.policy.CacheValid(localPath)
downloadRequired, fileExists, attr, err := fc.isDownloadRequired(localPath, options.Name, flock)

// return err in case of authorization permission mismatch
if err != nil && err == syscall.EACCES {
return nil, err
downloadRequired, fileExists, attr, err := fc.isDownloadRequired(localPath, handle.Path, flock)
if err != nil {
log.Err("FileCache::downloadFile : Failed to check if download is required for %s [%s]", handle.Path, err.Error())
}

fileMode := fc.defaultPermission
if downloadRequired {
log.Debug("FileCache::OpenFile : Need to download %s", options.Name)
log.Debug("FileCache::downloadFile : Need to download %s", handle.Path)

fileSize := int64(0)
if attr != nil {
fileSize = int64(attr.Size)
}

if fileExists {
log.Debug("FileCache::OpenFile : Delete cached file %s", options.Name)
log.Debug("FileCache::downloadFile : Delete cached file %s", handle.Path)

err := deleteFile(localPath)
if err != nil && !os.IsNotExist(err) {
log.Err("FileCache::OpenFile : Failed to delete old file %s", options.Name)
log.Err("FileCache::downloadFile : Failed to delete old file %s", handle.Path)
}
} else {
// Create the file if if doesn't already exist.
err := os.MkdirAll(filepath.Dir(localPath), fc.defaultPermission)
if err != nil {
log.Err("FileCache::OpenFile : error creating directory structure for file %s [%s]", options.Name, err.Error())
return nil, err
log.Err("FileCache::downloadFile : error creating directory structure for file %s [%s]", handle.Path, err.Error())
return err
}
}

// Open the file in write mode.
f, err = common.OpenFile(localPath, os.O_CREATE|os.O_RDWR, options.Mode)
f, err = common.OpenFile(localPath, os.O_CREATE|os.O_RDWR, fMode)
if err != nil {
log.Err("FileCache::OpenFile : error creating new file %s [%s]", options.Name, err.Error())
return nil, err
log.Err("FileCache::downloadFile : error creating new file %s [%s]", handle.Path, err.Error())
return err
}

if options.Flags&os.O_TRUNC != 0 {
if flags&os.O_TRUNC != 0 {
fileSize = 0
}

if fileSize > 0 {
if fc.diskHighWaterMark != 0 {
currSize, err := common.GetUsage(fc.tmpPath)
if err != nil {
log.Err("FileCache::OpenFile : error getting current usage of cache [%s]", err.Error())
} else {
if (currSize + float64(fileSize)) > fc.diskHighWaterMark {
log.Err("FileCache::OpenFile : cache size limit reached [%f] failed to open %s", fc.maxCacheSize, options.Name)
return nil, syscall.ENOSPC
}
}

}
// Download/Copy the file from storage to the local file.
// We pass a count of 0 to get the entire object
err = fc.NextComponent().CopyToFile(
internal.CopyToFileOptions{
Name: options.Name,
Name: handle.Path,
Offset: 0,
Count: 0,
File: f,
})
if err != nil {
// File was created locally and now download has failed so we need to delete it back from local cache
log.Err("FileCache::OpenFile : error downloading file from storage %s [%s]", options.Name, err.Error())
log.Err("FileCache::downloadFile : error downloading file from storage %s [%s]", handle.Path, err.Error())
_ = f.Close()
_ = os.Remove(localPath)
return nil, err
return err
}
}

// Update the last download time of this file
flock.SetDownloadTime()

log.Debug("FileCache::OpenFile : Download of %s is complete", options.Name)
log.Debug("FileCache::downloadFile : Download of %s is complete", handle.Path)
f.Close()

// After downloading the file, update the modified times and mode of the file.
fileMode := fc.defaultPermission
if attr != nil && !attr.IsModeDefault() {
fileMode = attr.Mode
}
}

// If user has selected some non default mode in config then every local file shall be created with that mode only
err = os.Chmod(localPath, fileMode)
if err != nil {
log.Err("FileCache::OpenFile : Failed to change mode of file %s [%s]", options.Name, err.Error())
}
// TODO: When chown is supported should we update that?
// If user has selected some non default mode in config then every local file shall be created with that mode only
err = os.Chmod(localPath, fileMode)
if err != nil {
log.Err("FileCache::downloadFile : Failed to change mode of file %s [%s]", handle.Path, err.Error())
}
// TODO: When chown is supported should we update that?

if attr != nil {
// chtimes shall be the last api otherwise calling chmod/chown will update the last change time
err = os.Chtimes(localPath, attr.Atime, attr.Mtime)
if err != nil {
log.Err("FileCache::OpenFile : Failed to change times of file %s [%s]", options.Name, err.Error())
}
if attr != nil {
// chtimes shall be the last api otherwise calling chmod/chown will update the last change time
err = os.Chtimes(localPath, attr.Atime, attr.Mtime)
if err != nil {
log.Err("FileCache::downloadFile : Failed to change times of file %s [%s]", handle.Path, err.Error())
}

fileCacheStatsCollector.UpdateStats(stats_manager.Increment, dlFiles, (int64)(1))
} else {
log.Debug("FileCache::OpenFile : %s will be served from cache", options.Name)
fileCacheStatsCollector.UpdateStats(stats_manager.Increment, cacheServed, (int64)(1))
}

fileCacheStatsCollector.UpdateStats(stats_manager.Increment, dlFiles, (int64)(1))

// Open the file and grab a shared lock to prevent deletion by the cache policy.
f, err = common.OpenFile(localPath, options.Flags, options.Mode)
f, err = common.OpenFile(localPath, flags, fMode)
if err != nil {
log.Err("FileCache::OpenFile : error opening cached file %s [%s]", options.Name, err.Error())
return nil, err
log.Err("FileCache::downloadFile : error opening cached file %s [%s]", handle.Path, err.Error())
return err
}

// Increment the handle count in this lock item as there is one handle open for this now
flock.Inc()

handle := handlemap.NewHandle(options.Name)
inf, err := f.Stat()
if err == nil {
handle.Size = inf.Size()
Expand All @@ -831,16 +830,62 @@ func (fc *FileCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Hand
handle.Flags.Set(handlemap.HandleFlagCached)
}

log.Info("FileCache::OpenFile : file=%s, fd=%d", options.Name, f.Fd())
log.Info("FileCache::downloadFile : file=%s, fd=%d", handle.Path, f.Fd())
handle.SetFileObject(f)

//set boolean in isDownloadNeeded value to signal that the file has been downloaded
handle.RemoveValue("openFileOptions")

return nil
}

// OpenFile: Makes the file available in the local cache for further file operations.
func (fc *FileCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Handle, error) {
log.Trace("FileCache::OpenFile : name=%s, flags=%d, mode=%s", options.Name, options.Flags, options.Mode)

attr, err := fc.NextComponent().GetAttr(internal.GetAttrOptions{Name: options.Name})

// return err in case of authorization permission mismatch
if err != nil && err == syscall.EACCES {
return nil, err
}

fileSize := int64(0)
if attr != nil {
fileSize = int64(attr.Size)
}

if fileSize > 0 {
if fc.diskHighWaterMark != 0 {
currSize, err := common.GetUsage(fc.tmpPath)
if err != nil {
log.Err("FileCache::OpenFile : error getting current usage of cache [%s]", err.Error())
} else {
if (currSize + float64(fileSize)) > fc.diskHighWaterMark {
log.Err("FileCache::OpenFile : cache size limit reached [%f] failed to open %s", fc.maxCacheSize, options.Name)
return nil, syscall.ENOSPC
}
}
}
}

// create handle and set value
handle := handlemap.NewHandle(options.Name)
handle.SetValue("openFileOptions", openFileOptions{flags: options.Flags, fMode: options.Mode})

return handle, nil
}

// CloseFile: Flush the file and invalidate it from the cache.
func (fc *FileCache) CloseFile(options internal.CloseFileOptions) error {
log.Trace("FileCache::CloseFile : name=%s, handle=%d", options.Handle.Path, options.Handle.ID)

// if file has not been interactively read or written to by end user, then there is no cached file to close.
_, found := options.Handle.GetValue("openFileOptions")
if found {
return nil
}

localPath := common.JoinUnixFilepath(fc.tmpPath, options.Handle.Path)

err := fc.FlushFile(internal.FlushFileOptions{Handle: options.Handle}) //nolint
Expand Down Expand Up @@ -891,6 +936,12 @@ func (fc *FileCache) ReadFile(options internal.ReadFileOptions) ([]byte, error)
localPath := common.JoinUnixFilepath(fc.tmpPath, options.Handle.Path)
fc.policy.CacheValid(localPath)

err := fc.downloadFile(options.Handle)
if err != nil {
log.Err("FileCache::ReadFile : error from calling downloadFile for %s", options.Handle.Path)
return nil, err
}

f := options.Handle.GetFileObject()
if f == nil {
log.Err("FileCache::ReadFile : error [couldn't find fd in handle] %s", options.Handle.Path)
Expand Down Expand Up @@ -920,6 +971,11 @@ func (fc *FileCache) ReadInBuffer(options internal.ReadInBufferOptions) (int, er
// The file should already be in the cache since CreateFile/OpenFile was called before and a shared lock was acquired.
// log.Debug("FileCache::ReadInBuffer : Reading %v bytes from %s", len(options.Data), options.Handle.Path)

err := fc.downloadFile(options.Handle)
if err != nil {
return 0, fmt.Errorf("error downloading file %s [%s]", options.Handle.Path, err)
}

f := options.Handle.GetFileObject()
if f == nil {
log.Err("FileCache::ReadInBuffer : error [couldn't find fd in handle] %s", options.Handle.Path)
Expand Down Expand Up @@ -950,6 +1006,11 @@ func (fc *FileCache) WriteFile(options internal.WriteFileOptions) (int, error) {
// The file should already be in the cache since CreateFile/OpenFile was called before and a shared lock was acquired.
//log.Debug("FileCache::WriteFile : Writing %v bytes from %s", len(options.Data), options.Handle.Path)

err := fc.downloadFile(options.Handle)
if err != nil {
return 0, fmt.Errorf("error downloading file for %s [%s]", options.Handle.Path, err)
}

f := options.Handle.GetFileObject()
if f == nil {
log.Err("FileCache::WriteFile : error [couldn't find fd in handle] %s", options.Handle.Path)
Expand Down Expand Up @@ -983,7 +1044,6 @@ func (fc *FileCache) WriteFile(options internal.WriteFileOptions) (int, error) {
if err == nil {
// Mark the handle dirty so the file is written back to storage on FlushFile.
options.Handle.Flags.Set(handlemap.HandleFlagDirty)

} else {
log.Err("FileCache::WriteFile : failed to write %s [%s]", options.Handle.Path, err.Error())
}
Expand Down Expand Up @@ -1255,9 +1315,8 @@ func (fc *FileCache) TruncateFile(options internal.TruncateFileOptions) error {
}
}

var h *handlemap.Handle = nil
var err error = nil

var h *handlemap.Handle
var err error
if options.Size == 0 {
// If size is 0 then no need to download any file we can just create an empty file
h, err = fc.CreateFile(internal.CreateFileOptions{Name: options.Name, Mode: fc.defaultPermission})
Expand All @@ -1267,10 +1326,15 @@ func (fc *FileCache) TruncateFile(options internal.TruncateFileOptions) error {
}
} else {
// If size is not 0 then we need to open the file and then truncate it
// Open will force download if file was not present in local system
// downloadFile will download if file was not present in local system
h, err = fc.OpenFile(internal.OpenFileOptions{Name: options.Name, Flags: os.O_RDWR, Mode: fc.defaultPermission})
if err != nil {
log.Err("FileCache::TruncateFile : Error opening file %s [%s]", options.Name, err.Error())
log.Err("FileCache::TruncateFile : Error calling OpenFile with %s [%s]", options.Name, err.Error())
}

err = fc.downloadFile(h)
if err != nil {
log.Err("FileCache::TruncateFile : Error calling downloadFile with %s [%s]", options.Name, err.Error())
return err
}
}
Expand Down
4 changes: 3 additions & 1 deletion component/file_cache/file_cache_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,11 @@ func (suite *fileCacheLinuxTestSuite) TestChownInCache() {
createHandle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777})
suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: createHandle})
openHandle, _ := suite.fileCache.OpenFile(internal.OpenFileOptions{Name: path, Mode: 0777})
err := suite.fileCache.downloadFile(openHandle)
suite.assert.NoError(err)

// Path should be in the file cache
_, err := os.Stat(suite.cache_path + "/" + path)
_, err = os.Stat(suite.cache_path + "/" + path)
suite.assert.True(err == nil || os.IsExist(err))
// Path should be in fake storage
_, err = os.Stat(suite.fake_storage_path + "/" + path)
Expand Down
Loading

0 comments on commit 3fcfe20

Please sign in to comment.