Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,6 @@ check-import-profile:
diff import_profiles/filtered_flyte_importtime.txt updated_filtered_flyte_importtime.txt || (echo "Import profile mismatch!" && exit 1)
rm -f updated_flyte_importtime.txt updated_filtered_flyte_importtime.txt

.PHONY: copy-protos
copy-protos: export CLOUD_REPO_PATH ?= ../cloud
copy-protos:
uv run ./maint_tools/copy_pb_python_from_cloud.py ${CLOUD_REPO_PATH}


.PHONY: unit_test
unit_test: ## Test the code with pytest
@echo "🚀 Testing code: Running unit tests..."
Expand Down
2 changes: 1 addition & 1 deletion examples/basics/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,5 +211,5 @@ async def main() -> None:
import flyte.git

flyte.init_from_config(flyte.git.config_from_root())
r = flyte.run(main)
r = flyte.with_runcontext(mode="local").run(main)
print(r.url)
36 changes: 29 additions & 7 deletions examples/stress/large_file_io.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import asyncio
import os
import signal
import tempfile
import time
from typing import Tuple

Expand Down Expand Up @@ -26,25 +30,43 @@ async def create_large_file(size_gigabytes: int = 5) -> flyte.io.File:


@env.task
async def read_large_file(f: flyte.io.File) -> Tuple[int, float]:
async def read_large_file(f: flyte.io.File, hang: bool = False) -> Tuple[int, float]:
total_bytes = 0
chunk_size = 10 * 1024 * 1024
# chunk_size = 10 * 1024 * 1024
_, tmp_path = tempfile.mkstemp()
print(f"Will download file from {f.path} to {tmp_path}", flush=True)
if hang:
# This is debugging to exec into the container to monitor it.
loop = asyncio.get_running_loop()
waiter = asyncio.Event()
args = ()

loop.add_signal_handler(signal.SIGUSR2, waiter.set, *args)
# kill -USR2 <pid>
print(f"Hanging until SIGUSR2 is received (pid={os.getpid()})", flush=True)
signal.pause()

start = time.time()
read = 0
async with f.open("rb", block_size=chunk_size) as fp:
while _ := await fp.read():
read += 1
# read = 0
# async with f.open("rb", block_size=chunk_size) as fp:
# while _ := await fp.read():
# read += 1

await flyte.storage.get(f.path, tmp_path)

end = time.time()
total = end - start
total_bytes = os.path.getsize(tmp_path)
await asyncio.sleep(100)
print(f"Read {total_bytes} bytes in {total:.2f} seconds ({total_bytes / total / (1024 * 1024):.2f} MiB/s)")

return total_bytes, total


@env.task
async def main(size_gigabytes: int = 5) -> Tuple[int, float]:
large_file = await create_large_file(size_gigabytes)
return await read_large_file(large_file)
return await read_large_file(large_file, hang=False)


if __name__ == "__main__":
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ authors = [{ name = "Ketan Umare", email = "[email protected]" }]
requires-python = ">=3.10"
dependencies = [
"aiofiles>=24.1.0",
# 8.2.0 has a bug https://github.com/pallets/click/issues/2897
"click>=8.2.1",
"flyteidl>=1.15.4b0,<2.0.0",
"cloudpickle>=3.1.1",
Expand Down
50 changes: 42 additions & 8 deletions src/flyte/io/_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,23 +527,40 @@ async def download_to_path(f: File) -> str:
The absolute path to the downloaded file
"""
if local_path is None:
local_path = storage.get_random_local_path(file_path_or_file_name=local_path)
local_path = storage.get_random_local_path(file_path_or_file_name=self.path)
else:
# Preserve trailing separator if present (Path.absolute() strips it)
local_path_str = str(local_path)
has_trailing_sep = local_path_str.endswith(os.sep)
local_path = str(Path(local_path).absolute())
if has_trailing_sep and not local_path.endswith(os.sep):
local_path = local_path + os.sep

fs = storage.get_underlying_filesystem(path=self.path)

# If it's already a local file, just copy it
if "file" in fs.protocol:
# Apply directory logic for local-to-local copies
local_path_for_copy = local_path
if isinstance(local_path, str):
local_path_obj = Path(local_path)
# Check if it's a directory or ends with separator
if local_path.endswith(os.sep) or (local_path_obj.exists() and local_path_obj.is_dir()):
remote_filename = Path(self.path).name
local_path_for_copy = str(local_path_obj / remote_filename)

# Ensure parent directory exists
Path(local_path_for_copy).parent.mkdir(parents=True, exist_ok=True)

# Use aiofiles for async copy
async with aiofiles.open(self.path, "rb") as src:
async with aiofiles.open(local_path, "wb") as dst:
async with aiofiles.open(local_path_for_copy, "wb") as dst:
await dst.write(await src.read())
return str(local_path)
return str(local_path_for_copy)

# Otherwise download from remote using async functionality
await storage.get(self.path, str(local_path))
return str(local_path)
result_path = await storage.get(self.path, str(local_path))
return result_path

def download_sync(self, local_path: Optional[Union[str, Path]] = None) -> str:
"""
Expand Down Expand Up @@ -579,19 +596,36 @@ def download_to_path_sync(f: File) -> str:
The absolute path to the downloaded file
"""
if local_path is None:
local_path = storage.get_random_local_path(file_path_or_file_name=local_path)
local_path = storage.get_random_local_path(file_path_or_file_name=self.path)
else:
# Preserve trailing separator if present (Path.absolute() strips it)
local_path_str = str(local_path)
has_trailing_sep = local_path_str.endswith(os.sep)
local_path = str(Path(local_path).absolute())
if has_trailing_sep and not local_path.endswith(os.sep):
local_path = local_path + os.sep

fs = storage.get_underlying_filesystem(path=self.path)

# If it's already a local file, just copy it
if "file" in fs.protocol:
# Apply directory logic for local-to-local copies
local_path_for_copy = local_path
if isinstance(local_path, str):
local_path_obj = Path(local_path)
# Check if it's a directory or ends with separator
if local_path.endswith(os.sep) or (local_path_obj.exists() and local_path_obj.is_dir()):
remote_filename = Path(self.path).name
local_path_for_copy = str(local_path_obj / remote_filename)

# Ensure parent directory exists
Path(local_path_for_copy).parent.mkdir(parents=True, exist_ok=True)

# Use standard file operations for sync copy
import shutil

shutil.copy2(self.path, local_path)
return str(local_path)
shutil.copy2(self.path, local_path_for_copy)
return str(local_path_for_copy)

# Otherwise download from remote using sync functionality
# Use the sync version of storage operations
Expand Down
Loading
Loading