Skip to content

Commit

Permalink
Merge pull request rook#14827 from jhoblitt/feature/parseAdditionalCo…
Browse files Browse the repository at this point in the history
…nfig

object: set obc user quota(s) in one SetUserQuota() call
  • Loading branch information
BlaineEXE authored Oct 18, 2024
2 parents be23171 + 9243764 commit 1c91774
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 77 deletions.
97 changes: 44 additions & 53 deletions pkg/operator/ceph/object/bucket/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/ceph/go-ceph/rgw/admin"
"github.com/coreos/pkg/capnslog"
"github.com/google/go-cmp/cmp"
bktv1alpha1 "github.com/kube-object-storage/lib-bucket-provisioner/pkg/apis/objectbucket.io/v1alpha1"
apibkt "github.com/kube-object-storage/lib-bucket-provisioner/pkg/provisioner/api"
opcontroller "github.com/rook/rook/pkg/operator/ceph/controller"
"github.com/rook/rook/pkg/operator/ceph/object"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/pkg/errors"
"github.com/rook/rook/pkg/clusterd"
Expand All @@ -56,6 +56,11 @@ type Provisioner struct {
adminOpsClient *admin.API
}

type additionalConfigSpec struct {
maxObjects *int64
maxSize *int64
}

var _ apibkt.Provisioner = &Provisioner{}

