Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DataSpaces to benchmarks #56

Draft
wants to merge 34 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
6be9b5a
changes to enable margo
ValHayot Sep 19, 2022
a98c0c0
Merge branch 'main' of github.com:proxystore/proxystore-benchmarks in…
ValHayot Sep 29, 2022
1a32670
added intrasite updates to benchmarks
ValHayot Oct 7, 2022
0df2351
Add DataSpaces to funcX pong benchmarks
ValHayot Mar 21, 2023
7cdfa74
[pre-commit.ci] pre-commit autoupdate
pre-commit-ci[bot] Mar 27, 2023
76618cc
[pre-commit.ci] pre-commit autoupdate
pre-commit-ci[bot] Apr 4, 2023
6e671b6
Fix Flake8-bugbear B018 complaint
gpauloski Apr 4, 2023
d3ce011
[pre-commit.ci] pre-commit autoupdate
pre-commit-ci[bot] Apr 11, 2023
14c67dc
[pre-commit.ci] pre-commit autoupdate
pre-commit-ci[bot] Apr 25, 2023
bb07637
[pre-commit.ci] pre-commit autoupdate
pre-commit-ci[bot] May 2, 2023
41ac09a
[pre-commit.ci] pre-commit autoupdate
pre-commit-ci[bot] May 9, 2023
38030cc
Upgrade most benchmarks to ProxyStore v0.5.0
gpauloski May 2, 2023
196dcc4
Add options to pass optional address and interface to DIMs
ValHayot May 13, 2023
16f4515
Migrate funcx-tasks to globus-compute-tasks
gpauloski May 13, 2023
0b5db75
Upgrade to Colmena v0.5.0
gpauloski May 15, 2023
10ce293
Bump to psbench v0.1.0
gpauloski May 15, 2023
463f5a4
Update PR tagging for generating releases
gpauloski May 15, 2023
d16a19a
Add cache cleanup action
gpauloski May 15, 2023
6f7abfd
Update linting configurations
gpauloski May 15, 2023
553aa93
Update package metadata
gpauloski May 15, 2023
1f09a5a
Autofix ruff linting errors
gpauloski May 15, 2023
2620697
[pre-commit.ci] pre-commit autoupdate
pre-commit-ci[bot] May 16, 2023
8fc0586
[pre-commit.ci] pre-commit autoupdate
pre-commit-ci[bot] May 23, 2023
30d9d9c
[pre-commit.ci] pre-commit autoupdate
pre-commit-ci[bot] May 29, 2023
569fc3b
[pre-commit.ci] pre-commit autoupdate
pre-commit-ci[bot] Jun 13, 2023
17c917a
[pre-commit.ci] pre-commit autoupdate
pre-commit-ci[bot] Jun 20, 2023
5331d29
Bump ProxyStore dependency to v0.5.1
gpauloski Jun 24, 2023
d4893ac
Bump to psbench v0.1.1
gpauloski Jun 24, 2023
22ede22
Add zenodo badge
gpauloski Jun 24, 2023
164a187
Add DataSpaces to funcX pong benchmarks
ValHayot Mar 21, 2023
52fcdb3
Convert FuncX to Globus Compute for DataSpaces
ValHayot Jun 25, 2023
47c9491
Merge branch 'dspaces' of github.com:proxystore/proxystore-benchmarks…
ValHayot Aug 19, 2023
24f3e7f
Merge branch 'main' of github.com:proxystore/proxystore-benchmarks in…
ValHayot Aug 19, 2023
5828ea4
Fix pre-commit issues
ValHayot Aug 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dataspaces.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## Config file for DataSpaces
ndim = 1
dims = 10000000000
max_versions = 1
num_apps = 1
15 changes: 15 additions & 0 deletions psbench/argparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,21 @@ def add_ipfs_options(parser: argparse.ArgumentParser) -> None:
)


def add_dspaces_options(parser: argparse.ArgumentParser) -> None:
"""Add CLI arguments for DataSpaces.

Args:
parser (ArgumentParser): parser object to add DataSpaces arguments to.
"""
' '.join(sys.argv)
parser.add_argument(
'--dspaces',
action='store_true',
default=False,
help='Use DataSpaces for data transfer.',
)


