Skip to content

Commit

Permalink
Add support for recursive upload in stage copy (#1816)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-turbaszek authored Oct 31, 2024
1 parent e634c53 commit f4c69a8
Show file tree
Hide file tree
Showing 8 changed files with 541 additions and 31 deletions.
2 changes: 2 additions & 0 deletions RELEASE-NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
## New additions
* Added `--retain-comments` option to `snow sql` command to allow passing comments to Snowflake.
* Added `--replace` and `--if-not-exists` options to `snow object create` command.
* `snow stage copy` supports `--recursive` flag to copy local files and subdirectories recursively to stage. Including
glob support.

## Fixes and improvements
* `snow --info` callback returns information about `SNOWFLAKE_HOME` variable.
Expand Down
36 changes: 20 additions & 16 deletions src/snowflake/cli/_plugins/stage/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def copy(
)
return _put(
recursive=recursive,
source_path=source_path,
source_path=Path(source_path),
destination_path=destination_path,
parallel=parallel,
overwrite=overwrite,
Expand Down Expand Up @@ -247,23 +247,27 @@ def get(recursive: bool, source_path: str, destination_path: str, parallel: int)

def _put(
recursive: bool,
source_path: str,
source_path: Path,
destination_path: str,
parallel: int,
overwrite: bool,
auto_compress: bool,
):
if recursive:
raise click.ClickException("Recursive flag for upload is not supported.")

source = Path(source_path).resolve()
local_path = str(source) + "/*" if source.is_dir() else str(source)

cursor = StageManager().put(
local_path=local_path,
stage_path=destination_path,
overwrite=overwrite,
parallel=parallel,
auto_compress=auto_compress,
)
return QueryResult(cursor)
if recursive and not source_path.is_file():
cursor_generator = StageManager().put_recursive(
local_path=source_path,
stage_path=destination_path,
overwrite=overwrite,
parallel=parallel,
auto_compress=auto_compress,
)
return CollectionResult(cursor_generator)
else:
cursor = StageManager().put(
local_path=source_path.resolve(),
stage_path=destination_path,
overwrite=overwrite,
parallel=parallel,
auto_compress=auto_compress,
)
return QueryResult(cursor)
2 changes: 1 addition & 1 deletion src/snowflake/cli/_plugins/stage/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def put_files_on_stage(


def sync_local_diff_with_stage(
role: str, deploy_root_path: Path, diff_result: DiffResult, stage_fqn: str
role: str | None, deploy_root_path: Path, diff_result: DiffResult, stage_fqn: str
):
"""
Syncs a given local directory's contents with a Snowflake stage, including removing old files, and re-uploading modified and new files.
Expand Down
135 changes: 132 additions & 3 deletions src/snowflake/cli/_plugins/stage/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@
import fnmatch
import glob
import logging
import os
import re
import shutil
import sys
import time
from collections import deque
from contextlib import nullcontext
from dataclasses import dataclass
from os import path
from pathlib import Path
from tempfile import TemporaryDirectory
from textwrap import dedent
from typing import Dict, List, Optional, Union
from typing import Deque, Dict, Generator, List, Optional, Union

from click import ClickException
from click import ClickException, UsageError
from snowflake.cli._plugins.snowpark.package_utils import parse_requirements
from snowflake.cli.api.commands.common import (
OnErrorType,
Expand Down Expand Up @@ -306,23 +310,148 @@ def put(
overwrite: bool = False,
role: Optional[str] = None,
auto_compress: bool = False,
use_dict_cursor: bool = False,
) -> SnowflakeCursor:
"""
This method will take a file path from the user's system and put it into a Snowflake stage,
which includes its fully qualified name as well as the path within the stage.
If provided with a role, then temporarily use this role to perform the operation above,
and switch back to the original role for the next commands to run.
"""
if "*" not in str(local_path):
local_path = (
os.path.join(local_path, "*")
if Path(local_path).is_dir()
else str(local_path)
)
with self.use_role(role) if role else nullcontext():
spath = self.build_path(stage_path)
local_resolved_path = path_resolver(str(local_path))
log.info("Uploading %s to %s", local_resolved_path, stage_path)
cursor = self._execute_query(
f"put {self._to_uri(local_resolved_path)} {spath.path_for_sql()} "
f"auto_compress={str(auto_compress).lower()} parallel={parallel} overwrite={overwrite}"
f"auto_compress={str(auto_compress).lower()} parallel={parallel} overwrite={overwrite}",
cursor_class=DictCursor if use_dict_cursor else SnowflakeCursor,
)
return cursor

@staticmethod
def _symlink_or_copy(source_root: Path, source_file_or_dir: Path, dest_dir: Path):
from snowflake.cli._plugins.nativeapp.artifacts import resolve_without_follow

absolute_src = resolve_without_follow(source_file_or_dir)
dest_path = dest_dir / source_file_or_dir.relative_to(source_root)

if absolute_src.is_file():
try:
os.symlink(absolute_src, dest_path)
except OSError:
if not dest_path.parent.exists():
dest_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(absolute_src, dest_path)
else:
dest_path.mkdir(exist_ok=True, parents=True)

def put_recursive(
self,
local_path: Path,
stage_path: str,
parallel: int = 4,
overwrite: bool = False,
role: Optional[str] = None,
auto_compress: bool = False,
) -> Generator[dict, None, None]:
if local_path.is_file():
raise UsageError("Cannot use recursive upload with a single file.")

if local_path.is_dir():
root = local_path
glob_pattern = str(local_path / "**/*")
else:
root = Path([p for p in local_path.parents if p.is_dir()][0])
glob_pattern = str(local_path)

with TemporaryDirectory() as tmp:
temp_dir_with_copy = Path(tmp)

# Create a symlink or copy the file to the temp directory
for file_or_dir in glob.iglob(glob_pattern, recursive=True):
self._symlink_or_copy(
source_root=root,
source_file_or_dir=Path(file_or_dir),
dest_dir=temp_dir_with_copy,
)

# Find the deepest directories, we will be iterating from bottom to top
deepest_dirs_list = self._find_deepest_directories(temp_dir_with_copy)

while deepest_dirs_list:
# Remove as visited
directory = deepest_dirs_list.pop(0)

# We reached root but there are still directories to process
if directory == temp_dir_with_copy and deepest_dirs_list:
continue

# Upload the directory content, at this moment the directory has only files
if list(directory.iterdir()):
destination = StagePath.from_stage_str(
stage_path
) / directory.relative_to(temp_dir_with_copy)
results: list[dict] = self.put(
local_path=directory,
stage_path=destination,
parallel=parallel,
overwrite=overwrite,
role=role,
auto_compress=auto_compress,
use_dict_cursor=True,
).fetchall()

# Rewrite results to have resolved paths for better UX
for item in results:
item["source"] = (directory / item["source"]).relative_to(
temp_dir_with_copy
)
item["target"] = str(destination / item["target"])
yield item

# We end if we reach the root directory
if directory == temp_dir_with_copy:
break

# Add parent directory to the list if it's not already there
if directory.parent not in deepest_dirs_list:
deepest_dirs_list.append(directory.parent)

# Remove the directory so the parent directory will contain only files
shutil.rmtree(directory)

@staticmethod
def _find_deepest_directories(root_directory: Path) -> list[Path]:
"""
BFS to find the deepest directories. Build a tree of directories
structure and return leaves.
"""
deepest_dirs: list[Path] = list()

queue: Deque[Path] = deque()
queue.append(root_directory)
while queue:
current_dir = queue.popleft()
# Sorted to have deterministic order
children_directories = sorted(
list(d for d in current_dir.iterdir() if d.is_dir())
)
if not children_directories and current_dir not in deepest_dirs:
deepest_dirs.append(current_dir)
else:
queue.extend([c for c in children_directories if c not in deepest_dirs])
deepest_dirs_list = sorted(
list(deepest_dirs), key=lambda d: len(d.parts), reverse=True
)
return deepest_dirs_list

def copy_files(self, source_path: str, destination_path: str) -> SnowflakeCursor:
source_stage_path = self.build_path(source_path)
# We copy only into stage
Expand Down
2 changes: 1 addition & 1 deletion src/snowflake/cli/api/output/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def result(self):


class CollectionResult(CommandResult):
def __init__(self, elements: t.Iterable[t.Dict]):
def __init__(self, elements: t.Iterable[t.Dict] | t.Generator[t.Dict, None, None]):
self._elements = elements

@property
Expand Down
7 changes: 5 additions & 2 deletions src/snowflake/cli/api/stage_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,13 @@ def absolute_path(self, no_fqn=False, at_prefix=True) -> str:
return str_path.rstrip("/") + "/"
return str_path

def joinpath(self, path: str) -> StagePath:
def joinpath(self, path: str | Path) -> StagePath:
if self.is_file():
raise ValueError("Cannot join path to a file")

return StagePath(
stage_name=self._stage_name,
path=PurePosixPath(self._path) / path.lstrip("/"),
path=PurePosixPath(self._path) / str(path).lstrip("/"),
git_ref=self._git_ref,
)

Expand Down Expand Up @@ -237,5 +237,8 @@ def get_local_target_path(self, target_dir: Path, stage_root: StagePath):
def __str__(self):
return self.absolute_path()

def __repr__(self):
return str(self)

def __eq__(self, other):
return self.absolute_path() == other.absolute_path()
Loading

0 comments on commit f4c69a8

Please sign in to comment.