Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vmm benchmark support #883

Draft
wants to merge 15 commits into
base: branch-0.28
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add support for VmmPool
pentschev committed Oct 3, 2022
commit 2a8c4dc4f668e524172393bf4e5d19e244787381
36 changes: 32 additions & 4 deletions ucp/_libs/vmm.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
from functools import partial

from cuda import cuda

from dask_cuda.rmm_vmm_block_pool import VmmBlockPool
from dask_cuda.rmm_vmm_pool import checkCudaErrors
from dask_cuda.vmm_pool import VmmPool


def get_vmm_allocator(vmm):
if vmm:
vmm_is_block_pool = isinstance(vmm, VmmBlockPool)
print(f"Server vmm_is_block_pool: {vmm_is_block_pool}")

if isinstance(vmm, VmmBlockPool) or isinstance(vmm, VmmPool):
vmm_allocator = VmmBlockPoolArray
else:
vmm_allocator = VmmSingleArray
return partial(vmm_allocator, vmm)

return None


class VmmArray:
class VmmSingleArray:
def __init__(self, vmm_allocator, size):
self.vmm_allocator = vmm_allocator

@@ -28,12 +46,14 @@ def copy_from_host(self, arr, stream=cuda.CUstream(0)):
checkCudaErrors(
cuda.cuMemcpyHtoDAsync(self.ptr, arr.ctypes.data, self.shape[0], stream)
)
checkCudaErrors(cuda.cuStreamSynchronize(stream))

def copy_to_host(self, arr, stream=cuda.CUstream(0)):
print(f"copy_to_host: {hex(int(self.ptr))}", flush=True)
checkCudaErrors(
cuda.cuMemcpyDtoHAsync(arr.ctypes.data, self.ptr, self.shape[0], stream)
)
checkCudaErrors(cuda.cuStreamSynchronize(stream))


class VmmArraySlice:
@@ -56,20 +76,21 @@ def copy_from_host(self, arr, stream=cuda.CUstream(0)):
checkCudaErrors(
cuda.cuMemcpyHtoDAsync(self.ptr, arr.ctypes.data, self.shape[0], stream)
)
checkCudaErrors(cuda.cuStreamSynchronize(stream))

def copy_to_host(self, arr, stream=cuda.CUstream(0)):
print(f"copy_to_host: {hex(int(self.ptr))}", flush=True)
checkCudaErrors(
cuda.cuMemcpyDtoHAsync(arr.ctypes.data, self.ptr, self.shape[0], stream)
)
checkCudaErrors(cuda.cuStreamSynchronize(stream))


class VmmBlockArray:
class VmmBlockPoolArray:
def __init__(self, vmm_block_pool_allocator, size):
self.vmm_allocator = vmm_block_pool_allocator

self.ptr = cuda.CUdeviceptr(self.vmm_allocator.allocate(size))
self.blocks = self.vmm_allocator.get_allocation_blocks(int(self.ptr))
self.shape = (size,)

def __del__(self):
@@ -88,17 +109,24 @@ def __cuda_array_interface__(self):
}

def get_blocks(self):
return list([VmmArraySlice(block[0], block[1]) for block in self.blocks])
if isinstance(self.vmm_allocator, VmmBlockPool):
blocks = self.vmm_allocator.get_allocation_blocks(int(self.ptr))
return list([VmmArraySlice(block[0], block[1]) for block in blocks])
else:
blocks = self.vmm_allocator._allocs[int(self.ptr)].blocks
return list([VmmArraySlice(block._ptr, block.size) for block in blocks])

def copy_from_host(self, arr, stream=cuda.CUstream(0)):
print(f"copy_from_host: {hex(int(self.ptr))}", flush=True)
print(f"copy_from_host: {type(arr)}")
checkCudaErrors(
cuda.cuMemcpyHtoDAsync(self.ptr, arr.ctypes.data, self.shape[0], stream)
)
checkCudaErrors(cuda.cuStreamSynchronize(stream))

def copy_to_host(self, arr, stream=cuda.CUstream(0)):
print(f"copy_to_host: {hex(int(self.ptr))}", flush=True)
checkCudaErrors(
cuda.cuMemcpyDtoHAsync(arr.ctypes.data, self.ptr, self.shape[0], stream)
)
checkCudaErrors(cuda.cuStreamSynchronize(stream))
27 changes: 7 additions & 20 deletions ucp/benchmarks/backends/ucp_async.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import asyncio
from argparse import Namespace
from functools import partial
from queue import Queue
from time import monotonic
from typing import Any

import ucp
from ucp._libs.arr import Array
from ucp._libs.utils import print_key_value
from ucp._libs.vmm import VmmArray, VmmBlockArray
from ucp._libs.vmm import get_vmm_allocator
from ucp.benchmarks.backends.base import BaseClient, BaseServer


@@ -55,16 +54,10 @@ async def run(self):

register_am_allocators(self.args)

from dask_cuda.rmm_vmm_block_pool import VmmBlockPool

vmm_is_block_pool = isinstance(self.vmm, VmmBlockPool)
print(f"Server vmm_is_block_pool: {vmm_is_block_pool}")