def add_logging_options(parser: argparse.ArgumentParser) -> None:
"""Add CLI arguments for logging options."""
group = parser.add_argument_group(
Expand Down
96 changes: 94 additions & 2 deletions psbench/benchmarks/globus_compute_tasks/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from proxystore.store.utils import get_key

from psbench import ipfs
from psbench.argparse import add_dspaces_options
from psbench.argparse import add_globus_compute_options
from psbench.argparse import add_ipfs_options
from psbench.argparse import add_logging_options
Expand All @@ -31,6 +32,7 @@
from psbench.logging import TESTING_LOG_LEVEL
from psbench.proxystore import init_store_from_args
from psbench.tasks.pong import pong
from psbench.tasks.pong import pong_dspaces
from psbench.tasks.pong import pong_ipfs
from psbench.tasks.pong import pong_proxy
from psbench.utils import randbytes
Expand Down Expand Up @@ -153,6 +155,84 @@ def time_task_ipfs(
)


def time_task_dspaces(
*,
gce: globus_compute_sdk.Executor,
input_size: int,
output_size: int,
task_sleep: float,
) -> TaskStats:
"""Execute and time a single Globus Compute task with DataSpaces for data transfer.

Args:
gce (Executor): Globus Compute Executor to submit task through.
input_size (int): number of bytes to send as input to task.
output_size (int): number of bytes task should return.
task_sleep (int): number of seconds to sleep inside task.

Returns:
TaskStats
"""
import dspaces as ds
import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
version = 1

client = ds.dspaces.DSClient()
path = str(uuid.uuid4())

data = randbytes(input_size)
input_size / size
start = time.perf_counter_ns()

client.Put(
np.array(bytearray(data)),
path,
version=version,
offset=((input_size * rank),),
)
fut = gce.submit(
pong_dspaces,
path,
input_size,
rank,
size,
version=version,
result_size=output_size,
sleep=task_sleep,
)

result = fut.result()

if result is not None:
out_path = result[0]
out_size = result[1]
data = client.Get(
out_path,
version,
lb=((out_size * rank),),
ub=((out_size * rank + out_size - 1),),
dtype=bytes,
timeout=-1,
).tobytes()

end = time.perf_counter_ns()
assert isinstance(data, bytes)

return TaskStats(
proxystore_backend='DataSpaces',
task_name='pong',
input_size_bytes=input_size,
output_size_bytes=output_size,
task_sleep_seconds=task_sleep,
total_time_ms=(end - start) / 1e6,
)


def time_task_proxy(
*,
gce: globus_compute_sdk.Executor,
Expand Down Expand Up @@ -219,6 +299,7 @@ def runner(
*,
globus_compute_endpoint: str,
store: Store | None,
use_dspaces: bool,
use_ipfs: bool,
ipfs_local_dir: str | None,
ipfs_remote_dir: str | None,
Expand All @@ -237,6 +318,7 @@ def runner(
'Starting test runner\n'
f' - Globus Compute Endpoint: {globus_compute_endpoint}\n'
f' - ProxyStore backend: {store_connector_name}\n'
f' - DataSpaces enabled: {use_dspaces}\n'
f' - IPFS enabled: {use_ipfs}\n'
f' - Task type: ping-pong\n'
f' - Task repeat: {task_repeat}\n'
Expand All @@ -245,9 +327,10 @@ def runner(
f' - Task sleep time: {task_sleep} s',
)

if store is not None and use_ipfs:
if store is not None and (use_ipfs or use_dspaces):
raise ValueError(
'IPFS and ProxyStore cannot be used at the same time.',
f"""{"IPFS" if use_ipfs else "DataSpaces"} and ProxyStore
cannot be used at the same time.""",
)

runner_start = time.perf_counter_ns()
Expand Down Expand Up @@ -281,6 +364,13 @@ def runner(
output_size=output_size,
task_sleep=task_sleep,
)
elif use_dspaces:
stats = time_task_dspaces(
gce=gce,
input_size=input_size,
output_size=output_size,
task_sleep=task_sleep,
)
else:
stats = time_task(
gce=gce,
Expand Down Expand Up @@ -360,6 +450,7 @@ def main(argv: Sequence[str] | None = None) -> int:
add_logging_options(parser)
add_proxystore_options(parser, required=False)
add_ipfs_options(parser)
add_dspaces_options(parser)
args = parser.parse_args(argv)

init_logging(args.log_file, args.log_level, force=True)
Expand All @@ -369,6 +460,7 @@ def main(argv: Sequence[str] | None = None) -> int:
runner(
globus_compute_endpoint=args.globus_compute_endpoint,
store=store,
use_dspaces=args.dspaces,
use_ipfs=args.ipfs,
ipfs_local_dir=args.ipfs_local_dir,
ipfs_remote_dir=args.ipfs_remote_dir,
Expand Down
63 changes: 61 additions & 2 deletions psbench/tasks/pong.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,66 @@ def pong_ipfs(
return None


def pong_dspaces(
path: str,
data_size: int,
rank: int,
size: int,
*,
version: int = 1,
result_size: int = 0,
sleep: float = 0,
) -> tuple[str, int] | None:
"""Task that takes a DataSpace path and returns data via DataSpaces.

Args:
client (ds.DSpaces):DataSpaces client
path (str): filename of the DataSpaces stored data.
data_size (int) : the size of the DataSpaces object.
rank (int) : MPI rank.
size (int): MPI communication size.
version (int): The version of the data to access (default: 1).
result_size (int): size of results byte array (default: 0).
sleep (float): seconds to sleep for to simulate work (default: 0).

Returns:
Filename of return data or None.
"""
import time
import uuid

import dspaces as ds
import numpy as np

from psbench.utils import randbytes

client = ds.dspaces.DSClient()
data = client.Get(
path,
version=version,
lb=((data_size * rank),),
ub=((data_size * rank + data_size - 1),),
dtype=bytes,
timeout=-1,
).tobytes()

assert isinstance(data, bytes)
time.sleep(sleep)

if result_size > 0:
filepath = str(uuid.uuid4())
return_data = bytearray(randbytes(result_size))
client.Put(
np.array(return_data),
filepath,
version=version,
offset=((result_size * rank),),
)
return (filepath, result_size)
else:
return None


def pong_proxy(
data: bytes,
*,
Expand Down Expand Up @@ -101,7 +161,6 @@ def pong_proxy(
from proxystore.proxy import is_resolved
from proxystore.proxy import Proxy
from proxystore.store import get_store
from proxystore.store.utils import resolve_async

from psbench.tasks.pong import ProxyStats
from psbench.utils import randbytes
Expand All @@ -110,7 +169,7 @@ def pong_proxy(
assert not is_resolved(data)

if sleep > 0.0:
resolve_async(data)
data.resolve_async()
time.sleep(sleep)

assert isinstance(data, bytes)
Expand Down
11 changes: 9 additions & 2 deletions tests/benchmarks/globus_compute_tasks/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,18 @@ def test_time_task_proxy() -> None:


@pytest.mark.parametrize(
('use_ipfs', 'use_proxystore', 'log_to_csv'),
((False, True, False), (True, False, False), (False, False, True)),
('use_ipfs', 'use_proxystore', 'use_dspaces', 'log_to_csv'),
(
(False, True, False, False),
(True, False, False, False),
(False, False, False, True),
),
)
def test_runner(
caplog,
use_ipfs: bool,
use_proxystore: bool,
use_dspaces: bool,
log_to_csv: bool,
tmp_path: pathlib.Path,
) -> None:
Expand Down Expand Up @@ -145,6 +150,7 @@ def test_runner(
runner(
globus_compute_endpoint=str(uuid.uuid4()),
store=store,
use_dspaces=use_dspaces,
use_ipfs=use_ipfs,
ipfs_local_dir=str(ipfs_local_dir),
ipfs_remote_dir=str(ipfs_remote_dir),
Expand All @@ -171,6 +177,7 @@ def test_runner_error() -> None:
globus_compute_endpoint=str(uuid.uuid4()),
store=store,
use_ipfs=True,
use_dspaces=False,
ipfs_local_dir='/tmp/local/',
ipfs_remote_dir='/tmp/remote/',
input_sizes=[0],
Expand Down