Skip to content

Conversation

JiangJiaWei1103
Copy link
Contributor

@JiangJiaWei1103 JiangJiaWei1103 commented Jan 1, 2025

Tracking issue

Closes flyteorg/flyte#6096.

Why are the changes needed?

When users create a StructuredDataset with a specified file_format (e.g., parquet), the file_format information will be accidentally discarded in this case during async_to_literal call. To be concrete, StructuredDatasetType's format attribute is set to GENERIC_FORMAT, which is an empty string "".

What changes were proposed in this pull request?

Override StructuredDatasetType's format attribute when users explicitly set file_format of python native StructuredDataset.

How was this patch tested?

This patch is tested through the newly added integration test and double checked by observing the flyte console I/O and the task pod stdout.

Setup process

For local run, the setup process is summarized as follows:

git clone https://github.com/flyteorg/flytekit.git
gh pr checkout 3027
make setup && pip install -e .

After installation, run the following command:

pytest -svvv tests/flytekit/integration/remote/test_remote.py::test_sd_attr

Screenshots

The following results are expected:

  • Flyte console input
    Screenshot 2025-01-04 at 2 44 54 PM

  • Flyte console output
    Screenshot 2025-01-04 at 2 45 03 PM

  • Task pod stdout
    Screenshot 2025-01-04 at 2 44 43 PM

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Summary by Bito

This PR implements comprehensive Flytekit improvements including: enhanced execution engine with improved error handling, fixed StructuredDataset file format handling, K8s Data Service plugin implementation, and Ray plugin enhancements. Key features include configurable cloud storage writes, improved secret handling, Optuna plugin for hyperparameter optimization, and enhanced thread safety mechanisms.

Unit tests added: True

Estimated effort to review (1-5, lower is better): 5

@flyte-bot
Copy link
Contributor

Code Review Agent Run Status

  • Limitations and other issues: ❌ Failure - The AI Code Review Agent skipped reviewing this change because it is configured to exclude certain pull requests based on the source/target branch or the pull request status. You can change the settings here, or contact the agent instance creator at [email protected].

Copy link

codecov bot commented Jan 1, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 76.84%. Comparing base (4208a64) to head (f1a2ba3).
Report is 1 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff             @@
##           master    #3027       +/-   ##
===========================================
+ Coverage   51.81%   76.84%   +25.02%     
===========================================
  Files         202      202               
  Lines       21469    21472        +3     
  Branches     2766     2767        +1     
===========================================
+ Hits        11125    16500     +5375     
+ Misses       9735     4211     -5524     
- Partials      609      761      +152     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@flyte-bot
Copy link
Contributor

Code Review Agent Run Status

  • Limitations and other issues: ❌ Failure - The AI Code Review Agent skipped reviewing this change because it is configured to exclude certain pull requests based on the source/target branch or the pull request status. You can change the settings here, or contact the agent instance creator at [email protected].

Signed-off-by: JiaWei Jiang <[email protected]>
@flyte-bot
Copy link
Contributor

Code Review Agent Run Status

  • Limitations and other issues: ❌ Failure - The AI Code Review Agent skipped reviewing this change because it is configured to exclude certain pull requests based on the source/target branch or the pull request status. You can change the settings here, or contact the agent instance creator at [email protected].

@JiangJiaWei1103 JiangJiaWei1103 changed the title [WIP] Fix StructuredDataset empty-str file_format in dc attr access [BUG] Fix StructuredDataset empty-str file_format in dc attr access Jan 4, 2025
@JiangJiaWei1103
Copy link
Contributor Author

Follow-up

During fixing this bug, we observe another two phenomenon, which are briefly described as follows:

  1. pyflyte run can't handle the dataclass input as a json string, e.g.,
pyflyte run --remote test.py wf --dc '{"dc": {"sd": {"uri": "s3://my-s3-bucket/df.parquet", "file_format": "parquet"}}}'

