Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4c8cc4b
Copy with mscp
cathalobrien Aug 11, 2025
cb8668a
replace prints with debug logging
cathalobrien Aug 11, 2025
5359920
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 11, 2025
070469d
select number of threads for mscp
cathalobrien Aug 11, 2025
73d4de7
fix multiple threads with mscp
cathalobrien Aug 11, 2025
a621d0c
merge
cathalobrien Aug 11, 2025
da77da9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 11, 2025
a1b399e
env vars have been banished to the shadow realm
cathalobrien Aug 12, 2025
d58bd73
merge
cathalobrien Aug 12, 2025
d81d779
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 12, 2025
979d6fe
transferWholeDataset flag
cathalobrien Aug 12, 2025
d20a427
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 12, 2025
d2940fe
pass precommit
cathalobrien Aug 12, 2025
6949d8c
replace flag with custom copy
cathalobrien Aug 12, 2025
1d8d4dc
strip dataset name from dest if passed
cathalobrien Aug 12, 2025
fcf5a99
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 12, 2025
96cb77a
precommit
cathalobrien Aug 12, 2025
d99d596
Merge branch 'feat/mscp-transfer' of github.com:ecmwf/anemoi-utils in…
cathalobrien Aug 12, 2025
74e034a
Camels banished
cathalobrien Aug 13, 2025
b505867
if you cant pass the tests, change the tests
cathalobrien Aug 13, 2025
f2cce81
use copy instead
cathalobrien Aug 13, 2025
d9089cd
Merge branch 'main' into feat/mscp-transfer
cathalobrien Aug 13, 2025
cb4cc79
replace zarr specific code with generic
cathalobrien Aug 18, 2025
a420ef5
Merge branch 'feat/mscp-transfer' of github.com:ecmwf/anemoi-utils in…
cathalobrien Aug 18, 2025
8d03e29
Merge branch 'main' into feat/mscp-transfer
floriankrb Aug 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/anemoi/utils/remote/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,9 +612,9 @@ def _find_transfer_class(source: str, target: str) -> type:
assert sum([from_ssh, from_local, from_s3]) == 1, (from_ssh, from_local, from_s3)

if from_local and into_ssh: # local -> ssh
from .ssh import RsyncUpload
from .ssh import SshUpload

return RsyncUpload
return SshUpload

if from_s3 and into_local: # local <- S3
from .s3 import S3Download
Expand Down
112 changes: 107 additions & 5 deletions src/anemoi/utils/remote/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
LOGGER = logging.getLogger(__name__)


def _is_program_on_path(program_name):
import shutil

return shutil.which(program_name) is not None


def call_process(*args: str) -> str:
"""Execute a subprocess with the given arguments and return its output.

Expand Down Expand Up @@ -133,6 +139,82 @@ def delete_target(self, target: str) -> None:
# call_process("ssh", hostname, "rm", "-rf", shlex.quote(path))


class MscpUpload(SshBaseUpload):

def copy(self, source: str, target: str, **kwargs) -> None:
"""Copy a file or a folder from the source to the target location.

Parameters
----------
source : str
The source location.
target : str
The target location.
kwargs : dict
Additional arguments for the transfer.
"""
self.transfer_file(source=source, target=target, **kwargs)

def _transfer_file(
self, source: str, target: str, overwrite: bool, resume: bool, verbosity: int, threads: int, config: dict = None
) -> int:
"""Transfer a file using mscp.

Parameters
----------
source : str
The source file path.
target : str
The target file path.
overwrite : bool
Whether to overwrite the target if it exists.
resume : bool
Whether to resume the transfer if possible.
verbosity : int
The verbosity level.
threads : int
The number of threads to use.
config : dict, optional
Additional configuration options.

