Skip to content

Create a Ray Cluster SDK upgrade scenarios #440

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

require (
github.com/onsi/gomega v1.27.10
github.com/project-codeflare/codeflare-common v0.0.0-20231110155354-042fb171fcdb
github.com/project-codeflare/codeflare-common v0.0.0-20231129165224-988ba1da9069
github.com/project-codeflare/multi-cluster-app-dispatcher v1.37.0
github.com/ray-project/kuberay/ray-operator v1.0.0
k8s.io/api v0.26.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/project-codeflare/codeflare-common v0.0.0-20231110155354-042fb171fcdb h1:L2Gdr2SlvshDKZY2KK6507AwzQ1NSfRbMQuz5dOsYNM=
github.com/project-codeflare/codeflare-common v0.0.0-20231110155354-042fb171fcdb/go.mod h1:zdi2GCYJX+QyxFWyCLMoTme3NMz/aucWDJWMqKfigxk=
github.com/project-codeflare/codeflare-common v0.0.0-20231129165224-988ba1da9069 h1:81+ma1mchF/LtAGsf+poAt50kJ/fLYjoTAcZOxci1Yc=
github.com/project-codeflare/codeflare-common v0.0.0-20231129165224-988ba1da9069/go.mod h1:zdi2GCYJX+QyxFWyCLMoTme3NMz/aucWDJWMqKfigxk=
github.com/project-codeflare/multi-cluster-app-dispatcher v1.37.0 h1:oyhdLdc4BgA4zcH1zlRrSrYpzuVxV5QLDbyIXrwnQqs=
github.com/project-codeflare/multi-cluster-app-dispatcher v1.37.0/go.mod h1:Yge6GRNpO9YIDfeL+XOcCE9xbmfCTD5C1h5dlW87mxQ=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
Expand Down
40 changes: 3 additions & 37 deletions tests/e2e/mnist_raycluster_sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ limitations under the License.
package e2e

import (
"strings"
"testing"
"time"

. "github.com/onsi/gomega"
. "github.com/project-codeflare/codeflare-common/support"
Expand Down Expand Up @@ -137,7 +135,7 @@ func TestMNISTRayClusterSDK(t *testing.T) {
Command: []string{
"/bin/sh", "-c",
"while [ ! -f /codeflare-sdk/pyproject.toml ]; do sleep 1; done; " +
"cp /test/* . && chmod +x install-codeflare-sdk.sh && ./install-codeflare-sdk.sh && python mnist_raycluster_sdk.py " + namespace.Name,
"cp /test/* . && chmod +x install-codeflare-sdk.sh && ./install-codeflare-sdk.sh && python mnist_raycluster_sdk.py " + namespace.Name,
},
VolumeMounts: []corev1.VolumeMount{
{
Expand Down Expand Up @@ -194,40 +192,8 @@ func TestMNISTRayClusterSDK(t *testing.T) {
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created Job %s/%s successfully", job.Namespace, job.Name)

go func() {
// Checking if pod is found and running
podName := ""
foundPod := false
for !foundPod {
pods, _ := test.Client().Core().CoreV1().Pods(namespace.Name).List(test.Ctx(), metav1.ListOptions{
LabelSelector: "job-name=sdk",
})
for _, pod := range pods.Items {
if strings.HasPrefix(pod.Name, "sdk-") && pod.Status.Phase == corev1.PodRunning {
podName = pod.Name
foundPod = true
test.T().Logf("Pod is running!")
break
}
}
if !foundPod {
test.T().Logf("Waiting for pod to start...")
time.Sleep(5 * time.Second)
}
}

// Get rest config
restConfig, err := GetRestConfig(test); if err != nil {
test.T().Errorf("Error getting rest config: %v", err)
}

// Copy codeflare-sdk to the pod
srcDir := "../.././"
dstDir := "/codeflare-sdk"
if err := CopyToPod(test, namespace.Name, podName, restConfig, srcDir, dstDir); err != nil {
test.T().Errorf("Error copying codeflare-sdk to pod: %v", err)
}
}()
// Setup the codeflare-sdk inside the pod associated to the created job
SetupCodeflareSDKInsidePod(test, namespace, job.Name)

test.T().Logf("Waiting for Job %s/%s to complete", job.Namespace, job.Name)
test.Eventually(Job(test, job.Namespace, job.Name), TestTimeoutLong).Should(
Expand Down
46 changes: 46 additions & 0 deletions tests/e2e/mnist_rayjob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import sys

from time import sleep

from torchx.specs.api import AppState, is_terminal

from codeflare_sdk.cluster.cluster import get_cluster
from codeflare_sdk.job.jobs import DDPJobDefinition

namespace = sys.argv[1]

cluster = get_cluster("mnist", namespace)

cluster.details()

jobdef = DDPJobDefinition(
name="mnist",
script="mnist.py",
scheduler_args={"requirements": "requirements.txt"},
)
job = jobdef.submit(cluster)

done = False
time = 0
timeout = 900
while not done:
status = job.status()
if is_terminal(status.state):
break
if not done:
print(status)
if timeout and time >= timeout:
raise TimeoutError(f"job has timed out after waiting {timeout}s")
sleep(5)
time += 5

print(f"Job has completed: {status.state}")

print(job.logs())

cluster.down()

if not status.state == AppState.SUCCEEDED:
exit(1)
else:
exit(0)
52 changes: 52 additions & 0 deletions tests/e2e/start_ray_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import sys
import os

from time import sleep

from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration

namespace = sys.argv[1]
ray_image = os.getenv("RAY_IMAGE")
host = os.getenv("CLUSTER_HOSTNAME")

ingress_options = {}
if host is not None:
ingress_options = {
"ingresses": [
{
"ingressName": "ray-dashboard",
"port": 8265,
"pathType": "Prefix",
"path": "/",
"host": host,
},
]
}

cluster = Cluster(
ClusterConfiguration(
name="mnist",
namespace=namespace,
num_workers=1,
head_cpus="500m",
head_memory=2,
min_cpus="500m",
max_cpus=1,
min_memory=1,
max_memory=2,
num_gpus=0,
instascale=False,
image=ray_image,
ingress_options=ingress_options,
)
)

cluster.up()

cluster.status()

cluster.wait_ready()

cluster.status()

cluster.details()
50 changes: 48 additions & 2 deletions tests/e2e/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ import (
"embed"
"os"
"path/filepath"
"strings"
"time"

"github.com/onsi/gomega"
"github.com/project-codeflare/codeflare-common/support"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/cli-runtime/pkg/genericclioptions"
Expand All @@ -33,8 +38,6 @@ import (
"k8s.io/kubectl/pkg/cmd/cp"
"k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/scheme"

"github.com/project-codeflare/codeflare-common/support"
)

//go:embed *.py *.txt *.sh
Expand Down Expand Up @@ -109,3 +112,46 @@ func (r restClientGetter) ToDiscoveryClient() (discovery.CachedDiscoveryInterfac
func (r restClientGetter) ToRESTMapper() (meta.RESTMapper, error) {
return nil, nil
}

func SetupCodeflareSDKInsidePod(test support.Test, namespace *corev1.Namespace, labelName string) {

// Get pod name
podName := GetPodName(test, namespace, labelName)

// Get rest config
restConfig, err := GetRestConfig(test)
if err != nil {
test.T().Errorf("Error getting rest config: %v", err)
}

// Copy codeflare-sdk to the pod
srcDir := "../.././"
dstDir := "/codeflare-sdk"
if err := CopyToPod(test, namespace.Name, podName, restConfig, srcDir, dstDir); err != nil {
test.T().Errorf("Error copying codeflare-sdk to pod: %v", err)
}
}

func GetPodName(test support.Test, namespace *corev1.Namespace, labelName string) string {
podName := ""
foundPod := false
for !foundPod {
pods, _ := test.Client().Core().CoreV1().Pods(namespace.Name).List(test.Ctx(), metav1.ListOptions{
LabelSelector: "job-name=" + labelName,
})
for _, pod := range pods.Items {

if strings.HasPrefix(pod.Name, labelName+"-") && pod.Status.Phase == corev1.PodRunning {
podName = pod.Name
foundPod = true
test.T().Logf("Pod is running!")
break
}
}
if !foundPod {
test.T().Logf("Waiting for pod to start...")
time.Sleep(5 * time.Second)
}
}
return podName
}
Loading