Skip to content

Commit

Permalink
[Refactor][Kubectl-plugin] Replace dynamic client with Ray client (#2703
Browse files Browse the repository at this point in the history
)
  • Loading branch information
MortalHappiness authored Jan 9, 2025
1 parent e959950 commit 9a48b82
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 352 deletions.
2 changes: 1 addition & 1 deletion kubectl-plugin/cmd/kubectl-ray.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package main
import (
"os"

cmd "github.com/ray-project/kuberay/kubectl-plugin/pkg/cmd"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/cmd"
flag "github.com/spf13/pflag"
"k8s.io/cli-runtime/pkg/genericiooptions"
)
Expand Down
4 changes: 3 additions & 1 deletion kubectl-plugin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/onsi/ginkgo/v2 v2.20.2
github.com/onsi/gomega v1.34.2
github.com/ray-project/kuberay/ray-operator v1.2.2
github.com/ray-project/kuberay/ray-operator v0.0.0
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
Expand Down Expand Up @@ -103,3 +103,5 @@ require (
sigs.k8s.io/kustomize/kyaml v0.17.2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
)

replace github.com/ray-project/kuberay/ray-operator => ../ray-operator
2 changes: 0 additions & 2 deletions kubectl-plugin/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 22 additions & 28 deletions kubectl-plugin/pkg/cmd/get/get_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import (
"io"
"time"

"github.com/ray-project/kuberay/kubectl-plugin/pkg/util/client"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util/completion"
"github.com/spf13/cobra"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/duration"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/printers"
cmdutil "k8s.io/kubectl/pkg/cmd/util"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
)

type GetClusterOptions struct {
Expand Down Expand Up @@ -49,7 +50,11 @@ func NewGetClusterCommand(streams genericclioptions.IOStreams) *cobra.Command {
return err
}
// running cmd.Execute or cmd.ExecuteE sets the context, which will be done by root
return options.Run(cmd.Context(), cmdFactory)
k8sClient, err := client.NewClient(cmdFactory)
if err != nil {
return fmt.Errorf("failed to create client: %w", err)
}
return options.Run(cmd.Context(), k8sClient)
},
}
cmd.Flags().BoolVarP(&options.AllNamespaces, "all-namespaces", "A", options.AllNamespaces, "If present, list the requested clusters across all namespaces. Namespace in current context is ignored even if specified with --namespace.")
Expand Down Expand Up @@ -81,20 +86,9 @@ func (options *GetClusterOptions) Validate() error {
return nil
}

