Skip to content
13 changes: 10 additions & 3 deletions src/flyte/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,10 @@ def __init__(
async def _run_remote(self, obj: TaskTemplate[P, R] | LazyEntity, *args: P.args, **kwargs: P.kwargs) -> Run:
import grpc
from flyteidl2.common import identifier_pb2
from flyteidl2.core import literals_pb2
from flyteidl2.core import literals_pb2, security_pb2
from flyteidl2.task import run_pb2
from flyteidl2.workflow import run_definition_pb2, run_service_pb2

from google.protobuf import wrappers_pb2

from flyte.remote import Run
Expand Down Expand Up @@ -258,6 +259,10 @@ async def _run_remote(self, obj: TaskTemplate[P, R] | LazyEntity, *args: P.args,
env_kv = run_pb2.Envs(values=kv_pairs)
annotations = run_pb2.Annotations(values=self._annotations)
labels = run_pb2.Labels(values=self._labels)
raw_data_storage = run_pb2.RawDataStorage(raw_data_prefix=self._raw_data_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we shouldn't send these at all if the values aren't defined. there's no point sending an empty object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed! Thank you

security_context = security_pb2.SecurityContext(
run_as=security_pb2.Identity(k8s_service_account=self._service_account)
)

try:
resp = await get_client().run_service.CreateRun(
Expand All @@ -275,6 +280,8 @@ async def _run_remote(self, obj: TaskTemplate[P, R] | LazyEntity, *args: P.args,
labels=labels,
envs=env_kv,
cluster=self._queue or task.queue,
raw_data_storage=raw_data_storage,
security_context=security_context,
),
),
)
Expand Down Expand Up @@ -603,8 +610,8 @@ async def example_task(x: int, y: str) -> str:
:param interactive_mode: Optional, can be forced to True or False.
If not provided, it will be set based on the current environment. For example Jupyter notebooks are considered
interactive mode, while scripts are not. This is used to determine how the code bundle is created.
:param raw_data_path: Use this path to store the raw data for the run. Currently only supported for local runs,
and can be used to store raw data in specific locations. TODO coming soon for remote runs as well.
:param raw_data_path: Use this path to store the raw data for the run for local and remote, and can be used to
store raw data in specific locations.
:param run_base_dir: Optional The base directory to use for the run. This is used to store the metadata for the run,
that is passed between tasks.
:param overwrite_cache: Optional If true, the cache will be overwritten for the run
Expand Down
22 changes: 22 additions & 0 deletions src/flyte/cli/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,26 @@ class RunArguments:
)
},
)
raw_data_path: str | None = field(
default=None,
metadata={
"click.option": click.Option(
["--raw-data-path"],
type=str,
help="Override the output path to store the raw data. Example: s3://bucket/",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
help="Override the output path to store the raw data. Example: s3://bucket/",
help="Override the output prefix used to store offloaded data types. e.g. s3://bucket/",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed!

)
},
)
service_account: str | None = field(
default=None,
metadata={
"click.option": click.Option(
["--service-account"],
type=str,
help="Kubernetes service account. If not provided, default will be used",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
help="Kubernetes service account. If not provided, default will be used",
help="Kubernetes service account. If not provided, the configured default will be used",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed!

)
},
)
name: str | None = field(
default=None,
metadata={
Expand Down Expand Up @@ -140,6 +160,8 @@ async def _run():
copy_style=self.run_args.copy_style,
mode="local" if self.run_args.local else "remote",
name=self.run_args.name,
raw_data_path=self.run_args.raw_data_path,
service_account=self.run_args.service_account,
).run.aio(self.obj, **ctx.params)
if self.run_args.local:
console.print(
Expand Down
Loading