Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1506546: Add support for INCLUDE_METADATA copy option for df.copy_into_table() #1839

Open
sfc-gh-kgaputis opened this issue Jun 26, 2024 · 7 comments
Assignees
Labels
status-triage_done Initial triage done, will be further handled by the driver team

Comments

@sfc-gh-kgaputis
Copy link

I'm using Snowpark Python 1.16.0.

I'd like to add metadata columns to my COPY statement using the new include_metadata copy option, but there's no way to pass in the value in such a way that it doesn't get quoted:

        my_df.copy_into_table(
            "raw_table", 
            include_metadata='(FILE_SCAN_TIME=METADATA$START_SCAN_TIME)')

I think it comes down to this code in AnalyzerUtils:

def convert_value_to_sql_option(value: Optional[Union[str, bool, int, float]]) -> str:
    if isinstance(value, str):
        if len(value) > 1 and is_single_quoted(value):
            return value
        else:
            value = value.replace(
                "'", "''"
            )  # escape single quotes before adding a pair of quotes
            return f"'{value}'"
    else:
        return str(value)


def get_options_statement(options: Dict[str, Any]) -> str:
    return (
        SPACE
        + SPACE.join(
            f"{k}{EQUALS}{convert_value_to_sql_option(v)}"
            for k, v in options.items()
            if v is not None
        )
        + SPACE
    )

And convert_value_to_sql_option doesn't support a list of key values pairs.

@sfc-gh-kgaputis sfc-gh-kgaputis added the feature New feature or request label Jun 26, 2024
@github-actions github-actions bot changed the title Add support for INCLUDE_METADATA copy option for df.copy_into_table() SNOW-1506546: Add support for INCLUDE_METADATA copy option for df.copy_into_table() Jun 26, 2024
@sfc-gh-kgaputis
Copy link
Author

sfc-gh-kgaputis commented Jun 26, 2024

Actually there is a bigger issue. I was able to bypass the issue with string quoting like this:

     my_df.copy_into_table(
            "raw_table", 
            include_metadata=RawSqlExpression('(FILE_SCAN_TIME=METADATA$START_SCAN_TIME)'))

Using this custom class:

class RawSqlExpression:
    def __init__(self, expression):
        self.expression = expression

    def __str__(self):
        return self.expression

But then the SQL fails:
SQL compilation error: include_metadata is not supported with copy transform

Is it possible to generate a COPY statement in Snowpark that does not explicitly list columns, and instead relies on native schema evolution? That may be required for INCLUDE_METADATA.

@sfc-gh-sghosh sfc-gh-sghosh self-assigned this Jun 28, 2024
@sfc-gh-sghosh
Copy link

Hello @sfc-gh-kgaputis ,

Thanks for raising this issue.
Include_metadata is supported for copy into table command but its not supported for copy transform because include_metadata has to be used with The MATCH_BY_COLUMN_NAME and copy transform doesn't support it.

https://docs.snowflake.com/en/release-notes/2024/8_17#new-copy-option-include-metadata

The purpose of copy transform is to transform any column while loading, so it expects explicit column names, so we can't skip column names in copy transform.

Regards,
Sujan

@sfc-gh-sghosh
Copy link

Hello @sfc-gh-kgaputis ,

Will check and update further if we have any plans to support it.

Regards,
Sujan

@sfc-gh-sghosh sfc-gh-sghosh added the status-triage Issue is under initial triage label Jun 28, 2024
@sfc-gh-sghosh
Copy link

Hello @sfc-gh-kgaputis ,

After discussion with the team:
As mentioned in previous update: The INCLUDE_METADATA is only for COPY INTO TABLE statement, its not for COPY TRANSFORMATION and the error message you hitting is because the syntax you using eventually landing with COPY TRANSFORMATION internally and thats why its throwing expected error

SQL compilation error: include_metadata is not supported with copy transform

Example
Jypyter UI:
copied_into_result=df.copy_into_table(
"copied_into_table",
target_columns=target_column_names,
force=True,
include_metadata=RawSqlExpression('(FILENAME=METADATA$FILENAME,FILE_LAST_MODIFIED=METADATA$FILE_LAST_MODIFIED,FILE_SCAN_TIME=METADATA$START_SCAN_TIME)')
)

Query history:
COPY INTO copied_into_table(“SEQ”, “FIRST_NAMELAST_NAME”, “FILE_LAST_MODIFIED”, “FILE_SCAN_TIME”, “FILENAME”) FROM @mystagetest/File1.csv FILE_FORMAT = ( TYPE = CSV FIELD_DELIMITER = ‘,’ SKIP_HEADER = 1 ) force = True include_metadata = ‘(FILENAME=METADATA$FILENAME,FILE_LAST_MODIFIED=METADATA$FILE_LAST_MODIFIED,FILE_SCAN_TIME=METADATA$START_SCAN_TIME)’

At present, it's not supported, I will take this as a feature request.

Regards,
Sujan

@sfc-gh-sghosh sfc-gh-sghosh added status-triage_done Initial triage done, will be further handled by the driver team and removed status-triage Issue is under initial triage labels Jul 9, 2024
@sfc-gh-sghosh
Copy link

sfc-gh-sghosh commented Jul 10, 2024

Hello @sfc-gh-kgaputis ,

You can try the following code to get the metadata. Its working fine and fetching all metadata

