diff --git a/python/kvikio/kvikio/_lib/arr.pxd b/python/kvikio/kvikio/_lib/arr.pxd index 47bad21a3b..0937d9b87b 100644 --- a/python/kvikio/kvikio/_lib/arr.pxd +++ b/python/kvikio/kvikio/_lib/arr.pxd @@ -1,12 +1,10 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. -# distutils: language = c++ # cython: language_level=3 from libc.stdint cimport uintptr_t -from libcpp.utility cimport pair cdef class Array: @@ -31,6 +29,11 @@ cdef class Array: cpdef Array asarray(obj) -cdef pair[uintptr_t, size_t] parse_buffer_argument( - buf, size, bint accept_host_buffer +cdef struct mem_ptr_nbytes: + uintptr_t ptr + Py_ssize_t nbytes + + +cdef mem_ptr_nbytes parse_buffer_argument( + Array arr, Py_ssize_t nbytes, bint accept_host_buffer ) except * diff --git a/python/kvikio/kvikio/_lib/arr.pyx b/python/kvikio/kvikio/_lib/arr.pyx index 45c7430313..ece43100dc 100644 --- a/python/kvikio/kvikio/_lib/arr.pyx +++ b/python/kvikio/kvikio/_lib/arr.pyx @@ -305,22 +305,19 @@ cpdef Array asarray(obj): return Array(obj) -cdef pair[uintptr_t, size_t] parse_buffer_argument( - buf, size, bint accept_host_buffer +cdef mem_ptr_nbytes parse_buffer_argument( + Array arr, Py_ssize_t nbytes, bint accept_host_buffer ) except *: - """Parse `buf` and `size` argument and return a pointer and nbytes""" - if not isinstance(buf, Array): - buf = Array(buf) - cdef Array arr = buf + """Parse `arr` and `size` argument and return a pointer and nbytes""" if not arr._contiguous(): raise ValueError("Array must be contiguous") if not accept_host_buffer and not arr.cuda: raise ValueError("Non-CUDA buffers not supported") - cdef size_t nbytes - if size is None: - nbytes = arr.nbytes - elif size > arr.nbytes: + + cdef Py_ssize_t arr_nbytes = arr._nbytes() + if nbytes < 0: + nbytes = arr_nbytes + elif nbytes > arr_nbytes: raise ValueError("Size is greater than the size of the buffer") - else: - nbytes = size - return pair[uintptr_t, size_t](arr.ptr, nbytes) + + return mem_ptr_nbytes(ptr=arr.ptr, nbytes=nbytes) diff --git a/python/kvikio/kvikio/_lib/file_handle.pxd b/python/kvikio/kvikio/_lib/file_handle.pxd new file mode 100644 index 0000000000..061f5e9f7a --- /dev/null +++ b/python/kvikio/kvikio/_lib/file_handle.pxd @@ -0,0 +1,102 @@ +# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# distutils: language = c++ +# cython: language_level=3 + +from posix cimport fcntl + +from libc.stdint cimport uintptr_t +from libcpp cimport bool as cpp_bool +from libcpp.string cimport string + +from kvikio._lib import defaults +from kvikio._lib.arr cimport Array +from kvikio._lib.future cimport IOFuture, IOFutureStream, cpp_StreamFuture, future + +ctypedef int cpp_int + + +cdef extern from "cuda.h": + ctypedef void* CUstream + + +cdef extern from "" namespace "kvikio" nogil: + cdef cppclass FileHandle: + FileHandle() except + + FileHandle(cpp_int fd) except + + FileHandle( + string file_path, + string flags, + ) except + + FileHandle( + string file_path, + string flags, + fcntl.mode_t mode + ) except + + void close() + cpp_bool closed() + cpp_int fd() + cpp_int fd_open_flags() except + + future[size_t] pread( + void* devPtr, + size_t size, + size_t file_offset, + size_t task_size + ) except + + future[size_t] pwrite( + void* devPtr, + size_t size, + size_t file_offset, + size_t task_size + ) except + + size_t read( + void* devPtr_base, + size_t size, + size_t file_offset, + size_t devPtr_offset + ) except + + size_t write( + void* devPtr_base, + size_t size, + size_t file_offset, + size_t devPtr_offset + ) except + + cpp_StreamFuture read_async( + void* devPtr_base, + size_t size, + size_t file_offset, + size_t devPtr_offset, + CUstream stream + ) except + + cpp_StreamFuture write_async( + void* devPtr_base, + size_t size, + size_t file_offset, + size_t devPtr_offset, + CUstream stream + ) except + + + +cdef class CuFile: + """File handle for GPUDirect Storage (GDS)""" + cdef FileHandle _handle + + cpdef close(self) + cpdef cpp_bool closed(self) + cpdef cpp_int fileno(self) + cpdef cpp_int open_flags(self) + cpdef IOFuture pread(self, Array buf, Py_ssize_t size=*, Py_ssize_t file_offset=*, + Py_ssize_t task_size=*) + cpdef IOFuture pwrite(self, Array buf, Py_ssize_t size=*, Py_ssize_t file_offset=*, + Py_ssize_t task_size=*) + cpdef Py_ssize_t read(self, Array buf, Py_ssize_t size=*, Py_ssize_t file_offset=*, + Py_ssize_t dev_offset=*) + cpdef Py_ssize_t write(self, Array buf, Py_ssize_t size=*, Py_ssize_t file_offset=*, + Py_ssize_t dev_offset=*) + cpdef IOFutureStream read_async(self, Array buf, Py_ssize_t size=*, + Py_ssize_t file_offset=*, Py_ssize_t dev_offset=*, + uintptr_t stream=*) + cpdef IOFutureStream write_async(self, Array buf, Py_ssize_t size=*, + Py_ssize_t file_offset=*, Py_ssize_t dev_offset=*, + uintptr_t stream=*) diff --git a/python/kvikio/kvikio/_lib/file_handle.pyx b/python/kvikio/kvikio/_lib/file_handle.pyx index 7a8de368ef..442d144035 100644 --- a/python/kvikio/kvikio/_lib/file_handle.pyx +++ b/python/kvikio/kvikio/_lib/file_handle.pyx @@ -4,174 +4,108 @@ # distutils: language = c++ # cython: language_level=3 -import pathlib -from typing import Optional - -from posix cimport fcntl +from pathlib import Path from libc.stdint cimport uintptr_t -from libcpp cimport bool -from libcpp.string cimport string -from libcpp.utility cimport move, pair +from libcpp cimport bool as cpp_bool +from libcpp.utility cimport move -from kvikio._lib.arr cimport parse_buffer_argument +from kvikio._lib.arr cimport Array, mem_ptr_nbytes, parse_buffer_argument from kvikio._lib.future cimport ( IOFuture, IOFutureStream, _wrap_io_future, _wrap_stream_future, - cpp_StreamFuture, - future, ) from kvikio._lib import defaults -cdef extern from "cuda.h": - ctypedef void* CUstream - - -cdef extern from "" namespace "kvikio" nogil: - cdef cppclass FileHandle: - FileHandle() except + - FileHandle(int fd) except + - FileHandle( - string file_path, - string flags, - ) except + - FileHandle( - string file_path, - string flags, - fcntl.mode_t mode - ) except + - void close() - bool closed() - int fd() - int fd_open_flags() except + - future[size_t] pread( - void* devPtr, - size_t size, - size_t file_offset, - size_t task_size - ) except + - future[size_t] pwrite( - void* devPtr, - size_t size, - size_t file_offset, - size_t task_size - ) except + - size_t read( - void* devPtr_base, - size_t size, - size_t file_offset, - size_t devPtr_offset - ) except + - size_t write( - void* devPtr_base, - size_t size, - size_t file_offset, - size_t devPtr_offset - ) except + - cpp_StreamFuture read_async( - void* devPtr_base, - size_t size, - size_t file_offset, - size_t devPtr_offset, - CUstream stream - ) except + - cpp_StreamFuture write_async( - void* devPtr_base, - size_t size, - size_t file_offset, - size_t devPtr_offset, - CUstream stream - ) except + - - cdef class CuFile: - """File handle for GPUDirect Storage (GDS)""" - cdef FileHandle _handle - - def __init__(self, file_path, flags="r"): + def __init__(self, file_path, str flags="r"): self._handle = move( FileHandle( - str.encode(str(pathlib.Path(file_path))), - str.encode(str(flags)) + bytes(Path(file_path)), + flags.encode(), ) ) - def close(self) -> None: + cpdef close(self): self._handle.close() - def closed(self) -> bool: + cpdef cpp_bool closed(self): return self._handle.closed() - def fileno(self) -> int: + cpdef cpp_int fileno(self): return self._handle.fd() - def open_flags(self) -> int: + cpdef cpp_int open_flags(self): return self._handle.fd_open_flags() - def pread(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + cpdef IOFuture pread(self, Array buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0, + Py_ssize_t task_size=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True) return _wrap_io_future( self._handle.pread( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, - task_size if task_size else defaults.task_size() + task_size if task_size > 0 else defaults.task_size() ) ) - def pwrite(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + cpdef IOFuture pwrite(self, Array buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0, + Py_ssize_t task_size=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True) return _wrap_io_future( self._handle.pwrite( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, - task_size if task_size else defaults.task_size() + task_size if task_size > 0 else defaults.task_size() ) ) - def read(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + cpdef Py_ssize_t read(self, Array buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0, + Py_ssize_t dev_offset=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False) return self._handle.read( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, dev_offset, ) - def write(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + cpdef Py_ssize_t write(self, Array buf, Py_ssize_t size=-1, + Py_ssize_t file_offset=0, Py_ssize_t dev_offset=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False) return self._handle.write( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, dev_offset, ) - def read_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int, - st: uintptr_t) -> IOFutureStream: - stream = st - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + cpdef IOFutureStream read_async(self, Array buf, Py_ssize_t size=-1, + Py_ssize_t file_offset=0, Py_ssize_t dev_offset=0, + uintptr_t stream=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False) return _wrap_stream_future(self._handle.read_async( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, dev_offset, - stream, + stream, )) - def write_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int, - st: uintptr_t) -> IOFutureStream: - stream = st - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + cpdef IOFutureStream write_async(self, Array buf, Py_ssize_t size=-1, + Py_ssize_t file_offset=0, Py_ssize_t dev_offset=0, + uintptr_t stream=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False) return _wrap_stream_future(self._handle.write_async( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, dev_offset, - stream, + stream, )) diff --git a/python/kvikio/kvikio/_lib/remote_handle.pxd b/python/kvikio/kvikio/_lib/remote_handle.pxd new file mode 100644 index 0000000000..7a8a97094c --- /dev/null +++ b/python/kvikio/kvikio/_lib/remote_handle.pxd @@ -0,0 +1,47 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# distutils: language = c++ +# cython: language_level=3 + +from libcpp.memory cimport unique_ptr + +from kvikio._lib.arr cimport Array +from kvikio._lib.future cimport IOFuture, future + + +cdef extern from "" nogil: + cdef cppclass cpp_RemoteEndpoint "kvikio::RemoteEndpoint": + pass + + cdef cppclass cpp_HttpEndpoint "kvikio::HttpEndpoint": + cpp_HttpEndpoint(string url) except + + + cdef cppclass cpp_RemoteHandle "kvikio::RemoteHandle": + cpp_RemoteHandle( + unique_ptr[cpp_RemoteEndpoint] endpoint, size_t nbytes + ) except + + cpp_RemoteHandle(unique_ptr[cpp_RemoteEndpoint] endpoint) except + + size_t nbytes() except + + size_t read( + void* buf, + size_t size, + size_t file_offset + ) except + + future[size_t] pread( + void* buf, + size_t size, + size_t file_offset + ) except + + + +cdef class RemoteFile: + cdef unique_ptr[cpp_RemoteHandle] _handle + + @staticmethod + cdef RemoteFile cpp_open_http(str url, Py_ssize_t nbytes=*) + + cpdef Py_ssize_t nbytes(self) + + cpdef Py_ssize_t read(self, Array buf, Py_ssize_t size=*, Py_ssize_t file_offset=*) + cpdef IOFuture pread(self, Array buf, Py_ssize_t size=*, Py_ssize_t file_offset=*) diff --git a/python/kvikio/kvikio/_lib/remote_handle.pyx b/python/kvikio/kvikio/_lib/remote_handle.pyx index 93c6ac398a..45207b48b5 100644 --- a/python/kvikio/kvikio/_lib/remote_handle.pyx +++ b/python/kvikio/kvikio/_lib/remote_handle.pyx @@ -4,41 +4,14 @@ # distutils: language = c++ # cython: language_level=3 -from typing import Optional - +from cython cimport Py_ssize_t from cython.operator cimport dereference as deref -from libc.stdint cimport uintptr_t from libcpp.memory cimport make_unique, unique_ptr from libcpp.string cimport string -from libcpp.utility cimport move, pair - -from kvikio._lib.arr cimport parse_buffer_argument -from kvikio._lib.future cimport IOFuture, _wrap_io_future, future - - -cdef extern from "" nogil: - cdef cppclass cpp_RemoteEndpoint "kvikio::RemoteEndpoint": - pass - - cdef cppclass cpp_HttpEndpoint "kvikio::HttpEndpoint": - cpp_HttpEndpoint(string url) except + +from libcpp.utility cimport move - cdef cppclass cpp_RemoteHandle "kvikio::RemoteHandle": - cpp_RemoteHandle( - unique_ptr[cpp_RemoteEndpoint] endpoint, size_t nbytes - ) except + - cpp_RemoteHandle(unique_ptr[cpp_RemoteEndpoint] endpoint) except + - int nbytes() except + - size_t read( - void* buf, - size_t size, - size_t file_offset - ) except + - future[size_t] pread( - void* buf, - size_t size, - size_t file_offset - ) except + +from kvikio._lib.arr cimport Array, mem_ptr_nbytes, parse_buffer_argument +from kvikio._lib.future cimport IOFuture, _wrap_io_future cdef string _to_string(str s): @@ -50,42 +23,40 @@ cdef string _to_string(str s): cdef class RemoteFile: - cdef unique_ptr[cpp_RemoteHandle] _handle + @staticmethod + def open_http(str url, Py_ssize_t nbytes=-1) -> RemoteFile: + return RemoteFile.cpp_open_http(url, nbytes) - @classmethod - def open_http( - cls, - url: str, - nbytes: Optional[int], - ): + @staticmethod + cdef RemoteFile cpp_open_http(str url, Py_ssize_t nbytes=-1): cdef RemoteFile ret = RemoteFile() cdef unique_ptr[cpp_HttpEndpoint] ep = make_unique[cpp_HttpEndpoint]( _to_string(url) ) - if nbytes is None: + if nbytes < 0: ret._handle = make_unique[cpp_RemoteHandle](move(ep)) return ret - cdef size_t n = nbytes - ret._handle = make_unique[cpp_RemoteHandle](move(ep), n) + ret._handle = make_unique[cpp_RemoteHandle](move(ep), nbytes) return ret - def nbytes(self) -> int: + cpdef Py_ssize_t nbytes(self): return deref(self._handle).nbytes() - def read(self, buf, size: Optional[int], file_offset: int) -> int: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + cpdef Py_ssize_t read(self, Array buf, Py_ssize_t size=-1, + Py_ssize_t file_offset=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True) return deref(self._handle).read( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, ) - def pread(self, buf, size: Optional[int], file_offset: int) -> IOFuture: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + cpdef IOFuture pread(self, Array buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True) return _wrap_io_future( deref(self._handle).pread( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, ) ) diff --git a/python/kvikio/kvikio/cufile.py b/python/kvikio/kvikio/cufile.py index ead7bc5f7a..8a12ea391a 100644 --- a/python/kvikio/kvikio/cufile.py +++ b/python/kvikio/kvikio/cufile.py @@ -2,9 +2,10 @@ # See file LICENSE for terms. import pathlib -from typing import Optional, Union +from typing import Union from kvikio._lib import file_handle # type: ignore +from kvikio._lib.arr import asarray class IOFutureStream: @@ -112,9 +113,9 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None: def pread( self, buf, - size: Optional[int] = None, + size: int = -1, file_offset: int = 0, - task_size: Optional[int] = None, + task_size: int = 0, ) -> IOFuture: """Reads specified bytes from the file into device or host memory in parallel @@ -154,14 +155,14 @@ def pread( any undesired bytes from the resulting data. Similarly, it is optimal for `size` to be a multiple of 4096 bytes. When GDS isn't used, this is less critical. """ - return IOFuture(self._handle.pread(buf, size, file_offset, task_size)) + return IOFuture(self._handle.pread(asarray(buf), size, file_offset, task_size)) def pwrite( self, buf, - size: Optional[int] = None, + size: int = -1, file_offset: int = 0, - task_size: Optional[int] = None, + task_size: int = 0, ) -> IOFuture: """Writes specified bytes from device or host memory into the file in parallel @@ -201,14 +202,14 @@ def pwrite( any undesired bytes from the resulting data. Similarly, it is optimal for `size` to be a multiple of 4096 bytes. When GDS isn't used, this is less critical. """ - return IOFuture(self._handle.pwrite(buf, size, file_offset, task_size)) + return IOFuture(self._handle.pwrite(asarray(buf), size, file_offset, task_size)) def read( self, buf, - size: Optional[int] = None, + size: int = -1, file_offset: int = 0, - task_size: Optional[int] = None, + task_size: int = 0, ) -> int: """Reads specified bytes from the file into the device memory in parallel @@ -240,14 +241,14 @@ def read( any undesired bytes from the resulting data. Similarly, it is optimal for `size` to be a multiple of 4096 bytes. When GDS isn't used, this is less critical. """ - return self.pread(buf, size, file_offset, task_size).get() + return self.pread(asarray(buf), size, file_offset, task_size).get() def write( self, buf, - size: Optional[int] = None, + size: int = -1, file_offset: int = 0, - task_size: Optional[int] = None, + task_size: int = 0, ) -> int: """Writes specified bytes from the device memory into the file in parallel @@ -279,13 +280,13 @@ def write( any undesired bytes from the resulting data. Similarly, it is optimal for `size` to be a multiple of 4096 bytes. When GDS isn't used, this is less critical. """ - return self.pwrite(buf, size, file_offset, task_size).get() + return self.pwrite(asarray(buf), size, file_offset, task_size).get() def raw_read_async( self, buf, stream, - size: Optional[int] = None, + size: int = -1, file_offset: int = 0, dev_offset: int = 0, ) -> IOFutureStream: @@ -314,13 +315,15 @@ def raw_read_async( `IOFutureStream.check_bytes_done()`, which will synchronize the associated stream and return the number of bytes read. """ - return self._handle.read_async(buf, size, file_offset, dev_offset, stream) + return self._handle.read_async( + asarray(buf), size, file_offset, dev_offset, stream + ) def raw_write_async( self, buf, stream, - size: Optional[int] = None, + size: int = -1, file_offset: int = 0, dev_offset: int = 0, ) -> IOFutureStream: @@ -349,12 +352,14 @@ def raw_write_async( `IOFutureStream.check_bytes_done()`, which will synchronize the associated stream and return the number of bytes written. """ - return self._handle.write_async(buf, size, file_offset, dev_offset, stream) + return self._handle.write_async( + asarray(buf), size, file_offset, dev_offset, stream + ) def raw_read( self, buf, - size: Optional[int] = None, + size: int = -1, file_offset: int = 0, dev_offset: int = 0, ) -> int: @@ -389,12 +394,12 @@ def raw_read( any undesired bytes from the resulting data. Similarly, it is optimal for `size` to be a multiple of 4096 bytes. When GDS isn't used, this is less critical. """ - return self._handle.read(buf, size, file_offset, dev_offset) + return self._handle.read(asarray(buf), size, file_offset, dev_offset) def raw_write( self, buf, - size: Optional[int] = None, + size: int = -1, file_offset: int = 0, dev_offset: int = 0, ) -> int: @@ -429,4 +434,4 @@ def raw_write( any undesired bytes from the resulting data. Similarly, it is optimal for `size` to be a multiple of 4096 bytes. When GDS isn't used, this is less critical. """ - return self._handle.write(buf, size, file_offset, dev_offset) + return self._handle.write(asarray(buf), size, file_offset, dev_offset) diff --git a/python/kvikio/kvikio/remote_file.py b/python/kvikio/kvikio/remote_file.py index 52bbe8010f..8c3f4de580 100644 --- a/python/kvikio/kvikio/remote_file.py +++ b/python/kvikio/kvikio/remote_file.py @@ -4,8 +4,8 @@ from __future__ import annotations import functools -from typing import Optional +from kvikio._lib.arr import asarray from kvikio.cufile import IOFuture @@ -54,7 +54,7 @@ def __init__(self, handle): def open_http( cls, url: str, - nbytes: Optional[int] = None, + nbytes: int = -1, ) -> RemoteFile: """Open a http file. @@ -89,7 +89,7 @@ def nbytes(self) -> int: """ return self._handle.nbytes() - def read(self, buf, size: Optional[int] = None, file_offset: int = 0) -> int: + def read(self, buf, size: int = -1, file_offset: int = 0) -> int: """Read from remote source into buffer (host or device memory) in parallel. Parameters @@ -105,9 +105,9 @@ def read(self, buf, size: Optional[int] = None, file_offset: int = 0) -> int: ------- The size of bytes that were successfully read. """ - return self.pread(buf, size, file_offset).get() + return self.pread(asarray(buf), size, file_offset).get() - def pread(self, buf, size: Optional[int] = None, file_offset: int = 0) -> IOFuture: + def pread(self, buf, size: int = -1, file_offset: int = 0) -> IOFuture: """Read from remote source into buffer (host or device memory) in parallel. Parameters @@ -124,4 +124,4 @@ def pread(self, buf, size: Optional[int] = None, file_offset: int = 0) -> IOFutu Future that on completion returns the size of bytes that were successfully read. """ - return IOFuture(self._handle.pread(buf, size, file_offset)) + return IOFuture(self._handle.pread(asarray(buf), size, file_offset))