Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion flytekit/core/type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1363,7 +1363,11 @@ def lazy_import_transformers(cls):
from flytekit.extras import pydantic_transformer # noqa: F401
if is_imported("pandas"):
try:
from flytekit.types.schema.types_pandas import PandasSchemaReader, PandasSchemaWriter # noqa: F401
from flytekit.types.schema.types_pandas import ( # noqa: F401
PandasSchemaReader,
PandasSchemaWriter,
PandasTimestampTransformer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you make this a separate import, and move the timestamp transformer into a separate file?

The "schema" folder name refers to the old Flyte Schema type, which was deprecated by the structureddataset type. Could we put it in the basic_dfs.py file alongside the rest of the Dataframe stuff for pandas?

)
except ValueError:
logger.debug("Transformer for pandas is already registered.")
if is_imported("numpy"):
Expand Down
24 changes: 21 additions & 3 deletions flytekit/types/schema/types_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import pandas

from flytekit import FlyteContext
from flytekit.core.type_engine import AsyncTypeTransformer, T, TypeEngine
from flytekit.models.literals import Literal, Scalar, Schema
from flytekit.models.types import LiteralType, SchemaType
from flytekit.core.type_engine import AsyncTypeTransformer, SimpleTransformer, T, TypeEngine
from flytekit.models.literals import Literal, Primitive, Scalar, Schema
from flytekit.models.types import LiteralType, SchemaType, SimpleType
from flytekit.types.schema import LocalIOSchemaReader, LocalIOSchemaWriter, SchemaEngine, SchemaFormat, SchemaHandler


Expand Down Expand Up @@ -122,7 +122,25 @@ def to_html(self, ctx: FlyteContext, python_val: pandas.DataFrame, expected_pyth
return python_val.describe().to_html()


class PandasTimestampTransformer(SimpleTransformer[pandas.Timestamp]):
"""
Type transformer for pandas.Timestamp objects.

Converts pandas.Timestamp to/from Flyte DATETIME literals.
"""

def __init__(self):
super().__init__(
name="pandas.Timestamp",
t=pandas.Timestamp,
lt=LiteralType(simple=SimpleType.DATETIME),
to_literal_transformer=lambda x: Literal(scalar=Scalar(primitive=Primitive(datetime=x.to_pydatetime()))),
from_literal_transformer=lambda x: pandas.Timestamp(x.scalar.primitive.datetime),
)


SchemaEngine.register_handler(
SchemaHandler("pandas-dataframe-schema", pandas.DataFrame, PandasSchemaReader, PandasSchemaWriter)
)
TypeEngine.register(PandasDataFrameTransformer())
TypeEngine.register(PandasTimestampTransformer())
Loading