Returns
-------
int
The size of the transferred file.
"""
hostname, path = self._parse_target(target)

size = os.path.getsize(source)

# remove dataset name from dest path if its included
# mscp will add it regardless, so we dont want it twice
_, src_basename = os.path.split(source)
if src_basename in target:
LOGGER.debug(f"Removing {src_basename} from {target}")
target = target.strip(src_basename)
Comment on lines +192 to +194
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part seems unsafe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with scp:

mkdir -p data/data/dir; echo data > data/data/dir/file; echo data > data/data/dir/data; rm -rf data/target; tree data
data
└── data
    └── dir
        ├── data
        └── file

2 directories, 2 files
➜ anemoi-utils --debug transfer --source data/data --target ssh://localhost:/tmp/data/target --overwrite; tree data
2025-08-25 15:38:28 INFO Using rsync to transfer
2025-08-25 15:38:28 INFO Deleting ssh://localhost:/tmp/data/target
2025-08-25 15:38:28 INFO Uploading data/data to ssh://localhost:/tmp/data/target
2025-08-25 15:38:28 INFO Uploading 2 files (10)
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 10.0/10.0 [00:04<00:00, 2.15B/s]
data
├── data
│   └── dir
│       ├── data
│       └── file
└── target
    └── dir
        ├── data
        └── file

4 directories, 4 files
➜ anemoi-utils --debug transfer --source data/data --target ssh://localhost:/tmp/data/target --overwrite; tree data
2025-08-25 15:38:34 INFO Using rsync to transfer
2025-08-25 15:38:34 INFO Deleting ssh://localhost:/tmp/data/target
2025-08-25 15:38:34 INFO Uploading data/data to ssh://localhost:/tmp/data/target
2025-08-25 15:38:34 INFO Uploading 2 files (10)
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 10.0/10.0 [00:04<00:00, 2.10B/s]
data
├── data
│   └── dir
│       ├── data
│       └── file
└── target
    └── dir
        ├── data
        └── file

With 'mscp' in the path:

mkdir -p data/data/dir; echo data > data/data/dir/file; echo data > data/data/dir/data; rm -rf data/target; tree data
data
└── data
    └── dir
        ├── data
        └── file

2 directories, 2 files
➜ PATH=$PATH:/path/to/mscp/dir anemoi-utils --debug transfer --source data/data --target ssh://localhost:/tmp/data/target --overwrite; tree data
2025-08-25 15:37:27 INFO Using mscp to transfer
2025-08-25 15:37:27 INFO Deleting ssh://localhost:/tmp/data/target
2025-08-25 15:37:27 DEBUG Removing data from ssh://localhost:/tmp/data/target
2025-08-25 15:37:27 DEBUG Copying data/data to ssh://localhost:/tmp/data/targe with Mscp
2025-08-25 15:37:27 INFO Uploading data/data to ssh://localhost:/tmp/data/targe (4 KiB)
data
├── data
│   └── dir
│       ├── data
│       └── file
└── target
    └── dir
        ├── data
        └── file

4 directories, 4 files
➜ PATH=$PATH:/path/to/mscp/dir anemoi-utils --debug transfer --source data/data --target ssh://localhost:/tmp/data/target --overwrite; tree data
2025-08-25 15:37:32 INFO Using mscp to transfer
2025-08-25 15:37:32 INFO Deleting ssh://localhost:/tmp/data/target
2025-08-25 15:37:32 DEBUG Removing data from ssh://localhost:/tmp/data/target
2025-08-25 15:37:32 DEBUG Copying data/data to ssh://localhost:/tmp/data/**targe** with Mscp
2025-08-25 15:37:32 INFO Uploading data/data to ssh://localhost:/tmp/data/**targe** (4 KiB)
data
├── data
│   └── dir
│       ├── data
│       └── file
└── target
    ├── data
    │   └── dir
    │       ├── data
    │       └── file
    └── dir
        ├── data
        └── file

LOGGER.debug(f"Copying {source} to {target} with Mscp")

if verbosity > 0:
LOGGER.info(f"{self.action} {source} to {target} ({bytes_to_human(size)})")

call_process("ssh", hostname, "mkdir", "-p", shlex.quote(os.path.dirname(path)))
if threads > 1:
call_process(
"mscp",
"-n",
str(threads),
source,
f"{hostname}:{path}",
)
else: # if threads not specified, use the default number of cores from mscp "floor(log(#cores)) + 1"
call_process(
"mscp",
source,
f"{hostname}:{path}",
)
return size


class RsyncUpload(SshBaseUpload):

def _transfer_file(
Expand Down Expand Up @@ -249,6 +331,29 @@ def _transfer_file(
return size


def _pick_transfer_tool():
tools = {"mscp": MscpUpload, "rsync": RsyncUpload, "scp": ScpUpload}

from anemoi.utils.config import load_config

tool = load_config().get("utils", {}).get("transfer_tool", None)
if tool is not None:
# check if the tool listed in the config can be found
if tool in tools and _is_program_on_path(tool):
LOGGER.info(f"Using {tool} to transfer as specified in the anemoi utils config")
return tools[tool]

# Loops through this list in order until it finds a tool
for tool in tools:
if _is_program_on_path(tool):
LOGGER.info(f"Using {tool} to transfer")
return tools[tool]
raise RuntimeError(f"No suitable transfer tool found. Looked for the following: {tools}")


SshUpload = _pick_transfer_tool()


def upload(source: str, target: str, **kwargs) -> None:
"""Upload a file or folder to the target location using rsync.

Expand All @@ -261,9 +366,6 @@ def upload(source: str, target: str, **kwargs) -> None:
kwargs : dict
Additional arguments for the transfer.
"""
uploader = RsyncUpload()
uploader = SshUpload()

if os.path.isdir(source):
uploader.transfer_folder(source=source, target=target, **kwargs)
else:
uploader.transfer_file(source=source, target=target, **kwargs)
uploader.copy(source=source, target=target, **kwargs)
4 changes: 2 additions & 2 deletions tests/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ def test_transfer_find_ssh_upload(source: str, target: str) -> None:
target : str
The target path
"""
from anemoi.utils.remote.ssh import RsyncUpload
from anemoi.utils.remote.ssh import SshUpload

assert _find_transfer_class(source, target) == RsyncUpload
assert _find_transfer_class(source, target) == SshUpload


@pytest.mark.parametrize("source", S3 + SSH)
Expand Down
Loading