There are two sub-cases:

  • DC's sd is defined without default_factory
    • Get KeyError: 'sd'
  • DC's sd is defined with default_factory
    • wf always takes the default_factory setup as input no matter what's specified in the json string
  1. uri information loss, which is just similar to what we've solved in this patch
    Screenshot 2025-01-04 at 2 44 43 PM
    As can be observed, StructuredDataset uri becomes None.

We plan to solve these issues in the future which can make attribute access result more accurate. Thanks!

@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 4, 2025

Code Review Agent Run #2ae7a3

Actionable Suggestions - 0
Review Details
  • Files reviewed - 3 · Commit Range: cdb56c0..46b7b62
    • flytekit/types/structured/structured_dataset.py
    • tests/flytekit/integration/remote/test_remote.py
    • tests/flytekit/integration/remote/workflows/basic/sd_attr.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 4, 2025

Changelist by Bito

This pull request implements the following key changes.

Key Change Files Impacted
Bug Fix - Fix StructuredDataset File Format Preservation

structured_dataset.py - Added logic to retain user-specified file_format in StructuredDataset during async_to_literal conversion

test_remote.py - Added integration test to verify StructuredDataset file_format attribute preservation

sd_attr.py - Added new test workflow to validate StructuredDataset file format handling

@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 11, 2025

Code Review Agent Run #1863ff

Actionable Suggestions - 0
Additional Suggestions - 10
  • tests/flytekit/integration/remote/utils.py - 1
    • Consider making parquet path configurable · Line 89-89
  • plugins/flytekit-optuna/flytekitplugins/optuna/optimizer.py - 1
  • plugins/flytekit-inference/tests/test_vllm.py - 1
    • Consider breaking down long assertion statement · Line 41-41
  • tests/flytekit/unit/core/test_flyte_file.py - 1
  • flytekit/image_spec/default_builder.py - 1
  • tests/flytekit/unit/core/test_flyte_directory.py - 1
  • tests/flytekit/integration/remote/test_remote.py - 1
    • Consider extracting duplicated workflow execution logic · Line 846-847
  • plugins/flytekit-optuna/setup.py - 1
    • Consider adding Python 3.11 support · Line 21-21
  • plugins/flytekit-spark/tests/test_environment.py - 1
    • Consider validating Spark config in test · Line 17-20
  • tests/flytekit/unit/core/image_spec/test_default_builder.py - 1
    • Consider using complete error message pattern · Line 271-271
Review Details
  • Files reviewed - 27 · Commit Range: 46b7b62..cb73c0e
    • flytekit/__init__.py
    • flytekit/core/array_node_map_task.py
    • flytekit/core/environment.py
    • flytekit/core/workflow.py
    • flytekit/image_spec/default_builder.py
    • flytekit/remote/remote.py
    • flytekit/types/directory/types.py
    • flytekit/types/file/file.py
    • plugins/flytekit-inference/flytekitplugins/inference/__init__.py
    • plugins/flytekit-inference/flytekitplugins/inference/vllm/serve.py
    • plugins/flytekit-inference/setup.py
    • plugins/flytekit-inference/tests/test_vllm.py
    • plugins/flytekit-onnx-pytorch/dev-requirements.txt
    • plugins/flytekit-optuna/flytekitplugins/optuna/__init__.py
    • plugins/flytekit-optuna/flytekitplugins/optuna/optimizer.py
    • plugins/flytekit-optuna/setup.py
    • plugins/flytekit-optuna/tests/test_optimizer.py
    • plugins/flytekit-spark/tests/test_environment.py
    • pyproject.toml
    • tests/flytekit/integration/remote/test_remote.py
    • tests/flytekit/integration/remote/utils.py
    • tests/flytekit/integration/remote/workflows/basic/attr_access_sd.py
    • tests/flytekit/unit/core/image_spec/test_default_builder.py
    • tests/flytekit/unit/core/test_environment.py
    • tests/flytekit/unit/core/test_flyte_directory.py
    • tests/flytekit/unit/core/test_flyte_file.py
    • tests/flytekit/unit/core/test_workflows.py
  • Files skipped - 3
    • .github/workflows/pythonbuild.yml - Reason: Filter setting
    • plugins/flytekit-inference/README.md - Reason: Filter setting
    • plugins/flytekit-optuna/README.md - Reason: Filter setting
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@Future-Outlier Future-Outlier self-assigned this Jan 13, 2025
@JiangJiaWei1103
Copy link
Contributor Author

