Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the code framework #79

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions deploy/kubernetes/csi-s3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ spec:
fieldRef:
fieldPath: spec.nodeName
volumeMounts:
- name: plugins-dir
mountPath: /var/lib/kubelet/plugins
mountPropagation: "Bidirectional"
- name: plugin-dir
mountPath: /csi
- name: pods-mount-dir
Expand All @@ -103,6 +106,10 @@ spec:
- name: fuse-device
mountPath: /dev/fuse
volumes:
- name: plugins-dir
hostPath:
path: /var/lib/kubelet/plugins
type: Directory
- name: registration-dir
hostPath:
path: /var/lib/kubelet/plugins_registry/
Expand Down
45 changes: 45 additions & 0 deletions pkg/common/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package common

import (
"hash/fnv"
"sync"

"k8s.io/utils/mount"
)

type KeyMutex struct {
mutexes []sync.RWMutex
size int32
}

func HashToUint32(data []byte) uint32 {
h := fnv.New32a()
h.Write(data)

return h.Sum32()
}

func NewKeyMutex(size int32) *KeyMutex {
return &KeyMutex{
mutexes: make([]sync.RWMutex, size),
size: size,
}
}

func (km *KeyMutex) GetMutex(key string) *sync.RWMutex {
hashed := HashToUint32([]byte(key))
index := hashed % uint32(km.size)

return &km.mutexes[index]
}

// CleanupMountPoint unmounts the given path and deletes the remaining directory
func CleanupMountPoint(mountPath string) error {
mounter := mount.New("")

if err := mount.CleanupMountPoint(mountPath, mounter, true); err != nil {
return err
}

return nil
}
3 changes: 3 additions & 0 deletions pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/golang/glog"

csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"

"github.com/ctrox/csi-s3/pkg/common"
)

type driver struct {
Expand Down Expand Up @@ -66,6 +68,7 @@ func (s3 *driver) newControllerServer(d *csicommon.CSIDriver) *controllerServer
func (s3 *driver) newNodeServer(d *csicommon.CSIDriver) *nodeServer {
return &nodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d),
volumeMutexes: common.NewKeyMutex(32),
}
}

