Skip to content

Commit 10fbb22

Browse files
kumare3fiedlerNr9eapolinario
authored andcommitted
Tracks progress for package creation, upload and kickoff (flyteorg#2935)
* Tracks progress for package creation, upload and kickoff Signed-off-by: Ketan Umare <[email protected]> * updated Signed-off-by: Ketan Umare <[email protected]> * introduce FLYTEKIT_DISPLAY_PROGRESS_ENV_VAR to control progress Signed-off-by: Jan Fiedler <[email protected]> * update remote.py Signed-off-by: Jan Fiedler <[email protected]> * update fast_registration.py Signed-off-by: Jan Fiedler <[email protected]> * ruff format Signed-off-by: Jan Fiedler <[email protected]> * ruff check fix Signed-off-by: Jan Fiedler <[email protected]> * remove show_progress attribute from remote & fast_registration Signed-off-by: Jan Fiedler <[email protected]> * make lint Signed-off-by: Jan Fiedler <[email protected]> * Revert "make lint" This reverts commit 5b05419. Signed-off-by: Eduardo Apolinario <[email protected]> * run make lint from repo this time Signed-off-by: Eduardo Apolinario <[email protected]> * reformat remote.py Signed-off-by: Eduardo Apolinario <[email protected]> * reformat fast_registration.py Signed-off-by: Eduardo Apolinario <[email protected]> * reuse LOGGING_RICH_FMT_ENV_VAR for is_display_progress_enabled() Signed-off-by: Jan Fiedler <[email protected]> * replace l & t variable names with total files & files_processed Signed-off-by: Jan Fiedler <[email protected]> --------- Signed-off-by: Ketan Umare <[email protected]> Signed-off-by: Jan Fiedler <[email protected]> Signed-off-by: Eduardo Apolinario <[email protected]> Co-authored-by: Ketan Umare <[email protected]> Co-authored-by: Jan Fiedler <[email protected]> Co-authored-by: Jan Fiedler <[email protected]> Co-authored-by: Eduardo Apolinario <[email protected]> Signed-off-by: Atharva <[email protected]>
1 parent ee88172 commit 10fbb22

File tree

3 files changed

+87
-3
lines changed

3 files changed

+87
-3
lines changed

flytekit/loggers.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,5 +186,9 @@ def get_level_from_cli_verbosity(verbosity: int) -> int:
186186
return logging.DEBUG
187187

188188

189+
def is_display_progress_enabled() -> bool:
190+
return os.getenv(LOGGING_RICH_FMT_ENV_VAR, False)
191+
192+
189193
# Default initialization
190194
initialize_global_loggers()

flytekit/remote/remote.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import requests
3232
from flyteidl.admin.signal_pb2 import Signal, SignalListRequest, SignalSetRequest
3333
from flyteidl.core import literals_pb2
34+
from rich.progress import Progress, TextColumn, TimeElapsedColumn
3435

3536
from flytekit import ImageSpec
3637
from flytekit.clients.friendly import SynchronousFlyteClient
@@ -66,7 +67,7 @@
6667
FlyteEntityNotExistException,
6768
FlyteValueException,
6869
)
69-
from flytekit.loggers import developer_logger, logger
70+
from flytekit.loggers import developer_logger, is_display_progress_enabled, logger
7071
from flytekit.models import common as common_models
7172
from flytekit.models import filters as filter_models
7273
from flytekit.models import launch_plan as launch_plan_models
@@ -1168,6 +1169,13 @@ def upload_file(
11681169
encoded_md5 = b64encode(md5_bytes)
11691170
local_file_path = str(to_upload)
11701171
content_length = os.stat(local_file_path).st_size
1172+
1173+
upload_package_progress = Progress(TimeElapsedColumn(), TextColumn("[progress.description]{task.description}"))
1174+
t1 = upload_package_progress.add_task(f"Uploading package of size {content_length/1024/1024:.2f} MBs", total=1)
1175+
upload_package_progress.start_task(t1)
1176+
if is_display_progress_enabled():
1177+
upload_package_progress.start()
1178+
11711179
with open(local_file_path, "+rb") as local_file:
11721180
headers = {"Content-Length": str(content_length), "Content-MD5": encoded_md5}
11731181
headers.update(extra_headers)
@@ -1187,6 +1195,16 @@ def upload_file(
11871195
f"Request to send data {upload_location.signed_url} failed.\nResponse: {rsp.text}",
11881196
)
11891197

1198+
upload_package_progress.update(
1199+
t1,
1200+
completed=1,
1201+
description=f"Uploaded package of size {content_length/1024/1024:.2f}MB",
1202+
refresh=True,
1203+
)
1204+
upload_package_progress.stop_task(t1)
1205+
if is_display_progress_enabled():
1206+
upload_package_progress.stop()
1207+
11901208
developer_logger.debug(
11911209
f"Uploading {to_upload} to {upload_location.signed_url} native url {upload_location.native_url}"
11921210
)

flytekit/tools/fast_registration.py

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,20 @@
1717

1818
import click
1919
from rich import print as rich_print
20+
from rich.progress import (
21+
BarColumn,
22+
Progress,
23+
TextColumn,
24+
TimeElapsedColumn,
25+
)
2026
from rich.tree import Tree
2127

2228
from flytekit.constants import CopyFileDetection
2329
from flytekit.core.context_manager import FlyteContextManager
2430
from flytekit.core.python_auto_container import PICKLE_FILE_PATH
2531
from flytekit.core.utils import timeit
2632
from flytekit.exceptions.user import FlyteDataNotFoundException
27-
from flytekit.loggers import logger
33+
from flytekit.loggers import is_display_progress_enabled, logger
2834
from flytekit.tools.ignore import DockerIgnore, FlyteIgnore, GitIgnore, Ignore, IgnoreGroup, StandardIgnore
2935
from flytekit.tools.script_mode import _filehash_update, _pathhash_update, ls_files, tar_strip_file_attributes
3036

@@ -120,6 +126,18 @@ def fast_package(
120126
if options and (
121127
options.copy_style == CopyFileDetection.LOADED_MODULES or options.copy_style == CopyFileDetection.ALL
122128
):
129+
create_tarball_progress = Progress(
130+
TimeElapsedColumn(),
131+
TextColumn("[progress.description]{task.description}."),
132+
BarColumn(),
133+
TextColumn("{task.fields[files_added_progress]}"),
134+
)
135+
136+
compress_tarball_progress = Progress(
137+
TimeElapsedColumn(),
138+
TextColumn("[progress.description]{task.description}"),
139+
)
140+
123141
ls, ls_digest = ls_files(str(source), options.copy_style, deref_symlinks, ignore)
124142
logger.debug(f"Hash digest: {ls_digest}")
125143

@@ -130,13 +148,30 @@ def fast_package(
130148
archive_fname = f"{FAST_PREFIX}{ls_digest}{FAST_FILEENDING}"
131149
if output_dir is None:
132150
output_dir = tempfile.mkdtemp()
133-
click.secho(f"No output path provided, using a temporary directory at {output_dir} instead", fg="yellow")
151+
click.secho(
152+
f"No output path provided, using a temporary directory at {output_dir} instead",
153+
fg="yellow",
154+
)
134155
archive_fname = os.path.join(output_dir, archive_fname)
135156

157+
# add the tarfile task to progress and start it
158+
total_files = len(ls)
159+
files_processed = 0
160+
tar_task = create_tarball_progress.add_task(
161+
f"Creating tarball with [{total_files}] files...",
162+
total=total_files,
163+
files_added_progress=f"{files_processed}/{total_files} files",
164+
)
165+
166+
if is_display_progress_enabled():
167+
create_tarball_progress.start()
168+
169+
create_tarball_progress.start_task(tar_task)
136170
with tempfile.TemporaryDirectory() as tmp_dir:
137171
tar_path = os.path.join(tmp_dir, "tmp.tar")
138172
with tarfile.open(tar_path, "w", dereference=deref_symlinks) as tar:
139173
for ws_file in ls:
174+
files_processed = files_processed + 1
140175
rel_path = os.path.relpath(ws_file, start=source)
141176
tar.add(
142177
os.path.join(source, ws_file),
@@ -145,7 +180,34 @@ def fast_package(
145180
filter=lambda x: tar_strip_file_attributes(x),
146181
)
147182

183+
create_tarball_progress.update(
184+
tar_task,
185+
advance=1,
186+
description=f"Added file {rel_path}",
187+
refresh=True,
188+
files_added_progress=f"{files_processed}/{total_files} files",
189+
)
190+
191+
create_tarball_progress.stop_task(tar_task)
192+
if is_display_progress_enabled():
193+
create_tarball_progress.stop()
194+
compress_tarball_progress.start()
195+
196+
tpath = pathlib.Path(tar_path)
197+
size_mbs = tpath.stat().st_size / 1024 / 1024
198+
compress_task = compress_tarball_progress.add_task(f"Compressing tarball size {size_mbs:.2f}MB...", total=1)
199+
compress_tarball_progress.start_task(compress_task)
148200
compress_tarball(tar_path, archive_fname)
201+
arpath = pathlib.Path(archive_fname)
202+
asize_mbs = arpath.stat().st_size / 1024 / 1024
203+
compress_tarball_progress.update(
204+
compress_task,
205+
advance=1,
206+
description=f"Tarball {size_mbs:.2f}MB compressed to {asize_mbs:.2f}MB",
207+
)
208+
compress_tarball_progress.stop_task(compress_task)
209+
if is_display_progress_enabled():
210+
compress_tarball_progress.stop()
149211

150212
# Original tar command - This condition to be removed in the future after serialize is removed.
151213
else:

0 commit comments

Comments
 (0)