func NewProvisioner(context *clusterd.Context, clusterInfo *client.ClusterInfo) *Provisioner {
Expand Down Expand Up @@ -128,7 +133,7 @@ func (p Provisioner) Provision(options *apibkt.BucketOptions) (*bktv1alpha1.Obje

err = p.setAdditionalSettings(options)
if err != nil {
return nil, errors.Wrapf(err, "failed to set additional settings for OBC %q", options.ObjectBucketClaim.Name)
return nil, errors.Wrapf(err, "failed to set additional settings for OBC %q in NS %q associated with CephObjectStore %q in NS %q", options.ObjectBucketClaim.Name, options.ObjectBucketClaim.Namespace, p.objectStoreName, p.clusterInfo.Namespace)
}

return p.composeObjectBucket(), nil
Expand Down Expand Up @@ -218,7 +223,7 @@ func (p Provisioner) Grant(options *apibkt.BucketOptions) (*bktv1alpha1.ObjectBu
// setting quota limit if it is enabled
err = p.setAdditionalSettings(options)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "failed to set additional settings for OBC %q in NS %q associated with CephObjectStore %q in NS %q", options.ObjectBucketClaim.Name, options.ObjectBucketClaim.Namespace, p.objectStoreName, p.clusterInfo.Namespace)
}

// returned ob with connection info
Expand Down Expand Up @@ -568,75 +573,61 @@ func (p *Provisioner) populateDomainAndPort(sc *storagev1.StorageClass) error {

// Check for additional options mentioned in OBC and set them accordingly
func (p *Provisioner) setAdditionalSettings(options *apibkt.BucketOptions) error {
var maxObjectsInt64 int64 = -1
var maxSizeInt64 int64 = -1
var err error
var quotaEnabled bool

maxObjects := MaxObjectQuota(options.ObjectBucketClaim.Spec.AdditionalConfig)
if maxObjects != "" {
quotaEnabled = true

maxObjectsInt64, err = toInt64(maxObjects)
if err != nil {
return errors.Wrapf(err, "failed to parse maxObjects quota for user %q", p.cephUserName)
}
}

maxSize := MaxSizeQuota(options.ObjectBucketClaim.Spec.AdditionalConfig)
if maxSize != "" {
quotaEnabled = true

maxSizeInt64, err = toInt64(maxSize)
if err != nil {
return errors.Wrapf(err, "failed to parse maxSize quota for user %q", p.cephUserName)
}
additionalConfig, err := additionalConfigSpecFromMap(options.ObjectBucketClaim.Spec.AdditionalConfig)
if err != nil {
return errors.Wrap(err, "failed to process additionalConfig")
}

objectUser, err := p.adminOpsClient.GetUser(p.clusterInfo.Context, admin.User{ID: p.cephUserName})
liveQuota, err := p.adminOpsClient.GetUserQuota(p.clusterInfo.Context, admin.QuotaSpec{UID: p.cephUserName})
if err != nil {
return errors.Wrapf(err, "failed to fetch user %q", p.cephUserName)
}

// enable or disable quota for user
if *objectUser.UserQuota.Enabled != quotaEnabled {
err = p.adminOpsClient.SetUserQuota(p.clusterInfo.Context, admin.QuotaSpec{UID: p.cephUserName, Enabled: &quotaEnabled})
if err != nil {
return errors.Wrapf(err, "failed to set user %q quota enabled=%v for obc", p.cephUserName, quotaEnabled)
}
// Copy only the fields that are actively managed by the provisioner to
// prevent passing back undesirable combinations of fields. It is
// known to be problematic to set both MaxSize and MaxSizeKB.
currentQuota := admin.QuotaSpec{
Enabled: liveQuota.Enabled,
MaxObjects: liveQuota.MaxObjects,
MaxSize: liveQuota.MaxSize,
}
targetQuota := currentQuota

if !quotaEnabled {
// no need to process anything else if quotas are disabled
return nil
// enable or disable quota for user
quotaEnabled := (additionalConfig.maxObjects != nil) || (additionalConfig.maxSize != nil)

targetQuota.Enabled = &quotaEnabled

if additionalConfig.maxObjects != nil {
targetQuota.MaxObjects = additionalConfig.maxObjects
} else if currentQuota.MaxObjects != nil && *currentQuota.MaxObjects >= 0 {
// if the existing value is already negative, we don't want to change it
var objects int64 = -1
targetQuota.MaxObjects = &objects
}

if *objectUser.UserQuota.MaxObjects != maxObjectsInt64 {
err = p.adminOpsClient.SetUserQuota(p.clusterInfo.Context, admin.QuotaSpec{UID: p.cephUserName, MaxObjects: &maxObjectsInt64})
if err != nil {
return errors.Wrapf(err, "failed to set MaxObjects=%v to user %q", maxObjectsInt64, p.cephUserName)
}
if additionalConfig.maxSize != nil {
targetQuota.MaxSize = additionalConfig.maxSize
} else if currentQuota.MaxSize != nil && *currentQuota.MaxSize >= 0 {
// if the existing value is already negative, we don't want to change it
var size int64 = -1
targetQuota.MaxSize = &size
}

if objectUser.UserQuota.MaxSize != &maxSizeInt64 {
err = p.adminOpsClient.SetUserQuota(p.clusterInfo.Context, admin.QuotaSpec{UID: p.cephUserName, MaxSize: &maxSizeInt64})
diff := cmp.Diff(currentQuota, targetQuota)
if diff != "" {
logger.Debugf("Quota for user %q has changed. diff:%s", p.cephUserName, diff)
// UID is not set in the QuotaSpec returned by GetUser()/GetUserQuota()
targetQuota.UID = p.cephUserName
err = p.adminOpsClient.SetUserQuota(p.clusterInfo.Context, targetQuota)
if err != nil {
return errors.Wrapf(err, "failed to set MaxSize=%v to user %q", maxSizeInt64, p.cephUserName)
return errors.Wrapf(err, "failed to set user %q quota enabled=%v %+v", p.cephUserName, quotaEnabled, additionalConfig)
}
}

return nil
}

func toInt64(maxSize string) (int64, error) {
maxSizeInt, err := resource.ParseQuantity(maxSize)
if err != nil {
return 0, errors.Wrap(err, "failed to parse quantity")
}

return maxSizeInt.Value(), nil
}

func (p *Provisioner) setTlsCaCert() error {
objStore, err := p.getObjectStore()
if err != nil {
Expand Down
61 changes: 42 additions & 19 deletions pkg/operator/ceph/object/bucket/provisioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,29 +112,30 @@ func TestPopulateDomainAndPort(t *testing.T) {
assert.Equal(t, "rook-ceph-rgw-test-store.ns.svc", p.storeDomainName)
}

func TestMaxSizeToInt64(t *testing.T) {
type args struct {
maxSize string
}
func TestQuanityToInt64(t *testing.T) {
tests := []struct {
name string
args args
want int64
input string
want *int64
wantErr bool
}{
{"invalid size", args{maxSize: "foo"}, 0, true},
{"2gb size is invalid", args{maxSize: "2g"}, 0, true},
{"2G size is valid", args{maxSize: "2G"}, 2000000000, false},
{"foo is invalid", "foo", nil, true},
{"2gb size is invalid", "2g", nil, true},
{"2G size is valid", "2G", &(&struct{ i int64 }{2000000000}).i, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := toInt64(tt.args.maxSize)
got, err := quanityToInt64(tt.input)
if (err != nil) != tt.wantErr {
t.Errorf("maxSizeToInt64() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("quanityToInt64() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("maxSizeToInt64() = %v, want %v", got, tt.want)
if got != nil && tt.want != nil && *got != *tt.want {
t.Errorf("quanityToInt64() = %v, want %v", *got, *tt.want)
} else if got != nil && tt.want == nil {
t.Errorf("quanityToInt64() = %v, want %v", *got, tt.want)
} else if got == nil && tt.want != nil {
t.Errorf("quanityToInt64() = %v, want %v", got, *tt.want)
}
})
}
Expand Down Expand Up @@ -191,7 +192,7 @@ func TestProvisioner_setAdditionalSettings(t *testing.T) {
t.Run("quota should remain disabled", func(t *testing.T) {
putValsSeen := []string{}
p := newProvisioner(t,
`{"user_quota":{"enabled":false,"max_size":-1,"max_objects":-1}}`,
`{"enabled":false,"check_on_raw":false,"max_size":-1024,"max_size_kb":0,"max_objects":-1}`,
&putValsSeen,
)

Expand All @@ -209,7 +210,7 @@ func TestProvisioner_setAdditionalSettings(t *testing.T) {
t.Run("quota should be disabled", func(t *testing.T) {
putValsSeen := []string{}
p := newProvisioner(t,
`{"user_quota":{"enabled":true,"max_size":-1,"max_objects":2}}`,
`{"enabled":true,"check_on_raw":false,"max_size":-1024,"max_size_kb":0,"max_objects":2}`,
&putValsSeen,
)

Expand All @@ -230,7 +231,7 @@ func TestProvisioner_setAdditionalSettings(t *testing.T) {
t.Run("maxSize quota should be enabled", func(t *testing.T) {
putValsSeen := []string{}
p := newProvisioner(t,
`{"user_quota":{"enabled":false,"max_size":-1,"max_objects":-1}}`,
`{"enabled":false,"check_on_raw":false,"max_size":-1024,"max_size_kb":0,"max_objects":-1}`,
&putValsSeen,
)

Expand Down Expand Up @@ -258,7 +259,7 @@ func TestProvisioner_setAdditionalSettings(t *testing.T) {
t.Run("maxObjects quota should be enabled", func(t *testing.T) {
putValsSeen := []string{}
p := newProvisioner(t,
`{"user_quota":{"enabled":false,"max_size":-1,"max_objects":-1}}`,
`{"enabled":false,"check_on_raw":false,"max_size":-1024,"max_size_kb":0,"max_objects":-1}`,
&putValsSeen,
)

Expand Down Expand Up @@ -286,7 +287,7 @@ func TestProvisioner_setAdditionalSettings(t *testing.T) {
t.Run("maxObjects and maxSize quotas should be enabled", func(t *testing.T) {
putValsSeen := []string{}
p := newProvisioner(t,
`{"user_quota":{"enabled":false,"max_size":-1,"max_objects":-1}}`,
`{"enabled":false,"check_on_raw":false,"max_size":-1024,"max_size_kb":0,"max_objects":-1}`,
&putValsSeen,
)

Expand Down Expand Up @@ -318,7 +319,7 @@ func TestProvisioner_setAdditionalSettings(t *testing.T) {
t.Run("quotas are enabled and need updated enabled", func(t *testing.T) {
putValsSeen := []string{}
p := newProvisioner(t,
`{"user_quota":{"enabled":true,"max_size":1,"max_objects":1}}`,
`{"enabled":true,"check_on_raw":false,"max_size":1,"max_size_kb":0,"max_objects":1}`,
&putValsSeen,
)

Expand Down Expand Up @@ -346,6 +347,28 @@ func TestProvisioner_setAdditionalSettings(t *testing.T) {
})
}

func TestProvisioner_additionalConfigSpecFromMap(t *testing.T) {
t.Run("does not fail on empty map", func(t *testing.T) {
spec, err := additionalConfigSpecFromMap(map[string]string{})
assert.NoError(t, err)
assert.Equal(t, additionalConfigSpec{}, *spec)
})

t.Run("maxObjects field should be set", func(t *testing.T) {
spec, err := additionalConfigSpecFromMap(map[string]string{"maxObjects": "2"})
assert.NoError(t, err)
var i int64 = 2
assert.Equal(t, additionalConfigSpec{maxObjects: &i}, *spec)
})

t.Run("maxSize field should be set", func(t *testing.T) {
spec, err := additionalConfigSpecFromMap(map[string]string{"maxSize": "3"})
assert.NoError(t, err)
var i int64 = 3
assert.Equal(t, additionalConfigSpec{maxSize: &i}, *spec)
})
}

func numberOfPutsWithValue(substr string, strs []string) int {
count := 0
for _, s := range strs {
Expand Down
35 changes: 30 additions & 5 deletions pkg/operator/ceph/object/bucket/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
cephObject "github.com/rook/rook/pkg/operator/ceph/object"
storagev1 "k8s.io/api/storage/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -87,12 +88,25 @@ func (p *Provisioner) getObjectStore() (*cephv1.CephObjectStore, error) {
return store, err
}

func MaxObjectQuota(AdditionalConfig map[string]string) string {
return AdditionalConfig["maxObjects"]
}
func additionalConfigSpecFromMap(config map[string]string) (*additionalConfigSpec, error) {
var err error
spec := additionalConfigSpec{}

if _, ok := config["maxObjects"]; ok {
spec.maxObjects, err = quanityToInt64(config["maxObjects"])
if err != nil {
return nil, errors.Wrapf(err, "failed to parse maxObjects quota")
}
}

if _, ok := config["maxSize"]; ok {
spec.maxSize, err = quanityToInt64(config["maxSize"])
if err != nil {
return nil, errors.Wrapf(err, "failed to parse maxSize quota")
}
}

func MaxSizeQuota(AdditionalConfig map[string]string) string {
return AdditionalConfig["maxSize"]
return &spec, nil
}

func GetObjectStoreNameFromBucket(ob *bktv1alpha1.ObjectBucket) (types.NamespacedName, error) {
Expand Down Expand Up @@ -128,3 +142,14 @@ func getNSNameFromAdditionalState(state map[string]string) (types.NamespacedName
}
return types.NamespacedName{Name: name, Namespace: namespace}, nil
}

func quanityToInt64(qty string) (*int64, error) {
n, err := resource.ParseQuantity(qty)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse %q as a quantity", qty)
}

value := n.Value()

return &value, nil
}

0 comments on commit 1c91774

Please sign in to comment.