Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions release/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,18 @@ py_library(
visibility = ["//visibility:private"],
)

py_library(
name = "cloud_util",
srcs = ["ray_release/cloud_util.py"],
imports = ["."],
visibility = ["//visibility:private"],
deps = [
":logger",
bk_require("azure-storage-blob"),
bk_require("azure-identity"),
],
)

py_library(
name = "ray_release",
srcs = glob(
Expand All @@ -390,6 +402,7 @@ py_library(
"ray_release/kuberay_util.py",
"ray_release/wheels.py",
"ray_release/retry.py",
"ray_release/cloud_util.py",
],
),
data = glob([
Expand All @@ -407,6 +420,7 @@ py_library(
visibility = ["//visibility:public"],
deps = [
":bazel",
":cloud_util",
":exception",
":global_config",
":kuberay_util",
Expand Down Expand Up @@ -843,15 +857,15 @@ py_test(
)

py_test(
name = "test_util",
srcs = ["ray_release/tests/test_util.py"],
name = "test_cloud_util",
srcs = ["ray_release/tests/test_cloud_util.py"],
exec_compatible_with = ["//:hermetic_python"],
tags = [
"release_unit",
"team:ci",
],
deps = [
":ray_release",
":cloud_util",
bk_require("pytest"),
],
)
Expand Down
102 changes: 102 additions & 0 deletions release/ray_release/cloud_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import random
import string
from typing import Optional, Tuple
import time
import os
import shutil
from urllib.parse import urlparse

from azure.storage.blob import BlobServiceClient
from azure.identity import DefaultAzureCredential

from ray_release.logger import logger


def generate_tmp_cloud_storage_path() -> str:
return "".join(random.choice(string.ascii_lowercase) for i in range(10))


def upload_file_to_azure(
local_file_path: str,
azure_file_path: str,
blob_service_client: Optional[BlobServiceClient] = None,
) -> None:
"""Upload a file to Azure Blob Storage.

Args:
local_file_path: Path to local file to upload.
azure_file_path: Path to file in Azure blob storage.
"""

account, container, path = _parse_abfss_uri(azure_file_path)
account_url = f"https://{account}.blob.core.windows.net"
if blob_service_client is None:
credential = DefaultAzureCredential(exclude_managed_identity_credential=True)
blob_service_client = BlobServiceClient(account_url, credential)

blob_client = blob_service_client.get_blob_client(container=container, blob=path)
try:
with open(local_file_path, "rb") as f:
blob_client.upload_blob(data=f, overwrite=True)
except Exception as e:
logger.exception(f"Failed to upload file to Azure Blob Storage: {e}")
raise


def archive_directory(directory_path: str) -> str:
timestamp = str(int(time.time()))
archived_filename = f"ray_release_{timestamp}.zip"
output_path = os.path.abspath(archived_filename)
shutil.make_archive(output_path[:-4], "zip", directory_path)
return output_path


def upload_working_dir_to_azure(working_dir: str, azure_directory_uri: str) -> str:
"""Upload archived working directory to Azure blob storage.

Args:
working_dir: Path to directory to upload.
azure_directory_uri: Path to directory in Azure blob storage.
Returns:
Azure blob storage path where archived directory was uploaded.
"""
archived_file_path = archive_directory(working_dir)
archived_filename = os.path.basename(archived_file_path)
azure_file_path = f"{azure_directory_uri}/{archived_filename}"
upload_file_to_azure(
local_file_path=archived_file_path, azure_file_path=azure_file_path
)
return azure_file_path
Comment on lines +63 to +69
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The upload_working_dir_to_azure function calls archive_directory to create a temporary zip file, but this file is not deleted after being uploaded to Azure. This will leave temporary files in the execution environment. It's a good practice to ensure temporary resources are cleaned up. You can use a try...finally block to ensure the file is removed even if the upload fails.

    archived_file_path = archive_directory(working_dir)
    try:
        archived_filename = os.path.basename(archived_file_path)
        azure_file_path = f"{azure_directory_uri}/{archived_filename}"
        upload_file_to_azure(
            local_file_path=archived_file_path, azure_file_path=azure_file_path
        )
        return azure_file_path
    finally:
        if os.path.exists(archived_file_path):
            os.remove(archived_file_path)



def _parse_abfss_uri(uri: str) -> Tuple[str, str, str]:
"""Parse ABFSS URI to extract account, container, and path.
ABFSS URI format: abfss://[email protected]/path
Returns: (account_name, container_name, path)
"""
parsed = urlparse(uri)
if "@" not in parsed.netloc:
raise ValueError(
f"Invalid ABFSS URI format: {uri}. "
"Expected format: abfss://[email protected]/path"
)

# Split netloc into [email protected]
container, account_part = parsed.netloc.split("@", 1)

# Extract account name from account.dfs.core.windows.net
account = account_part.split(".")[0]

# Path starts with / which we keep for the blob path
path = parsed.path.lstrip("/")

return account, container, path


def convert_abfss_uri_to_https(uri: str) -> str:
"""Convert ABFSS URI to HTTPS URI.
ABFSS URI format: abfss://[email protected]/path
Returns: HTTPS URI format: https://account.dfs.core.windows.net/container/path
"""
account, container, path = _parse_abfss_uri(uri)
return f"https://{account}.dfs.core.windows.net/{container}/{path}"
Comment on lines +96 to +102
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The function convert_abfss_uri_to_https returns a URL with dfs.core.windows.net, while upload_file_to_azure in the same file uses blob.core.windows.net when creating a BlobServiceClient. This inconsistency can be confusing and lead to potential issues. For better maintainability and clarity, it's recommended to consistently use the blob storage endpoint (blob.core.windows.net) throughout, especially since you are using azure.storage.blob.BlobServiceClient.

def convert_abfss_uri_to_https(uri: str) -> str:
    """Convert ABFSS URI to HTTPS URI.
    ABFSS URI format: abfss://[email protected]/path
    Returns: HTTPS URI format: https://account.blob.core.windows.net/container/path
    """
    account, container, path = _parse_abfss_uri(uri)
    return f"https://{account}.blob.core.windows.net/{container}/{path}"

8 changes: 5 additions & 3 deletions release/ray_release/command_runner/anyscale_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, List

from ray_release.cluster_manager.cluster_manager import ClusterManager
from ray_release.cloud_util import (
convert_abfss_uri_to_https,
generate_tmp_cloud_storage_path,
upload_working_dir_to_azure,
)
from ray_release.command_runner.job_runner import JobRunner
from ray_release.exception import (
TestCommandTimeout,
Expand All @@ -24,13 +29,10 @@
from ray_release.logger import logger
from ray_release.util import (
join_cloud_storage_paths,
generate_tmp_cloud_storage_path,
get_anyscale_sdk,
S3_CLOUD_STORAGE,
AZURE_CLOUD_STORAGE,
AZURE_STORAGE_CONTAINER,
upload_working_dir_to_azure,
convert_abfss_uri_to_https,
)

if TYPE_CHECKING:
Expand Down
5 changes: 2 additions & 3 deletions release/ray_release/file_manager/job_file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
import boto3
from google.cloud import storage
from ray_release.aws import RELEASE_AWS_BUCKET
from ray_release.cloud_util import generate_tmp_cloud_storage_path
from ray_release.cluster_manager.cluster_manager import ClusterManager
from ray_release.exception import FileDownloadError, FileUploadError
from ray_release.file_manager.file_manager import FileManager
from ray_release.job_manager import JobManager
from ray_release.logger import logger
from ray_release.util import (
exponential_backoff_retry,
generate_tmp_cloud_storage_path,
S3_CLOUD_STORAGE,
GS_CLOUD_STORAGE,
GS_BUCKET,
Expand Down Expand Up @@ -64,8 +64,7 @@ def _run_with_retry(self, f, initial_retry_delay_s: int = 10):
)

def _generate_tmp_cloud_storage_path(self):
location = f"tmp/{generate_tmp_cloud_storage_path()}"
return location
return f"tmp/{generate_tmp_cloud_storage_path()}"

def download_from_cloud(
self, key: str, target: str, delete_after_download: bool = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pytest
import tempfile
from ray_release.util import (
from ray_release.cloud_util import (
upload_file_to_azure,
upload_working_dir_to_azure,
_parse_abfss_uri,
Expand All @@ -29,8 +29,8 @@ def upload_blob(self, data, overwrite=True):
self.uploaded_data = data.read()


@patch("ray_release.util.BlobServiceClient")
@patch("ray_release.util.DefaultAzureCredential")
@patch("ray_release.cloud_util.BlobServiceClient")
@patch("ray_release.cloud_util.DefaultAzureCredential")
def test_upload_file_to_azure(mock_credential, mock_blob_service_client):
with tempfile.TemporaryDirectory() as tmp_path:
local_file = os.path.join(tmp_path, "test.txt")
Expand All @@ -53,7 +53,7 @@ def test_upload_file_to_azure(mock_credential, mock_blob_service_client):
assert fake_blob_client.uploaded_data == expected_data


@patch("ray_release.util.upload_file_to_azure")
@patch("ray_release.cloud_util.upload_file_to_azure")
def test_upload_working_dir_to_azure(mock_upload_file_to_azure):
with tempfile.TemporaryDirectory() as tmp_path:
working_dir = os.path.join(tmp_path, "working_dir")
Expand Down
99 changes: 2 additions & 97 deletions release/ray_release/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,15 @@
import hashlib
import json
import os
import random
import string
import subprocess
import time
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple

from azure.storage.blob import BlobServiceClient
from azure.identity import DefaultAzureCredential
from google.cloud import storage
import requests
import shutil
from urllib.parse import urlparse

from ray_release.logger import logger
from ray_release.cloud_util import archive_directory
from ray_release.configs.global_config import get_global_config

if TYPE_CHECKING:
Expand Down Expand Up @@ -196,10 +191,6 @@ def python_version_str(python_version: Tuple[int, int]) -> str:
return "".join([str(x) for x in python_version])


def generate_tmp_cloud_storage_path() -> str:
return "".join(random.choice(string.ascii_lowercase) for i in range(10))


def join_cloud_storage_paths(*paths: str):
paths = list(paths)
if len(paths) > 1:
Expand All @@ -222,7 +213,7 @@ def upload_working_dir_to_gcs(working_dir: str) -> str:
"""
# Create archive of working dir
logger.info(f"Archiving working directory: {working_dir}")
archived_file_path = _archive_directory(working_dir)
archived_file_path = archive_directory(working_dir)
archived_filename = os.path.basename(archived_file_path)

# Upload to GCS
Expand All @@ -232,89 +223,3 @@ def upload_working_dir_to_gcs(working_dir: str) -> str:
blob.upload_from_filename(archived_filename)

return f"gs://ray-release-working-dir/{blob.name}"


def upload_file_to_azure(
local_file_path: str,
azure_file_path: str,
blob_service_client: Optional[BlobServiceClient] = None,
) -> None:
"""Upload a file to Azure Blob Storage.

Args:
local_file_path: Path to local file to upload.
azure_file_path: Path to file in Azure blob storage.
"""

account, container, path = _parse_abfss_uri(azure_file_path)
account_url = f"https://{account}.blob.core.windows.net"
if blob_service_client is None:
credential = DefaultAzureCredential(exclude_managed_identity_credential=True)
blob_service_client = BlobServiceClient(account_url, credential)

blob_client = blob_service_client.get_blob_client(container=container, blob=path)
try:
with open(local_file_path, "rb") as f:
blob_client.upload_blob(data=f, overwrite=True)
except Exception as e:
logger.exception(f"Failed to upload file to Azure Blob Storage: {e}")
raise


def _archive_directory(directory_path: str) -> str:
timestamp = str(int(time.time()))
archived_filename = f"ray_release_{timestamp}.zip"
output_path = os.path.abspath(archived_filename)
shutil.make_archive(output_path[:-4], "zip", directory_path)
return output_path


def upload_working_dir_to_azure(working_dir: str, azure_directory_uri: str) -> str:
"""Upload archived working directory to Azure blob storage.

Args:
working_dir: Path to directory to upload.
azure_directory_uri: Path to directory in Azure blob storage.
Returns:
Azure blob storage path where archived directory was uploaded.
"""
archived_file_path = _archive_directory(working_dir)
archived_filename = os.path.basename(archived_file_path)
azure_file_path = f"{azure_directory_uri}/{archived_filename}"
upload_file_to_azure(
local_file_path=archived_file_path, azure_file_path=azure_file_path
)
return azure_file_path


def _parse_abfss_uri(uri: str) -> Tuple[str, str, str]:
"""Parse ABFSS URI to extract account, container, and path.
ABFSS URI format: abfss://[email protected]/path
Returns: (account_name, container_name, path)
"""
parsed = urlparse(uri)
if "@" not in parsed.netloc:
raise ValueError(
f"Invalid ABFSS URI format: {uri}. "
"Expected format: abfss://[email protected]/path"
)

# Split netloc into [email protected]
container, account_part = parsed.netloc.split("@", 1)

# Extract account name from account.dfs.core.windows.net
account = account_part.split(".")[0]

# Path starts with / which we keep for the blob path
path = parsed.path.lstrip("/")

return account, container, path


def convert_abfss_uri_to_https(uri: str) -> str:
"""Convert ABFSS URI to HTTPS URI.
ABFSS URI format: abfss://[email protected]/path
Returns: HTTPS URI format: https://account.dfs.core.windows.net/container/path
"""
account, container, path = _parse_abfss_uri(uri)
return f"https://{account}.dfs.core.windows.net/{container}/{path}"