diff --git a/common/lock_map.go b/common/lock_map.go index eba0098d6..fa93cb135 100644 --- a/common/lock_map.go +++ b/common/lock_map.go @@ -44,6 +44,8 @@ type LockMapItem struct { exLocked bool mtx sync.Mutex downloadTime time.Time + // track if file is in lazy open state + LazyOpen bool } // Map holding locks for all the files diff --git a/component/azstorage/block_blob.go b/component/azstorage/block_blob.go index bc2bed319..30fea7f70 100644 --- a/component/azstorage/block_blob.go +++ b/component/azstorage/block_blob.go @@ -569,7 +569,7 @@ func (bb *BlockBlob) List(prefix string, marker *string, count int32) ([]*intern var dirList = make(map[string]bool) for _, blobInfo := range listBlob.Segment.BlobItems { blobInfo.Name = bb.getFileName(*blobInfo.Name) - attr := &internal.ObjAttr{} + var attr *internal.ObjAttr if blobInfo.Properties.CustomerProvidedKeySHA256 != nil && *blobInfo.Properties.CustomerProvidedKeySHA256 != "" { log.Trace("BlockBlob::List : blob is encrypted with customer provided key so fetching metadata explicitly using REST") attr, err = bb.getAttrUsingRest(*blobInfo.Name) diff --git a/component/file_cache/cache_policy.go b/component/file_cache/cache_policy.go index 53a2b09d1..1536ba2ea 100644 --- a/component/file_cache/cache_policy.go +++ b/component/file_cache/cache_policy.go @@ -56,8 +56,8 @@ type cachePolicy interface { UpdateConfig(cachePolicyConfig) error - CacheValid(name string) // Mark the file as hit - CachePurge(name string) // Schedule the file for deletion + CacheValid(name string) // Mark the file as hit + CachePurge(name string, flock *common.LockMapItem) // Delete the file from cache IsCached(name string) bool // Whether or not the cache policy considers this file cached diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 4b09a0ab7..a44f27d5f 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -34,6 +34,7 @@ import ( "os" "path/filepath" "runtime" + "sort" "strings" "sync" "syscall" @@ -432,47 +433,6 @@ func isLocalDirEmpty(path string) bool { return err == io.EOF } -// invalidateDirectory: Recursively invalidates a directory in the file cache. -func (fc *FileCache) invalidateDirectory(name string) { - log.Trace("FileCache::invalidateDirectory : %s", name) - - localPath := filepath.Join(fc.tmpPath, name) - // TODO : wouldn't this cause a race condition? a thread might get the lock before we purge - and the file would be non-existent - // WalkDir goes through the tree in lexical order so 'dir' always comes before 'dir/file' - var directoriesToPurge []string - err := filepath.WalkDir(localPath, func(path string, d fs.DirEntry, err error) error { - if err == nil && d != nil { - if !d.IsDir() { - log.Debug("FileCache::invalidateDirectory : removing file %s from cache", path) - fc.policy.CachePurge(path) - } else { - // remember to delete the directory later (after its children) - directoriesToPurge = append(directoriesToPurge, path) - } - } else { - // stat(localPath) failed. err is the one returned by stat - // documentation: https://pkg.go.dev/io/fs#WalkDirFunc - if os.IsNotExist(err) { - log.Info("FileCache::invalidateDirectory : %s does not exist in local cache.", name) - } else if err != nil { - log.Warn("FileCache::invalidateDirectory : %s stat err [%s].", name, err.Error()) - } - } - return nil - }) - - // clean up leftover source directories in reverse order - for i := len(directoriesToPurge) - 1; i >= 0; i-- { - log.Debug("FileCache::invalidateDirectory : removing dir %s from cache", directoriesToPurge[i]) - fc.policy.CachePurge(directoriesToPurge[i]) - } - - if err != nil { - log.Debug("FileCache::invalidateDirectory : Failed to walk directory %s. Here's why: %v", localPath, err) - return - } -} - // Note: The primary purpose of the file cache is to keep track of files that are opened by the user. // So we do not need to support some APIs like Create Directory since the file cache will manage // creating local directories as needed. @@ -481,14 +441,16 @@ func (fc *FileCache) invalidateDirectory(name string) { func (fc *FileCache) DeleteDir(options internal.DeleteDirOptions) error { log.Trace("FileCache::DeleteDir : %s", options.Name) + // The libfuse component only calls DeleteDir on empty directories, so this directory must be empty err := fc.NextComponent().DeleteDir(options) if err != nil { log.Err("FileCache::DeleteDir : %s failed", options.Name) // There is a chance that meta file for directory was not created in which case // rest api delete will fail while we still need to cleanup the local cache for the same + } else { + fc.policy.CachePurge(filepath.Join(fc.tmpPath, options.Name), nil) } - fc.invalidateDirectory(options.Name) return err } @@ -620,7 +582,31 @@ func (fc *FileCache) IsDirEmpty(options internal.IsDirEmptyOptions) bool { func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { log.Trace("FileCache::RenameDir : src=%s, dst=%s", options.Src, options.Dst) - err := fc.NextComponent().RenameDir(options) + // get a list of source objects form both cloud and cache + objectNames, err := fc.listAllObjects(options.Src) + if err != nil { + log.Err("FileCache::RenameDir : %s listAllObjects failed. Here's why: %v", options.Src, err) + return err + } + + // add object destinations, and sort the result + for _, srcName := range objectNames { + dstName := strings.Replace(srcName, options.Src, options.Dst, 1) + objectNames = append(objectNames, dstName) + } + sort.Strings(objectNames) + + // acquire a file lock on each entry (and defer unlock) + var flocks []*common.LockMapItem + for _, objectName := range objectNames { + flock := fc.fileLocks.Get(objectName) + flocks = append(flocks, flock) + flock.Lock() + } + defer unlockAll(flocks) + + // rename the directory in the cloud + err = fc.NextComponent().RenameDir(options) if err != nil { log.Err("FileCache::RenameDir : error %s [%s]", options.Src, err.Error()) return err @@ -639,21 +625,12 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { // get locks sflock := fc.fileLocks.Get(fc.getObjectName(path)) dflock := fc.fileLocks.Get(fc.getObjectName(newPath)) - if path < newPath { - sflock.Lock() - dflock.Lock() - } else { - dflock.Lock() - sflock.Lock() - } // complete local rename err := fc.renameCachedFile(path, newPath, sflock, dflock) if err != nil { // there's really not much we can do to handle the error, so just log it log.Err("FileCache::RenameDir : %s file rename failed. Directory state is inconsistent!", path) } - sflock.Unlock() - dflock.Unlock() } else { log.Debug("FileCache::RenameDir : Creating local destination directory %s", newPath) // create the new directory @@ -681,12 +658,114 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { // clean up leftover source directories in reverse order for i := len(directoriesToPurge) - 1; i >= 0; i-- { log.Debug("FileCache::RenameDir : Removing local directory %s", directoriesToPurge[i]) - fc.policy.CachePurge(directoriesToPurge[i]) + fc.policy.CachePurge(directoriesToPurge[i], nil) } return nil } +func (fc *FileCache) listAllObjects(prefix string) (objectNames []string, err error) { + // get cloud objects + var cloudObjects []string + cloudObjects, err = fc.listCloudObjects(prefix) + if err != nil { + return + } + // get local / cached objects + var localObjects []string + localObjects, err = fc.listCachedObjects(prefix) + if err != nil { + return + } + // combine the lists + objectNames = combineLists(cloudObjects, localObjects) + + return +} + +// recursively list all objects in the container at the given prefix / directory +func (fc *FileCache) listCloudObjects(prefix string) (objectNames []string, err error) { + var done bool + var token string + for !done { + var attrSlice []*internal.ObjAttr + attrSlice, token, err = fc.NextComponent().StreamDir(internal.StreamDirOptions{Name: prefix, Token: token}) + if err != nil { + return + } + for i := len(attrSlice) - 1; i >= 0; i-- { + attr := attrSlice[i] + if !attr.IsDir() { + objectNames = append(objectNames, attr.Path) + } else { + // recurse! + var subdirObjectNames []string + subdirObjectNames, err = fc.listCloudObjects(attr.Path) + if err != nil { + return + } + objectNames = append(objectNames, subdirObjectNames...) + } + } + done = token == "" + } + sort.Strings(objectNames) + return +} + +// recursively list all files in the directory +func (fc *FileCache) listCachedObjects(directory string) (objectNames []string, err error) { + localDirPath := filepath.Join(fc.tmpPath, directory) + walkDirErr := filepath.WalkDir(localDirPath, func(path string, d fs.DirEntry, err error) error { + if err == nil && d != nil { + if !d.IsDir() { + objectName := fc.getObjectName(path) + objectNames = append(objectNames, objectName) + } + } else { + // stat(localPath) failed. err is the one returned by stat + // documentation: https://pkg.go.dev/io/fs#WalkDirFunc + if os.IsNotExist(err) { + // none of the files that were moved actually exist in local storage + log.Info("FileCache::listObjects : %s does not exist in local cache.", directory) + } else if err != nil { + log.Warn("FileCache::listObjects : %s stat err [%v].", directory, err) + } + } + return nil + }) + if walkDirErr != nil && !os.IsNotExist(walkDirErr) { + err = walkDirErr + } + sort.Strings(objectNames) + return +} + +func combineLists(listA, listB []string) []string { + // since both lists are sorted, we can combine the two lists using a double-indexed for loop + combinedList := listA + i := 0 // Index for listA + j := 0 // Index for listB + // Iterate through both lists, adding entries from B that are missing from A + for i < len(listA) && j < len(listB) { + itemA := listA[i] + itemB := listB[j] + if itemA < itemB { + i++ + } else if itemA > itemB { + // we could insert here, but it's probably better to just sort later + combinedList = append(combinedList, itemB) + j++ + } else { + i++ + j++ + } + } + // sort and return + sort.Strings(combinedList) + return combinedList +} + func (fc *FileCache) getObjectName(localPath string) string { relPath, err := filepath.Rel(fc.tmpPath, localPath) if err != nil { @@ -696,6 +775,12 @@ func (fc *FileCache) getObjectName(localPath string) string { return common.NormalizeObjectName(relPath) } +func unlockAll(flocks []*common.LockMapItem) { + for _, flock := range flocks { + flock.Unlock() + } +} + // CreateFile: Create the file in local cache. func (fc *FileCache) CreateFile(options internal.CreateFileOptions) (*handlemap.Handle, error) { //defer exectime.StatTimeCurrentBlock("FileCache::CreateFile")() @@ -707,9 +792,6 @@ func (fc *FileCache) CreateFile(options internal.CreateFileOptions) (*handlemap. // createEmptyFile was added to optionally support immutable containers. If customers do not care about immutability they can set this to true. if fc.createEmptyFile { - // We tried moving CreateFile to a separate thread for better perf. - // However, before it is created in cloud storage, if GetAttr is called, the call will fail since the file - // does not exist in cloud storage yet, failing the whole CreateFile sequence in FUSE. newF, err := fc.NextComponent().CreateFile(options) if err != nil { log.Err("FileCache::CreateFile : Failed to create file %s", options.Name) @@ -757,6 +839,9 @@ func (fc *FileCache) CreateFile(options internal.CreateFileOptions) (*handlemap. handle.Flags.Set(handlemap.HandleFlagDirty) } + // update state + flock.LazyOpen = false + return handle, nil } @@ -796,7 +881,6 @@ func (fc *FileCache) validateStorageError(path string, err error, method string, return nil } -// DeleteFile: Invalidate the file in local cache. func (fc *FileCache) DeleteFile(options internal.DeleteFileOptions) error { log.Trace("FileCache::DeleteFile : name=%s", options.Name) @@ -812,18 +896,24 @@ func (fc *FileCache) DeleteFile(options internal.DeleteFileOptions) error { } localPath := filepath.Join(fc.tmpPath, options.Name) - err = deleteFile(localPath) - if err != nil && !os.IsNotExist(err) { - log.Err("FileCache::DeleteFile : failed to delete local file %s [%s]", localPath, err.Error()) - } + fc.policy.CachePurge(localPath, flock) - fc.policy.CachePurge(localPath) + // update file state + flock.LazyOpen = false return nil } -func (fc *FileCache) downloadFile(handle *handlemap.Handle) error { - log.Trace("FileCache::downloadFile : name=%s", handle.Path) +func openCompleted(handle *handlemap.Handle) bool { + handle.Lock() + defer handle.Unlock() + _, found := handle.GetValue("openFileOptions") + return !found +} + +// the file lock must be acquired before calling this +func (fc *FileCache) openFileInternal(handle *handlemap.Handle, flock *common.LockMapItem) error { + log.Trace("FileCache::openFileInternal : name=%s", handle.Path) handle.Lock() defer handle.Unlock() @@ -842,19 +932,15 @@ func (fc *FileCache) downloadFile(handle *handlemap.Handle) error { localPath := filepath.Join(fc.tmpPath, handle.Path) var f *os.File - flock := fc.fileLocks.Get(handle.Path) - flock.Lock() - defer flock.Unlock() - fc.policy.CacheValid(localPath) downloadRequired, fileExists, attr, err := fc.isDownloadRequired(localPath, handle.Path, flock) - if err != nil { - log.Err("FileCache::downloadFile : Failed to check if download is required for %s [%s]", handle.Path, err.Error()) + if err != nil && !os.IsNotExist(err) { + log.Err("FileCache::openFileInternal : Failed to check if download is required for %s [%s]", handle.Path, err.Error()) } fileMode := fc.defaultPermission if downloadRequired { - log.Debug("FileCache::downloadFile : Need to download %s", handle.Path) + log.Debug("FileCache::openFileInternal : Need to download %s", handle.Path) fileSize := int64(0) if attr != nil { @@ -862,17 +948,17 @@ func (fc *FileCache) downloadFile(handle *handlemap.Handle) error { } if fileExists { - log.Debug("FileCache::downloadFile : Delete cached file %s", handle.Path) + log.Debug("FileCache::openFileInternal : Delete cached file %s", handle.Path) err := deleteFile(localPath) if err != nil && !os.IsNotExist(err) { - log.Err("FileCache::downloadFile : Failed to delete old file %s", handle.Path) + log.Err("FileCache::openFileInternal : Failed to delete old file %s", handle.Path) } } else { // Create the file if if doesn't already exist. err := os.MkdirAll(filepath.Dir(localPath), fc.defaultPermission) if err != nil { - log.Err("FileCache::downloadFile : error creating directory structure for file %s [%s]", handle.Path, err.Error()) + log.Err("FileCache::openFileInternal : error creating directory structure for file %s [%s]", handle.Path, err.Error()) return err } } @@ -880,7 +966,7 @@ func (fc *FileCache) downloadFile(handle *handlemap.Handle) error { // Open the file in write mode. f, err = common.OpenFile(localPath, os.O_CREATE|os.O_RDWR, fMode) if err != nil { - log.Err("FileCache::downloadFile : error creating new file %s [%s]", handle.Path, err.Error()) + log.Err("FileCache::openFileInternal : error creating new file %s [%s]", handle.Path, err.Error()) return err } @@ -900,7 +986,7 @@ func (fc *FileCache) downloadFile(handle *handlemap.Handle) error { }) if err != nil { // File was created locally and now download has failed so we need to delete it back from local cache - log.Err("FileCache::downloadFile : error downloading file from storage %s [%s]", handle.Path, err.Error()) + log.Err("FileCache::openFileInternal : error downloading file from storage %s [%s]", handle.Path, err.Error()) _ = f.Close() _ = os.Remove(localPath) return err @@ -909,8 +995,10 @@ func (fc *FileCache) downloadFile(handle *handlemap.Handle) error { // Update the last download time of this file flock.SetDownloadTime() + // update file state + flock.LazyOpen = false - log.Debug("FileCache::downloadFile : Download of %s is complete", handle.Path) + log.Debug("FileCache::openFileInternal : Download of %s is complete", handle.Path) f.Close() // After downloading the file, update the modified times and mode of the file. @@ -922,7 +1010,7 @@ func (fc *FileCache) downloadFile(handle *handlemap.Handle) error { // If user has selected some non default mode in config then every local file shall be created with that mode only err = os.Chmod(localPath, fileMode) if err != nil { - log.Err("FileCache::downloadFile : Failed to change mode of file %s [%s]", handle.Path, err.Error()) + log.Err("FileCache::openFileInternal : Failed to change mode of file %s [%s]", handle.Path, err.Error()) } // TODO: When chown is supported should we update that? @@ -930,7 +1018,7 @@ func (fc *FileCache) downloadFile(handle *handlemap.Handle) error { // chtimes shall be the last api otherwise calling chmod/chown will update the last change time err = os.Chtimes(localPath, attr.Atime, attr.Mtime) if err != nil { - log.Err("FileCache::downloadFile : Failed to change times of file %s [%s]", handle.Path, err.Error()) + log.Err("FileCache::openFileInternal : Failed to change times of file %s [%s]", handle.Path, err.Error()) } } @@ -939,7 +1027,7 @@ func (fc *FileCache) downloadFile(handle *handlemap.Handle) error { // Open the file and grab a shared lock to prevent deletion by the cache policy. f, err = common.OpenFile(localPath, flags, fMode) if err != nil { - log.Err("FileCache::downloadFile : error opening cached file %s [%s]", handle.Path, err.Error()) + log.Err("FileCache::openFileInternal : error opening cached file %s [%s]", handle.Path, err.Error()) return err } @@ -953,11 +1041,13 @@ func (fc *FileCache) downloadFile(handle *handlemap.Handle) error { handle.Flags.Set(handlemap.HandleFlagCached) } - log.Info("FileCache::downloadFile : file=%s, fd=%d", handle.Path, f.Fd()) + log.Info("FileCache::openFileInternal : file=%s, fd=%d", handle.Path, f.Fd()) handle.SetFileObject(f) //set boolean in isDownloadNeeded value to signal that the file has been downloaded handle.RemoveValue("openFileOptions") + // update file state + flock.LazyOpen = false return nil } @@ -971,19 +1061,17 @@ func (fc *FileCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Hand flock.Lock() defer flock.Unlock() - attr, err := fc.NextComponent().GetAttr(internal.GetAttrOptions{Name: options.Name}) + localPath := filepath.Join(fc.tmpPath, options.Name) + downloadRequired, _, cloudAttr, err := fc.isDownloadRequired(localPath, options.Name, flock) // return err in case of authorization permission mismatch if err != nil && err == syscall.EACCES { return nil, err } - fileSize := int64(0) - if attr != nil { - fileSize = int64(attr.Size) - } - - if fileSize > 0 { + // check if we are running out of space + if downloadRequired && cloudAttr != nil { + fileSize := int64(cloudAttr.Size) if fc.diskHighWaterMark != 0 { currSize, err := common.GetUsage(fc.tmpPath) if err != nil { @@ -1000,15 +1088,25 @@ func (fc *FileCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Hand // create handle and record openFileOptions for later handle := handlemap.NewHandle(options.Name) handle.SetValue("openFileOptions", openFileOptions{flags: options.Flags, fMode: options.Mode}) - if options.Flags&os.O_APPEND != 0 { handle.Flags.Set(handlemap.HandleOpenedAppend) } - // Increment the handle count in this lock item as there is one handle open for this now + // Increment the handle count flock.Inc() - return handle, nil + // will opening the file require downloading it? + var openErr error + if !downloadRequired { + // use the local file to complete the open operation now + openErr = fc.openFileInternal(handle, flock) + } else { + // use a lazy open algorithm to avoid downloading unnecessarily (do nothing for now) + // update file state + flock.LazyOpen = true + } + + return handle, openErr } // CloseFile: Flush the file and invalidate it from the cache. @@ -1041,13 +1139,13 @@ func (fc *FileCache) closeFileInternal(options internal.CloseFileOptions, flock // if file has not been interactively read or written to by end user, then there is no cached file to close. _, noCachedHandle := options.Handle.GetValue("openFileOptions") - err := fc.FlushFile(internal.FlushFileOptions{Handle: options.Handle, CloseInProgress: true}) //nolint - if err != nil { - log.Err("FileCache::closeFileInternal : failed to flush file %s", options.Handle.Path) - return err - } - if !noCachedHandle { + err := fc.flushFileInternal(internal.FlushFileOptions{Handle: options.Handle, CloseInProgress: true}, flock) //nolint + if err != nil { + log.Err("FileCache::closeFileInternal : failed to flush file %s", options.Handle.Path) + return err + } + f := options.Handle.GetFileObject() if f == nil { log.Err("FileCache::closeFileInternal : error [missing fd in handle object] %s", options.Handle.Path) @@ -1063,17 +1161,22 @@ func (fc *FileCache) closeFileInternal(options internal.CloseFileOptions, flock flock.Dec() + // unnecessary but tidy bookkeeping + if noCachedHandle { + //set boolean in isDownloadNeeded value to signal that the file has been downloaded + options.Handle.RemoveValue("openFileOptions") + // was this the only handle? + if flock.Count() == 0 { + // update file state + flock.LazyOpen = false + } + } + // If it is an fsync op then purge the file if options.Handle.Fsynced() { log.Trace("FileCache::closeFileInternal : fsync/sync op, purging %s", options.Handle.Path) localPath := filepath.Join(fc.tmpPath, options.Handle.Path) - - err = deleteFile(localPath) - if err != nil && !os.IsNotExist(err) { - log.Err("FileCache::closeFileInternal : failed to delete local file %s [%s]", localPath, err.Error()) - } - - fc.policy.CachePurge(localPath) + fc.policy.CachePurge(localPath, flock) return nil } @@ -1086,9 +1189,14 @@ func (fc *FileCache) ReadInBuffer(options internal.ReadInBufferOptions) (int, er // The file should already be in the cache since CreateFile/OpenFile was called before and a shared lock was acquired. // log.Debug("FileCache::ReadInBuffer : Reading %v bytes from %s", len(options.Data), options.Handle.Path) - err := fc.downloadFile(options.Handle) - if err != nil { - return 0, fmt.Errorf("error downloading file %s [%s]", options.Handle.Path, err) + if !openCompleted(options.Handle) { + flock := fc.fileLocks.Get(options.Handle.Path) + flock.Lock() + err := fc.openFileInternal(options.Handle, flock) + flock.Unlock() + if err != nil { + return 0, fmt.Errorf("error downloading file %s [%s]", options.Handle.Path, err) + } } f := options.Handle.GetFileObject() @@ -1123,11 +1231,18 @@ func (fc *FileCache) WriteFile(options internal.WriteFileOptions) (int, error) { // The file should already be in the cache since CreateFile/OpenFile was called before and a shared lock was acquired. //log.Debug("FileCache::WriteFile : Writing %v bytes from %s", len(options.Data), options.Handle.Path) - err := fc.downloadFile(options.Handle) - if err != nil { - return 0, fmt.Errorf("error downloading file for %s [%s]", options.Handle.Path, err) + if !openCompleted(options.Handle) { + flock := fc.fileLocks.Get(options.Handle.Path) + flock.Lock() + err := fc.openFileInternal(options.Handle, flock) + flock.Unlock() + if err != nil { + return 0, fmt.Errorf("error downloading file for %s [%s]", options.Handle.Path, err) + } } + var err error + f := options.Handle.GetFileObject() if f == nil { log.Err("FileCache::WriteFile : error [couldn't find fd in handle] %s", options.Handle.Path) @@ -1215,6 +1330,19 @@ func (fc *FileCache) SyncFile(options internal.SyncFileOptions) error { // FlushFile: Flush the local file to storage func (fc *FileCache) FlushFile(options internal.FlushFileOptions) error { + var flock *common.LockMapItem + + // if flush will upload the file, then acquire the file lock + if options.Handle.Dirty() && (!fc.lazyWrite || options.CloseInProgress) { + flock = fc.fileLocks.Get(options.Handle.Path) + flock.Lock() + defer flock.Unlock() + } + + return fc.flushFileInternal(options, flock) +} + +func (fc *FileCache) flushFileInternal(options internal.FlushFileOptions, flock *common.LockMapItem) error { //defer exectime.StatTimeCurrentBlock("FileCache::FlushFile")() log.Trace("FileCache::FlushFile : handle=%d, path=%s", options.Handle.ID, options.Handle.Path) @@ -1317,7 +1445,7 @@ func (fc *FileCache) FlushFile(options internal.FlushFileOptions) error { localPath := filepath.Join(fc.tmpPath, options.Handle.Path) info, err := os.Stat(localPath) if err == nil { - err = fc.Chmod(internal.ChmodOptions{Name: options.Handle.Path, Mode: info.Mode()}) + err = fc.chmodInternal(internal.ChmodOptions{Name: options.Handle.Path, Mode: info.Mode()}, flock) if err != nil { // chmod was missed earlier for this file and doing it now also // resulted in error so ignore this one and proceed for flush handling @@ -1340,6 +1468,13 @@ func (fc *FileCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr // 2. Path not in cloud storage but in local cache (this could happen if we recently created the file [and are currently writing to it]) (also supports immutable containers) // 3. Path in cloud storage and in local cache (this could result in dirty properties on the service if we recently wrote to the file) + // If the file is being downloaded or deleted, the size and mod time will be incorrect + // wait for download or deletion to complete before getting local file info + flock := fc.fileLocks.Get(options.Name) + // TODO: should we add RLock and RUnlock to the lock map for GetAttr? + flock.Lock() + defer flock.Unlock() + // To cover case 1, get attributes from storage var exists bool attrs, err := fc.NextComponent().GetAttr(options) @@ -1357,13 +1492,7 @@ func (fc *FileCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr // To cover cases 2 and 3, grab the attributes from the local cache localPath := filepath.Join(fc.tmpPath, options.Name) - // If the file is being downloaded or deleted, the size and mod time will be incorrect - // wait for download or deletion to complete before getting local file info - flock := fc.fileLocks.Get(options.Name) - flock.Lock() - // TODO: Do we need to call NextComponent().GetAttr in this same critical section to avoid a data race with the cloud? info, err := os.Stat(localPath) - flock.Unlock() // All directory operations are guaranteed to be synced with storage so they cannot be in a case 2 or 3 state. if err == nil && !info.IsDir() { if exists { // Case 3 (file in cloud storage and in local cache) so update the relevant attributes @@ -1396,7 +1525,7 @@ func (fc *FileCache) RenameFile(options internal.RenameFileOptions) error { // acquire file locks sflock := fc.fileLocks.Get(options.Src) dflock := fc.fileLocks.Get(options.Dst) - // prevent deadlock + // always lock files in lexical order to prevent deadlock if options.Src < options.Dst { sflock.Lock() dflock.Lock() @@ -1440,8 +1569,8 @@ func (fc *FileCache) RenameFile(options internal.RenameFileOptions) error { return localRenameErr } - // update any open handles to the file with its new name if sflock.Count() > 0 { + // update any open handles to the file with its new name handlemap.GetHandles().Range(func(key, value any) bool { handle := value.(*handlemap.Handle) if handle.Path == options.Src { @@ -1449,6 +1578,11 @@ func (fc *FileCache) RenameFile(options internal.RenameFileOptions) error { } return true }) + // copy the number of open handles to the new name + for sflock.Count() > 0 { + sflock.Dec() + dflock.Inc() + } } return nil @@ -1475,7 +1609,7 @@ func (fc *FileCache) renameCachedFile(localSrcPath, localDstPath string, sflock, } // delete the source from our cache policy // this will also delete the source file from local storage (if rename failed) - fc.policy.CachePurge(localSrcPath) + fc.policy.CachePurge(localSrcPath, sflock) return nil } @@ -1510,16 +1644,20 @@ func (fc *FileCache) TruncateFile(options internal.TruncateFileOptions) error { } } else { // If size is not 0 then we need to open the file and then truncate it - // downloadFile will download if file was not present in local system h, err = fc.OpenFile(internal.OpenFileOptions{Name: options.Name, Flags: os.O_RDWR, Mode: fc.defaultPermission}) if err != nil { log.Err("FileCache::TruncateFile : Error calling OpenFile with %s [%s]", options.Name, err.Error()) } - - err = fc.downloadFile(h) - if err != nil { - log.Err("FileCache::TruncateFile : Error calling downloadFile with %s [%s]", options.Name, err.Error()) - return err + // openFileInternal will download if file was not present in local system + if !openCompleted(h) { + flock := fc.fileLocks.Get(options.Name) + flock.Lock() + err = fc.openFileInternal(h, flock) + flock.Unlock() + if err != nil { + log.Err("FileCache::TruncateFile : Error calling openFileInternal with %s [%s]", options.Name, err.Error()) + return err + } } } @@ -1545,6 +1683,17 @@ func (fc *FileCache) TruncateFile(options internal.TruncateFileOptions) error { func (fc *FileCache) Chmod(options internal.ChmodOptions) error { log.Trace("FileCache::Chmod : Change mode of path %s", options.Name) + flock := fc.fileLocks.Get(options.Name) + flock.Lock() + defer flock.Unlock() + + return fc.chmodInternal(options, flock) +} + +// Chmod : Update the file with its new permissions +func (fc *FileCache) chmodInternal(options internal.ChmodOptions, flock *common.LockMapItem) error { + log.Trace("FileCache::Chmod : Change mode of path %s", options.Name) + // Update the file in cloud storage err := fc.NextComponent().Chmod(options) err = fc.validateStorageError(options.Name, err, "Chmod", false) @@ -1579,6 +1728,10 @@ func (fc *FileCache) Chmod(options internal.ChmodOptions) error { func (fc *FileCache) Chown(options internal.ChownOptions) error { log.Trace("FileCache::Chown : Change owner of path %s", options.Name) + flock := fc.fileLocks.Get(options.Name) + flock.Lock() + defer flock.Unlock() + // Update the file in cloud storage err := fc.NextComponent().Chown(options) err = fc.validateStorageError(options.Name, err, "Chown", false) diff --git a/component/file_cache/file_cache_linux.go b/component/file_cache/file_cache_linux.go index 4c84cb958..5a89e004f 100644 --- a/component/file_cache/file_cache_linux.go +++ b/component/file_cache/file_cache_linux.go @@ -62,84 +62,69 @@ func newObjAttr(path string, info fs.FileInfo) *internal.ObjAttr { } // isDownloadRequired: Whether or not the file needs to be downloaded to local cache. -func (fc *FileCache) isDownloadRequired(localPath string, blobPath string, flock *common.LockMapItem) (bool, bool, *internal.ObjAttr, error) { - fileExists := false +func (fc *FileCache) isDownloadRequired(localPath string, objectPath string, flock *common.LockMapItem) (bool, bool, *internal.ObjAttr, error) { + cached := false downloadRequired := false lmt := time.Time{} var stat *syscall.Stat_t = nil - // The file is not cached then we need to download - if !fc.policy.IsCached(localPath) { - log.Debug("FileCache::isDownloadRequired : %s not present in local cache policy", localPath) - downloadRequired = true - } - - finfo, err := os.Stat(localPath) - if err == nil { - // The file exists in local cache - // The file needs to be downloaded if the cacheTimeout elapsed (check last change time and last modified time) - fileExists = true + // check if the file exists locally + finfo, statErr := os.Stat(localPath) + if statErr == nil { + // The file does not need to be downloaded as long as it is in the cache policy + fileInPolicyCache := fc.policy.IsCached(localPath) + if fileInPolicyCache { + cached = true + } else { + log.Warn("FileCache::isDownloadRequired : %s exists but is not present in local cache policy", localPath) + } + // gather stat details stat = finfo.Sys().(*syscall.Stat_t) - - // Deciding based on last modified time is not correct. Last modified time is based on the file was last written - // so if file was last written back to container 2 days back then even downloading it now shall represent the same date - // hence immediately after download it will become invalid. It shall be based on when the file was last downloaded. - // We can rely on last change time because once file is downloaded we reset its last mod time (represent same time as - // container on the local disk by resetting last mod time of local disk with utimens) - // and hence last change time on local disk will then represent the download time. - lmt = finfo.ModTime() - if time.Since(finfo.ModTime()).Seconds() > fc.cacheTimeout && - time.Since(time.Unix(stat.Ctim.Sec, stat.Ctim.Nsec)).Seconds() > fc.cacheTimeout { - log.Debug("FileCache::isDownloadRequired : %s not valid as per time checks", localPath) - downloadRequired = true - } - } else if os.IsNotExist(err) { + } else if os.IsNotExist(statErr) { // The file does not exist in the local cache so it needs to be downloaded log.Debug("FileCache::isDownloadRequired : %s not present in local cache", localPath) - downloadRequired = true } else { // Catch all, the file needs to be downloaded - log.Debug("FileCache::isDownloadRequired : error calling stat %s [%s]", localPath, err.Error()) - downloadRequired = true + log.Debug("FileCache::isDownloadRequired : error calling stat %s [%s]", localPath, statErr.Error()) } - if fileExists && flock.Count() > 0 { - // file exists in local cache and there is already an handle open for it - // In this case we can not redownload the file from container - log.Info("FileCache::isDownloadRequired : Need to re-download %s, but skipping as handle is already open", blobPath) - downloadRequired = false + // check if the file is due for a refresh from cloud storage + refreshTimerExpired := fc.refreshSec != 0 && time.Since(flock.DownloadTime()).Seconds() > float64(fc.refreshSec) + + // get cloud attributes + cloudAttr, err := fc.NextComponent().GetAttr(internal.GetAttrOptions{Name: objectPath}) + if err != nil && !os.IsNotExist(err) { + log.Err("FileCache::isDownloadRequired : Failed to get attr of %s [%s]", objectPath, err.Error()) } - err = nil // reset err variable - var attr *internal.ObjAttr = nil - if downloadRequired || - (fc.refreshSec != 0 && time.Since(flock.DownloadTime()).Seconds() > float64(fc.refreshSec)) { - attr, err = fc.NextComponent().GetAttr(internal.GetAttrOptions{Name: blobPath}) - if err != nil { - log.Err("FileCache::isDownloadRequired : Failed to get attr of %s [%s]", blobPath, err.Error()) - } + if !cached && cloudAttr != nil { + downloadRequired = true } - if fc.refreshSec != 0 && !downloadRequired && attr != nil && stat != nil { - // We decided that based on lmt of file file-cache-timeout has not expired - // However, user has configured refresh time then check time has elapsed since last download time of file or not - // If so, compare the lmt of file in local cache and once in container and redownload only if lmt of container is latest. - // If time matches but size does not then still we need to redownlaod the file. - if attr.Mtime.After(lmt) || stat.Size != attr.Size { - // File has not been modified at storage yet so no point in redownloading the file + if cached && refreshTimerExpired && cloudAttr != nil { + // File is not expired, but the user has configured a refresh timer, which has expired. + // Does the cloud have a newer copy? + cloudHasLatestData := cloudAttr.Mtime.After(lmt) || stat.Size != cloudAttr.Size + // Is the local file open? + fileIsOpen := flock.Count() > 0 && !flock.LazyOpen + if cloudHasLatestData && !fileIsOpen { log.Info("FileCache::isDownloadRequired : File is modified in container, so forcing redownload %s [A-%v : L-%v] [A-%v : L-%v]", - blobPath, attr.Mtime, lmt, attr.Size, stat.Size) + objectPath, cloudAttr.Mtime, lmt, cloudAttr.Size, stat.Size) downloadRequired = true - + } else { + // log why we decided not to refresh + if !cloudHasLatestData { + log.Info("FileCache::isDownloadRequired : File in container is not latest, skip redownload %s [A-%v : L-%v]", objectPath, cloudAttr.Mtime, lmt) + } else if fileIsOpen { + log.Info("FileCache::isDownloadRequired : Need to re-download %s, but skipping as handle is already open", objectPath) + } // As we have decided to continue using old file, we reset the timer to check again after refresh time interval flock.SetDownloadTime() - } else { - log.Info("FileCache::isDownloadRequired : File in container is not latest, skip redownload %s [A-%v : L-%v]", blobPath, attr.Mtime, lmt) } } - return downloadRequired, fileExists, attr, err + return downloadRequired, cached, cloudAttr, err } func (fc *FileCache) getAvailableSize() (uint64, error) { diff --git a/component/file_cache/file_cache_linux_test.go b/component/file_cache/file_cache_linux_test.go index 1cf9669a1..e486bd528 100644 --- a/component/file_cache/file_cache_linux_test.go +++ b/component/file_cache/file_cache_linux_test.go @@ -62,7 +62,7 @@ func (suite *fileCacheLinuxTestSuite) SetupTest() { rand := randomString(8) suite.cache_path = common.JoinUnixFilepath(home_dir, "file_cache"+rand) suite.fake_storage_path = common.JoinUnixFilepath(home_dir, "fake_storage"+rand) - defaultConfig := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: 0\n\nloopbackfs:\n path: %s", suite.cache_path, suite.fake_storage_path) + defaultConfig := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: 1\n\nloopbackfs:\n path: %s", suite.cache_path, suite.fake_storage_path) log.Debug(defaultConfig) // Delete the temp directories created @@ -111,23 +111,16 @@ func (suite *fileCacheLinuxTestSuite) cleanupTest() { func (suite *fileCacheLinuxTestSuite) TestChmodNotInCache() { defer suite.cleanupTest() - // Setup + // Setup - create file directly in fake storage path := "file33" - handle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) - suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) - - _, err := os.Stat(suite.cache_path + "/" + path) - for i := 0; i < 1000 && !os.IsNotExist(err); i++ { - time.Sleep(10 * time.Millisecond) - _, err = os.Stat(suite.cache_path + "/" + path) - } - suite.assert.True(os.IsNotExist(err)) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) // Path should be in fake storage suite.assert.FileExists(suite.fake_storage_path + "/" + path) // Chmod - err = suite.fileCache.Chmod(internal.ChmodOptions{Name: path, Mode: os.FileMode(0666)}) + err := suite.fileCache.Chmod(internal.ChmodOptions{Name: path, Mode: os.FileMode(0666)}) suite.assert.NoError(err) // Path in fake storage should be updated @@ -178,7 +171,6 @@ func (suite *fileCacheLinuxTestSuite) TestChmodCase2() { err = suite.fileCache.FlushFile(internal.FlushFileOptions{Handle: createHandle}) suite.assert.NoError(err) - // Path should be in the file cache with old mode (since we failed the operation) info, err := os.Stat(suite.cache_path + "/" + path) suite.assert.NoError(err) suite.assert.EqualValues(info.Mode(), newMode) @@ -187,6 +179,7 @@ func (suite *fileCacheLinuxTestSuite) TestChmodCase2() { suite.assert.NoError(err) // loop until file does not exist - done due to async nature of eviction + time.Sleep(time.Second) _, err = os.Stat(suite.cache_path + "/" + path) for i := 0; i < 1000 && !os.IsNotExist(err); i++ { time.Sleep(10 * time.Millisecond) @@ -206,15 +199,8 @@ func (suite *fileCacheLinuxTestSuite) TestChownNotInCache() { defer suite.cleanupTest() // Setup path := "file36" - handle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) - suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) - - _, err := os.Stat(suite.cache_path + "/" + path) - for i := 0; i < 1000 && !os.IsNotExist(err); i++ { - time.Sleep(10 * time.Millisecond) - _, err = os.Stat(suite.cache_path + "/" + path) - } - suite.assert.True(os.IsNotExist(err)) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) // Path should be in fake storage suite.assert.FileExists(suite.fake_storage_path + "/" + path) @@ -222,7 +208,7 @@ func (suite *fileCacheLinuxTestSuite) TestChownNotInCache() { // Chown owner := os.Getuid() group := os.Getgid() - err = suite.fileCache.Chown(internal.ChownOptions{Name: path, Owner: owner, Group: group}) + err := suite.fileCache.Chown(internal.ChownOptions{Name: path, Owner: owner, Group: group}) suite.assert.NoError(err) // Path in fake storage should be updated diff --git a/component/file_cache/file_cache_test.go b/component/file_cache/file_cache_test.go index db83d409d..312df64db 100644 --- a/component/file_cache/file_cache_test.go +++ b/component/file_cache/file_cache_test.go @@ -99,7 +99,7 @@ func (suite *fileCacheTestSuite) SetupTest() { rand := randomString(8) suite.cache_path = filepath.Join(home_dir, "file_cache"+rand) suite.fake_storage_path = filepath.Join(home_dir, "fake_storage"+rand) - defaultConfig := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: 0\n\nloopbackfs:\n path: %s", suite.cache_path, suite.fake_storage_path) + defaultConfig := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n\nloopbackfs:\n path: %s", suite.cache_path, suite.fake_storage_path) log.Debug(defaultConfig) // Delete the temp directories created @@ -355,11 +355,6 @@ func (suite *fileCacheTestSuite) TestCreateDir() { func (suite *fileCacheTestSuite) TestDeleteDir() { defer suite.cleanupTest() // Setup - // Configure to create empty files so we create the file in cloud storage - createEmptyFile := true - config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n create-empty-file: %t\n\nloopbackfs:\n path: %s", - suite.cache_path, createEmptyFile, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) dir := "dir" path := dir + "/file" @@ -377,8 +372,6 @@ func (suite *fileCacheTestSuite) TestDeleteDir() { // Delete the directory err = suite.fileCache.DeleteDir(internal.DeleteDirOptions{Name: dir}) suite.assert.NoError(err) - // wait for asynchronous deletion - time.Sleep(100 * time.Millisecond) // Directory should not be cached suite.assert.NoDirExists(filepath.Join(suite.cache_path, dir)) } @@ -404,9 +397,12 @@ func (suite *fileCacheTestSuite) TestStreamDirCase1() { // Create files directly in "fake_storage" suite.loopback.CreateDir(internal.CreateDirOptions{Name: name, Mode: 0777}) suite.loopback.CreateDir(internal.CreateDirOptions{Name: subdir, Mode: 0777}) - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file1}) - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file2}) - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file3}) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: file1}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) + handle, _ = suite.loopback.CreateFile(internal.CreateFileOptions{Name: file2}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) + handle, _ = suite.loopback.CreateFile(internal.CreateFileOptions{Name: file3}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) // Read the Directory dir, _, err := suite.fileCache.StreamDir(internal.StreamDirOptions{Name: name}) @@ -463,9 +459,12 @@ func (suite *fileCacheTestSuite) TestStreamDirCase3() { suite.fileCache.CreateFile(internal.CreateFileOptions{Name: file3, Mode: 0777}) suite.fileCache.TruncateFile(internal.TruncateFileOptions{Name: file3, Size: 1024}) // Create the files in fake_storage and simulate different sizes - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file1, Mode: 0777}) // Length is default 0 - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file2, Mode: 0777}) - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file3, Mode: 0777}) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: file1, Mode: 0777}) // Length is default 0 + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) + handle, _ = suite.loopback.CreateFile(internal.CreateFileOptions{Name: file2, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) + handle, _ = suite.loopback.CreateFile(internal.CreateFileOptions{Name: file3, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) // Read the Directory dir, _, err := suite.fileCache.StreamDir(internal.StreamDirOptions{Name: name}) @@ -510,10 +509,12 @@ func (suite *fileCacheTestSuite) TestStreamDirMixed() { suite.fileCache.TruncateFile(internal.TruncateFileOptions{Name: file3, Size: 1024}) // Create the files in fake_storage and simulate different sizes - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file1, Mode: 0777}) // Length is default 0 - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file3, Mode: 0777}) - - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file4, Mode: 0777}) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: file1, Mode: 0777}) // Length is default 0 + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) + handle, _ = suite.loopback.CreateFile(internal.CreateFileOptions{Name: file3, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) + handle, _ = suite.loopback.CreateFile(internal.CreateFileOptions{Name: file4, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) suite.fileCache.TruncateFile(internal.TruncateFileOptions{Name: file4, Size: 1024}) suite.fileCache.TruncateFile(internal.TruncateFileOptions{Name: file4, Size: 0}) @@ -580,13 +581,8 @@ func (suite *fileCacheTestSuite) TestIsDirEmptyFalseInCache() { func (suite *fileCacheTestSuite) TestRenameDir() { defer suite.cleanupTest() - // Setup - // Configure to create empty files so we create the file in cloud storage - createEmptyFile := true - config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n create-empty-file: %t\n\nloopbackfs:\n path: %s", - suite.cache_path, createEmptyFile, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + // Setup src := "src" dst := "dst" err := suite.fileCache.CreateDir(internal.CreateDirOptions{Name: src, Mode: 0777}) @@ -603,8 +599,6 @@ func (suite *fileCacheTestSuite) TestRenameDir() { // Rename the directory err = suite.fileCache.RenameDir(internal.RenameDirOptions{Src: src, Dst: dst}) suite.assert.NoError(err) - // wait for asynchronous deletion - time.Sleep(100 * time.Millisecond) // src directory should not exist in local filesystem suite.assert.NoDirExists(filepath.Join(suite.cache_path, src)) // dst directory should exist and have contents from src @@ -860,30 +854,19 @@ func (suite *fileCacheTestSuite) TestDeleteFileError() { func (suite *fileCacheTestSuite) TestOpenFileNotInCache() { defer suite.cleanupTest() path := "file7" - handle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) testData := "test data" data := []byte(testData) - suite.fileCache.WriteFile(internal.WriteFileOptions{Handle: handle, Offset: 0, Data: data}) - suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) - - // loop until file does not exist - done due to async nature of eviction - _, err := os.Stat(filepath.Join(suite.cache_path, path)) - for i := 0; i < 1000 && !os.IsNotExist(err); i++ { - time.Sleep(10 * time.Millisecond) - _, err = os.Stat(filepath.Join(suite.cache_path, path)) - } - suite.assert.True(os.IsNotExist(err)) + suite.loopback.WriteFile(internal.WriteFileOptions{Handle: handle, Offset: 0, Data: data}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) - handle, err = suite.fileCache.OpenFile(internal.OpenFileOptions{Name: path, Flags: os.O_RDWR, Mode: suite.fileCache.defaultPermission}) - suite.assert.NoError(err) - // Download is required - err = suite.fileCache.downloadFile(handle) + handle, err := suite.fileCache.OpenFile(internal.OpenFileOptions{Name: path, Flags: os.O_RDWR, Mode: suite.fileCache.defaultPermission}) suite.assert.NoError(err) suite.assert.EqualValues(path, handle.Path) suite.assert.False(handle.Dirty()) - // File should exist in cache - suite.assert.FileExists(filepath.Join(suite.cache_path, path)) + // File should not exist in cache + suite.assert.NoFileExists(filepath.Join(suite.cache_path, path)) } func (suite *fileCacheTestSuite) TestOpenFileInCache() { @@ -905,44 +888,35 @@ func (suite *fileCacheTestSuite) TestOpenFileInCache() { suite.assert.FileExists(filepath.Join(suite.cache_path, path)) } -// Tests for GetProperties in OpenFile should be done in E2E tests -// - there is no good way to test it here with a loopback FS without a mock component. - -func (suite *fileCacheTestSuite) TestCloseFile() { +func (suite *fileCacheTestSuite) TestOpenCreateGetAttr() { defer suite.cleanupTest() - // Default is to not create empty files on create file to support immutable storage. - path := "file9" + path := "file8a" - handle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) - // The file is in the cache but not in cloud storage (see TestCreateFileInDirCreateEmptyFile) - - // CloseFile - err := suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) + // we report file does not exist before it is created + attr, err := suite.fileCache.GetAttr(internal.GetAttrOptions{Name: path}) + suite.assert.Nil(attr) + suite.assert.ErrorIs(err, os.ErrNotExist) + // since it does not exist, we allow the file to be created using OpenFile + handle, err := suite.fileCache.OpenFile(internal.OpenFileOptions{Name: path, Flags: os.O_CREATE, Mode: 0777}) suite.assert.NoError(err) - - // loop until file does not exist - done due to async nature of eviction - _, err = os.Stat(filepath.Join(suite.cache_path, path)) - for i := 0; i < 1000 && !os.IsNotExist(err); i++ { - time.Sleep(10 * time.Millisecond) - _, err = os.Stat(filepath.Join(suite.cache_path, path)) - } - suite.assert.True(os.IsNotExist(err)) - - // File should not be in cache - suite.assert.NoFileExists(filepath.Join(suite.cache_path, path)) - // File should be in cloud storage - suite.assert.FileExists(filepath.Join(suite.fake_storage_path, path)) + suite.assert.EqualValues(path, handle.Path) + // we should report that the file exists now + attr, err = suite.fileCache.GetAttr(internal.GetAttrOptions{Name: path}) + suite.assert.NoError(err) + suite.NotNil(attr) } -func (suite *fileCacheTestSuite) TestCloseFileTimeout() { +// Tests for GetProperties in OpenFile should be done in E2E tests +// - there is no good way to test it here with a loopback FS without a mock component. + +func (suite *fileCacheTestSuite) TestCloseFileAndEvict() { defer suite.cleanupTest() - suite.cleanupTest() // teardown the default file cache generated - config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: %d\n\nloopbackfs:\n path: %s", + suite.cleanupTest() + configuration := fmt.Sprintf("file_cache:\n path: %s\n timeout-sec: %d\n\nloopbackfs:\n path: %s", suite.cache_path, minimumFileCacheTimeout, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.setupTestHelper(configuration) path := "file10" - handle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) // The file is in the cache but not in cloud storage (see TestCreateFileInDirCreateEmptyFile) @@ -955,10 +929,11 @@ func (suite *fileCacheTestSuite) TestCloseFileTimeout() { // File should be in cloud storage suite.assert.FileExists(filepath.Join(suite.fake_storage_path, path)) + time.Sleep(minimumFileCacheTimeout * time.Second) // loop until file does not exist - done due to async nature of eviction _, err = os.Stat(filepath.Join(suite.cache_path, path)) - for i := 0; i < (minimumFileCacheTimeout*300) && !os.IsNotExist(err); i++ { - time.Sleep(10 * time.Millisecond) + for i := 0; i < 30*minimumFileCacheTimeout && !os.IsNotExist(err); i++ { + time.Sleep(100 * time.Millisecond) _, err = os.Stat(filepath.Join(suite.cache_path, path)) } @@ -991,14 +966,14 @@ func (suite *fileCacheTestSuite) TestOpenCloseHandleCount() { func (suite *fileCacheTestSuite) TestOpenPreventsEviction() { defer suite.cleanupTest() - // Setup - suite.cleanupTest() // teardown the default file cache generated - config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: %d\n\nloopbackfs:\n path: %s", + + suite.cleanupTest() + configuration := fmt.Sprintf("file_cache:\n path: %s\n timeout-sec: %d\n\nloopbackfs:\n path: %s", suite.cache_path, minimumFileCacheTimeout, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.setupTestHelper(configuration) + // Setup path := "file12" - handle, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) suite.assert.NoError(err) err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) @@ -1012,7 +987,7 @@ func (suite *fileCacheTestSuite) TestOpenPreventsEviction() { suite.assert.NoError(err) // wait until file would be evicted (if not for being opened) - time.Sleep(time.Second * time.Duration(minimumFileCacheTimeout*3)) + time.Sleep(3 * minimumFileCacheTimeout * time.Second) // File should still be in cache suite.assert.FileExists(filepath.Join(suite.cache_path, path)) @@ -1172,7 +1147,8 @@ func (suite *fileCacheTestSuite) TestGetAttrCase1() { // Setup file := "file24" // Create files directly in "fake_storage" - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file, Mode: 0777}) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: file, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) // Read the Directory attr, err := suite.fileCache.GetAttr(internal.GetAttrOptions{Name: file}) @@ -1215,30 +1191,37 @@ func (suite *fileCacheTestSuite) TestGetAttrCase3() { func (suite *fileCacheTestSuite) TestGetAttrCase4() { defer suite.cleanupTest() + + suite.cleanupTest() + configuration := fmt.Sprintf("file_cache:\n path: %s\n timeout-sec: %d\n\nloopbackfs:\n path: %s", + suite.cache_path, minimumFileCacheTimeout, suite.fake_storage_path) + suite.setupTestHelper(configuration) + // Setup file := "file27" // By default createEmptyFile is false, so we will not create these files in cloud storage until they are closed. - createHandle, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: file, Mode: 0777}) + handle, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: file, Mode: 0777}) suite.assert.NoError(err) - suite.assert.NotNil(createHandle) + suite.assert.NotNil(handle) size := (100 * 1024 * 1024) data := make([]byte, size) - written, err := suite.fileCache.WriteFile(internal.WriteFileOptions{Handle: createHandle, Offset: 0, Data: data}) + written, err := suite.fileCache.WriteFile(internal.WriteFileOptions{Handle: handle, Offset: 0, Data: data}) suite.assert.NoError(err) suite.assert.EqualValues(size, written) - err = suite.fileCache.FlushFile(internal.FlushFileOptions{Handle: createHandle}) + err = suite.fileCache.FlushFile(internal.FlushFileOptions{Handle: handle}) suite.assert.NoError(err) - err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: createHandle}) + err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) suite.assert.NoError(err) // Wait file is evicted + time.Sleep(minimumFileCacheTimeout * time.Second) _, err = os.Stat(filepath.Join(suite.cache_path, file)) - for i := 0; i < 2000 && !os.IsNotExist(err); i++ { - time.Sleep(10 * time.Millisecond) + for i := 0; i < 20*minimumFileCacheTimeout && !os.IsNotExist(err); i++ { + time.Sleep(100 * time.Millisecond) _, err = os.Stat(filepath.Join(suite.cache_path, file)) } suite.assert.True(os.IsNotExist(err)) @@ -1269,23 +1252,14 @@ func (suite *fileCacheTestSuite) TestRenameFileNotInCache() { // Setup src := "source1" dst := "destination1" - handle, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0777}) - suite.assert.NoError(err) - err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) - suite.assert.NoError(err) - - _, err = os.Stat(filepath.Join(suite.cache_path, src)) - for i := 0; i < 1000 && !os.IsNotExist(err); i++ { - time.Sleep(10 * time.Millisecond) - _, err = os.Stat(filepath.Join(suite.cache_path, src)) - } - suite.assert.True(os.IsNotExist(err)) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) // Path should be in fake storage suite.assert.FileExists(filepath.Join(suite.fake_storage_path, src)) // RenameFile - err = suite.fileCache.RenameFile(internal.RenameFileOptions{Src: src, Dst: dst}) + err := suite.fileCache.RenameFile(internal.RenameFileOptions{Src: src, Dst: dst}) suite.assert.NoError(err) // Path in fake storage should be updated @@ -1298,11 +1272,9 @@ func (suite *fileCacheTestSuite) TestRenameFileInCache() { // Setup src := "source2" dst := "destination2" - createHandle, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0666}) - suite.assert.NoError(err) - openHandle, err := suite.fileCache.OpenFile(internal.OpenFileOptions{Name: src, Mode: 0666}) + handle, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0666}) suite.assert.NoError(err) - err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: createHandle}) + err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) suite.assert.NoError(err) // Path should be in the file cache @@ -1318,22 +1290,20 @@ func (suite *fileCacheTestSuite) TestRenameFileInCache() { suite.assert.FileExists(filepath.Join(suite.cache_path, dst)) // Dst shall exists in cache suite.assert.NoFileExists(filepath.Join(suite.fake_storage_path, src)) // Src does not exist suite.assert.FileExists(filepath.Join(suite.fake_storage_path, dst)) // Dst does exist - - suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: openHandle}) } func (suite *fileCacheTestSuite) TestRenameFileAndCacheCleanup() { defer suite.cleanupTest() - suite.cleanupTest() - config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: 1\n\nloopbackfs:\n path: %s", - suite.cache_path, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.cleanupTest() + configuration := fmt.Sprintf("file_cache:\n path: %s\n timeout-sec: %d\n\nloopbackfs:\n path: %s", + suite.cache_path, minimumFileCacheTimeout, suite.fake_storage_path) + suite.setupTestHelper(configuration) src := "source4" dst := "destination4" - createHandle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0666}) - suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: createHandle}) + handle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0666}) + suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) // Path should be in the file cache suite.assert.FileExists(suite.cache_path + "/" + src) @@ -1350,11 +1320,16 @@ func (suite *fileCacheTestSuite) TestRenameFileAndCacheCleanup() { suite.assert.NoFileExists(suite.fake_storage_path + "/" + src) // Src does not exist suite.assert.FileExists(suite.fake_storage_path + "/" + dst) // Dst does exist - time.Sleep(500 * time.Millisecond) // Check once before the cache cleanup that file exists suite.assert.FileExists(suite.cache_path + "/" + dst) // Dst shall exists in cache - time.Sleep(1 * time.Second) // Wait for the cache cleanup to occur - suite.assert.FileExists(suite.cache_path + "/" + dst) // Dst shall not exists in cache + // Wait for the cache cleanup to occur + time.Sleep(minimumFileCacheTimeout * time.Second) + _, err = os.Stat(filepath.Join(suite.cache_path, dst)) + for i := 0; i < 20*minimumFileCacheTimeout && !os.IsNotExist(err); i++ { + time.Sleep(100 * time.Millisecond) + _, err = os.Stat(filepath.Join(suite.cache_path, dst)) + } + suite.assert.NoFileExists(filepath.Join(suite.cache_path, dst)) // Dst shall not exists in cache } func (suite *fileCacheTestSuite) TestRenameOpenFileCase1() { @@ -1518,22 +1493,15 @@ func (suite *fileCacheTestSuite) TestTruncateFileNotInCache() { defer suite.cleanupTest() // Setup path := "file30" - handle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) - suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) - - _, err := os.Stat(filepath.Join(suite.cache_path, path)) - for i := 0; i < 1000 && !os.IsNotExist(err); i++ { - time.Sleep(10 * time.Millisecond) - _, err = os.Stat(filepath.Join(suite.cache_path, path)) - } - suite.assert.True(os.IsNotExist(err)) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) // Path should be in fake storage suite.assert.FileExists(filepath.Join(suite.fake_storage_path, path)) // Chmod size := 1024 - err = suite.fileCache.TruncateFile(internal.TruncateFileOptions{Name: path, Size: int64(size)}) + err := suite.fileCache.TruncateFile(internal.TruncateFileOptions{Name: path, Size: int64(size)}) suite.assert.NoError(err) // Path in fake storage should be updated @@ -1545,9 +1513,8 @@ func (suite *fileCacheTestSuite) TestTruncateFileCase3() { defer suite.cleanupTest() // Setup path := "file31" - createHandle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0666}) - openHandle, _ := suite.fileCache.OpenFile(internal.OpenFileOptions{Name: path, Mode: 0666}) - suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: createHandle}) + handle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0666}) + suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) // Path should be in the file cache suite.assert.FileExists(filepath.Join(suite.cache_path, path)) @@ -1563,8 +1530,6 @@ func (suite *fileCacheTestSuite) TestTruncateFileCase3() { suite.assert.EqualValues(info.Size(), size) info, _ = os.Stat(filepath.Join(suite.fake_storage_path, path)) suite.assert.EqualValues(info.Size(), size) - - suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: openHandle}) } func (suite *fileCacheTestSuite) TestTruncateFileCase2() { @@ -1616,7 +1581,7 @@ func (suite *fileCacheTestSuite) TestCachePathSymlink() { defer os.RemoveAll(suite.cache_path) suite.assert.NoError(err) symlinkPath := suite.cache_path + ".lnk" - err = os.Symlink(common.NormalizeObjectName(suite.cache_path), symlinkPath) + err = os.Symlink(suite.cache_path, symlinkPath) defer os.Remove(symlinkPath) suite.assert.NoError(err) configuration := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n\nloopbackfs:\n path: %s", @@ -1641,8 +1606,8 @@ func (suite *fileCacheTestSuite) TestCachePathSymlink() { func (suite *fileCacheTestSuite) TestZZOffloadIO() { defer suite.cleanupTest() - configuration := fmt.Sprintf("file_cache:\n path: %s\n timeout-sec: 0\n\nloopbackfs:\n path: %s", - suite.cache_path, suite.fake_storage_path) + configuration := fmt.Sprintf("file_cache:\n path: %s\n timeout-sec: %d\n\nloopbackfs:\n path: %s", + suite.cache_path, minimumFileCacheTimeout, suite.fake_storage_path) suite.setupTestHelper(configuration) @@ -1657,10 +1622,6 @@ func (suite *fileCacheTestSuite) TestZZOffloadIO() { func (suite *fileCacheTestSuite) TestZZZZLazyWrite() { defer suite.cleanupTest() - configuration := fmt.Sprintf("file_cache:\n path: %s\n timeout-sec: 0\n\nloopbackfs:\n path: %s", - suite.cache_path, suite.fake_storage_path) - - suite.setupTestHelper(configuration) suite.fileCache.lazyWrite = true file := "file101" @@ -1672,12 +1633,17 @@ func (suite *fileCacheTestSuite) TestZZZZLazyWrite() { // As lazy write is enabled flush shall not upload the file suite.assert.True(handle.Dirty()) + // File is uploaded async on close _ = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) - time.Sleep(5 * time.Second) - suite.fileCache.lazyWrite = false + // Wait for the upload + for i := 0; i < 50 && handle.Dirty(); i++ { + time.Sleep(100 * time.Millisecond) + } - // As lazy write is enabled flush shall not upload the file suite.assert.False(handle.Dirty()) + + // cleanup + suite.fileCache.lazyWrite = false } func (suite *fileCacheTestSuite) TestStatFS() { @@ -1709,9 +1675,8 @@ func (suite *fileCacheTestSuite) TestStatFS() { func (suite *fileCacheTestSuite) TestReadFileWithRefresh() { defer suite.cleanupTest() // Configure to create empty files so we create the file in cloud storage - createEmptyFile := true - config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n create-empty-file: %t\n timeout-sec: 1000\n refresh-sec: 1\n\nloopbackfs:\n path: %s", - suite.cache_path, createEmptyFile, suite.fake_storage_path) + config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n refresh-sec: 1\n\nloopbackfs:\n path: %s", + suite.cache_path, suite.fake_storage_path) suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) path := "file42" @@ -1724,26 +1689,26 @@ func (suite *fileCacheTestSuite) TestReadFileWithRefresh() { options := internal.OpenFileOptions{Name: path, Mode: 0777} // Read file once and we shall get the same data - f, err := suite.fileCache.OpenFile(options) + handle, err := suite.fileCache.OpenFile(options) suite.assert.NoError(err) - suite.assert.False(f.Dirty()) - n, err := suite.fileCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: f, Offset: 0, Data: data}) + suite.assert.False(handle.Dirty()) + n, err := suite.fileCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: handle, Offset: 0, Data: data}) suite.assert.NoError(err) suite.assert.Equal(9, n) - err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: f}) + err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) suite.assert.NoError(err) // Modify the file in background but we shall still get the old data byteArr = []byte("test data1") err = os.WriteFile(suite.fake_storage_path+"/"+path, byteArr, 0777) suite.assert.NoError(err) - f, err = suite.fileCache.OpenFile(options) + handle, err = suite.fileCache.OpenFile(options) suite.assert.NoError(err) - suite.assert.False(f.Dirty()) - n, err = suite.fileCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: f, Offset: 0, Data: data}) + suite.assert.False(handle.Dirty()) + n, err = suite.fileCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: handle, Offset: 0, Data: data}) suite.assert.NoError(err) suite.assert.Equal(9, n) - err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: f}) + err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) suite.assert.NoError(err) // Now wait for refresh timeout and we shall get the updated content on next read @@ -1751,13 +1716,13 @@ func (suite *fileCacheTestSuite) TestReadFileWithRefresh() { err = os.WriteFile(suite.fake_storage_path+"/"+path, []byte("test data123456"), 0777) suite.assert.NoError(err) time.Sleep(2 * time.Second) - f, err = suite.fileCache.OpenFile(options) + handle, err = suite.fileCache.OpenFile(options) suite.assert.NoError(err) - suite.assert.False(f.Dirty()) - n, err = suite.fileCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: f, Offset: 0, Data: data}) + suite.assert.False(handle.Dirty()) + n, err = suite.fileCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: handle, Offset: 0, Data: data}) suite.assert.NoError(err) suite.assert.Equal(15, n) - err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: f}) + err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) suite.assert.NoError(err) } @@ -1824,11 +1789,6 @@ func (suite *fileCacheTestSuite) TestHardLimitOnSize() { func (suite *fileCacheTestSuite) TestHandleDataChange() { defer suite.cleanupTest() - // Configure to create empty files so we create the file in cloud storage - createEmptyFile := true - config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n create-empty-file: %t\n timeout-sec: 1000\n refresh-sec: 10\n\nloopbackfs:\n path: %s", - suite.cache_path, createEmptyFile, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) path := "file43" err := os.WriteFile(suite.fake_storage_path+"/"+path, []byte("test data"), 0777) @@ -1838,18 +1798,17 @@ func (suite *fileCacheTestSuite) TestHandleDataChange() { options := internal.OpenFileOptions{Name: path, Flags: os.O_RDONLY, Mode: 0777} // Read file once and we shall get the same data - f, err := suite.fileCache.OpenFile(options) - handlemap.Add(f) + handle, err := suite.fileCache.OpenFile(options) + handlemap.Add(handle) suite.assert.NoError(err) - suite.assert.False(f.Dirty()) - n, err := suite.fileCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: f, Offset: 0, Data: data}) - f, loaded := handlemap.Load(f.ID) + suite.assert.False(handle.Dirty()) + n, err := suite.fileCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: handle, Offset: 0, Data: data}) + handle, loaded := handlemap.Load(handle.ID) suite.assert.True(loaded) suite.assert.NoError(err) suite.assert.Equal(9, n) - err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: f}) + err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) suite.assert.NoError(err) - } // In order for 'go test' to run this suite, we need to create diff --git a/component/file_cache/file_cache_windows.go b/component/file_cache/file_cache_windows.go index f2b45cd79..c3ceff7ef 100644 --- a/component/file_cache/file_cache_windows.go +++ b/component/file_cache/file_cache_windows.go @@ -63,84 +63,67 @@ func newObjAttr(path string, info fs.FileInfo) *internal.ObjAttr { } // isDownloadRequired: Whether or not the file needs to be downloaded to local cache. -func (fc *FileCache) isDownloadRequired(localPath string, blobPath string, flock *common.LockMapItem) (bool, bool, *internal.ObjAttr, error) { - fileExists := false +func (fc *FileCache) isDownloadRequired(localPath string, objectPath string, flock *common.LockMapItem) (bool, bool, *internal.ObjAttr, error) { + cached := false downloadRequired := false lmt := time.Time{} - var stat *syscall.Win32FileAttributeData = nil - // The file is not cached then we need to download - if !fc.policy.IsCached(localPath) { - log.Debug("FileCache::isDownloadRequired : %s not present in local cache policy", localPath) - downloadRequired = true - } - - finfo, err := os.Stat(localPath) - if err == nil { - // The file exists in local cache - // The file needs to be downloaded if the cacheTimeout elapsed (check last change time and last modified time) - fileExists = true - stat = finfo.Sys().(*syscall.Win32FileAttributeData) - - // Deciding based on last modified time is not correct. Last modified time is based on the file was last written - // so if file was last written back to container 2 days back then even downloading it now shall represent the same date - // hence immediately after download it will become invalid. It shall be based on when the file was last downloaded. - // We can rely on last change time because once file is downloaded we reset its last mod time (represent same time as - // container on the local disk by resetting last mod time of local disk with utimens) - // and hence last change time on local disk will then represent the download time. - - lmt = finfo.ModTime() - if time.Since(finfo.ModTime()).Seconds() > fc.cacheTimeout && - time.Since(time.Unix(0, stat.CreationTime.Nanoseconds())).Seconds() > fc.cacheTimeout { - log.Debug("FileCache::isDownloadRequired : %s not valid as per time checks", localPath) - downloadRequired = true + // check if the file exists locally + finfo, statErr := os.Stat(localPath) + if statErr == nil { + // The file does not need to be downloaded as long as it is in the cache policy + fileInPolicyCache := fc.policy.IsCached(localPath) + if fileInPolicyCache { + cached = true + } else { + log.Warn("FileCache::isDownloadRequired : %s exists but is not present in local cache policy", localPath) } - } else if os.IsNotExist(err) { + // gather stat details + lmt = finfo.ModTime() + } else if os.IsNotExist(statErr) { // The file does not exist in the local cache so it needs to be downloaded log.Debug("FileCache::isDownloadRequired : %s not present in local cache", localPath) - downloadRequired = true } else { // Catch all, the file needs to be downloaded - log.Debug("FileCache::isDownloadRequired : error calling stat %s [%s]", localPath, err.Error()) - downloadRequired = true + log.Debug("FileCache::isDownloadRequired : error calling stat %s [%s]", localPath, statErr.Error()) } - if fileExists && flock.Count() > 0 { - // file exists in local cache and there is already an handle open for it - // In this case we can not redownload the file from container - log.Info("FileCache::isDownloadRequired : Need to re-download %s, but skipping as handle is already open", blobPath) - downloadRequired = false + // check if the file is due for a refresh from cloud storage + refreshTimerExpired := fc.refreshSec != 0 && time.Since(flock.DownloadTime()).Seconds() > float64(fc.refreshSec) + + // get cloud attributes + cloudAttr, err := fc.NextComponent().GetAttr(internal.GetAttrOptions{Name: objectPath}) + if err != nil && !os.IsNotExist(err) { + log.Err("FileCache::isDownloadRequired : Failed to get attr of %s [%s]", objectPath, err.Error()) } - err = nil // reset err variable - var attr *internal.ObjAttr = nil - if downloadRequired || - (fc.refreshSec != 0 && time.Since(flock.DownloadTime()).Seconds() > float64(fc.refreshSec)) { - attr, err = fc.NextComponent().GetAttr(internal.GetAttrOptions{Name: blobPath}) - if err != nil { - log.Err("FileCache::isDownloadRequired : Failed to get attr of %s [%s]", blobPath, err.Error()) - } + if !cached && cloudAttr != nil { + downloadRequired = true } - if fc.refreshSec != 0 && !downloadRequired && attr != nil && stat != nil { - // We decided that based on lmt of file file-cache-timeout has not expired - // However, user has configured refresh time then check time has elapsed since last download time of file or not - // If so, compare the lmt of file in local cache and once in container and redownload only if lmt of container is latest. - // If time matches but size does not then still we need to redownlaod the file. - if attr.Mtime.After(lmt) || finfo.Size() != attr.Size { - // File has not been modified at storage yet so no point in redownloading the file + if cached && refreshTimerExpired && cloudAttr != nil { + // File is not expired, but the user has configured a refresh timer, which has expired. + // Does the cloud have a newer copy? + cloudHasLatestData := cloudAttr.Mtime.After(lmt) || finfo.Size() != cloudAttr.Size + // Is the local file open? + fileIsOpen := flock.Count() > 0 && !flock.LazyOpen + if cloudHasLatestData && !fileIsOpen { log.Info("FileCache::isDownloadRequired : File is modified in container, so forcing redownload %s [A-%v : L-%v] [A-%v : L-%v]", - blobPath, attr.Mtime, lmt, attr.Size, finfo.Size()) + objectPath, cloudAttr.Mtime, lmt, cloudAttr.Size, finfo.Size()) downloadRequired = true - + } else { + // log why we decided not to refresh + if !cloudHasLatestData { + log.Info("FileCache::isDownloadRequired : File in container is not latest, skip redownload %s [A-%v : L-%v]", objectPath, cloudAttr.Mtime, lmt) + } else if fileIsOpen { + log.Info("FileCache::isDownloadRequired : Need to re-download %s, but skipping as handle is already open", objectPath) + } // As we have decided to continue using old file, we reset the timer to check again after refresh time interval flock.SetDownloadTime() - } else { - log.Info("FileCache::isDownloadRequired : File in container is not latest, skip redownload %s [A-%v : L-%v]", blobPath, attr.Mtime, lmt) } } - return downloadRequired, fileExists, attr, err + return downloadRequired, cached, cloudAttr, err } func (fc *FileCache) getAvailableSize() (uint64, error) { diff --git a/component/file_cache/file_cache_windows_test.go b/component/file_cache/file_cache_windows_test.go index 0d7dc6a13..f1548aee1 100644 --- a/component/file_cache/file_cache_windows_test.go +++ b/component/file_cache/file_cache_windows_test.go @@ -33,7 +33,6 @@ import ( "os" "strings" "testing" - "time" "github.com/Seagate/cloudfuse/common" "github.com/Seagate/cloudfuse/common/config" @@ -61,7 +60,7 @@ func (suite *fileCacheWindowsTestSuite) SetupTest() { rand := randomString(8) suite.cache_path = common.JoinUnixFilepath(home_dir, "file_cache"+rand) suite.fake_storage_path = common.JoinUnixFilepath(home_dir, "fake_storage"+rand) - defaultConfig := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: 0\n\nloopbackfs:\n path: %s", suite.cache_path, suite.fake_storage_path) + defaultConfig := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: 1\n\nloopbackfs:\n path: %s", suite.cache_path, suite.fake_storage_path) log.Debug(defaultConfig) // Delete the temp directories created @@ -112,15 +111,8 @@ func (suite *fileCacheWindowsTestSuite) TestChownNotInCache() { defer suite.cleanupTest() // Setup path := "file" - handle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) - suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) - - _, err := os.Stat(suite.cache_path + "/" + path) - for i := 0; i < 1000 && !os.IsNotExist(err); i++ { - time.Sleep(10 * time.Millisecond) - _, err = os.Stat(suite.cache_path + "/" + path) - } - suite.assert.True(os.IsNotExist(err)) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) // Path should be in fake storage suite.assert.FileExists(suite.fake_storage_path + "/" + path) @@ -128,7 +120,7 @@ func (suite *fileCacheWindowsTestSuite) TestChownNotInCache() { // Checking that nothing changed with existing files owner := os.Getuid() group := os.Getgid() - err = suite.fileCache.Chown(internal.ChownOptions{Name: path, Owner: owner, Group: group}) + err := suite.fileCache.Chown(internal.ChownOptions{Name: path, Owner: owner, Group: group}) suite.assert.Nil(err) // Path in fake storage should be updated diff --git a/component/file_cache/lru_policy.go b/component/file_cache/lru_policy.go index adba1f867..8373654e6 100644 --- a/component/file_cache/lru_policy.go +++ b/component/file_cache/lru_policy.go @@ -58,9 +58,6 @@ type lruPolicy struct { closeSignal chan int closeSignalValidate chan int - // Channel to contain files that needs to be deleted immediately - deleteEvent chan string - // Channel to contain files that are in use so push them up in lru list validateChan chan string @@ -109,8 +106,6 @@ func (p *lruPolicy) StartPolicy() error { p.closeSignal = make(chan int) p.closeSignalValidate = make(chan int) - - p.deleteEvent = make(chan string, 1000) p.validateChan = make(chan string, 10000) _, err := common.GetUsage(p.tmpPath) @@ -162,11 +157,14 @@ func (p *lruPolicy) CacheValid(name string) { } } -func (p *lruPolicy) CachePurge(name string) { +func (p *lruPolicy) CachePurge(name string, flock *common.LockMapItem) { log.Trace("lruPolicy::CachePurge : %s", name) p.removeNode(name) - p.deleteEvent <- name + err := deleteFile(name) + if err != nil && !os.IsNotExist(err) { + log.Err("lruPolicy::CachePurge : failed to delete local file %s. Here's why: %v", name, err) + } } func (p *lruPolicy) IsCached(name string) bool { @@ -246,11 +244,6 @@ func (p *lruPolicy) clearCache() { for { select { - case name := <-p.deleteEvent: - log.Trace("lruPolicy::Clear-delete") - // we are asked to delete file explicitly - p.deleteItem(name) - case <-p.cacheTimeoutMonitor: log.Trace("lruPolicy::Clear-timeout monitor") // File cache timeout has hit so delete all unused files for past N seconds diff --git a/component/file_cache/lru_policy_test.go b/component/file_cache/lru_policy_test.go index df574719f..a487bb2f4 100644 --- a/component/file_cache/lru_policy_test.go +++ b/component/file_cache/lru_policy_test.go @@ -59,7 +59,7 @@ func (suite *lruPolicyTestSuite) SetupTest() { config := cachePolicyConfig{ tmpPath: cache_path, - cacheTimeout: 0, + cacheTimeout: 1, maxEviction: defaultMaxEviction, maxSizeMB: 0, highThreshold: defaultMaxThreshold, @@ -149,7 +149,7 @@ func (suite *lruPolicyTestSuite) generateNestedDirectory(aPath string) ([]string func (suite *lruPolicyTestSuite) TestDefault() { defer suite.cleanupTest() suite.assert.EqualValues("lru", suite.policy.Name()) - suite.assert.EqualValues(0, suite.policy.cacheTimeout) // cacheTimeout does not change + suite.assert.EqualValues(1, suite.policy.cacheTimeout) // cacheTimeout does not change suite.assert.EqualValues(defaultMaxEviction, suite.policy.maxEviction) suite.assert.EqualValues(0, suite.policy.maxSizeMB) suite.assert.EqualValues(defaultMaxThreshold, suite.policy.highThreshold) @@ -170,7 +170,7 @@ func (suite *lruPolicyTestSuite) TestUpdateConfig() { suite.policy.UpdateConfig(config) suite.assert.NotEqualValues(120, suite.policy.cacheTimeout) // cacheTimeout does not change - suite.assert.EqualValues(0, suite.policy.cacheTimeout) // cacheTimeout does not change + suite.assert.EqualValues(1, suite.policy.cacheTimeout) // cacheTimeout does not change suite.assert.EqualValues(100, suite.policy.maxEviction) suite.assert.EqualValues(10, suite.policy.maxSizeMB) suite.assert.EqualValues(70, suite.policy.highThreshold) @@ -205,22 +205,19 @@ func (suite *lruPolicyTestSuite) TestCachePurge() { // test policy cache data suite.policy.CacheValid("temp") - suite.policy.CachePurge("temp") + suite.policy.CachePurge("temp", nil) n, ok := suite.policy.nodeMap.Load("temp") suite.assert.False(ok) suite.assert.Nil(n) - // test asynchronous file and folder deletion + // test synchronous file and folder deletion // purge all aPaths, in reverse order aPaths, abPaths, acPaths := suite.generateNestedDirectory("temp") for i := len(aPaths) - 1; i >= 0; i-- { - suite.policy.CachePurge(aPaths[i]) + suite.policy.CachePurge(aPaths[i], nil) } - // wait for asynchronous deletions - // in local testing, 1ms was enough - time.Sleep(100 * time.Millisecond) // validate all aPaths were deleted for _, path := range aPaths { suite.assert.NoFileExists(path) @@ -250,42 +247,19 @@ func (suite *lruPolicyTestSuite) TestIsCachedFalse() { func (suite *lruPolicyTestSuite) TestTimeout() { defer suite.cleanupTest() - suite.cleanupTest() - - config := cachePolicyConfig{ - tmpPath: cache_path, - cacheTimeout: 1, - maxEviction: defaultMaxEviction, - maxSizeMB: 0, - highThreshold: defaultMaxThreshold, - lowThreshold: defaultMinThreshold, - fileLocks: &common.LockMap{}, - } - - suite.setupTestHelper(config) suite.policy.CacheValid("temp") - time.Sleep(3 * time.Second) // Wait for time > cacheTimeout, the file should no longer be cached + // Wait for time > cacheTimeout, the file should no longer be cached + for i := 0; i < 300 && suite.policy.IsCached("temp"); i++ { + time.Sleep(10 * time.Millisecond) + } suite.assert.False(suite.policy.IsCached("temp")) } func (suite *lruPolicyTestSuite) TestMaxEvictionDefault() { defer suite.cleanupTest() - suite.cleanupTest() - - config := cachePolicyConfig{ - tmpPath: cache_path, - cacheTimeout: 1, - maxEviction: defaultMaxEviction, - maxSizeMB: 0, - highThreshold: defaultMaxThreshold, - lowThreshold: defaultMinThreshold, - fileLocks: &common.LockMap{}, - } - - suite.setupTestHelper(config) for i := 1; i < 5000; i++ { suite.policy.CacheValid("temp" + fmt.Sprint(i)) diff --git a/component/loopback/loopback_fs.go b/component/loopback/loopback_fs.go index 180b25c07..2b1b8f6c7 100644 --- a/component/loopback/loopback_fs.go +++ b/component/loopback/loopback_fs.go @@ -354,7 +354,7 @@ func (lfs *LoopbackFS) GetAttr(options internal.GetAttrOptions) (*internal.ObjAt info, err := os.Lstat(path) if err != nil { log.Err("LoopbackFS::GetAttr : error [%s]", err) - return &internal.ObjAttr{}, err + return nil, err } attr := &internal.ObjAttr{ Path: options.Name,