JiangJiaWei1103 commented Jan 17, 2025

For the second follow-up issue, I thought I found why uri information is missing. I'll open another PR and link to this one after this one is merged. Thanks!

Screenshot 2025-01-17 at 7 57 44 PM

@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 19, 2025

Code Review Agent Run #2259be

Actionable Suggestions - 0
Additional Suggestions - 10
  • flytekit/core/worker_queue.py - 3
  • flytekit/core/array_node_map_task.py - 1
    • Consider using proper constructor for metadata · Line 131-132
  • flytekit/exceptions/user.py - 1
    • Consider adding timestamp parameter validation · Line 18-18
  • flytekit/remote/remote.py - 1
  • plugins/flytekit-optuna/flytekitplugins/optuna/optimizer.py - 2
    • Consider adding inputs parameter validation · Line 192-192
    • Consider simplifying validation logic expressions · Line 84-87
  • tests/flytekit/unit/exceptions/test_user.py - 1
    • Consider splitting error message assertion · Line 19-19
  • flytekit/core/type_engine.py - 1
Review Details
  • Files reviewed - 40 · Commit Range: cb73c0e..416bed8
    • flytekit/bin/entrypoint.py
    • flytekit/clis/sdk_in_container/run.py
    • flytekit/core/array_node_map_task.py
    • flytekit/core/context_manager.py
    • flytekit/core/node.py
    • flytekit/core/promise.py
    • flytekit/core/type_engine.py
    • flytekit/core/worker_queue.py
    • flytekit/exceptions/user.py
    • flytekit/image_spec/default_builder.py
    • flytekit/remote/remote.py
    • flytekit/tools/translator.py
    • flytekit/types/structured/structured_dataset.py
    • plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py
    • plugins/flytekit-kf-pytorch/tests/test_elastic_task.py
    • plugins/flytekit-optuna/flytekitplugins/optuna/__init__.py
    • plugins/flytekit-optuna/flytekitplugins/optuna/optimizer.py
    • plugins/flytekit-optuna/setup.py
    • plugins/flytekit-optuna/tests/test_callback.py
    • plugins/flytekit-optuna/tests/test_decorator.py
    • plugins/flytekit-optuna/tests/test_imperative.py
    • plugins/flytekit-optuna/tests/test_optimizer.py
    • plugins/flytekit-optuna/tests/test_validation.py
    • plugins/flytekit-pydantic/flytekitplugins/pydantic/__init__.py
    • plugins/flytekit-pydantic/flytekitplugins/pydantic/basemodel_transformer.py
    • plugins/flytekit-pydantic/flytekitplugins/pydantic/commons.py
    • plugins/flytekit-pydantic/flytekitplugins/pydantic/deserialization.py
    • plugins/flytekit-pydantic/flytekitplugins/pydantic/serialization.py
    • plugins/flytekit-pydantic/setup.py
    • plugins/flytekit-pydantic/tests/folder/test_file1.txt
    • plugins/flytekit-pydantic/tests/folder/test_file2.txt
    • plugins/flytekit-pydantic/tests/test_type_transformer.py
    • tests/flytekit/integration/remote/test_remote.py
    • tests/flytekit/integration/remote/workflows/basic/signal_test.py
    • tests/flytekit/unit/bin/test_python_entrypoint.py
    • tests/flytekit/unit/core/test_array_node_map_task.py
    • tests/flytekit/unit/core/test_type_hints.py
    • tests/flytekit/unit/core/test_worker_queue.py
    • tests/flytekit/unit/exceptions/test_user.py
    • tests/flytekit/unit/remote/test_remote.py
  • Files skipped - 2
    • plugins/flytekit-optuna/README.md - Reason: Filter setting
    • plugins/flytekit-pydantic/README.md - Reason: Filter setting
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 25, 2025