`from snowflake.snowpark.column import METADATA_FILENAME, METADATA_FILE_ROW_NUMBER, METADATA_FILE_LAST_MODIFIED, METADATA_START_SCAN_TIME

schema_for_data_file = StructType([
StructField("seq", IntegerType()),
StructField("last_name", StringType()),
StructField("first_name", StringType()),
StructField("FILE_LAST_MODIFIED", TimestampType(), nullable=True),
StructField("FILE_SCAN_TIME", TimestampType(), nullable=True),
StructField("FILENAME", StringType(), nullable=True)
])

df = session.read.schema(schema_for_data_file)
.option("field_delimiter", ",")
.option("SKIP_HEADER", 1)
.csv("@mystagetest/File1.csv")

target_column_names = ["seq","first_name" "last_name","FILE_LAST_MODIFIED","FILE_SCAN_TIME","FILENAME"]

session.sql("DROP TABLE IF EXISTS copied_into_table").collect()

user_schema = StructType([
StructField("SEQ", StringType()), StructField("FIRST_NAMELAST_NAME", StringType())])
df = session.read.with_metadata(METADATA_FILENAME, METADATA_START_SCAN_TIME.as_("SCAN_TIME"), METADATA_FILE_LAST_MODIFIED.as_("FILE_LAST_MODIFIED"), METADATA_FILE_ROW_NUMBER.as_("ROW NUMBER")).schema(user_schema).csv("@mystagetest/File1.csv")
df.show()`

Output
`|"METADATA$FILENAME" |"SCAN_TIME" |"FILE_LAST_MODIFIED" |"ROW NUMBER" |"SEQ" |"FIRST_NAMELAST_NAME" |

|File1.csv |2024-07-10 15:34:49.941119+00:00 |2024-07-09 06:53:53 |1 |seq | first_name |
|File1.csv |2024-07-10 15:34:49.941119+00:00 |2024-07-09 06:53:53 |2 |1 |AAAAAAA |
|File1.csv |2024-07-10 15:34:49.941119+00:00 |2024-07-09 06:53:53 |3 |2 |AAABBB |
|File1.csv |2024-07-10 15:34:49.941119+00:00 |2024-07-09 06:53:53 |4 |3 |newuser |
|File1.csv |2024-07-10 15:34:49.941119+00:00 |2024-07-09 06:53:53 |5 |4 |forthuser |

`

Regards,
Sujan

@sfc-gh-sghosh sfc-gh-sghosh self-assigned this Jul 10, 2024
@sfc-gh-sghosh sfc-gh-sghosh removed the feature New feature or request label Jul 10, 2024
@sfc-gh-sghosh sfc-gh-sghosh removed their assignment Jul 18, 2024
@tanmaykansara
Copy link

tanmaykansara commented Sep 27, 2024

I am running into a similar issue:

`
df = snowpark_session.read.option("PARSE_HEADER", True).option("error_on_column_count_mismatch", False).options({ "FIELD_OPTIONALLY_ENCLOSED_BY":'\042' }).schema(raw_schema).csv("@DEV_CORE.ADMIN.S3_DEV_STAGE/BROADCAST/RAW")

df.copy_into_table('DEV_CORE.RAW.BROADCAST_UNP', force=True, MATCH_BY_COLUMN_NAME='CASE_INSENSITIVE', include_metadata='(CHECKSUM=METADATA$FILE_CONTENT_KEY)')
`

Without the '' around (CHECKSUM=METADATA$FILE_CONTENT_KEY)') the code does not compile, and with the quotes, SNowflake gives following:
COPY INTO DEV_CORE.RAW.BROADCAST_UNP FROM @DEV_CORE.ADMIN.S3_DEV_☺☺☺☺☺ FILE_FORMAT = ( TYPE = CSV PARSE_HEADER = True ERROR_ON_COLUMN_COUNT_MISMATCH = False FIELD_OPTIONALLY_ENCLOSED_BY = '"' ) force = True MATCH_BY_COLUMN_NAME = 'CASE_INSENSITIVE' include_metadata = '(CHECKSUM=METADATA$FILE_CONTENT_KEY)'
And the following error message: SQL compilation error: invalid value ''(CHECKSUM=METADATA$FILE_CONTENT_KEY)'' for property 'INCLUDE_METADATA'

For reference following SQL, when executed from Snowflake directly works:
......ce = True MATCH_BY_COLUMN_NAME = 'CASE_INSENSITIVE' include_metadata = (CHECKSUM='METADATA$FILE_CONTENT_KEY')
Any thoughts on how do I go around this ? Thanks

@Hyurt
Copy link

Hyurt commented Nov 15, 2024

Got redirected from support and been thinking on how making it work

Somehow this
.with_metadata(METADATA_FILENAME.alias("FILENAME"))
works when printing the dataframe - but when using this within the copy into - the transformation is not being sent to the query (can have a look on the Activity Monitoring)

Plus depending on where this fields is located, it can mess with the table structure (my filename is at the end for example)

I came with this solution

result = raw_files.copy_into_table(destination_table,pattern=rf'.*/REDACTED',target_columns=source_tbl.columns,transformations=[*[f'${col}' for col in range(1,len(source_tbl.columns))], 'METADATA$FILENAME'],**copy_options)

with raw_files being my DataframeReader

Only tweaking the transformation argument helped me to achieve adding the filename in my copy into processes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status-triage_done Initial triage done, will be further handled by the driver team
Projects
None yet
Development

No branches or pull requests

5 participants