diff --git a/internal/controller/controller.go b/internal/controller/controller.go index af7f199..a3559bb 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -53,141 +53,52 @@ func NewController(svc service.Service, zone string, maxVolumesPerNode int, l *l } // CreateVolume provisions storage via UpCloud Storage service. -func (c *Controller) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (resp *csi.CreateVolumeResponse, err error) { //nolint: funlen // TODO: refactor +func (c *Controller) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (resp *csi.CreateVolumeResponse, err error) { log := logger.WithServerContext(ctx, c.log).WithField(logger.VolumeNameKey, req.GetName()) - if req.Name == "" { - return nil, status.Error(codes.InvalidArgument, "CreateVolume Name cannot be empty") - } - - if req.VolumeCapabilities == nil || len(req.VolumeCapabilities) == 0 { - return nil, status.Error(codes.InvalidArgument, "CreateVolume VolumeCapabilities cannot be empty") - } - - violations := validateCapabilities(req.VolumeCapabilities) - if len(violations) > 0 { - return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("CreateVolume failed with the following violations: %s", strings.Join(violations, ", "))) - } - - // determine the size of the storage - storageSize, err := getStorageRange(req.CapacityRange) - if err != nil { - return nil, status.Error(codes.OutOfRange, fmt.Sprintf("CreateVolume failed to extract storage size: %s", err.Error())) - } - - if req.AccessibilityRequirements != nil { - for _, t := range req.AccessibilityRequirements.Requisite { - region, ok := t.Segments["region"] - if !ok { - continue // nothing to do - } - - if region != c.zone { - return nil, status.Errorf(codes.ResourceExhausted, "volume can be only created in region: %q, got: %q", c.zone, region) - } - } + if err := validateCreateVolumeRequest(req, c.zone); err != nil { + return nil, err } - - volumeName := req.Name - // get volume first, and skip if exists - volumes, err := c.svc.GetStorageByName(ctx, volumeName) + volumes, err := c.svc.GetStorageByName(ctx, req.GetName()) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } - if len(volumes) > 0 { - if len(volumes) > 1 { - return nil, fmt.Errorf("fatal: duplicate volume %q exists", volumeName) - } - vol := volumes[0].Storage - - if vol.Size*giB != int(storageSize) { - return nil, status.Errorf(codes.AlreadyExists, "invalid storage size requested: %d", storageSize) - } - - log.WithField(logger.VolumeIDKey, vol.UUID).Info("volume already exists") - return &csi.CreateVolumeResponse{ - Volume: &csi.Volume{ - VolumeId: vol.UUID, - CapacityBytes: int64(vol.Size) * giB, - ContentSource: req.GetVolumeContentSource(), - }, - }, nil + return createVolumeExistsResponse(ctx, req, volumes, log) } - tierMapper := map[string]string{"maxiops": upcloud.StorageTierMaxIOPS, "hdd": upcloud.StorageTierHDD} - tier := tierMapper[req.Parameters["tier"]] - var vol *upcloud.StorageDetails + tier, err := createVolumeRequestTier(req) + if err != nil { + return nil, err + } + // determine the size of the storage + storageSize, err := getStorageRange(req.GetCapacityRange()) + if err != nil { + return nil, status.Error(codes.OutOfRange, fmt.Sprintf("CreateVolume failed to extract storage size: %s", err.Error())) + } storageSizeGB := int(storageSize / giB) - if volContentSrc := req.GetVolumeContentSource(); volContentSrc != nil { //nolint: nestif // TODO: refactor - var sourceID string - switch volContentSrc.Type.(type) { - case *csi.VolumeContentSource_Snapshot: - snapshot := volContentSrc.GetSnapshot() - if snapshot == nil { - return nil, status.Error(codes.Internal, "content source snapshot is not defined") - } - sourceID = snapshot.GetSnapshotId() - case *csi.VolumeContentSource_Volume: - srcVol := volContentSrc.GetVolume() - if srcVol == nil { - return nil, status.Error(codes.Internal, "content source volume is not defined") - } - sourceID = srcVol.GetVolumeId() - default: - return nil, status.Errorf(codes.InvalidArgument, "%v not a proper volume source", volContentSrc) - } - log := log.WithField(logger.VolumeSourceKey, sourceID) - log.Info("getting source storage by uuid") - src, err := c.svc.GetStorageByUUID(ctx, sourceID) - if err != nil { - if errors.Is(err, service.ErrStorageNotFound) { - return nil, status.Errorf(codes.NotFound, "could not retrieve source volume by ID: %s", err.Error()) - } - return nil, status.Errorf(codes.InvalidArgument, err.Error()) - } - log.Info("checking that source storage is online") - if err := c.svc.RequireStorageOnline(ctx, &src.Storage); err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - volumeReq := &request.CloneStorageRequest{ - UUID: src.Storage.UUID, - Zone: c.zone, - Tier: tier, - Title: volumeName, - } - logger.WithServiceRequest(log, volumeReq).Info("cloning volume") - vol, err = c.svc.CloneStorage(ctx, volumeReq, c.storageLabels...) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - log = log.WithField(logger.VolumeIDKey, vol.Storage.UUID).WithField("size", vol.Storage.Size) - if storageSizeGB > vol.Storage.Size { - log.WithField("new_size", storageSizeGB).Info("resizing volume") - // resize cloned storage and delete backup taken during resize operation as this is newly created storage - if vol, err = c.svc.ResizeStorage(ctx, vol.Storage.UUID, storageSizeGB, true); err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } + var vol *upcloud.StorageDetails + if volContentSrc := req.GetVolumeContentSource(); volContentSrc != nil { + if vol, err = c.createVolumeFromSource(ctx, req, storageSizeGB, tier); err != nil { + return nil, err } } else { volumeReq := &request.CreateStorageRequest{ Zone: c.zone, - Title: volumeName, + Title: req.GetName(), Size: storageSizeGB, Tier: tier, Labels: c.storageLabels, } - logger.WithServiceRequest(log, volumeReq).Info("creating volume") if vol, err = c.svc.CreateStorage(ctx, volumeReq); err != nil { return nil, status.Error(codes.Internal, err.Error()) } } - volResp := &csi.CreateVolumeResponse{ + return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ VolumeId: vol.UUID, CapacityBytes: storageSize, @@ -200,9 +111,87 @@ func (c *Controller) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequ }, ContentSource: req.GetVolumeContentSource(), }, + }, nil +} + +func createVolumeExistsResponse(ctx context.Context, req *csi.CreateVolumeRequest, volumes []*upcloud.StorageDetails, log *logrus.Entry) (resp *csi.CreateVolumeResponse, err error) { + if len(volumes) > 1 { + return nil, fmt.Errorf("fatal: duplicate volume %q exists", req.GetName()) + } + vol := volumes[0].Storage + storageSize, err := getStorageRange(req.GetCapacityRange()) + if err != nil { + return nil, status.Error(codes.OutOfRange, fmt.Sprintf("CreateVolume failed to extract storage size: %s", err.Error())) + } + if vol.Size*giB != int(storageSize) { + return nil, status.Errorf(codes.AlreadyExists, "invalid storage size requested: %d", storageSize) + } + log.WithField(logger.VolumeIDKey, vol.UUID).Info("volume already exists") + return &csi.CreateVolumeResponse{ + Volume: &csi.Volume{ + VolumeId: vol.UUID, + CapacityBytes: int64(vol.Size) * giB, + ContentSource: req.GetVolumeContentSource(), + }, + }, nil +} + +func (c *Controller) createVolumeFromSource(ctx context.Context, req *csi.CreateVolumeRequest, storageSizeGB int, tier string) (*upcloud.StorageDetails, error) { + volContentSrc := req.GetVolumeContentSource() + if volContentSrc == nil { + return nil, status.Error(codes.Internal, "got empty volume content source") + } + var sourceID string + switch volContentSrc.Type.(type) { + case *csi.VolumeContentSource_Snapshot: + snapshot := volContentSrc.GetSnapshot() + if snapshot == nil { + return nil, status.Error(codes.Internal, "content source snapshot is not defined") + } + sourceID = snapshot.GetSnapshotId() + case *csi.VolumeContentSource_Volume: + srcVol := volContentSrc.GetVolume() + if srcVol == nil { + return nil, status.Error(codes.Internal, "content source volume is not defined") + } + sourceID = srcVol.GetVolumeId() + default: + return nil, status.Errorf(codes.InvalidArgument, "%v not a proper volume source", volContentSrc) + } + log := logger.WithServerContext(ctx, c.log).WithField(logger.VolumeNameKey, req.GetName()).WithField(logger.VolumeSourceKey, sourceID) + log.Info("getting source storage by uuid") + src, err := c.svc.GetStorageByUUID(ctx, sourceID) + if err != nil { + if errors.Is(err, service.ErrStorageNotFound) { + return nil, status.Errorf(codes.NotFound, "could not retrieve source volume by ID: %s", err.Error()) + } + return nil, status.Errorf(codes.InvalidArgument, err.Error()) + } + log.Info("checking that source storage is online") + if err := c.svc.RequireStorageOnline(ctx, &src.Storage); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + volumeReq := &request.CloneStorageRequest{ + UUID: src.Storage.UUID, + Zone: c.zone, + Tier: tier, + Title: req.GetName(), + } + logger.WithServiceRequest(log, volumeReq).Info("cloning volume") + vol, err := c.svc.CloneStorage(ctx, volumeReq, c.storageLabels...) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) } - return volResp, nil + log = log.WithField(logger.VolumeIDKey, vol.Storage.UUID).WithField("size", vol.Storage.Size) + if storageSizeGB > vol.Storage.Size { + log.WithField("new_size", storageSizeGB).Info("resizing volume") + // resize cloned storage and delete backup taken during resize operation as this is newly created storage + if vol, err = c.svc.ResizeStorage(ctx, vol.Storage.UUID, storageSizeGB, true); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + } + return vol, err } // DeleteVolume deletes storage via UpCloud Storage service. @@ -221,22 +210,10 @@ func (c *Controller) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequ // ControllerPublishVolume attaches storage to a node via UpCloud Storage service. func (c *Controller) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { //nolint: funlen // TODO: refactor - if req.VolumeId == "" { - return nil, status.Error(codes.InvalidArgument, "volume ID must be provided") - } - log := logger.WithServerContext(ctx, c.log).WithField(logger.VolumeIDKey, req.GetVolumeId()) - - if req.NodeId == "" { - return nil, status.Error(codes.InvalidArgument, "node ID must be provided") - } - log = log.WithField(logger.NodeIDKey, req.GetNodeId()) - - if req.VolumeCapability == nil { - return nil, status.Error(codes.InvalidArgument, "volume capability must be provided") - } - if req.GetReadonly() { - return nil, status.Error(codes.Unimplemented, "read only Volumes are not supported") + if err := validateControllerPublishVolumeRequest(req); err != nil { + return nil, err } + log := logger.WithServerContext(ctx, c.log).WithField(logger.VolumeIDKey, req.GetVolumeId()).WithField(logger.NodeIDKey, req.GetNodeId()) server, err := c.svc.GetServerByHostname(ctx, req.NodeId) if err != nil { @@ -544,17 +521,12 @@ func (c *Controller) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRe backups := make([]upcloud.Storage, 0) - if snapID := req.GetSnapshotId(); snapID != "" { //nolint: nestif // TODO: refactor + if snapID := req.GetSnapshotId(); snapID != "" { log = log.WithField("snapshot_id", snapID) log.Info("getting storage snapshots by ID") s, err := c.svc.GetStorageByUUID(ctx, snapID) if err != nil { - if errors.Is(err, service.ErrStorageNotFound) { - return &csi.ListSnapshotsResponse{ - Entries: make([]*csi.ListSnapshotsResponse_Entry, 0), - }, nil - } - return nil, status.Error(codes.Internal, err.Error()) + return listSnapshotsErrorResponse(err) } backups = append(backups, s.Storage) } else { @@ -750,3 +722,69 @@ func paginateStorage(s []upcloud.Storage, start, size int) ([]upcloud.Storage, i return s, next } + +func listSnapshotsErrorResponse(err error) (*csi.ListSnapshotsResponse, error) { + if errors.Is(err, service.ErrStorageNotFound) { + return &csi.ListSnapshotsResponse{ + Entries: make([]*csi.ListSnapshotsResponse_Entry, 0), + }, nil + } + return nil, status.Error(codes.Internal, err.Error()) +} + +func createVolumeRequestTier(r *csi.CreateVolumeRequest) (string, error) { + tierMapper := map[string]string{"maxiops": upcloud.StorageTierMaxIOPS, "hdd": upcloud.StorageTierHDD} + p, ok := r.Parameters["tier"] + if !ok { + // tier parameter is not required + return "", nil + } + tier, ok := tierMapper[p] + if ok { + return tier, nil + } + return "", status.Error(codes.InvalidArgument, fmt.Sprintf("storage tier '%s' not supported", tier)) +} + +func validateCreateVolumeRequest(r *csi.CreateVolumeRequest, zone string) error { + if r.GetName() == "" { + return status.Error(codes.InvalidArgument, "CreateVolume Name cannot be empty") + } + + if r.GetVolumeCapabilities() == nil || len(r.VolumeCapabilities) == 0 { + return status.Error(codes.InvalidArgument, "CreateVolume VolumeCapabilities cannot be empty") + } + + if violations := validateCapabilities(r.VolumeCapabilities); len(violations) > 0 { + return status.Error(codes.InvalidArgument, fmt.Sprintf("CreateVolume failed with the following violations: %s", strings.Join(violations, ", "))) + } + if r.GetAccessibilityRequirements() != nil { + for _, t := range r.AccessibilityRequirements.Requisite { + region, ok := t.Segments["region"] + if !ok { + continue // nothing to do + } + + if region != zone { + return status.Errorf(codes.ResourceExhausted, "volume can be only created in region: %q, got: %q", zone, region) + } + } + } + return nil +} + +func validateControllerPublishVolumeRequest(r *csi.ControllerPublishVolumeRequest) error { + if r.GetVolumeId() == "" { + return status.Error(codes.InvalidArgument, "volume ID must be provided") + } + if r.GetNodeId() == "" { + return status.Error(codes.InvalidArgument, "node ID must be provided") + } + if r.GetVolumeCapability() == nil { + return status.Error(codes.InvalidArgument, "volume capability must be provided") + } + if r.GetReadonly() { + return status.Error(codes.Unimplemented, "read only Volumes are not supported") + } + return nil +} diff --git a/internal/filesystem/linux_filesystem.go b/internal/filesystem/linux_filesystem.go index f7986f5..6625a9a 100644 --- a/internal/filesystem/linux_filesystem.go +++ b/internal/filesystem/linux_filesystem.go @@ -7,7 +7,6 @@ import ( "fmt" "os" "os/exec" - "path/filepath" "strings" "syscall" @@ -102,18 +101,11 @@ func (m *LinuxFilesystem) Mount(ctx context.Context, source, target, fsType stri } // block device requires that target is file instead of directory - if fsType == "" { //nolint: nestif // TODO: refactor - err := os.MkdirAll(filepath.Dir(target), 0o750) - if err != nil { - return err - } - f, err := os.OpenFile(target, os.O_CREATE, 0o660) + if fsType == "" { + err := createBlockDevice(target) if err != nil { return err } - if err := f.Close(); err != nil { - return err - } } else { mountArgs = append(mountArgs, "-t", fsType) // create target, os.Mkdirall is noop if it exists diff --git a/internal/filesystem/linux_utils.go b/internal/filesystem/linux_utils.go index f3aea24..4fffef0 100644 --- a/internal/filesystem/linux_utils.go +++ b/internal/filesystem/linux_utils.go @@ -84,3 +84,18 @@ func sfdiskOutputGetLastPartition(source, sfdiskOutput string) (string, error) { } return lastPartition, nil } + +func createBlockDevice(target string) error { + err := os.MkdirAll(filepath.Dir(target), 0o750) + if err != nil { + return err + } + f, err := os.OpenFile(target, os.O_CREATE, 0o660) + if err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + return nil +} diff --git a/internal/node/node.go b/internal/node/node.go index 2f21861..1734a62 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -154,24 +154,12 @@ func (n *Node) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolume } // NodePublishVolume mounts the volume mounted to the staging path to the target path. -func (n *Node) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { //nolint: funlen // TODO: refactor - if req.VolumeId == "" { - return nil, status.Error(codes.InvalidArgument, "volume ID must be provided") +func (n *Node) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { + if err := validateNodePublishVolumeRequest(req); err != nil { + return nil, err } log := logger.WithServerContext(ctx, n.log).WithField(logger.VolumeIDKey, req.GetVolumeId()) - if req.StagingTargetPath == "" { - return nil, status.Error(codes.InvalidArgument, "staging target path must be provided") - } - - if req.TargetPath == "" { - return nil, status.Error(codes.InvalidArgument, "target path must be provided") - } - - if req.VolumeCapability == nil { - return nil, status.Error(codes.InvalidArgument, "volume capability must be provided") - } - var err error source := req.GetStagingTargetPath() target := req.GetTargetPath() @@ -355,3 +343,19 @@ func (n *Node) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeSta func (n *Node) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method NodeExpandVolume not implemented") } + +func validateNodePublishVolumeRequest(r *csi.NodePublishVolumeRequest) error { + if r.GetVolumeId() == "" { + return status.Error(codes.InvalidArgument, "volume ID must be provided") + } + if r.GetStagingTargetPath() == "" { + return status.Error(codes.InvalidArgument, "staging target path must be provided") + } + if r.GetTargetPath() == "" { + return status.Error(codes.InvalidArgument, "target path must be provided") + } + if r.GetVolumeCapability() == nil { + return status.Error(codes.InvalidArgument, "volume capability must be provided") + } + return nil +}