From dc7d65119394d9dc544ad94ea159b2946651264e Mon Sep 17 00:00:00 2001 From: Rewant Soni Date: Thu, 7 Nov 2024 16:33:13 +0530 Subject: [PATCH] implemenent the server side rpc call for peering Signed-off-by: Rewant Soni --- .../ocs-operator/manifests/provider-role.yaml | 7 +++ rbac/provider-role.yaml | 7 +++ services/provider/server/server.go | 58 +++++++++++++++---- .../provider/server/storageclusterpeer.go | 39 +++++++++++++ 4 files changed, 101 insertions(+), 10 deletions(-) create mode 100644 services/provider/server/storageclusterpeer.go diff --git a/deploy/ocs-operator/manifests/provider-role.yaml b/deploy/ocs-operator/manifests/provider-role.yaml index dd1c692975..42259a455d 100644 --- a/deploy/ocs-operator/manifests/provider-role.yaml +++ b/deploy/ocs-operator/manifests/provider-role.yaml @@ -75,3 +75,10 @@ rules: verbs: - get - list + - apiGroups: + - ocs.openshift.io + resources: + - storageclusterpeers + verbs: + - get + - list diff --git a/rbac/provider-role.yaml b/rbac/provider-role.yaml index dd1c692975..42259a455d 100644 --- a/rbac/provider-role.yaml +++ b/rbac/provider-role.yaml @@ -75,3 +75,10 @@ rules: verbs: - get - list + - apiGroups: + - ocs.openshift.io + resources: + - storageclusterpeers + verbs: + - get + - list diff --git a/services/provider/server/server.go b/services/provider/server/server.go index 1e7f82ae4a..da4935b0fa 100644 --- a/services/provider/server/server.go +++ b/services/provider/server/server.go @@ -66,10 +66,11 @@ const ( type OCSProviderServer struct { pb.UnimplementedOCSProviderServer - client client.Client - consumerManager *ocsConsumerManager - storageRequestManager *storageRequestManager - namespace string + client client.Client + consumerManager *ocsConsumerManager + storageRequestManager *storageRequestManager + storageClusterPeerManager *storageClusterPeerManager + namespace string } func NewOCSProviderServer(ctx context.Context, namespace string) (*OCSProviderServer, error) { @@ -93,11 +94,17 @@ func NewOCSProviderServer(ctx context.Context, namespace string) (*OCSProviderSe return nil, fmt.Errorf("failed to create new StorageRequest instance. %v", err) } + storageClusterPeerManager, err := newStorageClusterPeerManager(client, namespace) + if err != nil { + return nil, fmt.Errorf("failed to create new StorageClusterPeer instance. %v", err) + } + return &OCSProviderServer{ - client: client, - consumerManager: consumerManager, - storageRequestManager: storageRequestManager, - namespace: namespace, + client: client, + consumerManager: consumerManager, + storageRequestManager: storageRequestManager, + storageClusterPeerManager: storageClusterPeerManager, + namespace: namespace, }, nil } @@ -141,7 +148,7 @@ func (s *OCSProviderServer) OnboardConsumer(ctx context.Context, req *pb.Onboard storageQuotaInGiB := ptr.Deref(onboardingTicket.StorageQuotaInGiB, 0) if onboardingTicket.SubjectRole != services.ClientRole { - err := fmt.Errorf("unsupported ticket role for consumer %q, found %s, expected %s", req.ConsumerName, onboardingTicket.SubjectRole, services.ClientRole) + err := fmt.Errorf("invalid onboarding ticket for %q, expecting role %s found role %s", req.ConsumerName, services.ClientRole, onboardingTicket.SubjectRole) klog.Error(err) return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -925,6 +932,37 @@ func extractMonitorIps(data string) ([]string, error) { return ips, nil } -func (s *OCSProviderServer) PeerStorageCluster(_ context.Context, _ *pb.PeerStorageClusterRequest) (*pb.PeerStorageClusterResponse, error) { +func (s *OCSProviderServer) PeerStorageCluster(ctx context.Context, req *pb.PeerStorageClusterRequest) (*pb.PeerStorageClusterResponse, error) { + + pubKey, err := s.getOnboardingValidationKey(ctx) + if err != nil { + klog.Errorf("failed to get public key to validate peer onboarding ticket %v", err) + return nil, status.Errorf(codes.Internal, "failed to validate peer onboarding ticket") + } + + onboardingToken, err := decodeAndValidateTicket(req.OnboardingToken, pubKey) + if err != nil { + klog.Errorf("Invalid onboarding token. %v", err) + return nil, status.Errorf(codes.InvalidArgument, "onboarding ticket is invalid") + } + + if onboardingToken.SubjectRole != services.PeerRole { + err := fmt.Errorf("invalid onboarding ticket for %q, expecting role %s found role %s", req.StorageClusterUID, services.PeerRole, onboardingToken.SubjectRole) + klog.Error(err) + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + storageClusterPeer, err := s.storageClusterPeerManager.GetByPeerStorageClusterUID(ctx, types.UID(req.StorageClusterUID)) + if err != nil { + klog.Error(err) + return nil, status.Errorf(codes.NotFound, "Cannot find a storage cluster peer that meets all criteria") + } + + klog.Infof("Found StorageClusterPeer %s for PeerStorageCluster", storageClusterPeer.Name) + + if storageClusterPeer.Status.State != ocsv1.StorageClusterPeerStatePending && storageClusterPeer.Status.State != ocsv1.StorageClusterPeerStatePeered { + return nil, status.Errorf(codes.NotFound, "Cannot find a storage cluster peer that meets all criteria") + } + return &pb.PeerStorageClusterResponse{}, nil } diff --git a/services/provider/server/storageclusterpeer.go b/services/provider/server/storageclusterpeer.go new file mode 100644 index 0000000000..fb933f816d --- /dev/null +++ b/services/provider/server/storageclusterpeer.go @@ -0,0 +1,39 @@ +package server + +import ( + "context" + "fmt" + ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + "k8s.io/apimachinery/pkg/types" +) + +type storageClusterPeerManager struct { + client client.Client + namespace string +} + +func newStorageClusterPeerManager(cl client.Client, namespace string) (*storageClusterPeerManager, error) { + return &storageClusterPeerManager{ + client: cl, + namespace: namespace, + }, nil +} + +func (s *storageClusterPeerManager) GetByPeerStorageClusterUID(ctx context.Context, storageClusterUID types.UID) (*ocsv1.StorageClusterPeer, error) { + storageClusterPeerList := &ocsv1.StorageClusterPeerList{} + err := s.client.List(ctx, storageClusterPeerList, client.InNamespace(s.namespace)) + if err != nil { + return nil, fmt.Errorf("failed to list StorageClusterPeer: %v", err) + } + for i := range storageClusterPeerList.Items { + storageClusterPeer := &storageClusterPeerList.Items[i] + if storageClusterPeer.Status.PeerInfo != nil { + if storageClusterUID == types.UID(storageClusterPeer.Status.PeerInfo.StorageClusterUid) { + return storageClusterPeer, nil + } + } + } + return nil, fmt.Errorf("StorageClusterPeer linked to StorageCluster with uid %q not found", storageClusterUID) +}