Skip to content

Commit

Permalink
Drop default {node,identity}server implementations, code cleanup
Browse files Browse the repository at this point in the history
These default generic implementations were inherited from deprecated
kubernetes-csi/drivers/csi-common package, which is not needed anymore. Merging
them with their derived implementations. This simplifies the code and leads to
cleaning up lot of unused code.

Left controller server default implementation, as it can be shared by both node
controller and controller servers.

Signed-off-by: Amarnath Valluri <amarnath.valluri@intel.com>
  • Loading branch information
avalluri committed Mar 1, 2019
1 parent 61851e4 commit 6c9e6a0
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 337 deletions.
35 changes: 33 additions & 2 deletions pkg/pmem-csi-driver/controllerserver-default.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,25 @@ import (
)

type DefaultControllerServer struct {
Driver *CSIDriver
serviceCaps []*csi.ControllerServiceCapability
}

func NewDefaultControllerServer(caps []csi.ControllerServiceCapability_RPC_Type) *DefaultControllerServer {

serviceCaps := []*csi.ControllerServiceCapability{}
for _, cap := range caps {
serviceCaps = append(serviceCaps, &csi.ControllerServiceCapability{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: cap,
},
},
})
}

return &DefaultControllerServer{
serviceCaps: serviceCaps,
}
}

