Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion flytekit/clis/sdk_in_container/pyflyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ def main(ctx, pkgs: typing.List[str], config: str, verbose: int):
if config:
ctx.obj[CTX_CONFIG_FILE] = config
cfg = configuration.ConfigFile(config)
# Temporarily commented out to ensure proper output format when using --quiet flag in pyflyte register
# Set here so that if someone has Config.auto() in their user code, the config here will get used.
if FLYTECTL_CONFIG_ENV_VAR in os.environ:
if FLYTECTL_CONFIG_ENV_VAR in os.environ and verbose > 0:
# Log when verbose > 0 to prevent breaking output format for pyflyte register's quiet or summamry-format flag
logger.info(
f"Config file arg {config} will override env var {FLYTECTL_CONFIG_ENV_VAR}: {os.environ[FLYTECTL_CONFIG_ENV_VAR]}"
)
Expand Down
115 changes: 76 additions & 39 deletions flytekit/clis/sdk_in_container/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
the root of your project, it finds the first folder that does not have a ``__init__.py`` file.
"""

_original_secho = click.secho
_original_log_level = logger.level
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using context manager for logger

Consider using a context manager to handle logger level changes instead of storing the level in a global variable. This would ensure proper cleanup and avoid potential side effects. A similar issue was also found in flytekit/tools/repo.py (line 290).

Code suggestion
Check the AI-generated fix before applying
Suggested change
_original_log_level = logger.level
class LogLevelManager:
def __init__(self):
self.original_level = logger.level
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
logger.level = self.original_level

Code Review Run #7297c4


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged



@click.command("register", help=_register_help)
@project_option_dec
Expand Down Expand Up @@ -159,6 +162,20 @@
help="Skip errors during registration. This is useful when registering multiple packages and you want to skip "
"errors for some packages.",
)
@click.option(
"--summary-format",
"-f",
required=False,
type=click.Choice(["json", "yaml"], case_sensitive=False),
default=None,
help="Output format for registration summary. Lists registered workflows, tasks, and launch plans. 'json' and 'yaml' supported.",
)
@click.option(
"--quiet",
is_flag=True,
default=False,
help="Suppress output messages, only displaying errors.",
)
@click.argument("package-or-module", type=click.Path(exists=True, readable=True, resolve_path=True), nargs=-1)
@click.pass_context
def register(
Expand All @@ -181,12 +198,25 @@ def register(
resource_requests: typing.Optional[Resources],
resource_limits: typing.Optional[Resources],
skip_errors: bool,
summary_format: typing.Optional[str],
quiet: bool,
):
"""
see help
"""

if summary_format is not None:
quiet = True

if quiet:
# Mute all secho output through monkey patching
click.secho = lambda *args, **kw: None
# Output only log at ERROR or CRITICAL level
logger.setLevel("ERROR")

# Set the relevant copy option if non_fast is set, this enables the individual file listing behavior
# that the copy flag uses.

if non_fast:
click.secho("The --non-fast flag is deprecated, please use --copy none instead", fg="yellow")
if "--copy" in sys.argv:
Expand Down Expand Up @@ -214,42 +244,49 @@ def register(
"Missing argument 'PACKAGE_OR_MODULE...', at least one PACKAGE_OR_MODULE is required but multiple can be passed",
)

# Use extra images in the config file if that file exists
config_file = ctx.obj.get(constants.CTX_CONFIG_FILE)
if config_file:
image_config = patch_image_config(config_file, image_config)

click.secho(
f"Running pyflyte register from {os.getcwd()} "
f"with images {image_config} "
f"and image destination folder {destination_dir} "
f"on {len(package_or_module)} package(s) {package_or_module}",
dim=True,
)

# Create and save FlyteRemote,
remote = get_and_save_remote_with_click_context(ctx, project, domain, data_upload_location="flyte://data")
click.secho(f"Registering against {remote.config.platform.endpoint}")
repo.register(
project,
domain,
image_config,
output,
destination_dir,
service_account,
raw_data_prefix,
version,
deref_symlinks,
copy_style=copy,
package_or_module=package_or_module,
remote=remote,
env=env,
default_resources=ResourceSpec(
requests=resource_requests or Resources(), limits=resource_limits or Resources()
),
dry_run=dry_run,
activate_launchplans=activate_launchplans,
skip_errors=skip_errors,
show_files=show_files,
verbosity=ctx.obj[constants.CTX_VERBOSE],
)
try:
# Use extra images in the config file if that file exists
config_file = ctx.obj.get(constants.CTX_CONFIG_FILE)
if config_file:
image_config = patch_image_config(config_file, image_config)

click.secho(
f"Running pyflyte register from {os.getcwd()} "
f"with images {image_config} "
f"and image destination folder {destination_dir} "
f"on {len(package_or_module)} package(s) {package_or_module}",
dim=True,
)

# Create and save FlyteRemote,
remote = get_and_save_remote_with_click_context(ctx, project, domain, data_upload_location="flyte://data")
click.secho(f"Registering against {remote.config.platform.endpoint}")
repo.register(
project,
domain,
image_config,
output,
destination_dir,
service_account,
raw_data_prefix,
version,
deref_symlinks,
copy_style=copy,
package_or_module=package_or_module,
remote=remote,
env=env,
summary_format=summary_format,
quiet=quiet,
default_resources=ResourceSpec(
requests=resource_requests or Resources(), limits=resource_limits or Resources()
),
dry_run=dry_run,
activate_launchplans=activate_launchplans,
skip_errors=skip_errors,
show_files=show_files,
verbosity=ctx.obj[constants.CTX_VERBOSE],
)
finally:
# Restore original secho
click.secho = _original_secho
logger.setLevel(_original_log_level)
5 changes: 4 additions & 1 deletion flytekit/clis/sdk_in_container/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ def validate_package(ctx, param, values):
pkgs.extend(val.split(","))
else:
pkgs.append(val)
logger.debug(f"Using packages: {pkgs}")

if ctx.params.get("verbose", 0) > 0:
# Log when verbose > 0 to prevent breaking output format for pyflyte register's quiet or summamry-format flag
logger.debug(f"Using packages: {pkgs}")
return pkgs


Expand Down
31 changes: 31 additions & 0 deletions flytekit/image_spec/noop_builder 2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from flytekit.image_spec.image_spec import ImageSpec, ImageSpecBuilder


class NoOpBuilder(ImageSpecBuilder):
"""Noop image builder."""

builder_type = "noop"

def should_build(self, image_spec: ImageSpec) -> bool:
"""
The build_image function of NoOpBuilder does not actually build a Docker image.
Since no Docker build process occurs, we do not need to check for Docker daemon
or existing images. Therefore, should_build should always return True.

