Skip to content

Commit

Permalink
Add Nebius AI Cloud support (#443)
Browse files Browse the repository at this point in the history
* Add Nebius AI Cloud support

* format

* Add Nebius documentation to Dask Cloudprovider

Introduce new documentation for Nebius integration, detailing authentication setup and configuration options. This includes updates to the index file and a new Nebius-specific guide covering cluster setup and Dask-related settings.

* Add installation instruction for Nebius provider

Updated the installation guide to include the Nebius provider option for `dask-cloudprovider`. This ensures users are aware of all supported cloud platforms.

---------

Co-authored-by: Alexander Salikov <[email protected]>
  • Loading branch information
SalikovAlex and SalikovAlex authored Jan 23, 2025
1 parent 5031aa7 commit 6d1614f
Show file tree
Hide file tree
Showing 8 changed files with 428 additions and 1 deletion.
10 changes: 10 additions & 0 deletions dask_cloudprovider/cloudprovider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,13 @@ cloudprovider:
external_network_id: null # The ID of the external network used for assigning floating IPs. List available external networks using: `openstack network list --external`
create_floating_ip: false # Specifies whether to assign a floating IP to each instance, enabling external access. Set to `True` if external connectivity is needed.
docker_image: "daskdev/dask:latest" # docker image to use

nebius:
token: null # iam token for interacting with the Nebius AI Cloud
project_id: null # You can find it in Nebius AI Cloud console
bootstrap: true # It is assumed that the OS image does not have Docker and needs bootstrapping. Set this to false if using a custom image with Docker already installed.
image_family: "ubuntu22.04-driverless" # it should be "ubuntu22.04-driverless" or "ubuntu22.04-cuda12" https://docs.nebius.com/compute/storage/manage#parameters-boot
docker_image: "daskdev/dask:latest" # docker image to use
server_platform: "cpu-d3" # all platforms https://docs.nebius.com/compute/virtual-machines/types
server_preset: "4vcpu-16gb" # all presets https://docs.nebius.com/compute/virtual-machines/types
disk_size: 64 # Specifies the size of the VM host OS disk in gigabytes.
1 change: 1 addition & 0 deletions dask_cloudprovider/nebius/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .instances import NebiusCluster
287 changes: 287 additions & 0 deletions dask_cloudprovider/nebius/instances.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
import dask

from dask_cloudprovider.generic.vmcluster import (
VMCluster,
VMInterface,
SchedulerMixin,
WorkerMixin,
)

try:
from nebius.api.nebius.common.v1 import ResourceMetadata
from nebius.api.nebius.vpc.v1 import SubnetServiceClient, ListSubnetsRequest
from nebius.sdk import SDK
from nebius.api.nebius.compute.v1 import (
InstanceServiceClient,
CreateInstanceRequest,
DiskServiceClient,
CreateDiskRequest,
DiskSpec,
SourceImageFamily,
InstanceSpec,
AttachedDiskSpec,
ExistingDisk,
ResourcesSpec,
NetworkInterfaceSpec,
IPAddress,
PublicIPAddress,
GetInstanceRequest,
DeleteInstanceRequest,
DeleteDiskRequest,
)
except ImportError as e:
msg = (
"Dask Cloud Provider Nebius requirements are not installed.\n\n"
"Please pip install as follows:\n\n"
' pip install "dask-cloudprovider[nebius]" --upgrade # or python -m pip install'
)
raise ImportError(msg) from e


class NebiusInstance(VMInterface):
def __init__(
self,
cluster: str,
config,
env_vars: dict = None,
bootstrap=None,
extra_bootstrap=None,
docker_image: str = None,
image_family: str = None,
project_id: str = None,
server_platform: str = None,
server_preset: str = None,
disk_size: int = None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.cluster = cluster
self.config = config
self.extra_bootstrap = extra_bootstrap
self.env_vars = env_vars
self.bootstrap = bootstrap
self.image_family = image_family
self.project_id = project_id
self.docker_image = docker_image
self.server_platform = server_platform
self.server_preset = server_preset
self.sdk = SDK(credentials=self.config.get("token"))
self.disk_size = disk_size
self.instance_id = None
self.disk_id = None

async def create_vm(self, user_data=None):
service = DiskServiceClient(self.sdk)
operation = await service.create(
CreateDiskRequest(
metadata=ResourceMetadata(
parent_id=self.project_id,
name=self.name + "-disk",
),
spec=DiskSpec(
source_image_family=SourceImageFamily(
image_family=self.image_family
),
size_gibibytes=self.disk_size,
type=DiskSpec.DiskType.NETWORK_SSD,
),
)
)
await operation.wait()
self.disk_id = operation.resource_id

service = SubnetServiceClient(self.sdk)
sub_net = await service.list(ListSubnetsRequest(parent_id=self.project_id))
subnet_id = sub_net.items[0].metadata.id

service = InstanceServiceClient(self.sdk)
operation = await service.create(
CreateInstanceRequest(
metadata=ResourceMetadata(
parent_id=self.project_id,
name=self.name,
),
spec=InstanceSpec(
boot_disk=AttachedDiskSpec(
attach_mode=AttachedDiskSpec.AttachMode(2),
existing_disk=ExistingDisk(id=self.disk_id),
),
cloud_init_user_data=self.cluster.render_process_cloud_init(self),
resources=ResourcesSpec(
platform=self.server_platform, preset=self.server_preset
),
network_interfaces=[
NetworkInterfaceSpec(
subnet_id=subnet_id,
ip_address=IPAddress(),
name="network-interface-0",
public_ip_address=PublicIPAddress(),
)
],
),
)
)
self.instance_id = operation.resource_id

self.cluster._log(f"Creating Nebius instance {self.name}")
await operation.wait()
service = InstanceServiceClient(self.sdk)
operation = await service.get(
GetInstanceRequest(
id=self.instance_id,
)
)
internal_ip = operation.status.network_interfaces[0].ip_address.address.split(
"/"
)[0]
external_ip = operation.status.network_interfaces[
0
].public_ip_address.address.split("/")[0]
self.cluster._log(
f"Created Nebius instance {self.name} with internal IP {internal_ip} and external IP {external_ip}"
)
return internal_ip, external_ip

async def destroy_vm(self):
if self.instance_id:
service = InstanceServiceClient(self.sdk)
operation = await service.delete(
DeleteInstanceRequest(
id=self.instance_id,
)
)
await operation.wait()

if self.disk_id:
service = DiskServiceClient(self.sdk)
await service.delete(
DeleteDiskRequest(
id=self.disk_id,
)
)
self.cluster._log(
f"Terminated instance {self.name} ({self.instance_id}) and deleted disk {self.disk_id}"
)
self.instance_id = None
self.disk_id = None


class NebiusScheduler(SchedulerMixin, NebiusInstance):
"""Scheduler running on a Nebius server."""


class NebiusWorker(WorkerMixin, NebiusInstance):
"""Worker running on a Nebius server."""


class NebiusCluster(VMCluster):
"""Cluster running on Nebius AI Cloud instances.
VMs in Nebius AI Cloud are referred to as instances. This cluster manager constructs a Dask cluster
running on VMs.
When configuring your cluster you may find it useful to install the ``nebius`` tool for querying the
Nebius API for available options.
https://docs.nebius.com/cli/quickstart
Parameters
----------
image_family: str
The image to use for the host OS. This should be a Ubuntu variant.
You find list available images here https://docs.nebius.com/compute/storage/manage#parameters-boot.
project_id: str
The Nebius AI Cloud project id. You can find in Nebius AI Cloud console.
server_platform: str
List of all platforms and presets here https://docs.nebius.com/compute/virtual-machines/types/.
server_preset: str
List of all platforms and presets here https://docs.nebius.com/compute/virtual-machines/types/.
n_workers: int
Number of workers to initialise the cluster with. Defaults to ``0``.
worker_module: str
The Python module to run for the worker. Defaults to ``distributed.cli.dask_worker``
worker_options: dict
Params to be passed to the worker class.
See :class:`distributed.worker.Worker` for default worker class.
If you set ``worker_module`` then refer to the docstring for the custom worker class.
scheduler_options: dict
Params to be passed to the scheduler class.
See :class:`distributed.scheduler.Scheduler`.
env_vars: dict
Environment variables to be passed to the worker.
extra_bootstrap: list[str] (optional)
Extra commands to be run during the bootstrap phase.
Example
--------
>>> from dask_cloudprovider.nebius import NebiusCluster
>>> cluster = NebiusCluster(n_workers=1)
>>> from dask.distributed import Client
>>> client = Client(cluster)
>>> import dask.array as da
>>> arr = da.random.random((1000, 1000), chunks=(100, 100))
>>> arr.mean().compute()
>>> client.close()
>>> cluster.close()
"""

def __init__(
self,
bootstrap: str = None,
image_family: str = None,
project_id: str = None,
disk_size: int = None,
server_platform: str = None,
server_preset: str = None,
docker_image: str = None,
debug: bool = False,
**kwargs,
):
self.config = dask.config.get("cloudprovider.nebius", {})

self.scheduler_class = NebiusScheduler
self.worker_class = NebiusWorker

self.image_family = dask.config.get(
"cloudprovider.nebius.image_family", override_with=image_family
)
self.docker_image = dask.config.get(
"cloudprovider.nebius.docker_image", override_with=docker_image
)
self.project_id = dask.config.get(
"cloudprovider.nebius.project_id", override_with=project_id
)
self.server_platform = dask.config.get(
"cloudprovider.nebius.server_platform", override_with=server_platform
)
self.server_preset = dask.config.get(
"cloudprovider.nebius.server_preset", override_with=server_preset
)
self.bootstrap = dask.config.get(
"cloudprovider.nebius.bootstrap", override_with=bootstrap
)
self.disk_size = dask.config.get(
"cloudprovider.nebius.disk_size", override_with=disk_size
)
self.debug = debug

self.options = {
"bootstrap": self.bootstrap,
"cluster": self,
"config": self.config,
"docker_image": self.docker_image,
"image_family": self.image_family,
"project_id": self.project_id,
"server_platform": self.server_platform,
"server_preset": self.server_preset,
"disk_size": self.disk_size,
}
self.scheduler_options = {**self.options}
self.worker_options = {**self.options}
super().__init__(debug=debug, **kwargs)
77 changes: 77 additions & 0 deletions dask_cloudprovider/nebius/tests/test_nebius.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import pytest

import dask

nebius = pytest.importorskip("nebius")

from dask_cloudprovider.nebius.instances import NebiusCluster
from dask.distributed import Client
from distributed.core import Status


async def skip_without_credentials(config):
if config.get("token") is None or config.get("project_id") is None:
pytest.skip(
"""
You must configure a Nebius AI Cloud API token to run this test.
Either set this in your config
# cloudprovider.yaml
cloudprovider:
nebius:
token: "yourtoken"
project_id: "yourprojectid"
Or by setting it as an environment variable
export DASK_CLOUDPROVIDER__NEBIUS__TOKEN=$(nebius iam get-access-token)
export DASK_CLOUDPROVIDER__NEBIUS__PROJECT_ID=project_id
"""
)


@pytest.fixture
async def config():
return dask.config.get("cloudprovider.nebius", {})


@pytest.fixture
@pytest.mark.external
async def cluster(config):
await skip_without_credentials(config)
async with NebiusCluster(asynchronous=True, debug=True) as cluster:
yield cluster


@pytest.mark.asyncio
@pytest.mark.external
async def test_init():
cluster = NebiusCluster(asynchronous=True, debug=True)
assert cluster.status == Status.created


@pytest.mark.asyncio
@pytest.mark.external
async def test_create_cluster(cluster):
assert cluster.status == Status.running

cluster.scale(1)
await cluster
assert len(cluster.workers) == 1

async with Client(cluster, asynchronous=True) as client:

def inc(x):
return x + 1

assert await client.submit(inc, 10).result() == 11


@pytest.mark.asyncio
async def test_get_cloud_init():
cloud_init = NebiusCluster.get_cloud_init(
docker_args="--privileged",
)
assert " --privileged " in cloud_init
1 change: 1 addition & 0 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ It can be used on any cluster that has workers running on Azure VMs, not just on
hetzner.rst
ibm.rst
openstack.rst
nebius.rst

.. toctree::
:maxdepth: 2
Expand Down
Loading

0 comments on commit 6d1614f

Please sign in to comment.