Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion flytekit/remote/remote_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
_DEFAULT_CALLBACK = NoOpCallback()
_PREFIX_KEY = "upload_prefix"
_HASHES_KEY = "hashes"
_LPATH_ROOT_KEY = "local_path_root"
# This file system is not really a filesystem, so users aren't really able to specify the remote path,
# at least not yet.
REMOTE_PLACEHOLDER = "flyte://data"
Expand Down Expand Up @@ -143,8 +144,15 @@ async def _put_file(
Make the request and upload, but then how do we get the s3 paths back to the user?
"""
prefix = kwargs.pop(_PREFIX_KEY)
lpath = pathlib.Path(lpath)
if _LPATH_ROOT_KEY in kwargs:
lpath_root = pathlib.Path(kwargs.pop(_LPATH_ROOT_KEY))
if lpath.parent.is_relative_to(lpath_root):
relative_subdirectory = str(lpath.parent.relative_to(lpath_root))
if relative_subdirectory != ".":
prefix += "/" + relative_subdirectory
_, native_url = self._remote.upload_file(
pathlib.Path(lpath), self._remote.default_project, self._remote.default_domain, prefix
lpath, self._remote.default_project, self._remote.default_domain, prefix
)
return native_url

Expand Down Expand Up @@ -234,6 +242,7 @@ async def _put(

kwargs[_PREFIX_KEY] = prefix
kwargs[_HASHES_KEY] = file_info
kwargs[_LPATH_ROOT_KEY] = lpath
res = await super()._put(lpath, REMOTE_PLACEHOLDER, recursive, callback, batch_size, **kwargs)
if isinstance(res, list):
res = self.extract_common(res)
Expand Down
Loading