Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More thoroughly type *Handle classes #501

Open
wants to merge 1 commit into
base: branch-24.12
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions python/kvikio/kvikio/_lib/arr.pxd
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
# Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

# distutils: language = c++
# cython: language_level=3


from libc.stdint cimport uintptr_t
from libcpp.utility cimport pair


cdef class Array:
Expand All @@ -31,6 +29,11 @@ cdef class Array:
cpdef Array asarray(obj)


cdef pair[uintptr_t, size_t] parse_buffer_argument(
buf, size, bint accept_host_buffer
cdef struct mem_ptr_nbytes:
uintptr_t ptr
Py_ssize_t nbytes


cdef mem_ptr_nbytes parse_buffer_argument(
Array arr, Py_ssize_t nbytes, bint accept_host_buffer
) except *
23 changes: 10 additions & 13 deletions python/kvikio/kvikio/_lib/arr.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -305,22 +305,19 @@ cpdef Array asarray(obj):
return Array(obj)


cdef pair[uintptr_t, size_t] parse_buffer_argument(
buf, size, bint accept_host_buffer
cdef mem_ptr_nbytes parse_buffer_argument(
Array arr, Py_ssize_t nbytes, bint accept_host_buffer
) except *:
"""Parse `buf` and `size` argument and return a pointer and nbytes"""
if not isinstance(buf, Array):
buf = Array(buf)
cdef Array arr = buf
"""Parse `arr` and `size` argument and return a pointer and nbytes"""
if not arr._contiguous():
raise ValueError("Array must be contiguous")
if not accept_host_buffer and not arr.cuda:
raise ValueError("Non-CUDA buffers not supported")
cdef size_t nbytes
if size is None:
nbytes = arr.nbytes
elif size > arr.nbytes:

cdef Py_ssize_t arr_nbytes = arr._nbytes()
if nbytes < 0:
nbytes = arr_nbytes
elif nbytes > arr_nbytes:
raise ValueError("Size is greater than the size of the buffer")
else:
nbytes = size
return pair[uintptr_t, size_t](arr.ptr, nbytes)

return mem_ptr_nbytes(ptr=arr.ptr, nbytes=nbytes)
102 changes: 102 additions & 0 deletions python/kvikio/kvikio/_lib/file_handle.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

# distutils: language = c++
# cython: language_level=3

from posix cimport fcntl

from libc.stdint cimport uintptr_t
from libcpp cimport bool as cpp_bool
from libcpp.string cimport string

from kvikio._lib import defaults
from kvikio._lib.arr cimport Array
from kvikio._lib.future cimport IOFuture, IOFutureStream, cpp_StreamFuture, future

ctypedef int cpp_int


cdef extern from "cuda.h":
ctypedef void* CUstream


cdef extern from "<kvikio/file_handle.hpp>" namespace "kvikio" nogil:
cdef cppclass FileHandle:
FileHandle() except +
FileHandle(cpp_int fd) except +
FileHandle(
string file_path,
string flags,
) except +
FileHandle(
string file_path,
string flags,
fcntl.mode_t mode
) except +
void close()
cpp_bool closed()
cpp_int fd()
cpp_int fd_open_flags() except +
future[size_t] pread(
void* devPtr,
size_t size,
size_t file_offset,
size_t task_size
) except +
future[size_t] pwrite(
void* devPtr,
size_t size,
size_t file_offset,
size_t task_size
) except +
size_t read(
void* devPtr_base,
size_t size,
size_t file_offset,
size_t devPtr_offset
) except +
size_t write(
void* devPtr_base,
size_t size,
size_t file_offset,
size_t devPtr_offset
) except +
cpp_StreamFuture read_async(
void* devPtr_base,
size_t size,
size_t file_offset,
size_t devPtr_offset,
CUstream stream
) except +
cpp_StreamFuture write_async(
void* devPtr_base,
size_t size,
size_t file_offset,
size_t devPtr_offset,
CUstream stream
) except +


cdef class CuFile:
"""File handle for GPUDirect Storage (GDS)"""
cdef FileHandle _handle

cpdef close(self)
cpdef cpp_bool closed(self)
cpdef cpp_int fileno(self)
cpdef cpp_int open_flags(self)
cpdef IOFuture pread(self, Array buf, Py_ssize_t size=*, Py_ssize_t file_offset=*,
Py_ssize_t task_size=*)
cpdef IOFuture pwrite(self, Array buf, Py_ssize_t size=*, Py_ssize_t file_offset=*,
Py_ssize_t task_size=*)
cpdef Py_ssize_t read(self, Array buf, Py_ssize_t size=*, Py_ssize_t file_offset=*,
Py_ssize_t dev_offset=*)
cpdef Py_ssize_t write(self, Array buf, Py_ssize_t size=*, Py_ssize_t file_offset=*,
Py_ssize_t dev_offset=*)
cpdef IOFutureStream read_async(self, Array buf, Py_ssize_t size=*,
Py_ssize_t file_offset=*, Py_ssize_t dev_offset=*,
uintptr_t stream=*)
cpdef IOFutureStream write_async(self, Array buf, Py_ssize_t size=*,
Py_ssize_t file_offset=*, Py_ssize_t dev_offset=*,
uintptr_t stream=*)
160 changes: 47 additions & 113 deletions python/kvikio/kvikio/_lib/file_handle.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4,174 +4,108 @@
# distutils: language = c++
# cython: language_level=3

import pathlib
from typing import Optional

from posix cimport fcntl
from pathlib import Path

from libc.stdint cimport uintptr_t
from libcpp cimport bool
from libcpp.string cimport string
from libcpp.utility cimport move, pair
from libcpp cimport bool as cpp_bool
from libcpp.utility cimport move

from kvikio._lib.arr cimport parse_buffer_argument
from kvikio._lib.arr cimport Array, mem_ptr_nbytes, parse_buffer_argument
from kvikio._lib.future cimport (
IOFuture,
IOFutureStream,
_wrap_io_future,
_wrap_stream_future,
cpp_StreamFuture,
future,
)

from kvikio._lib import defaults


cdef extern from "cuda.h":
ctypedef void* CUstream


cdef extern from "<kvikio/file_handle.hpp>" namespace "kvikio" nogil:
cdef cppclass FileHandle:
FileHandle() except +
FileHandle(int fd) except +
FileHandle(
string file_path,
string flags,
) except +
FileHandle(
string file_path,
string flags,
fcntl.mode_t mode
) except +
void close()
bool closed()
int fd()
int fd_open_flags() except +
future[size_t] pread(
void* devPtr,
size_t size,
size_t file_offset,
size_t task_size
) except +
future[size_t] pwrite(
void* devPtr,
size_t size,
size_t file_offset,
size_t task_size
) except +
size_t read(
void* devPtr_base,
size_t size,
size_t file_offset,
size_t devPtr_offset
) except +
size_t write(
void* devPtr_base,
size_t size,
size_t file_offset,
size_t devPtr_offset
) except +
cpp_StreamFuture read_async(
void* devPtr_base,
size_t size,
size_t file_offset,
size_t devPtr_offset,
CUstream stream
) except +
cpp_StreamFuture write_async(
void* devPtr_base,
size_t size,
size_t file_offset,
size_t devPtr_offset,
CUstream stream
) except +


cdef class CuFile:
"""File handle for GPUDirect Storage (GDS)"""
cdef FileHandle _handle

def __init__(self, file_path, flags="r"):
def __init__(self, file_path, str flags="r"):
self._handle = move(
FileHandle(
str.encode(str(pathlib.Path(file_path))),
str.encode(str(flags))
bytes(Path(file_path)),
flags.encode(),
)
)

def close(self) -> None:
cpdef close(self):
self._handle.close()

def closed(self) -> bool:
cpdef cpp_bool closed(self):
return self._handle.closed()

def fileno(self) -> int:
cpdef cpp_int fileno(self):
return self._handle.fd()

def open_flags(self) -> int:
cpdef cpp_int open_flags(self):
return self._handle.fd_open_flags()

def pread(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True)
cpdef IOFuture pread(self, Array buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0,
Py_ssize_t task_size=0):
cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True)
return _wrap_io_future(
self._handle.pread(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
task_size if task_size else defaults.task_size()
task_size if task_size > 0 else defaults.task_size()
)
)

def pwrite(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True)
cpdef IOFuture pwrite(self, Array buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0,
Py_ssize_t task_size=0):
cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True)
return _wrap_io_future(
self._handle.pwrite(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
task_size if task_size else defaults.task_size()
task_size if task_size > 0 else defaults.task_size()
)
)