func (options *GetClusterOptions) Run(ctx context.Context, factory cmdutil.Factory) error {
// Retrieves the dynamic client with factory.
dynamicClient, err := factory.DynamicClient()
if err != nil {
return fmt.Errorf("dynamic client failed to initialize: %w", err)
}

rayResourceSchema := schema.GroupVersionResource{
Group: "ray.io",
Version: "v1",
Resource: "rayclusters",
}

var rayclustersList *unstructured.UnstructuredList
func (options *GetClusterOptions) Run(ctx context.Context, k8sClient client.Client) error {
var err error
var rayclusterList *rayv1.RayClusterList

listopts := v1.ListOptions{}
if len(options.args) == 1 {
Expand All @@ -104,21 +98,21 @@ func (options *GetClusterOptions) Run(ctx context.Context, factory cmdutil.Facto
}

if options.AllNamespaces {
rayclustersList, err = dynamicClient.Resource(rayResourceSchema).List(ctx, listopts)
rayclusterList, err = k8sClient.RayClient().RayV1().RayClusters("").List(ctx, listopts)
if err != nil {
return fmt.Errorf("unable to retrieve raycluster for all namespaces: %w", err)
}
} else {
rayclustersList, err = dynamicClient.Resource(rayResourceSchema).Namespace(*options.configFlags.Namespace).List(ctx, listopts)
rayclusterList, err = k8sClient.RayClient().RayV1().RayClusters(*options.configFlags.Namespace).List(ctx, listopts)
if err != nil {
return fmt.Errorf("unable to retrieve raycluster for namespace %s: %w", *options.configFlags.Namespace, err)
}
}

return printClusters(rayclustersList, options.ioStreams.Out)
return printClusters(rayclusterList, options.ioStreams.Out)
}

func printClusters(rayclustersList *unstructured.UnstructuredList, output io.Writer) error {
func printClusters(rayclusterList *rayv1.RayClusterList, output io.Writer) error {
resultTablePrinter := printers.NewTablePrinter(printers.PrintOptions{})

resTable := &v1.Table{
Expand All @@ -135,7 +129,7 @@ func printClusters(rayclustersList *unstructured.UnstructuredList, output io.Wri
},
}

for _, raycluster := range rayclustersList.Items {
for _, raycluster := range rayclusterList.Items {
age := duration.HumanDuration(time.Since(raycluster.GetCreationTimestamp().Time))
if raycluster.GetCreationTimestamp().Time.IsZero() {
age = "<unknown>"
Expand All @@ -144,12 +138,12 @@ func printClusters(rayclustersList *unstructured.UnstructuredList, output io.Wri
Cells: []interface{}{
raycluster.GetName(),
raycluster.GetNamespace(),
raycluster.Object["status"].(map[string]interface{})["desiredWorkerReplicas"],
raycluster.Object["status"].(map[string]interface{})["availableWorkerReplicas"],
raycluster.Object["status"].(map[string]interface{})["desiredCPU"],
raycluster.Object["status"].(map[string]interface{})["desiredGPU"],
raycluster.Object["status"].(map[string]interface{})["desiredTPU"],
raycluster.Object["status"].(map[string]interface{})["desiredMemory"],
raycluster.Status.DesiredWorkerReplicas,
raycluster.Status.AvailableWorkerReplicas,
raycluster.Status.DesiredCPU.String(),
raycluster.Status.DesiredGPU.String(),
raycluster.Status.DesiredTPU.String(),
raycluster.Status.DesiredMemory.String(),
age,
},
})
Expand Down
55 changes: 27 additions & 28 deletions kubectl-plugin/pkg/cmd/get/get_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/resource"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/printers"
fakedynamic "k8s.io/client-go/dynamic/fake"
kubefake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
cmdtesting "k8s.io/kubectl/pkg/cmd/testing"

"github.com/ray-project/kuberay/kubectl-plugin/pkg/util/client"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
rayClientFake "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/fake"
)

// This is to test Complete() and ensure that it is setting the namespace and arguments correctly
Expand Down Expand Up @@ -129,9 +133,10 @@ func TestRayClusterGetValidate(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
err := tc.opts.Validate()
if tc.expectError != "" {
assert.Error(t, err)
assert.Equal(t, tc.expectError, err.Error())
} else {
assert.Nil(t, err)
assert.NoError(t, err)
}
})
}
Expand All @@ -146,29 +151,26 @@ func TestRayClusterGetRun(t *testing.T) {

fakeClusterGetOptions := NewGetClusterOptions(testStreams)

// Create fake ray cluster unstructured object for fake rest response
raycluster := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "ray.io/v1",
"kind": "RayCluster",
"name": "raycluster-kuberay",
"namespace": "test",
"metadata": map[string]interface{}{
"name": "raycluster-kuberay",
"namespace": "test",
},
"status": map[string]interface{}{
"desiredWorkerReplicas": "2",
"availableWorkerReplicas": "2",
"desiredCPU": "6",
"desiredGPU": "1",
"desiredTPU": "1",
"desiredMemory": "24Gi",
"state": "ready",
},
rayCluster := &rayv1.RayCluster{
ObjectMeta: v1.ObjectMeta{
Name: "raycluster-kuberay",
Namespace: "test",
},
Status: rayv1.RayClusterStatus{
DesiredWorkerReplicas: 2,
AvailableWorkerReplicas: 2,
DesiredCPU: resource.MustParse("6"),
DesiredGPU: resource.MustParse("1"),
DesiredTPU: resource.MustParse("1"),
DesiredMemory: resource.MustParse("24Gi"),
State: rayv1.Ready,
},
}

kubeClientSet := kubefake.NewClientset()
rayClient := rayClientFake.NewSimpleClientset(rayCluster)
k8sClients := client.NewClientForTesting(kubeClientSet, rayClient)

// Initialize the printer with an empty print options since we are setting the column definition later
expectedTestResultTable := printers.NewTablePrinter(printers.PrintOptions{})

Expand Down Expand Up @@ -206,10 +208,7 @@ func TestRayClusterGetRun(t *testing.T) {
err := expectedTestResultTable.PrintObj(testResTable, &resbuffer)
assert.Nil(t, err)

// Create fake dynmaic with the rayscluster
tf.FakeDynamicClient = fakedynamic.NewSimpleDynamicClient(runtime.NewScheme(), raycluster)

err = fakeClusterGetOptions.Run(context.Background(), tf)
err = fakeClusterGetOptions.Run(context.Background(), k8sClients)
assert.Nil(t, err)

if e, a := resbuffer.String(), resBuf.String(); e != a {
Expand Down
Loading

0 comments on commit 9a48b82

Please sign in to comment.