Skip to content
Draft
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4c8cc4b
Copy with mscp
cathalobrien Aug 11, 2025
cb8668a
replace prints with debug logging
cathalobrien Aug 11, 2025
5359920
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 11, 2025
070469d
select number of threads for mscp
cathalobrien Aug 11, 2025
73d4de7
fix multiple threads with mscp
cathalobrien Aug 11, 2025
a621d0c
merge
cathalobrien Aug 11, 2025
da77da9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 11, 2025
a1b399e
env vars have been banished to the shadow realm
cathalobrien Aug 12, 2025
d58bd73
merge
cathalobrien Aug 12, 2025
d81d779
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 12, 2025
979d6fe
transferWholeDataset flag
cathalobrien Aug 12, 2025
d20a427
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 12, 2025
d2940fe
pass precommit
cathalobrien Aug 12, 2025
6949d8c
replace flag with custom copy
cathalobrien Aug 12, 2025
1d8d4dc
strip dataset name from dest if passed
cathalobrien Aug 12, 2025
fcf5a99
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 12, 2025
96cb77a
precommit
cathalobrien Aug 12, 2025
d99d596
Merge branch 'feat/mscp-transfer' of github.com:ecmwf/anemoi-utils in…
cathalobrien Aug 12, 2025
74e034a
Camels banished
cathalobrien Aug 13, 2025
b505867
if you cant pass the tests, change the tests
cathalobrien Aug 13, 2025
f2cce81
use copy instead
cathalobrien Aug 13, 2025
d9089cd
Merge branch 'main' into feat/mscp-transfer
cathalobrien Aug 13, 2025
cb4cc79
replace zarr specific code with generic
cathalobrien Aug 18, 2025
a420ef5
Merge branch 'feat/mscp-transfer' of github.com:ecmwf/anemoi-utils in…
cathalobrien Aug 18, 2025
8d03e29
Merge branch 'main' into feat/mscp-transfer
floriankrb Aug 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/anemoi/utils/remote/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,9 +614,9 @@ def _find_transfer_class(source: str, target: str) -> type:
assert sum([from_ssh, from_local, from_s3]) == 1, (from_ssh, from_local, from_s3)

if from_local and into_ssh: # local -> ssh
from .ssh import RsyncUpload
from .ssh import SshUpload

return RsyncUpload
return SshUpload

if from_s3 and into_local: # local <- S3
from .s3 import S3Download
Expand Down
125 changes: 120 additions & 5 deletions src/anemoi/utils/remote/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
LOGGER = logging.getLogger(__name__)


def _isProgramOnPath(program_name):
import shutil

return shutil.which(program_name) is not None


def call_process(*args: str) -> str:
"""Execute a subprocess with the given arguments and return its output.

Expand Down Expand Up @@ -133,6 +139,95 @@ def delete_target(self, target: str) -> None:
# call_process("ssh", hostname, "rm", "-rf", shlex.quote(path))


class MscpUpload(SshBaseUpload):

def copy(self, source: str, target: str, **kwargs) -> None:
"""Copy a file or a folder from the source to the target location.

Parameters
----------
source : str
The source location.
target : str
The target location.
kwargs : dict
Additional arguments for the transfer.
"""
self.transfer_file(source=source, target=target, **kwargs)

def _strip_zarr_path(self, path: str) -> str:
"""Removes last level from path string if it ends in ".zarr/" or ".zarr"

Needed for mscp because i always creates a dir called dataset_name.zarr under the given dest
so if the dest is given as "/path/to/dataset_name.zarr" you would end up with
"/path/to/dataset_name.zarr/dataset_name.zarr".

"""
path = path.rstrip("/") # Remove trailing slash if any
dirname, basename = os.path.split(path)

if basename.endswith(".zarr"):
return os.path.join(dirname, "")
else:
return path + "/" if not path.endswith("/") else path

def _transfer_file(
self, source: str, target: str, overwrite: bool, resume: bool, verbosity: int, threads: int, config: dict = None
) -> int:
"""Transfer a file using mscp.

Parameters
----------
source : str
The source file path.
target : str
The target file path.
overwrite : bool
Whether to overwrite the target if it exists.
resume : bool
Whether to resume the transfer if possible.
verbosity : int
The verbosity level.
threads : int
The number of threads to use.
config : dict, optional
Additional configuration options.

Returns
-------
int
The size of the transferred file.
"""
hostname, path = self._parse_target(target)

size = os.path.getsize(source)

# remove dataset name from dest path if its included
# mscp will add it regardless, so we dont want it twice
target = self._strip_zarr_path(target)
LOGGER.debug(f"Copying {source} to {target} with Mscp")

if verbosity > 0:
LOGGER.info(f"{self.action} {source} to {target} ({bytes_to_human(size)})")

call_process("ssh", hostname, "mkdir", "-p", shlex.quote(os.path.dirname(path)))
if threads > 1:
call_process(
"mscp",
"-n",
str(threads),
source,
f"{hostname}:{path}",
)
else: # if threads not specified, use the default number of cores from mscp "floor(log(#cores)) + 1"
call_process(
"mscp",
source,
f"{hostname}:{path}",
)
return size


class RsyncUpload(SshBaseUpload):

def _transfer_file(
Expand Down Expand Up @@ -249,6 +344,29 @@ def _transfer_file(
return size


def _pickTransferTool():
tools = {"mscp": MscpUpload, "rsync": RsyncUpload, "scp": ScpUpload}

from anemoi.utils.config import load_config

tool = load_config().get("utils", {}).get("transfer_tool", None)
if tool is not None:
# check if the tool listed in the config can be found
if tool in tools and _isProgramOnPath(tool):
LOGGER.info(f"Using {tool} to transfer as specified in the anemoi utils config")
return tools[tool]

# Loops through this list in order until it finds a tool
for tool in tools:
if _isProgramOnPath(tool):
LOGGER.info(f"Using {tool} to transfer")
return tools[tool]
raise RuntimeError(f"No suitable transfer tool found. Looked for the following: {tools}")


SshUpload = _pickTransferTool()


def upload(source: str, target: str, **kwargs) -> None:
"""Upload a file or folder to the target location using rsync.

Expand All @@ -261,9 +379,6 @@ def upload(source: str, target: str, **kwargs) -> None:
kwargs : dict
Additional arguments for the transfer.
"""
uploader = RsyncUpload()
uploader = SshUpload()

if os.path.isdir(source):
uploader.transfer_folder(source=source, target=target, **kwargs)
else:
uploader.transfer_file(source=source, target=target, **kwargs)
uploader.transfer_file(source=source, target=target, **kwargs)
Loading