Code Review Agent Run #d43f9b

Actionable Suggestions - 0
Review Details
  • Files reviewed - 1 · Commit Range: 416bed8..ddba120
    • flytekit/types/structured/structured_dataset.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@Future-Outlier
Copy link
Member

final testing.

from flytekit import task, workflow, ImageSpec
from flytekit.types.structured.structured_dataset import StructuredDataset
import pandas as pd


flytekit_hash = "f1a2ba3a1836983ffb9bb45276d8aa9b01665600"
flytekit = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}"

custom_image = ImageSpec(packages=[flytekit, "pandas", "pyarrow"],
                            apt_packages=["git"],
                        registry="localhost:30000",
                            env={"FLYTE_SDK_LOGGING_LEVEL": 10},
                         )

@task(container_image=custom_image)
def create_pd_sd() -> StructuredDataset:
    return StructuredDataset(
        dataframe=pd.DataFrame(
            {
                "calories": [420, 380, 390],
                "duration": [50, 40, 45]
            }
        ),
        file_format="parquet"
    )

@task(container_image=custom_image)
def return_pd_sd(sd: StructuredDataset) -> StructuredDataset:
    return sd

@workflow
def wf() -> StructuredDataset:
    sd = create_pd_sd()
    sd = return_pd_sd(sd)
    return sd

if __name__ == "__main__":
    print(wf())

this work!

@Future-Outlier Future-Outlier merged commit 88ac611 into flyteorg:master Jan 31, 2025
109 of 112 checks passed
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 31, 2025

Code Review Agent Run #adec49

Actionable Suggestions - 0
Additional Suggestions - 10
  • flytekit/models/security.py - 1
    • Consider adding env_var validation check · Line 45-45
  • flytekit/clis/sdk_in_container/serve.py - 2
    • Consider adding port number validation · Line 83-83
    • Consider adding parameter validation checks · Line 64-64
  • plugins/flytekit-omegaconf/flytekitplugins/omegaconf/dictconfig_transformer.py - 1
    • Consider consolidating NoneType handling checks · Line 146-147
  • flytekit/core/resources.py - 1
  • flytekit/core/type_engine.py - 3
    • Consider more descriptive environment variable name · Line 63-63
    • Consider performance impact of chunking coroutines · Line 1691-1693
    • Consider direct use of asyncio.gather() · Line 2162-2165
  • plugins/flytekit-k8sdataservice/tests/k8sdataservice/test_sensor.py - 1
    • Simplify async test execution pattern · Line 25-29
  • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/k8s/manager.py - 1