def read(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False)
cpdef Py_ssize_t read(self, Array buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0,
Py_ssize_t dev_offset=0):
cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False)
return self._handle.read(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
dev_offset,
)

def write(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False)
cpdef Py_ssize_t write(self, Array buf, Py_ssize_t size=-1,
Py_ssize_t file_offset=0, Py_ssize_t dev_offset=0):
cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False)
return self._handle.write(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
dev_offset,
)

def read_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int,
st: uintptr_t) -> IOFutureStream:
stream = <CUstream>st
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False)
cpdef IOFutureStream read_async(self, Array buf, Py_ssize_t size=-1,
Py_ssize_t file_offset=0, Py_ssize_t dev_offset=0,
uintptr_t stream=0):
cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False)
return _wrap_stream_future(self._handle.read_async(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
dev_offset,
stream,
<CUstream>stream,
))

def write_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int,
st: uintptr_t) -> IOFutureStream:
stream = <CUstream>st
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False)
cpdef IOFutureStream write_async(self, Array buf, Py_ssize_t size=-1,
Py_ssize_t file_offset=0, Py_ssize_t dev_offset=0,
uintptr_t stream=0):
cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False)
return _wrap_stream_future(self._handle.write_async(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
dev_offset,
stream,
<CUstream>stream,
))
Loading