diff --git a/README.md b/README.md
index c9ee02bfd1..d75178c462 100644
--- a/README.md
+++ b/README.md
@@ -125,16 +125,48 @@ The [`test/setup-ca-kubernetes.sh`](test/setup-ca-kubernetes.sh) script shows ho
A production deployment can improve upon that by using some other key delivery mechanism, like for example [Vault](https://www.vaultproject.io/).
-### Dynamic provisioning
-
-The following diagram illustrates how the PMEM-CSI driver performs dynamic volume provisioning in Kubernetes:
-![sequence diagram](/docs/images/sequence/pmem-csi-sequence-diagram.png)
+### Volume Persistency
+
+In a typical CSI deployment, volumes are provided by a storage backend that is independent of a particular node. When a node goes offline, the volume can be mounted elsewhere. But PMEM volumes are *local* to node and thus can only be used on the node where they were created. This means the applications using PMEM volume cannot freely move between nodes. This limitation needs to be considered when designing and deploying applications that are to use *local storage*.
+
+Below are the volume persistency models considered for implementation in PMEM-CSI to serve different application use cases:
+
+* Persistent Volumes
+A volume gets created independently of the application, on some node where there is enough free space. Applications using such a volume are then forced to run on that node and cannot run when the node is down. Data is retained until the volume gets deleted.
+
+* Ephemeral Volumes
+Each time an application starts to run on a node, a new volume is created for it on that node. When the application stops, the volume is deleted. The volume cannot be shared with other applications. Data on this volume is retained only while the application runs.
+
+* Cache Volumes
+Volumes are pre-created on a certain set of nodes, each with its own local data. Applications are started on those nodes and then get to use the volume on their node. Data persists across application restarts. This is useful when the data is only cached information that can be discarded and reconstructed at any time *and* the application can reuse existing local data when restarting.
+
+Volume | Kubernetes | PMEM-CSI | Limitations
+--- | --- | --- | ---
+Persistent | supported | supported | topology aware scheduling1
+Ephemeral | [in design](https://github.com/kubernetes/enhancements/blob/master/keps/sig-storage/20190122-csi-inline-volumes.md#proposal) | in design | topology aware scheduling1, resource constraints2
+Cache | supported | supported | topology aware scheduling1
+
+1 [Topology aware scheduling](https://github.com/kubernetes/enhancements/issues/490)
+ensures that an application runs on a node where the volume was created. For CSI-based drivers like PMEM-CSI, Kubernetes >= 1.13 is needed. On older Kubernetes releases, pods must be scheduled manually onto the right node(s).
+
+2 The upstream design for ephemeral volumes currently does not take [resource constraints](https://github.com/kubernetes/enhancements/pull/716#discussion_r250536632) into account. If an application gets scheduled onto a node and then creating the ephemeral volume on that node fails, the application on the node cannot start until resources become available.
+
+#### Usage on Kubernetes
+
+Kubernetes cluster administrators can expose above mentioned [volume persistency types](#volume-persistency) to applications using [`StorageClass Parameters`](https://kubernetes.io/docs/concepts/storage/storage-classes/#parameters). An optional `persistencyModel` parameter differentiates how the provisioned volume can be used.
+
+* if no `persistencyModel` parameter specified in `StorageClass` then it is treated as normal Kubernetes persistent volume. In this case PMEM-CSI creates PMEM volume on a node and the application that claims to use this volume is supposed to be scheduled onto this node by Kubernetes. Choosing of node is depend on StorageClass `volumeBindingMode`. In case of `volumeBindingMode: Immediate` PMEM-CSI chooses a node randomly, and in case of `volumeBindingMode: WaitForFirstConsumer` Kubernetes first chooses a node for scheduling the application, and PMEM-CSI creates the volume on that node. Applications which claim a normal persistent volume has to use `ReadOnlyOnce` access mode in its `accessModes` list. This [diagram](/docs/images/sequence/pmem-csi-persistent-sequence-diagram.png) illustrates how a normal persistent volume gets provisioned in Kubernetes using PMEM-CSI driver.
+
+* `persistencyModel: cache`
+Volumes of this type shall be used in combination with `volumeBindingMode: Immediate`. In this case, PMEM-CSI creates a set of PMEM volumes each volume on different node. The number of PMEM volumes to create can be specified by `cacheSize` StorageClass parameter. Applications which claim a `cache` volume can use `ReadWriteMany` in its `accessModes` list. Check with provided [cache StorageClass](deploy/kubernetes-1.13/pmem-storageclass-cache.yaml) example. This [diagram](/docs/images/sequence/pmem-csi-cache-sequence-diagram.png) illustrates how a cache volume gets provisioned in Kubernetes using PMEM-CSI driver.
+**NOTE**: Cache volumes are local to node not Pod. If two Pods using the same cache volume runs on the same node, will not get their own local volume, instead they endup sharing the same PMEM volume. Applications has to consider this and use available Kubernetes mechanisms like [node aniti-affinity](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity) while deploying. Check with provided [cache application](deploy/kubernetes-1.13/pmem-app-cache.yaml) example.
+
## Prerequisites
### Software required
@@ -160,7 +192,7 @@ The driver does not create persistent memory Regions, but expects Regions to exi
PMEM-CSI driver implements CSI specification version 1.0.0, which only supported by Kubernetes versions >= v1.13. The driver deployment in Kubernetes cluster has been verified on:
-| Branch | Kubernetes branch/version | Required Alfa feature-gates |
+| Branch | Kubernetes branch/version | Required alfa feature-gates |
|-------------------|--------------------------------|---------------------------- |
| devel | Kubernetes 1.13 | CSINodeInfo, CSIDriverRegistry |
diff --git a/deploy/kubernetes-1.13/pmem-app-cache.yaml b/deploy/kubernetes-1.13/pmem-app-cache.yaml
new file mode 100644
index 0000000000..db2607052f
--- /dev/null
+++ b/deploy/kubernetes-1.13/pmem-app-cache.yaml
@@ -0,0 +1,42 @@
+apiVersion: apps/v1beta2
+kind: ReplicaSet
+metadata:
+ name: my-csi-app
+spec:
+ selector:
+ matchLabels:
+ app: my-csi-app
+ replicas: 2
+ template:
+ metadata:
+ labels:
+ app: my-csi-app
+ spec:
+ # make sure that no two Pods run on same node
+ affinity:
+ podAntiAffinity:
+ requiredDuringSchedulingIgnoredDuringExecution:
+ - labelSelector:
+ matchExpressions:
+ - key: app
+ operator: In
+ values: [ my-csi-app ]
+ topologyKey: "kubernetes.io/hostname"
+ containers:
+ - name: my-frontend
+ image: busybox
+ command: [ "/bin/sh" ]
+ args: [ "-c", "touch /data/$(POD_NAME); sleep 100000" ]
+ env:
+ - name: POD_NAME
+ valueFrom:
+ fieldRef:
+ apiVersion: v1
+ fieldPath: metadata.name
+ volumeMounts:
+ - mountPath: "/data"
+ name: my-csi-volume
+ volumes:
+ - name: my-csi-volume
+ persistentVolumeClaim:
+ claimName: pmem-csi-pvc-cache
diff --git a/deploy/kubernetes-1.13/pmem-pvc-cache.yaml b/deploy/kubernetes-1.13/pmem-pvc-cache.yaml
new file mode 100644
index 0000000000..2dfd2eeda6
--- /dev/null
+++ b/deploy/kubernetes-1.13/pmem-pvc-cache.yaml
@@ -0,0 +1,11 @@
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+ name: pmem-csi-pvc-cache
+spec:
+ accessModes:
+ - ReadWriteMany # cache volumes are multi-node volumes
+ resources:
+ requests:
+ storage: 8Gi
+ storageClassName: pmem-csi-sc-cache # defined in pmem-storageclass-cache.yaml
diff --git a/deploy/kubernetes-1.13/pmem-storageclass-cache.yaml b/deploy/kubernetes-1.13/pmem-storageclass-cache.yaml
new file mode 100644
index 0000000000..58b3a4b208
--- /dev/null
+++ b/deploy/kubernetes-1.13/pmem-storageclass-cache.yaml
@@ -0,0 +1,10 @@
+apiVersion: storage.k8s.io/v1
+kind: StorageClass
+metadata:
+ name: pmem-csi-sc-cache
+provisioner: pmem-csi
+reclaimPolicy: Delete
+volumeBindingMode: Immediate
+parameters:
+ persistencyModel: cache
+ cacheSize: "2"
diff --git a/docs/diagrams/sequence-cache.wsd b/docs/diagrams/sequence-cache.wsd
new file mode 100644
index 0000000000..29b941251b
--- /dev/null
+++ b/docs/diagrams/sequence-cache.wsd
@@ -0,0 +1,170 @@
+@startuml "pmem-csi-cache-sequence-diagram"
+
+title \nDynamic provisioning of pmem-csi "cache" volume\n
+
+skinparam BoxPadding 40
+
+actor Admin as admin #red
+actor User as user
+entity Kubernetes as k8s
+box "Master node"
+entity kubelet as masterkubelet
+participant "external-provisioner" as provisioner
+participant "external-attacher" as attacher
+participant "pmem-csi-driver" as masterdriver
+endbox
+
+box "Compute node X"
+entity kubelet as nodekubeletX
+participant "pmem-csi-driver" as nodedriverX
+endbox
+
+box "Compute node Y"
+entity kubelet as nodekubeletY
+participant "pmem-csi-driver" as nodedriverY
+endbox
+
+== Driver setup ==
+admin->k8s:Label nvdimm nodes: storage=nvdimm
+k8s->admin
+
+' deploy driver
+admin->k8s:deploy driver\nkubectl create -f pmem-csi.yaml
+k8s->admin
+k8s->masterkubelet:start driver pod
+masterkubelet-->provisioner:start container
+masterkubelet-->attacher:start container
+masterkubelet-->masterdriver:start container
+note right of masterdriver
+ listen on tcp port 10000
+end note
+k8s-->nodekubeletX:start driver pod
+nodekubeletX-->nodedriverX:start container
+note left of nodedriverX
+ * prepare logical volume groups
+ * listen on port 10001
+ * listen on unix socket:
+ /var/lib/kubelet/plugins/pmem-csi/csi.sock
+end note
+nodedriverX->masterdriver:RegistryServer.RegisterNodeController(\n{nodeId:"node-x", endpoint:"http://ip:10001"})
+
+k8s-->nodekubeletY:start driver pod
+nodekubeletY-->nodedriverY:start container
+note left of nodedriverY
+ * prepare logical volume groups
+ * listen on port 10001
+ * listen on unix socket:
+ /var/lib/kubelet/plugins/pmem-csi/csi.sock
+end note
+nodedriverY->masterdriver:RegistryServer.RegisterNodeController(\n{nodeId:"node-y", endpoint:"http://ip:10001"})
+
+' install a storage class
+admin->k8s:create StorageClass\nkubectl create -f pmem-storageclass-cache.yaml
+note left of k8s
+ metadata:
+ name: pmem-csi-sc-cache
+ volumeBindingMode: Immediate
+ paramters:
+ persistencyModel: cache
+ cacheSize: "2"
+end note
+k8s->admin
+
+' provision a cache volume
+== Volume provisioning ==
+admin->k8s:create PVC object\nkubectl create -f pmem-pvc-cache.yaml
+note left of k8s
+ metatdata:
+ name: pmem-csi-pvc-cache
+ spec:
+ storageClassName: pmem-csi-sc-cache
+end note
+k8s->admin
+k8s-->provisioner:<>\nPersistentVolumeClaim created
+activate provisioner
+provisioner->masterdriver:CSI.Controller.CreateVolume()
+masterdriver->nodedriverX:csi.Controller.CreateVolume()
+nodedriverX->nodedriverX:create pmem volume
+nodedriverX->masterdriver:success
+masterdriver->nodedriverY:csi.Controller.CreateVolume()
+nodedriverY->nodedriverY:create pmem volume
+nodedriverY->masterdriver:success
+masterdriver->provisioner:success
+note left of masterdriver
+ prepare Topology information:
+ Volume{
+ accessible_topology: [
+ segments:{ "pmem-csi.intel.com/node":"node-x"},
+ segments:{ "pmem-csi.intel.com/node":"node-y"} ]
+ }
+end note
+provisioner->k8s:Create PV object
+deactivate provisioner
+
+== Volume usage ==
+' Start an application
+user->k8s:Create application pod
+note left of k8s
+ volumes:
+ - name: my-csi-volume
+ persistentVolumeClaim:
+ claimName: pmem-csi-pvc-cache
+end note
+
+k8s->user:success
+
+k8s->nodekubeletX:schedules pod on node-x
+note right of k8s
+ Kubernetes is might choose node-x or node-y.
+end note
+
+k8s-->nodekubeletX:make available volume to pod
+nodekubeletX->nodedriverX:csi.Node.StageVolume()
+activate nodedriverX
+nodedriverX->nodedriverX:mount pmem device
+nodedriverX->nodekubeletX:success
+deactivate nodedriverX
+
+nodekubeletX->nodedriverX:csi.Node.PublishVolume()
+activate nodedriverX
+nodedriverX->nodedriverX:bind mount pmem device
+nodedriverX->nodekubeletX:success
+deactivate nodedriverX
+
+' deprovision a cache volume
+== Volume Deletion ==
+' stop pod
+user->k8s:stop applicaiton pod
+k8s->user:success
+k8s->nodekubeletX:stop pod containers
+
+nodekubeletX->nodedriverX:csi.Node.UnPublishVolume()
+activate nodedriverX
+nodedriverX->nodedriverX:unmout pod's bind mount
+nodedriverX->nodekubeletX:success
+deactivate nodedriverX
+
+nodekubeletX->nodedriverX:csi.Node.UnstageVolume()
+activate nodedriverX
+nodedriverX->nodedriverX:unmount pmem device
+nodedriverX->nodekubeletX:success
+deactivate nodedriverX
+
+'''''''''''''''''''''''''''
+admin->k8s:Delete PVC object\nkubectl delete pvc pmem-pvc-cache
+k8s->admin
+k8s-->provisioner:<>\nPersistentVolumeClaim deleted
+activate provisioner
+provisioner->masterdriver:CSI.Controller.DeleteVolume()
+masterdriver->nodedriverX:csi.Controller.DeleteVolume()
+nodedriverX->nodedriverX:delete pmem volume
+nodedriverX->masterdriver:success
+masterdriver->nodedriverY:csi.Controller.DeleteVolume()
+nodedriverY->nodedriverY:delete pmem volume
+nodedriverY->masterdriver:success
+masterdriver->provisioner:success
+provisioner->k8s:Delete PV object
+deactivate provisioner
+
+
+@enduml
diff --git a/docs/diagrams/sequence.wsd b/docs/diagrams/sequence.wsd
index c89bbc219c..5f8e9dc705 100644
--- a/docs/diagrams/sequence.wsd
+++ b/docs/diagrams/sequence.wsd
@@ -1,6 +1,6 @@
-@startuml "pmem-csi-sequence-diagram"
+@startuml "pmem-csi-persistent-sequence-diagram"
-title \nDynamic volume provisioning with pmem-csi driver\n
+title \nDynamic provisioning of a pmem-csi persistent volume\n
skinparam BoxPadding 40
@@ -42,39 +42,45 @@ note left of nodedriver
end note
nodedriver->masterdriver:RegistryServer.RegisterNodeController(\n{nodeId:"node-xyz", endpoint:"http://ip:10001"})
+' install a storage class
+admin->k8s:create StorageClass\nkubectl create -f pmem-storageclass.yaml
+note left of k8s
+ metadata:
+ name: pmem-csi-sc
+ volumeBindingMode: Immediate
+end note
+k8s->admin
+
== Volume provisioning ==
admin->k8s:create PVC object\nkubectl create -f pmem-pvc.yaml
k8s->admin
k8s-->provisioner:<>\nPersistentVolumeClaim created
activate provisioner
provisioner->masterdriver:CSI.Controller.CreateVolume()
-note right of masterdriver
- * gets available pmem capacity at each worker node
- * does the sanity check if volume can be created with requested volume size
- * prepare Topology information with node ids which have enough capacity
- {accessible_topology: {segments:{key:"kubernetes.io/hostname" value:"node-x"}}
-end note
-masterdriver->nodedriver:csi.Controller.GetCapacity()
-nodedriver->masterdriver:{capacity: xxx}
+masterdriver->nodedriver:csi.Controller.CreateVolume()
+nodedriver->nodedriver:create pmem volume
+nodedriver->masterdriver:success
masterdriver->provisioner:success
+note left of masterdriver
+ prepare Topology information:
+ Volume{
+ accessible_topology: [ segments:{ "pmem-csi.intel.com/node":"node-xyz"} ]
+ }
+end note
provisioner->k8s:Create PV object
deactivate provisioner
== Volume usage ==
user->k8s:Create application pod
-k8s->user
-
-k8s-->attacher:<>\nVolumeAttachment object create
-activate attacher
-attacher-->masterdriver:csi.Controller.ControllerPublishVolume()
-activate masterdriver
-masterdriver-->nodedriver:csi.Controller.ControllerPublishVolume()
-nodedriver->nodedriver:create pmem device
-nodedriver->masterdriver:success
-masterdriver->attacher:success
-attacher->k8s:volume attached
-deactivate masterdriver
-deactivate attacher
+note left of k8s
+ volumes:
+ - name: my-csi-volume
+ persistentVolumeClaim:
+ claimName: pmem-csi-pvc
+end note
+k8s->user:success
+
+k8s->nodekubelet:schedules pod on node-xyz
k8s-->nodekubelet:make available volume to pod
nodekubelet->nodedriver:csi.Node.StageVolume()
@@ -89,4 +95,39 @@ nodedriver->nodedriver:bind mount pmem device
nodedriver->nodekubelet:success
deactivate nodedriver
+' deprovision a cache volume
+== Volume Deletion ==
+' stop pod
+user->k8s:stop applicaiton pod
+k8s->user:success
+k8s->nodekubelet:stop pod containers
+
+nodekubelet->nodedriver:csi.Node.UnPublishVolume()
+activate nodedriver
+nodedriver->nodedriver:unmout pod's bind mount
+nodedriver->nodekubelet:success
+deactivate nodedriver
+
+nodekubelet->nodedriver:csi.Node.UnstageVolume()
+activate nodedriver
+nodedriver->nodedriver:unmount pmem device
+nodedriver->nodekubelet:success
+deactivate nodedriver
+
+'''''''''''''''''''''''''''
+admin->k8s:Delete PVC object\nkubectl delete pvc pmem-pvc-cache
+k8s->admin
+k8s-->provisioner:<>\nPersistentVolumeClaim deleted
+activate provisioner
+provisioner->masterdriver:CSI.Controller.DeleteVolume()
+masterdriver->nodedriver:csi.Controller.DeleteVolume()
+nodedriver->nodedriver:delete pmem volume
+nodedriver->masterdriver:success
+masterdriver->nodedriver:csi.Controller.DeleteVolume()
+nodedriver->nodedriver:delete pmem volume
+nodedriver->masterdriver:success
+masterdriver->provisioner:success
+provisioner->k8s:Delete PV object
+deactivate provisioner
+
@enduml
diff --git a/docs/images/sequence/pmem-csi-cache-sequence-diagram.png b/docs/images/sequence/pmem-csi-cache-sequence-diagram.png
new file mode 100644
index 0000000000..ad91407149
Binary files /dev/null and b/docs/images/sequence/pmem-csi-cache-sequence-diagram.png differ
diff --git a/docs/images/sequence/pmem-csi-persistent-sequence-diagram.png b/docs/images/sequence/pmem-csi-persistent-sequence-diagram.png
new file mode 100644
index 0000000000..e4d215911d
Binary files /dev/null and b/docs/images/sequence/pmem-csi-persistent-sequence-diagram.png differ
diff --git a/docs/images/sequence/pmem-csi-sequence-diagram.png b/docs/images/sequence/pmem-csi-sequence-diagram.png
deleted file mode 100644
index 5cbdf50e75..0000000000
Binary files a/docs/images/sequence/pmem-csi-sequence-diagram.png and /dev/null differ
diff --git a/pkg/pmem-csi-driver/controllerserver-master.go b/pkg/pmem-csi-driver/controllerserver-master.go
index ff27897804..f1b4fe0513 100644
--- a/pkg/pmem-csi-driver/controllerserver-master.go
+++ b/pkg/pmem-csi-driver/controllerserver-master.go
@@ -8,9 +8,9 @@ package pmemcsidriver
import (
"fmt"
+ "strconv"
"github.com/google/uuid"
- "github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -35,17 +35,30 @@ const (
Unattached
//Deleted volume deleted
Deleted
+
+ // FIXME(avalluri): Choose better naming
+ pmemParameterKeyPersistencyModel = "persistencyModel"
+ pmemParameterKeyCacheSize = "cacheSize"
+
+ pmemPersistencyModelNone PmemPersistencyModel = "none"
+ pmemPersistencyModelCache PmemPersistencyModel = "cache"
+ //pmemPersistencyModelEphemeral PmemPersistencyModel = "ephemeral"
)
+type PmemPersistencyModel string
+
type pmemVolume struct {
// VolumeID published to outside world
id string
- // Cached PV creation request to be used in volume attach phase
- req *csi.CreateVolumeRequest
- // current volume status
- status VolumeStatus
- // node id where the volume provisioned/attached
- nodeID string
+ // Name of volume
+ name string
+ // Size of the volume
+ size int64
+ // ID of nodes where the volume provisioned/attached
+ // It would be one if simple volume, else would be more than one for "cached" volume
+ nodeIDs map[string]VolumeStatus
+ // VolumeType
+ volumeType PmemPersistencyModel
}
type masterController struct {
@@ -78,6 +91,7 @@ func (cs *masterController) RegisterService(rpcServer *grpc.Server) {
func (cs *masterController) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
var vol *pmemVolume
+ chosenNodes := map[string]VolumeStatus{}
if err := cs.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
pmemcommon.Infof(3, ctx, "invalid create volume req: %v", req)
@@ -94,63 +108,117 @@ func (cs *masterController) CreateVolume(ctx context.Context, req *csi.CreateVol
asked := req.GetCapacityRange().GetRequiredBytes()
- topology := []*csi.Topology{}
+ outTopology := []*csi.Topology{}
glog.Infof("CreateVolume: Name: %v req.Required: %v req.Limit: %v", req.Name, asked, req.GetCapacityRange().GetLimitBytes())
if vol = cs.getVolumeByName(req.Name); vol != nil {
// Check if the size of existing volume can cover the new request
- glog.Infof("CreateVolume: Vol %s exists, Size: %v", req.Name, vol.req.GetCapacityRange().GetRequiredBytes())
- if vol.req.GetCapacityRange().GetRequiredBytes() < asked {
+ glog.Infof("CreateVolume: Vol %s exists, Size: %v", req.Name, vol.size)
+ if vol.size < asked {
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with different size already exist", req.Name))
}
+
+ chosenNodes = vol.nodeIDs
} else {
id, _ := uuid.NewUUID() //nolint: gosec
volumeID := id.String()
- capReq := &csi.GetCapacityRequest{
- Parameters: req.GetParameters(),
- VolumeCapabilities: req.GetVolumeCapabilities(),
+ inTopology := []*csi.Topology{}
+ volumeType := pmemPersistencyModelNone
+ cacheCount := uint64(1)
+
+ if req.Parameters == nil {
+ req.Parameters = map[string]string{}
+ } else {
+ if val, ok := req.Parameters[pmemParameterKeyPersistencyModel]; ok {
+ volumeType = PmemPersistencyModel(val)
+ if volumeType == pmemPersistencyModelCache {
+ if val, ok := req.Parameters[pmemParameterKeyCacheSize]; ok {
+ c, err := strconv.ParseUint(val, 10, 64)
+ if err != nil {
+ glog.Warning("failed to parse '" + pmemParameterKeyCacheSize + "' parameter")
+ } else {
+ cacheCount = c
+ }
+ }
+ }
+ }
+ }
+
+ if reqTop := req.GetAccessibilityRequirements(); reqTop != nil {
+ inTopology = reqTop.Preferred
+ if inTopology == nil {
+ inTopology = reqTop.Requisite
+ }
+ }
+
+ if len(inTopology) == 0 {
+ // No topology provided, so we are free to choose from all available
+ // nodes
+ for node := range cs.rs.nodeClients {
+ inTopology = append(inTopology, &csi.Topology{
+ Segments: map[string]string{
+ PmemDriverTopologyKey: node,
+ },
+ })
+ }
}
- foundNodes := []string{}
- for _, node := range cs.rs.nodeClients {
- cap, err := cs.getNodeCapacity(ctx, node, capReq)
+
+ // Ask all nodes to use existing volume id
+ req.Parameters["_id"] = volumeID
+ for _, top := range inTopology {
+ if cacheCount == 0 {
+ break
+ }
+ node := top.Segments[PmemDriverTopologyKey]
+ conn, err := cs.rs.ConnectToNodeController(node, connectionTimeout)
if err != nil {
- glog.Warningf("Error while fetching '%s' node capacity: %s", node.NodeID, err.Error())
+ glog.Warningf("failed to connect to %s: %s", node, err.Error())
continue
}
- glog.Infof("Node: %s - Capacity: %v", node, cap)
- if cap >= asked {
- glog.Infof("node %s has requested capacity", node.NodeID)
- foundNodes = append(foundNodes, node.NodeID)
+
+ defer conn.Close()
+
+ csiClient := csi.NewControllerClient(conn)
+
+ if _, err := csiClient.CreateVolume(ctx, req); err != nil {
+ glog.Warningf("failed to create volume on %s: %s", node, err.Error())
+ continue
}
+ cacheCount = cacheCount - 1
+ chosenNodes[node] = Created
}
- if len(foundNodes) == 0 {
+ delete(req.Parameters, "_id")
+
+ if len(chosenNodes) == 0 {
return nil, status.Error(codes.Unavailable, fmt.Sprintf("No node found with %v capacity", asked))
}
- glog.Infof("Found nodes: %v", foundNodes)
-
- for _, id := range foundNodes {
- topology = append(topology, &csi.Topology{
- Segments: map[string]string{
- "kubernetes.io/hostname": id,
- },
- })
- }
+ glog.Infof("Chosen nodes: %v", chosenNodes)
vol = &pmemVolume{
- id: volumeID,
- req: req,
- status: Created,
+ id: volumeID,
+ name: req.Name,
+ size: asked,
+ nodeIDs: chosenNodes,
+ volumeType: volumeType,
}
cs.pmemVolumes[volumeID] = vol
glog.Infof("CreateVolume: Record new volume as %v", *vol)
}
+ for node := range chosenNodes {
+ outTopology = append(outTopology, &csi.Topology{
+ Segments: map[string]string{
+ PmemDriverTopologyKey: node,
+ },
+ })
+ }
+
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: vol.id,
CapacityBytes: asked,
- AccessibleTopology: topology,
+ AccessibleTopology: outTopology,
VolumeContext: req.Parameters,
},
}, nil
@@ -169,96 +237,28 @@ func (cs *masterController) DeleteVolume(ctx context.Context, req *csi.DeleteVol
// Serialize by VolumeId
volumeMutex.LockKey(req.VolumeId)
- defer volumeMutex.UnlockKey(req.VolumeId)
+ defer volumeMutex.UnlockKey(req.VolumeId) //nolint: errcheck
pmemcommon.Infof(4, ctx, "DeleteVolume: volumeID: %v", req.GetVolumeId())
if vol := cs.getVolumeByID(req.GetVolumeId()); vol != nil {
- if vol.status != Unattached {
- pmemcommon.Infof(3, ctx, "Volume %s is attached to %s but not detached", vol.req.Name, vol.nodeID)
- }
-
- conn, err := cs.rs.ConnectToNodeController(vol.nodeID, connectionTimeout)
- if err != nil {
- return nil, status.Error(codes.Internal, err.Error())
- }
- defer conn.Close()
+ for node := range vol.nodeIDs {
+ conn, err := cs.rs.ConnectToNodeController(node, connectionTimeout)
+ if err != nil {
+ glog.Warningf("Failed to connect to node controller:%s, stale volume(%s) on %s should be cleaned manually", err.Error(), vol.id, node)
+ }
- csiClient := csi.NewControllerClient(conn)
- clientReq := &csi.DeleteVolumeRequest{
- VolumeId: vol.id,
- }
- if res, err := csiClient.DeleteVolume(ctx, clientReq); err != nil {
- return res, err
+ if _, err := csi.NewControllerClient(conn).DeleteVolume(ctx, req); err != nil {
+ glog.Warningf("Failed to delete volume %s on %s: %s", vol.id, node, err.Error())
+ }
+ conn.Close() // nolint:gosec
}
delete(cs.pmemVolumes, vol.id)
pmemcommon.Infof(4, ctx, "DeleteVolume: volume %s deleted", req.GetVolumeId())
} else {
pmemcommon.Infof(3, ctx, "Volume %s not created by this controller", req.GetVolumeId())
}
- return &csi.DeleteVolumeResponse{}, nil
-}
-
-func (cs *masterController) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
- // Check arguments
- if len(req.GetVolumeId()) == 0 {
- return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
- }
-
- _, found := cs.pmemVolumes[req.VolumeId]
- if !found {
- return nil, status.Error(codes.NotFound, "No volume found with id %s"+req.VolumeId)
- }
-
- if req.GetVolumeCapabilities() == nil {
- return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request")
- }
-
- for _, cap := range req.VolumeCapabilities {
- if cap.GetAccessMode().GetMode() != csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER {
- return &csi.ValidateVolumeCapabilitiesResponse{
- Confirmed: nil,
- Message: "Driver does not support '" + cap.AccessMode.Mode.String() + "' mode",
- }, nil
- }
- }
-
- /*
- * FIXME(avalluri): Need to validate other capabilities against the existing volume
- */
- return &csi.ValidateVolumeCapabilitiesResponse{
- Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
- VolumeCapabilities: req.VolumeCapabilities,
- VolumeContext: req.GetVolumeContext(),
- },
- }, nil
-}
-
-func (cs *masterController) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
- pmemcommon.Infof(3, ctx, "ListVolumes")
- if err := cs.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_LIST_VOLUMES); err != nil {
- pmemcommon.Infof(3, ctx, "invalid list volumes req: %v", req)
- return nil, err
- }
- // List namespaces
- var entries []*csi.ListVolumesResponse_Entry
- for _, vol := range cs.pmemVolumes {
- info := &csi.Volume{
- VolumeId: vol.id,
- CapacityBytes: int64(vol.req.CapacityRange.RequiredBytes),
- }
- entries = append(entries, &csi.ListVolumesResponse_Entry{
- Volume: info,
- XXX_NoUnkeyedLiteral: *new(struct{}),
- })
- }
-
- response := &csi.ListVolumesResponse{
- Entries: entries,
- NextToken: "",
- XXX_NoUnkeyedLiteral: *new(struct{}),
- }
- return response, nil
+ return &csi.DeleteVolumeResponse{}, nil
}
func (cs *masterController) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
@@ -289,56 +289,16 @@ func (cs *masterController) ControllerPublishVolume(ctx context.Context, req *cs
}
glog.Infof("Current volume Info: %+v", vol)
-
- if vol.status == Attached {
- if req.NodeId == vol.nodeID {
- return &csi.ControllerPublishVolumeResponse{}, nil
- } else {
- return nil, status.Error(codes.FailedPrecondition, fmt.Sprintf("Volume already attached on %s", vol.nodeID))
- }
- } else if vol.status == Unattached {
- if req.NodeId == vol.nodeID {
- // Do we need to clear data on reattachment on same node??
- return &csi.ControllerPublishVolumeResponse{}, nil
- } else {
- // Delete stale volume on previously attached node
- conn, err := cs.rs.ConnectToNodeController(vol.nodeID, connectionTimeout)
- if err != nil {
- return nil, status.Error(codes.Internal, err.Error())
- }
- defer conn.Close()
-
- csiClient := csi.NewControllerClient(conn)
- clientReq := &csi.DeleteVolumeRequest{
- VolumeId: vol.id,
- }
- if _, err := csiClient.DeleteVolume(ctx, clientReq); err != nil {
- return nil, err
- }
- }
- }
-
- // Its new attachment, create pmem volume on node
- conn, err := cs.rs.ConnectToNodeController(req.NodeId, connectionTimeout)
- if err != nil {
- return nil, status.Error(codes.Internal, err.Error())
- }
- defer conn.Close()
-
- csiClient := csi.NewControllerClient(conn)
-
- if vol.req.Parameters == nil {
- vol.req.Parameters = map[string]string{}
- }
- // Ask node to use existing volume id
- vol.req.Parameters["_id"] = vol.id
- if _, err = csiClient.CreateVolume(ctx, vol.req); err != nil {
- return nil, status.Error(codes.Internal, errors.Wrap(err, "Attach volume failure").Error())
+ state, ok := vol.nodeIDs[req.NodeId]
+ if !ok {
+ // This should not happen as we locked the topology while volume creation
+ return nil, status.Error(codes.FailedPrecondition, "Volume cannot be published on requested node "+req.NodeId)
+ } else if state == Attached {
+ return nil, status.Error(codes.AlreadyExists, "Volume already published on requested node "+req.NodeId)
+ } else {
+ vol.nodeIDs[req.NodeId] = Attached
}
- vol.status = Attached
- vol.nodeID = req.NodeId
-
return &csi.ControllerPublishVolumeResponse{}, nil
}
@@ -346,7 +306,6 @@ func (cs *masterController) ControllerUnpublishVolume(ctx context.Context, req *
if err := cs.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME); err != nil {
return nil, err
}
-
if req.GetVolumeId() == "" {
return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume ID must be provided")
}
@@ -358,12 +317,72 @@ func (cs *masterController) ControllerUnpublishVolume(ctx context.Context, req *
glog.Infof("ControllerUnpublishVolume : volume_id: %s, node_id: %s ", req.VolumeId, req.NodeId)
if vol := cs.getVolumeByID(req.VolumeId); vol != nil {
- vol.status = Unattached
+ vol.nodeIDs[req.NodeId] = Unattached
+ } else {
+ return nil, status.Error(codes.NotFound, fmt.Sprintf("No volume with id '%s' found", req.VolumeId))
}
return &csi.ControllerUnpublishVolumeResponse{}, nil
}
+func (cs *masterController) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
+
+ // Check arguments
+ if len(req.GetVolumeId()) == 0 {
+ return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
+ }
+
+ _, found := cs.pmemVolumes[req.VolumeId]
+ if !found {
+ return nil, status.Error(codes.NotFound, "No volume found with id %s"+req.VolumeId)
+ }
+
+ if req.GetVolumeCapabilities() == nil {
+ return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request")
+ }
+
+ for _, cap := range req.VolumeCapabilities {
+ if cap.GetAccessMode().GetMode() != csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER {
+ return &csi.ValidateVolumeCapabilitiesResponse{
+ Confirmed: nil,
+ Message: "Driver does not support '" + cap.AccessMode.Mode.String() + "' mode",
+ }, nil
+ }
+ }
+
+ /*
+ * FIXME(avalluri): Need to validate other capabilities against the existing volume
+ */
+ return &csi.ValidateVolumeCapabilitiesResponse{
+ Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
+ VolumeCapabilities: req.VolumeCapabilities,
+ VolumeContext: req.GetVolumeContext(),
+ },
+ }, nil
+}
+
+func (cs *masterController) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
+ pmemcommon.Infof(3, ctx, "ListVolumes")
+ if err := cs.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_LIST_VOLUMES); err != nil {
+ pmemcommon.Infof(3, ctx, "invalid list volumes req: %v", req)
+ return nil, err
+ }
+ // List namespaces
+ var entries []*csi.ListVolumesResponse_Entry
+ for _, vol := range cs.pmemVolumes {
+ entries = append(entries, &csi.ListVolumesResponse_Entry{
+ Volume: &csi.Volume{
+ VolumeId: vol.id,
+ CapacityBytes: vol.size,
+ },
+ })
+ }
+
+ return &csi.ListVolumesResponse{
+ Entries: entries,
+ }, nil
+}
+
func (cs *masterController) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
var capacity int64
if err := cs.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_GET_CAPACITY); err != nil {
@@ -410,7 +429,7 @@ func (cs *masterController) getVolumeByID(volumeID string) *pmemVolume {
func (cs *masterController) getVolumeByName(Name string) *pmemVolume {
for _, pmemVol := range cs.pmemVolumes {
- if pmemVol.req.Name == Name {
+ if pmemVol.name == Name {
return pmemVol
}
}
diff --git a/pkg/pmem-csi-driver/controllerserver-node.go b/pkg/pmem-csi-driver/controllerserver-node.go
index dd12fd89c2..c4b4c4375a 100644
--- a/pkg/pmem-csi-driver/controllerserver-node.go
+++ b/pkg/pmem-csi-driver/controllerserver-node.go
@@ -8,6 +8,7 @@ package pmemcsidriver
import (
"fmt"
+ "strconv"
"github.com/google/uuid"
"golang.org/x/net/context"
@@ -23,6 +24,14 @@ import (
"k8s.io/utils/keymutex"
)
+const (
+ pmemParameterKeyNamespaceMode = "nsmode"
+ pmemParameterKeyEraseAfter = "eraseafter"
+
+ pmemNamespaceModeFsdax = "fsdax"
+ pmemNamespaceModeSector = "sector"
+)
+
type nodeVolume struct {
ID string
Name string
@@ -66,7 +75,7 @@ func (cs *nodeControllerServer) CreateVolume(ctx context.Context, req *csi.Creat
topology := []*csi.Topology{}
volumeID := ""
eraseafter := true
- nsmode := "fsdax"
+ nsmode := pmemNamespaceModeFsdax
if err := cs.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
pmemcommon.Infof(3, ctx, "invalid create volume req: %v", req)
@@ -82,19 +91,18 @@ func (cs *nodeControllerServer) CreateVolume(ctx context.Context, req *csi.Creat
}
// We recognize eraseafter=false/true, defaulting to true
- for key, val := range req.GetParameters() {
- glog.Infof("CreateVolume: parameter: [%v] [%v]", key, val)
- if key == "eraseafter" {
- if val == "true" {
- eraseafter = true
- } else if val == "false" {
- eraseafter = false
+ if params := req.GetParameters(); params != nil {
+ if val, ok := params[pmemParameterKeyEraseAfter]; ok {
+ if bVal, err := strconv.ParseBool(val); err != nil {
+ eraseafter = bVal
}
- } else if key == "nsmode" {
- if val == "fsdax" || val == "sector" {
+ }
+ if val, ok := params[pmemParameterKeyNamespaceMode]; ok {
+ if val == pmemNamespaceModeFsdax || val == pmemNamespaceModeSector {
nsmode = val
}
- } else if key == "_id" {
+ }
+ if val, ok := params["_id"]; ok {
/* use master controller provided volume uid */
volumeID = val
}
@@ -139,7 +147,7 @@ func (cs *nodeControllerServer) CreateVolume(ctx context.Context, req *csi.Creat
topology = append(topology, &csi.Topology{
Segments: map[string]string{
- "kubernetes.io/hostname": cs.nodeID,
+ PmemDriverTopologyKey: cs.nodeID,
},
})
@@ -244,7 +252,7 @@ func (cs *nodeControllerServer) GetCapacity(ctx context.Context, req *csi.GetCap
nsmode := "fsdax"
params := req.GetParameters()
if params != nil {
- if mode, ok := params["nsmode"]; ok {
+ if mode, ok := params[pmemParameterKeyNamespaceMode]; ok {
nsmode = mode
}
}
diff --git a/pkg/pmem-csi-driver/nodeserver.go b/pkg/pmem-csi-driver/nodeserver.go
index a857d176c2..822c950414 100644
--- a/pkg/pmem-csi-driver/nodeserver.go
+++ b/pkg/pmem-csi-driver/nodeserver.go
@@ -60,7 +60,7 @@ func (ns *nodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoReque
NodeId: ns.nodeID,
AccessibleTopology: &csi.Topology{
Segments: map[string]string{
- "kubernetes.io/hostname": ns.nodeID,
+ PmemDriverTopologyKey: ns.nodeID,
},
},
}, nil
@@ -238,7 +238,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
if params := req.GetVolumeContext(); params != nil {
// Add dax option if namespacemode == fsdax
- if params["nsmode"] == "fsdax" {
+ if params[pmemParameterKeyNamespaceMode] == pmemNamespaceModeFsdax {
glog.Infof("NodeStageVolume: namespacemode FSDAX, add dax mount option")
args = append(args, "-o", "dax")
}
diff --git a/pkg/pmem-csi-driver/pmem-csi-driver.go b/pkg/pmem-csi-driver/pmem-csi-driver.go
index 08d9ff766e..eceee980fd 100644
--- a/pkg/pmem-csi-driver/pmem-csi-driver.go
+++ b/pkg/pmem-csi-driver/pmem-csi-driver.go
@@ -36,6 +36,8 @@ const (
Node DriverMode = "node"
//Unified defintion for unified driver mode
Unified DriverMode = "unified"
+ //PmemDriverTopologyKey key to use for topology constraint
+ PmemDriverTopologyKey = "pmem-csi.intel.com/node"
)
//Config type for driver configuration
diff --git a/pkg/pmem-csi-driver/registryserver.go b/pkg/pmem-csi-driver/registryserver.go
index 954b784790..27b070ff75 100644
--- a/pkg/pmem-csi-driver/registryserver.go
+++ b/pkg/pmem-csi-driver/registryserver.go
@@ -6,7 +6,7 @@ import (
"time"
pmemgrpc "github.com/intel/pmem-csi/pkg/pmem-grpc"
- "github.com/intel/pmem-csi/pkg/pmem-registry"
+ registry "github.com/intel/pmem-csi/pkg/pmem-registry"
"golang.org/x/net/context"
"google.golang.org/grpc"
"k8s.io/klog/glog"
diff --git a/test/e2e/storage/csi_volumes.go b/test/e2e/storage/csi_volumes.go
index 7e990de4cd..4552e6bffe 100644
--- a/test/e2e/storage/csi_volumes.go
+++ b/test/e2e/storage/csi_volumes.go
@@ -95,11 +95,9 @@ var _ = Describe("PMEM Volumes", func() {
},
Config: testsuites.TestConfig{
- Framework: f,
- Prefix: "pmem",
- // Ensure that all pods land on the same node. Works around
- // https://github.com/intel/pmem-csi/issues/132.
- ClientNodeName: "host-1",
+ Framework: f,
+ Prefix: "pmem",
+ TopologyEnabled: true,
},
},
scManifest: "deploy/kubernetes-1.13/pmem-storageclass.yaml",
diff --git a/test/start-stop.make b/test/start-stop.make
index 4e31267751..7df8732be8 100644
--- a/test/start-stop.make
+++ b/test/start-stop.make
@@ -46,12 +46,18 @@ start: _work/clear-kvm.img _work/kube-clear-kvm _work/start-clear-kvm _work/ssh-
if ! _work/ssh-clear-kvm kubectl get storageclass/pmem-csi-sc >/dev/null 2>&1; then \
_work/ssh-clear-kvm kubectl create -f - /dev/null 2>&1; then \
+ _work/ssh-clear-kvm kubectl create -f -