Expand Down
81 changes: 59 additions & 22 deletions pkg/driver/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package driver
import (
"fmt"
"os"
"sync"

"github.com/ctrox/csi-s3/pkg/common"
"github.com/ctrox/csi-s3/pkg/mounter"
"github.com/ctrox/csi-s3/pkg/s3"
"github.com/golang/glog"
Expand All @@ -35,13 +37,15 @@ import (

type nodeServer struct {
*csicommon.DefaultNodeServer

// information about the managed volumes
volumes sync.Map
volumeMutexes *common.KeyMutex
}

func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
stagingTargetPath := req.GetStagingTargetPath()
bucketName, prefix := volumeIDToBucketPrefix(volumeID)

// Check arguments
if req.GetVolumeCapability() == nil {
Expand All @@ -50,9 +54,6 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if len(stagingTargetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "Staging Target path missing in request")
}
if len(targetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
Expand All @@ -79,21 +80,17 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n",
targetPath, deviceID, readOnly, volumeID, attrib, mountFlags)

s3, err := s3.NewClientFromSecret(req.GetSecrets())
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
meta, err := s3.GetFSMeta(bucketName, prefix)
if err != nil {
return nil, err
}
volumeMutex := ns.getVolumeMutex(volumeID)
volumeMutex.Lock()
defer volumeMutex.Unlock()

mounter, err := mounter.New(meta, s3.Config)
if err != nil {
return nil, err
volume, ok := ns.volumes.Load(volumeID)
if !ok {
return nil, status.Error(codes.FailedPrecondition, "volume hasn't been staged yet")
}
if err := mounter.Mount(stagingTargetPath, targetPath); err != nil {
return nil, err

if err := volume.(*Volume).Publish(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

glog.V(4).Infof("s3: volume %s successfuly mounted to %s", volumeID, targetPath)
Expand All @@ -113,10 +110,21 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}

if err := mounter.FuseUnmount(targetPath); err != nil {
volumeMutex := ns.getVolumeMutex(volumeID)
volumeMutex.Lock()
defer volumeMutex.Unlock()

volume, ok := ns.volumes.Load(volumeID)
if !ok {
glog.Warningf("volume %s hasn't been published", volumeID)
return &csi.NodeUnpublishVolumeResponse{}, nil
}

if err := volume.(*Volume).Unpublish(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
glog.V(4).Infof("s3: volume %s has been unmounted.", volumeID)

glog.V(4).Infof("s3: volume %s has been unpublished from %s.", volumeID, targetPath)

return &csi.NodeUnpublishVolumeResponse{}, nil
}
Expand All @@ -139,6 +147,10 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Volume Capability must be provided")
}

volumeMutex := ns.getVolumeMutex(volumeID)
volumeMutex.Lock()
defer volumeMutex.Unlock()

notMnt, err := checkMount(stagingTargetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
Expand All @@ -158,10 +170,15 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
if err != nil {
return nil, err
}
if err := mounter.Stage(stagingTargetPath); err != nil {
return nil, err

volume := NewVolume(volumeID, mounter)
if err := volume.Stage(stagingTargetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

ns.volumes.Store(volumeID, volume)
glog.V(4).Infof("volume %s successfully staged to %s", volumeID, stagingTargetPath)

return &csi.NodeStageVolumeResponse{}, nil
}

Expand All @@ -177,6 +194,22 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}

volumeMutex := ns.getVolumeMutex(volumeID)
volumeMutex.Lock()
defer volumeMutex.Unlock()

volume, ok := ns.volumes.Load(volumeID)
if !ok {
glog.Warningf("volume %s hasn't been staged", volumeID)
return &csi.NodeUnstageVolumeResponse{}, nil
}

if err := volume.(*Volume).Unstage(stagingTargetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
} else {
ns.volumes.Delete(volumeID)
}

return &csi.NodeUnstageVolumeResponse{}, nil
}

Expand All @@ -202,6 +235,10 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
return &csi.NodeExpandVolumeResponse{}, status.Error(codes.Unimplemented, "NodeExpandVolume is not implemented")
}

func (ns *nodeServer) getVolumeMutex(volumeID string) *sync.RWMutex {
return ns.volumeMutexes.GetMutex(volumeID)
}

func checkMount(targetPath string) (bool, error) {
notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath)
if err != nil {
Expand Down
81 changes: 81 additions & 0 deletions pkg/driver/volume.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package driver

import (
"github.com/ctrox/csi-s3/pkg/mounter"
"github.com/golang/glog"
)

type Volume struct {
VolumeId string

// volume's real mount point
stagingTargetPath string

// Target paths to which the volume has been published.
// These paths are symbolic links to the real mount point.
// So multiple pods using the same volume can share a mount.
targetPaths map[string]bool

mounter mounter.Mounter
}

func NewVolume(volumeID string, mounter mounter.Mounter) *Volume {
return &Volume{
VolumeId: volumeID,
mounter: mounter,
targetPaths: make(map[string]bool),
}
}

func (vol *Volume) Stage(stagingTargetPath string) error {
if vol.isStaged() {
return nil
}

if err := vol.mounter.Stage(stagingTargetPath); err != nil {
return err
}

vol.stagingTargetPath = stagingTargetPath
return nil
}

func (vol *Volume) Publish(targetPath string) error {
if err := vol.mounter.Mount(vol.stagingTargetPath, targetPath); err != nil {
return err
}

vol.targetPaths[targetPath] = true
return nil
}

func (vol *Volume) Unpublish(targetPath string) error {
// Check whether the volume is published to the target path.
if _, ok := vol.targetPaths[targetPath]; !ok {
glog.Warningf("volume %s hasn't been published to %s", vol.VolumeId, targetPath)
return nil
}

if err := vol.mounter.Unmount(targetPath); err != nil {
return err
}

delete(vol.targetPaths, targetPath)
return nil
}

func (vol *Volume) Unstage(_ string) error {
if !vol.isStaged() {
return nil
}

if err := vol.mounter.Unstage(vol.stagingTargetPath); err != nil {
return err
}

return nil
}

func (vol *Volume) isStaged() bool {
return vol.stagingTargetPath != ""
}
4 changes: 4 additions & 0 deletions pkg/mounter/goofys.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,7 @@ func (goofys *goofysMounter) Mount(source string, target string) error {
}
return nil
}

func (goofys *goofysMounter) Unmount(target string) error {
return FuseUnmount(target)
}
1 change: 1 addition & 0 deletions pkg/mounter/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Mounter interface {
Stage(stagePath string) error
Unstage(stagePath string) error
Mount(source string, target string) error
Unmount(target string) error
}

const (
Expand Down
4 changes: 4 additions & 0 deletions pkg/mounter/rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,7 @@ func (rclone *rcloneMounter) Mount(source string, target string) error {
os.Setenv("AWS_SECRET_ACCESS_KEY", rclone.secretAccessKey)
return fuseMount(target, rcloneCmd, args)
}

func (rclone *rcloneMounter) Unmount(target string) error {
return FuseUnmount(target)
}
5 changes: 5 additions & 0 deletions pkg/mounter/s3backer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

osexec "os/exec"

"github.com/ctrox/csi-s3/pkg/common"
"github.com/ctrox/csi-s3/pkg/s3"
"github.com/golang/glog"
"k8s.io/mount-utils"
Expand Down Expand Up @@ -96,6 +97,10 @@ func (s3backer *s3backerMounter) Mount(source string, target string) error {
return nil
}

func (s3backer *s3backerMounter) Unmount(target string) error {
return common.CleanupMountPoint(target)
}

func (s3backer *s3backerMounter) mountInit(p string) error {
args := []string{
fmt.Sprintf("--blockSize=%s", s3backerBlockSize),
Expand Down
Loading