Review Details
  • Files reviewed - 52 · Commit Range: ddba120..f1a2ba3
    • .pre-commit-config.yaml
    • Dockerfile.agent
    • docs/source/plugins/k8sstatefuldataservice.rst
    • flytekit/clis/sdk_in_container/serve.py
    • flytekit/core/data_persistence.py
    • flytekit/core/python_function_task.py
    • flytekit/core/resources.py
    • flytekit/core/type_engine.py
    • flytekit/image_spec/default_builder.py
    • flytekit/image_spec/image_spec.py
    • flytekit/models/security.py
    • flytekit/models/task.py
    • flytekit/remote/remote_fs.py
    • flytekit/types/structured/structured_dataset.py
    • plugins/flytekit-envd/flytekitplugins/envd/image_builder.py
    • plugins/flytekit-envd/tests/test_image_spec.py
    • plugins/flytekit-k8sdataservice/dev-requirements.txt
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/__init__.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/agent.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/k8s/kube_config.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/k8s/manager.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/sensor.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/task.py
    • plugins/flytekit-k8sdataservice/setup.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/k8s/test_kube_config.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/k8s/test_manager.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/test_agent.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/test_sensor.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/test_task.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/utils/test_resources.py
    • plugins/flytekit-k8sdataservice/utils/infra.py
    • plugins/flytekit-k8sdataservice/utils/resources.py
    • plugins/flytekit-omegaconf/flytekitplugins/omegaconf/dictconfig_transformer.py
    • plugins/flytekit-omegaconf/tests/test_dictconfig_transformer.py
    • plugins/flytekit-ray/flytekitplugins/ray/task.py
    • plugins/flytekit-ray/setup.py
    • plugins/flytekit-ray/tests/test_ray.py
    • plugins/setup.py
    • pydoclint-errors-baseline.txt
    • pyproject.toml
    • tests/flytekit/clis/sdk_in_container/test_serve.py
    • tests/flytekit/integration/remote/test_remote.py
    • tests/flytekit/integration/remote/workflows/basic/get_secret.py
    • tests/flytekit/unit/core/image_spec/test_default_builder.py
    • tests/flytekit/unit/core/test_data_persistence.py
    • tests/flytekit/unit/core/test_dataclass.py
    • tests/flytekit/unit/core/test_generice_idl_type_engine.py
    • tests/flytekit/unit/core/test_list.py
    • tests/flytekit/unit/core/test_resources.py
    • tests/flytekit/unit/core/test_type_engine.py
    • tests/flytekit/unit/extras/pydantic_transformer/test_pydantic_basemodel_transformer.py
    • tests/flytekit/unit/types/structured_dataset/test_structured_dataset.py
  • Files skipped - 2
    • .github/workflows/pythonbuild.yml - Reason: Filter setting
    • plugins/flytekit-k8sdataservice/README.md - Reason: Filter setting
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@JiangJiaWei1103
Copy link
Contributor Author

Nice bro, thanks for your time!

Let's move on to the next challenge!

UmerAhmad pushed a commit to UmerAhmad/flytekit that referenced this pull request Feb 8, 2025
…flyteorg#3027)

* fix: Retain user-specified file format info

Signed-off-by: JiaWei Jiang <[email protected]>

* fix: Set sdt format based on user-specified file_format

Signed-off-by: JiaWei Jiang <[email protected]>

* Remove redundant modification

Signed-off-by: JiaWei Jiang <[email protected]>

* test: Test file_format attribute alignment in dc.sd

Signed-off-by: JiaWei Jiang <[email protected]>

* Merge master and support pqt file upload

Signed-off-by: JiaWei Jiang <[email protected]>

* Remove redundant condition to always copy file_format over

Signed-off-by: JiangJiaWei1103 <[email protected]>

* Prioritize file_format in type hint over the user-specified one

Signed-off-by: JiangJiaWei1103 <[email protected]>

---------

Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiangJiaWei1103 <[email protected]>
Co-authored-by: Future-Outlier <[email protected]>
Signed-off-by: Umer Ahmad <[email protected]>
Atharva1723 pushed a commit to Atharva1723/flytekit that referenced this pull request Oct 5, 2025
…flyteorg#3027)

* fix: Retain user-specified file format info

Signed-off-by: JiaWei Jiang <[email protected]>

* fix: Set sdt format based on user-specified file_format

Signed-off-by: JiaWei Jiang <[email protected]>

* Remove redundant modification

Signed-off-by: JiaWei Jiang <[email protected]>

* test: Test file_format attribute alignment in dc.sd

Signed-off-by: JiaWei Jiang <[email protected]>

* Merge master and support pqt file upload

Signed-off-by: JiaWei Jiang <[email protected]>

* Remove redundant condition to always copy file_format over

Signed-off-by: JiangJiaWei1103 <[email protected]>

* Prioritize file_format in type hint over the user-specified one

Signed-off-by: JiangJiaWei1103 <[email protected]>

---------

Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiangJiaWei1103 <[email protected]>
Co-authored-by: Future-Outlier <[email protected]>
Signed-off-by: Atharva <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

[BUG] StructuredDataset file_format becomes an empty str through dataclass attribute access

4 participants