From 6ecab4eab90b653635da9edd42331eb03b593f0d Mon Sep 17 00:00:00 2001 From: garenchan Date: Tue, 16 Aug 2022 20:25:59 +0800 Subject: [PATCH] Optimize the code framework --- deploy/kubernetes/csi-s3.yaml | 7 +++ pkg/common/utils.go | 45 +++++++++++++++++++ pkg/driver/driver.go | 3 ++ pkg/driver/nodeserver.go | 81 +++++++++++++++++++++++++---------- pkg/driver/volume.go | 81 +++++++++++++++++++++++++++++++++++ pkg/mounter/goofys.go | 4 ++ pkg/mounter/mounter.go | 1 + pkg/mounter/rclone.go | 4 ++ pkg/mounter/s3backer.go | 5 +++ pkg/mounter/s3fs.go | 43 ++++++++++++++----- 10 files changed, 242 insertions(+), 32 deletions(-) create mode 100644 pkg/common/utils.go create mode 100644 pkg/driver/volume.go diff --git a/deploy/kubernetes/csi-s3.yaml b/deploy/kubernetes/csi-s3.yaml index 7eec63b..b602a5a 100644 --- a/deploy/kubernetes/csi-s3.yaml +++ b/deploy/kubernetes/csi-s3.yaml @@ -95,6 +95,9 @@ spec: fieldRef: fieldPath: spec.nodeName volumeMounts: + - name: plugins-dir + mountPath: /var/lib/kubelet/plugins + mountPropagation: "Bidirectional" - name: plugin-dir mountPath: /csi - name: pods-mount-dir @@ -103,6 +106,10 @@ spec: - name: fuse-device mountPath: /dev/fuse volumes: + - name: plugins-dir + hostPath: + path: /var/lib/kubelet/plugins + type: Directory - name: registration-dir hostPath: path: /var/lib/kubelet/plugins_registry/ diff --git a/pkg/common/utils.go b/pkg/common/utils.go new file mode 100644 index 0000000..ba66976 --- /dev/null +++ b/pkg/common/utils.go @@ -0,0 +1,45 @@ +package common + +import ( + "hash/fnv" + "sync" + + "k8s.io/utils/mount" +) + +type KeyMutex struct { + mutexes []sync.RWMutex + size int32 +} + +func HashToUint32(data []byte) uint32 { + h := fnv.New32a() + h.Write(data) + + return h.Sum32() +} + +func NewKeyMutex(size int32) *KeyMutex { + return &KeyMutex{ + mutexes: make([]sync.RWMutex, size), + size: size, + } +} + +func (km *KeyMutex) GetMutex(key string) *sync.RWMutex { + hashed := HashToUint32([]byte(key)) + index := hashed % uint32(km.size) + + return &km.mutexes[index] +} + +// CleanupMountPoint unmounts the given path and deletes the remaining directory +func CleanupMountPoint(mountPath string) error { + mounter := mount.New("") + + if err := mount.CleanupMountPoint(mountPath, mounter, true); err != nil { + return err + } + + return nil +} diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index c28079b..1b1d5b3 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -21,6 +21,8 @@ import ( "github.com/golang/glog" csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common" + + "github.com/ctrox/csi-s3/pkg/common" ) type driver struct { @@ -66,6 +68,7 @@ func (s3 *driver) newControllerServer(d *csicommon.CSIDriver) *controllerServer func (s3 *driver) newNodeServer(d *csicommon.CSIDriver) *nodeServer { return &nodeServer{ DefaultNodeServer: csicommon.NewDefaultNodeServer(d), + volumeMutexes: common.NewKeyMutex(32), } } diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index 366c4a3..795e18c 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -19,7 +19,9 @@ package driver import ( "fmt" "os" + "sync" + "github.com/ctrox/csi-s3/pkg/common" "github.com/ctrox/csi-s3/pkg/mounter" "github.com/ctrox/csi-s3/pkg/s3" "github.com/golang/glog" @@ -35,13 +37,15 @@ import ( type nodeServer struct { *csicommon.DefaultNodeServer + + // information about the managed volumes + volumes sync.Map + volumeMutexes *common.KeyMutex } func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { volumeID := req.GetVolumeId() targetPath := req.GetTargetPath() - stagingTargetPath := req.GetStagingTargetPath() - bucketName, prefix := volumeIDToBucketPrefix(volumeID) // Check arguments if req.GetVolumeCapability() == nil { @@ -50,9 +54,6 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis if len(volumeID) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } - if len(stagingTargetPath) == 0 { - return nil, status.Error(codes.InvalidArgument, "Staging Target path missing in request") - } if len(targetPath) == 0 { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } @@ -79,21 +80,17 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n", targetPath, deviceID, readOnly, volumeID, attrib, mountFlags) - s3, err := s3.NewClientFromSecret(req.GetSecrets()) - if err != nil { - return nil, fmt.Errorf("failed to initialize S3 client: %s", err) - } - meta, err := s3.GetFSMeta(bucketName, prefix) - if err != nil { - return nil, err - } + volumeMutex := ns.getVolumeMutex(volumeID) + volumeMutex.Lock() + defer volumeMutex.Unlock() - mounter, err := mounter.New(meta, s3.Config) - if err != nil { - return nil, err + volume, ok := ns.volumes.Load(volumeID) + if !ok { + return nil, status.Error(codes.FailedPrecondition, "volume hasn't been staged yet") } - if err := mounter.Mount(stagingTargetPath, targetPath); err != nil { - return nil, err + + if err := volume.(*Volume).Publish(targetPath); err != nil { + return nil, status.Error(codes.Internal, err.Error()) } glog.V(4).Infof("s3: volume %s successfuly mounted to %s", volumeID, targetPath) @@ -113,10 +110,21 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } - if err := mounter.FuseUnmount(targetPath); err != nil { + volumeMutex := ns.getVolumeMutex(volumeID) + volumeMutex.Lock() + defer volumeMutex.Unlock() + + volume, ok := ns.volumes.Load(volumeID) + if !ok { + glog.Warningf("volume %s hasn't been published", volumeID) + return &csi.NodeUnpublishVolumeResponse{}, nil + } + + if err := volume.(*Volume).Unpublish(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } - glog.V(4).Infof("s3: volume %s has been unmounted.", volumeID) + + glog.V(4).Infof("s3: volume %s has been unpublished from %s.", volumeID, targetPath) return &csi.NodeUnpublishVolumeResponse{}, nil } @@ -139,6 +147,10 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Volume Capability must be provided") } + volumeMutex := ns.getVolumeMutex(volumeID) + volumeMutex.Lock() + defer volumeMutex.Unlock() + notMnt, err := checkMount(stagingTargetPath) if err != nil { return nil, status.Error(codes.Internal, err.Error()) @@ -158,10 +170,15 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol if err != nil { return nil, err } - if err := mounter.Stage(stagingTargetPath); err != nil { - return nil, err + + volume := NewVolume(volumeID, mounter) + if err := volume.Stage(stagingTargetPath); err != nil { + return nil, status.Error(codes.Internal, err.Error()) } + ns.volumes.Store(volumeID, volume) + glog.V(4).Infof("volume %s successfully staged to %s", volumeID, stagingTargetPath) + return &csi.NodeStageVolumeResponse{}, nil } @@ -177,6 +194,22 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } + volumeMutex := ns.getVolumeMutex(volumeID) + volumeMutex.Lock() + defer volumeMutex.Unlock() + + volume, ok := ns.volumes.Load(volumeID) + if !ok { + glog.Warningf("volume %s hasn't been staged", volumeID) + return &csi.NodeUnstageVolumeResponse{}, nil + } + + if err := volume.(*Volume).Unstage(stagingTargetPath); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } else { + ns.volumes.Delete(volumeID) + } + return &csi.NodeUnstageVolumeResponse{}, nil } @@ -202,6 +235,10 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV return &csi.NodeExpandVolumeResponse{}, status.Error(codes.Unimplemented, "NodeExpandVolume is not implemented") } +func (ns *nodeServer) getVolumeMutex(volumeID string) *sync.RWMutex { + return ns.volumeMutexes.GetMutex(volumeID) +} + func checkMount(targetPath string) (bool, error) { notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath) if err != nil { diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go new file mode 100644 index 0000000..bb675ca --- /dev/null +++ b/pkg/driver/volume.go @@ -0,0 +1,81 @@ +package driver + +import ( + "github.com/ctrox/csi-s3/pkg/mounter" + "github.com/golang/glog" +) + +type Volume struct { + VolumeId string + + // volume's real mount point + stagingTargetPath string + + // Target paths to which the volume has been published. + // These paths are symbolic links to the real mount point. + // So multiple pods using the same volume can share a mount. + targetPaths map[string]bool + + mounter mounter.Mounter +} + +func NewVolume(volumeID string, mounter mounter.Mounter) *Volume { + return &Volume{ + VolumeId: volumeID, + mounter: mounter, + targetPaths: make(map[string]bool), + } +} + +func (vol *Volume) Stage(stagingTargetPath string) error { + if vol.isStaged() { + return nil + } + + if err := vol.mounter.Stage(stagingTargetPath); err != nil { + return err + } + + vol.stagingTargetPath = stagingTargetPath + return nil +} + +func (vol *Volume) Publish(targetPath string) error { + if err := vol.mounter.Mount(vol.stagingTargetPath, targetPath); err != nil { + return err + } + + vol.targetPaths[targetPath] = true + return nil +} + +func (vol *Volume) Unpublish(targetPath string) error { + // Check whether the volume is published to the target path. + if _, ok := vol.targetPaths[targetPath]; !ok { + glog.Warningf("volume %s hasn't been published to %s", vol.VolumeId, targetPath) + return nil + } + + if err := vol.mounter.Unmount(targetPath); err != nil { + return err + } + + delete(vol.targetPaths, targetPath) + return nil +} + +func (vol *Volume) Unstage(_ string) error { + if !vol.isStaged() { + return nil + } + + if err := vol.mounter.Unstage(vol.stagingTargetPath); err != nil { + return err + } + + return nil +} + +func (vol *Volume) isStaged() bool { + return vol.stagingTargetPath != "" +} diff --git a/pkg/mounter/goofys.go b/pkg/mounter/goofys.go index 0400b2f..b0774ac 100644 --- a/pkg/mounter/goofys.go +++ b/pkg/mounter/goofys.go @@ -74,3 +74,7 @@ func (goofys *goofysMounter) Mount(source string, target string) error { } return nil } + +func (goofys *goofysMounter) Unmount(target string) error { + return FuseUnmount(target) +} diff --git a/pkg/mounter/mounter.go b/pkg/mounter/mounter.go index d441fe1..997b42c 100644 --- a/pkg/mounter/mounter.go +++ b/pkg/mounter/mounter.go @@ -22,6 +22,7 @@ type Mounter interface { Stage(stagePath string) error Unstage(stagePath string) error Mount(source string, target string) error + Unmount(target string) error } const ( diff --git a/pkg/mounter/rclone.go b/pkg/mounter/rclone.go index 038c6aa..8d47c57 100644 --- a/pkg/mounter/rclone.go +++ b/pkg/mounter/rclone.go @@ -57,3 +57,7 @@ func (rclone *rcloneMounter) Mount(source string, target string) error { os.Setenv("AWS_SECRET_ACCESS_KEY", rclone.secretAccessKey) return fuseMount(target, rcloneCmd, args) } + +func (rclone *rcloneMounter) Unmount(target string) error { + return FuseUnmount(target) +} diff --git a/pkg/mounter/s3backer.go b/pkg/mounter/s3backer.go index 74cb3e2..ec716fb 100644 --- a/pkg/mounter/s3backer.go +++ b/pkg/mounter/s3backer.go @@ -8,6 +8,7 @@ import ( osexec "os/exec" + "github.com/ctrox/csi-s3/pkg/common" "github.com/ctrox/csi-s3/pkg/s3" "github.com/golang/glog" "k8s.io/mount-utils" @@ -96,6 +97,10 @@ func (s3backer *s3backerMounter) Mount(source string, target string) error { return nil } +func (s3backer *s3backerMounter) Unmount(target string) error { + return common.CleanupMountPoint(target) +} + func (s3backer *s3backerMounter) mountInit(p string) error { args := []string{ fmt.Sprintf("--blockSize=%s", s3backerBlockSize), diff --git a/pkg/mounter/s3fs.go b/pkg/mounter/s3fs.go index 9649388..b58f866 100644 --- a/pkg/mounter/s3fs.go +++ b/pkg/mounter/s3fs.go @@ -5,6 +5,9 @@ import ( "os" "path" + "k8s.io/utils/mount" + + "github.com/ctrox/csi-s3/pkg/common" "github.com/ctrox/csi-s3/pkg/s3" ) @@ -30,27 +33,47 @@ func newS3fsMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) { } func (s3fs *s3fsMounter) Stage(stageTarget string) error { - return nil -} - -func (s3fs *s3fsMounter) Unstage(stageTarget string) error { - return nil -} - -func (s3fs *s3fsMounter) Mount(source string, target string) error { if err := writes3fsPass(s3fs.pwFileContent); err != nil { return err } args := []string{ fmt.Sprintf("%s:/%s", s3fs.meta.BucketName, path.Join(s3fs.meta.Prefix, s3fs.meta.FSPath)), - target, + stageTarget, "-o", "use_path_request_style", "-o", fmt.Sprintf("url=%s", s3fs.url), "-o", fmt.Sprintf("endpoint=%s", s3fs.region), "-o", "allow_other", "-o", "mp_umask=000", } - return fuseMount(target, s3fsCmd, args) + return fuseMount(stageTarget, s3fsCmd, args) +} + +func (s3fs *s3fsMounter) Unstage(stageTarget string) error { + if err := FuseUnmount(stageTarget); err != nil { + return err + } + + if err := os.Remove(stageTarget); err != nil && !os.IsNotExist(err) { + return err + } + + return nil +} + +func (s3fs *s3fsMounter) Mount(source string, target string) error { + mounter := mount.New("") + // Use bind mount to create an alias of the real mount point. + mountOptions := []string{"bind"} + + if err := mounter.Mount(source, target, "", mountOptions); err != nil { + return err + } + + return nil +} + +func (s3fs *s3fsMounter) Unmount(target string) error { + return common.CleanupMountPoint(target) } func writes3fsPass(pwFileContent string) error {