func (cs *DefaultControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
Expand Down Expand Up @@ -45,7 +63,7 @@ func (cs *DefaultControllerServer) GetCapacity(ctx context.Context, req *csi.Get
// Default supports all capabilities
func (cs *DefaultControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
return &csi.ControllerGetCapabilitiesResponse{
Capabilities: cs.Driver.cap,
Capabilities: cs.serviceCaps,
}, nil
}

Expand All @@ -60,3 +78,16 @@ func (cs *DefaultControllerServer) DeleteSnapshot(ctx context.Context, req *csi.
func (cs *DefaultControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

func (cs *DefaultControllerServer) ValidateControllerServiceRequest(c csi.ControllerServiceCapability_RPC_Type) error {
if c == csi.ControllerServiceCapability_RPC_UNKNOWN {
return nil
}

for _, cap := range cs.serviceCaps {
if c == cap.GetRpc().GetType() {
return nil
}
}
return status.Error(codes.InvalidArgument, string(c))
}
42 changes: 25 additions & 17 deletions pkg/pmem-csi-driver/controllerserver-master.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,15 @@ var _ csi.ControllerServer = &masterController{}
var _ PmemService = &masterController{}
var volumeMutex = keymutex.NewHashed(-1)

func NewMasterControllerServer(driver *CSIDriver, rs *registryServer) *masterController {
serverCaps := []csi.ControllerServiceCapability_RPC_Type{}
serverCaps = append(serverCaps, csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME)
serverCaps = append(serverCaps, csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME)
serverCaps = append(serverCaps, csi.ControllerServiceCapability_RPC_LIST_VOLUMES)
serverCaps = append(serverCaps, csi.ControllerServiceCapability_RPC_GET_CAPACITY)
driver.AddControllerServiceCapabilities(serverCaps)

func NewMasterControllerServer(rs *registryServer) *masterController {
serverCaps := []csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
csi.ControllerServiceCapability_RPC_LIST_VOLUMES,
csi.ControllerServiceCapability_RPC_GET_CAPACITY,
}
return &masterController{
DefaultControllerServer: NewDefaultControllerServer(driver),
DefaultControllerServer: NewDefaultControllerServer(serverCaps),
rs: rs,
pmemVolumes: map[string]*pmemVolume{},
}
Expand All @@ -80,7 +79,7 @@ func (cs *masterController) RegisterService(rpcServer *grpc.Server) {
func (cs *masterController) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
var vol *pmemVolume

if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
if err := cs.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
pmemcommon.Infof(3, ctx, "invalid create volume req: %v", req)
return nil, err
}
Expand Down Expand Up @@ -158,17 +157,16 @@ func (cs *masterController) CreateVolume(ctx context.Context, req *csi.CreateVol
}

func (cs *masterController) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
if err := cs.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
pmemcommon.Infof(3, ctx, "invalid delete volume req: %v", req)
return nil, err
}

// Check arguments
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}

if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
pmemcommon.Infof(3, ctx, "invalid delete volume req: %v", req)
return nil, err
}

// Serialize by VolumeId
volumeMutex.LockKey(req.VolumeId)
defer volumeMutex.UnlockKey(req.VolumeId)
Expand Down Expand Up @@ -238,7 +236,7 @@ func (cs *masterController) ValidateVolumeCapabilities(ctx context.Context, req

func (cs *masterController) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
pmemcommon.Infof(3, ctx, "ListVolumes")
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_LIST_VOLUMES); err != nil {
if err := cs.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_LIST_VOLUMES); err != nil {
pmemcommon.Infof(3, ctx, "invalid list volumes req: %v", req)
return nil, err
}
Expand All @@ -264,6 +262,9 @@ func (cs *masterController) ListVolumes(ctx context.Context, req *csi.ListVolume
}

func (cs *masterController) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
if err := cs.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME); err != nil {
return nil, err
}
if req.GetVolumeId() == "" {
return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume ID must be provided")
}
Expand All @@ -280,7 +281,7 @@ func (cs *masterController) ControllerPublishVolume(ctx context.Context, req *cs
volumeMutex.LockKey(req.VolumeId)
defer volumeMutex.UnlockKey(req.VolumeId) //nolint: errcheck

glog.Infof("ControllerPublishVolume: cs.Node: %s req.volume_id: %s, req.node_id: %s ", cs.Driver.nodeID, req.VolumeId, req.NodeId)
glog.Infof("ControllerPublishVolume: req.volume_id: %s, req.node_id: %s ", req.VolumeId, req.NodeId)

vol := cs.getVolumeByID(req.VolumeId)
if vol == nil {
Expand Down Expand Up @@ -342,6 +343,10 @@ func (cs *masterController) ControllerPublishVolume(ctx context.Context, req *cs
}

func (cs *masterController) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
if err := cs.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME); err != nil {
return nil, err
}

if req.GetVolumeId() == "" {
return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume ID must be provided")
}
Expand All @@ -361,6 +366,9 @@ func (cs *masterController) ControllerUnpublishVolume(ctx context.Context, req *

func (cs *masterController) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
var capacity int64
if err := cs.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_GET_CAPACITY); err != nil {
return nil, err
}

for _, node := range cs.rs.nodeClients {
cap, err := cs.getNodeCapacity(ctx, node, req)
Expand Down
25 changes: 13 additions & 12 deletions pkg/pmem-csi-driver/controllerserver-node.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type nodeVolume struct {

type nodeControllerServer struct {
*DefaultControllerServer
nodeID string
dm pmdmanager.PmemDeviceManager
pmemVolumes map[string]*nodeVolume // map of reqID:nodeVolume
}
Expand All @@ -42,15 +43,15 @@ var _ PmemService = &nodeControllerServer{}

var nodeVolumeMutex = keymutex.NewHashed(-1)

func NewNodeControllerServer(driver *CSIDriver, dm pmdmanager.PmemDeviceManager) *nodeControllerServer {
serverCaps := []csi.ControllerServiceCapability_RPC_Type{}
serverCaps = append(serverCaps, csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME)
serverCaps = append(serverCaps, csi.ControllerServiceCapability_RPC_LIST_VOLUMES)
serverCaps = append(serverCaps, csi.ControllerServiceCapability_RPC_GET_CAPACITY)
driver.AddControllerServiceCapabilities(serverCaps)

func NewNodeControllerServer(nodeID string, dm pmdmanager.PmemDeviceManager) *nodeControllerServer {
serverCaps := []csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_LIST_VOLUMES,
csi.ControllerServiceCapability_RPC_GET_CAPACITY,
}
return &nodeControllerServer{
DefaultControllerServer: NewDefaultControllerServer(driver),
DefaultControllerServer: NewDefaultControllerServer(serverCaps),
nodeID: nodeID,
dm: dm,
pmemVolumes: map[string]*nodeVolume{},
}
Expand All @@ -67,7 +68,7 @@ func (cs *nodeControllerServer) CreateVolume(ctx context.Context, req *csi.Creat
eraseafter := true
nsmode := "fsdax"

if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
if err := cs.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
pmemcommon.Infof(3, ctx, "invalid create volume req: %v", req)
return nil, err
}
Expand Down Expand Up @@ -138,7 +139,7 @@ func (cs *nodeControllerServer) CreateVolume(ctx context.Context, req *csi.Creat

topology = append(topology, &csi.Topology{
Segments: map[string]string{
"kubernetes.io/hostname": cs.Driver.nodeID,
"kubernetes.io/hostname": cs.nodeID,
},
})

Expand All @@ -158,7 +159,7 @@ func (cs *nodeControllerServer) DeleteVolume(ctx context.Context, req *csi.Delet
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}

if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
if err := cs.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
pmemcommon.Infof(3, ctx, "invalid delete volume req: %v", req)
return nil, err
}
Expand Down Expand Up @@ -212,7 +213,7 @@ func (cs *nodeControllerServer) ValidateVolumeCapabilities(ctx context.Context,

func (cs *nodeControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
pmemcommon.Infof(3, ctx, "ListVolumes")
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_LIST_VOLUMES); err != nil {
if err := cs.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_LIST_VOLUMES); err != nil {
pmemcommon.Infof(3, ctx, "invalid list volumes req: %v", req)
return nil, err
}
Expand Down
101 changes: 0 additions & 101 deletions pkg/pmem-csi-driver/driver.go

This file was deleted.

62 changes: 0 additions & 62 deletions pkg/pmem-csi-driver/identityserver-default.go

This file was deleted.

Loading

0 comments on commit 6c9e6a0

Please sign in to comment.