if self.vmm:
vmm_allocator = VmmBlockArray if vmm_is_block_pool else VmmArray
vmm_allocator = partial(vmm_allocator, self.vmm)
vmm_allocator = get_vmm_allocator(self.vmm)

async def server_handler(ep):
recv_msg_vmm = None
if not self.args.enable_am:
if self.args.reuse_alloc:
if self.vmm:
@@ -89,7 +82,7 @@ async def server_handler(ep):
self.xp.empty(self.args.n_bytes, dtype="u1")
)

if vmm_is_block_pool:
if hasattr(recv_msg_vmm, "get_blocks"):
recv_blocks = recv_msg_vmm.get_blocks()
for recv_block in recv_blocks:
await ep.recv(recv_block)
@@ -136,16 +129,10 @@ async def run(self):

register_am_allocators(self.args)

from dask_cuda.rmm_vmm_block_pool import VmmBlockPool

vmm_is_block_pool = isinstance(self.vmm, VmmBlockPool)
print(f"Client vmm_is_block_pool: {vmm_is_block_pool}")

if self.vmm:
vmm_allocator = VmmBlockArray if vmm_is_block_pool else VmmArray
vmm_allocator = partial(vmm_allocator, self.vmm)
vmm_allocator = get_vmm_allocator(self.vmm)

ep = await ucp.create_endpoint(self.server_address, self.port)
recv_msg_vmm = None

if self.args.enable_am:
msg = self.xp.arange(self.args.n_bytes, dtype="u1")
@@ -188,7 +175,7 @@ async def run(self):
else:
recv_msg = Array(self.xp.zeros(self.args.n_bytes, dtype="u1"))

if vmm_is_block_pool:
if hasattr(recv_msg_vmm, "get_blocks"):
recv_blocks = recv_msg_vmm.get_blocks()
send_blocks = send_msg_vmm.get_blocks()
for send_block, recv_block in zip(send_blocks, recv_blocks):
64 changes: 37 additions & 27 deletions ucp/benchmarks/send_recv.py
Original file line number Diff line number Diff line change
@@ -55,6 +55,30 @@ def _get_backend_implementation(backend):
raise ValueError(f"Unknown backend {backend}")


def _get_vmm_allocator(object_type):
print(object_type)
if object_type.startswith("vmm"):
if object_type == "vmm-block-pool":
from dask_cuda.rmm_vmm_block_pool import VmmBlockPool

return VmmBlockPool()
elif object_type == "vmm-pool":
from dask_cuda.vmm_pool import VmmPool

return VmmPool()
elif object_type == "vmm-default-pool":
from dask_cuda.rmm_vmm_pool import VmmAllocPool

return VmmAllocPool()
elif object_type == "vmm-default":
from dask_cuda.rmm_vmm_pool import VmmAlloc

return VmmAlloc()
else:
raise ValueError(f"Unknown VMM type {object_type}")
return None


def server(queue, args):
if args.server_cpu_affinity >= 0:
os.sched_setaffinity(0, [args.server_cpu_affinity])
@@ -85,19 +109,8 @@ def server(queue, args):
xp.cuda.set_allocator(rmm.rmm_cupy_allocator)

server = _get_backend_implementation(args.backend)["server"](args, xp, queue)
if args.object_type.startswith("vmm"):
if args.object_type == "vmm-block-pool":
from dask_cuda.rmm_vmm_block_pool import VmmBlockPool

server.vmm = VmmBlockPool()
elif args.object_type == "vmm-pool":
from dask_cuda.rmm_vmm_pool import VmmAllocPool

server.vmm = VmmAllocPool()
else:
from dask_cuda.rmm_vmm_pool import VmmAlloc

server.vmm = VmmAlloc()
server.vmm = _get_vmm_allocator(args.object_type)
print(server.vmm)

if asyncio.iscoroutinefunction(server.run):
loop = get_event_loop()
@@ -140,19 +153,8 @@ def client(queue, port, server_address, args):
client = _get_backend_implementation(args.backend)["client"](
args, xp, queue, server_address, port
)
if args.object_type.startswith("vmm"):
if args.object_type == "vmm-block-pool":
from dask_cuda.rmm_vmm_block_pool import VmmBlockPool

client.vmm = VmmBlockPool()
elif args.object_type == "vmm-pool":
from dask_cuda.rmm_vmm_pool import VmmAllocPool

client.vmm = VmmAllocPool()
else:
from dask_cuda.rmm_vmm_pool import VmmAlloc

client.vmm = VmmAlloc()
client.vmm = _get_vmm_allocator(args.object_type)
print(client.vmm)

if asyncio.iscoroutinefunction(client.run):
loop = get_event_loop()
@@ -260,7 +262,15 @@ def parse_args():
"-o",
"--object_type",
default="numpy",
choices=["numpy", "cupy", "rmm", "vmm", "vmm-pool", "vmm-block-pool"],
choices=[
"numpy",
"cupy",
"rmm",
"vmm-default",
"vmm-default-pool",
"vmm-block-pool",
"vmm-pool",
],
help="In-memory array type.",
)
parser.add_argument(