Args:
image_spec (ImageSpec): Image specification

Returns:
bool: Always returns True
"""
return True

def build_image(self, image_spec: ImageSpec) -> str:
if not isinstance(image_spec.base_image, str):
msg = "base_image must be a string to use the noop image builder"
raise ValueError(msg)

import click

click.secho(f"Using image: {image_spec.base_image}", fg="blue")
return image_spec.base_image
62 changes: 62 additions & 0 deletions flytekit/models/concurrency 2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from flyteidl.admin import launch_plan_pb2 as _launch_plan_idl

from flytekit.models import common as _common


class ConcurrencyLimitBehavior(object):
SKIP = _launch_plan_idl.CONCURRENCY_LIMIT_BEHAVIOR_SKIP

@classmethod
def enum_to_string(cls, val):
"""
:param int val:
:rtype: Text
"""
if val == cls.SKIP:
return "SKIP"
else:
return "<UNKNOWN>"


class ConcurrencyPolicy(_common.FlyteIdlEntity):
"""
Defines the concurrency policy for a launch plan.
"""

def __init__(self, max_concurrency: int, behavior: ConcurrencyLimitBehavior = None):
self._max_concurrency = max_concurrency
self._behavior = behavior if behavior is not None else ConcurrencyLimitBehavior.SKIP

@property
def max_concurrency(self) -> int:
"""
Maximum number of concurrent workflows allowed.
"""
return self._max_concurrency

@property
def behavior(self) -> ConcurrencyLimitBehavior:
"""
Policy behavior when concurrency limit is reached.
"""
return self._behavior

def to_flyte_idl(self) -> _launch_plan_idl.ConcurrencyPolicy:
"""
:rtype: flyteidl.admin.launch_plan_pb2.ConcurrencyPolicy
"""
return _launch_plan_idl.ConcurrencyPolicy(
max=self.max_concurrency,
behavior=self.behavior,
)

@classmethod
def from_flyte_idl(cls, pb2_object: _launch_plan_idl.ConcurrencyPolicy) -> "ConcurrencyPolicy":
"""
:param flyteidl.admin.launch_plan_pb2.ConcurrencyPolicy pb2_object:
:rtype: ConcurrencyPolicy
"""
return cls(
max_concurrency=pb2_object.max,
behavior=pb2_object.behavior,
)
Loading