diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f9f8392..c8e1429f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,20 @@ # Release History +## 1.2.2 + +### Bug Fixes + +### Behavior Changes + +### New Features + +- Model Registry: Support providing external access integrations when deploying a model to SPCS. This will help and be + required to make sure the deploying process work as long as SPCS will by default deny all network connections. The + following endpoints must be allowed to make deployment work: docker.com:80, docker.com:443, anaconda.com:80, + anaconda.com:443, anaconda.org:80, anaconda.org:443, pypi.org:80, pypi.org:443. If you are using + `snowflake.ml.model.models.huggingface_pipeline.HuggingFacePipelineModel` object, the following endpoints are required + to be allowed: huggingface.com:80, huggingface.com:443, huggingface.co:80, huggingface.co:443. + ## 1.2.1 ### New Features diff --git a/bazel/py_rules.bzl b/bazel/py_rules.bzl index c01e6d60..72283696 100644 --- a/bazel/py_rules.bzl +++ b/bazel/py_rules.bzl @@ -259,6 +259,7 @@ def _py_wheel_impl(ctx): "--outdir", wheel_output_dir.path, ], + use_default_shell_env = True, progress_message = "Building Wheel", mnemonic = "WheelBuild", ) diff --git a/ci/RunBazelAction.sh b/ci/RunBazelAction.sh index 743dbffe..2ab960d9 100755 --- a/ci/RunBazelAction.sh +++ b/ci/RunBazelAction.sh @@ -1,7 +1,7 @@ #!/bin/bash # DESCRIPTION: Utility Shell script to run bazel action for snowml repository # -# RunBazelAction.sh [-b ] [-m merge_gate|continuous_run|local_unittest|local_all] [-t ] [-c ] +# RunBazelAction.sh [-b ] [-m merge_gate|continuous_run|quarantined|local_unittest|local_all] [-t ] [-c ] # # Args: # action: bazel action, choose from test and coverage @@ -10,11 +10,13 @@ # -b: specify path to bazel. # -m: specify the mode from the following options # merge_gate: run affected tests only. -# continuous_run (default): run all tests except auto-generated tests. (For nightly run.) +# continuous_run (default): run all tests. (For nightly run. Alias: release) +# quarantined: Run quarantined tests. # local_unit: run all unit tests affected by target defined by -t # local_all: run all tests including integration tests affected by target defined by -t # -t: specify the target for local_unit and local_all mode # -c: specify the path to the coverage report dat file. +# -e: specify the environment, used to determine. # set -o pipefail @@ -24,13 +26,14 @@ set -e bazel="bazel" mode="continuous_run" target="" +SF_ENV="prod3" PROG=$0 action=$1 && shift help() { local exit_code=$1 - echo "Usage: ${PROG} [-b ] [-m merge_gate|continuous_run|local_unittest|local_all]" + echo "Usage: ${PROG} [-b ] [-m merge_gate|continuous_run|quarantined|local_unittest|local_all] [-e ]" exit "${exit_code}" } @@ -38,11 +41,14 @@ if [[ "${action}" != "test" && "${action}" != "coverage" ]]; then help 1 fi -while getopts "b:m:t:c:h" opt; do +while getopts "b:m:t:c:e:h" opt; do case "${opt}" in m) - if [[ "${OPTARG}" = "merge_gate" || "${OPTARG}" = "continuous_run" || "${OPTARG}" = "local_unittest" || "${OPTARG}" = "local_all" ]]; then + if [[ "${OPTARG}" = "merge_gate" || "${OPTARG}" = "continuous_run" || "${OPTARG}" = "quarantined" || "${OPTARG}" = "release" || "${OPTARG}" = "local_unittest" || "${OPTARG}" = "local_all" ]]; then mode="${OPTARG}" + if [[ $mode = "release" ]]; then + mode="continuous_run" + fi else help 1 fi @@ -60,6 +66,9 @@ while getopts "b:m:t:c:h" opt; do c) coverage_report_file="${OPTARG}" ;; + e) + SF_ENV="${OPTARG}" + ;; h) help 0 ;; @@ -91,12 +100,13 @@ merge_gate) affected_targets_file="${working_dir}/affected_targets" ./bazel/get_affected_targets.sh -b "${bazel}" -f "${affected_targets_file}" - tag_filter="--test_tag_filters=-autogen" - - query_expr='kind(".*_test rule", rdeps(//... - set('"$( [-b ] [--env pip|conda] [--mode merge_gate|continuous_run|release] [--with-snowpark] [--report ] +# build_and_run_tests.sh [-b ] [--env pip|conda] [--mode merge_gate|continuous_run] [--with-snowpark] [--report ] # # Args # workspace: path to the workspace, SnowML code should be in snowml directory. @@ -11,9 +11,10 @@ # env: Set the environment, choose from pip and conda # mode: Set the tests set to be run. # merge_gate: run affected tests only. -# continuous_run (default): run all tests except auto-generated tests. (For nightly run.) -# release: run all tests including auto-generated tests. (For releasing) +# continuous_run (default): run all tests. (For nightly run. Alias: release) +# quarantined: run all quarantined tests. # with-snowpark: Build and test with snowpark in snowpark-python directory in the workspace. +# snowflake-env: The environment of the snowflake, use to determine the test quarantine list # report: Path to xml test report # # Action @@ -28,7 +29,7 @@ PROG=$0 help() { local exit_code=$1 - echo "Usage: ${PROG} [-b ] [--env pip|conda] [--mode merge_gate|continuous_run|release] [--with-snowpark] [--report ]" + echo "Usage: ${PROG} [-b ] [--env pip|conda] [--mode merge_gate|continuous_run|quarantined] [--with-snowpark] [--snowflake-env ] [--report ]" exit "${exit_code}" } @@ -43,6 +44,7 @@ SNOWML_DIR="snowml" SNOWPARK_DIR="snowpark-python" IS_NT=false JUNIT_REPORT_PATH="" +SF_ENV="prod3" while (($#)); do case $1 in @@ -63,12 +65,19 @@ while (($#)); do ;; --mode) shift - if [[ $1 = "merge_gate" || $1 = "continuous_run" || $1 = "release" ]]; then + if [[ $1 = "merge_gate" || $1 = "continuous_run" || $1 = "quarantined" || $1 = "release" ]]; then MODE=$1 + if [[ $MODE = "release" ]]; then + MODE="continuous_run" + fi else help 1 fi ;; + --snowflake-env) + shift + SF_ENV=$1 + ;; --report) shift JUNIT_REPORT_PATH=$1 @@ -186,20 +195,10 @@ while IFS='' read -r line; do OPTIONAL_REQUIREMENTS+=("$line"); done < <("${_YQ_ # Compare test required dependencies with wheel pkg dependencies and exclude tests if necessary EXCLUDE_TESTS=$(mktemp "${TEMP_TEST_DIR}/exclude_tests_XXXXX") -if [[ ${MODE} = "continuous_run" || ${MODE} = "release" ]]; then - ./ci/get_excluded_tests.sh -f "${EXCLUDE_TESTS}" -m unused -b "${BAZEL}" -elif [[ ${MODE} = "merge_gate" ]]; then - ./ci/get_excluded_tests.sh -f "${EXCLUDE_TESTS}" -m all -b "${BAZEL}" -fi +./ci/get_excluded_tests.sh -f "${EXCLUDE_TESTS}" -m "${MODE}" -b "${BAZEL}" -e "${SF_ENV}" # Generate and copy auto-gen tests. -if [[ ${MODE} = "release" ]]; then -# When release, we build all autogen tests - "${BAZEL}" "${BAZEL_ADDITIONAL_STARTUP_FLAGS[@]+"${BAZEL_ADDITIONAL_STARTUP_FLAGS[@]}"}" build "${BAZEL_ADDITIONAL_BUILD_FLAGS[@]+"${BAZEL_ADDITIONAL_BUILD_FLAGS[@]}"}" //tests/integ/... -else -# In other cases, we build required utility only. - "${BAZEL}" "${BAZEL_ADDITIONAL_STARTUP_FLAGS[@]+"${BAZEL_ADDITIONAL_STARTUP_FLAGS[@]}"}" build --build_tag_filters=-autogen_build,-autogen "${BAZEL_ADDITIONAL_BUILD_FLAGS[@]+"${BAZEL_ADDITIONAL_BUILD_FLAGS[@]}"}" //tests/integ/... -fi +"${BAZEL}" "${BAZEL_ADDITIONAL_STARTUP_FLAGS[@]+"${BAZEL_ADDITIONAL_STARTUP_FLAGS[@]}"}" build "${BAZEL_ADDITIONAL_BUILD_FLAGS[@]+"${BAZEL_ADDITIONAL_BUILD_FLAGS[@]}"}" //tests/integ/... # Rsync cannot work well with path that has drive letter in Windows, # Thus, these two rsync has to use relative path instead of absolute ones. @@ -330,7 +329,7 @@ echo "Done running ${PROG}" # 0: Success; # 5: no tests found # See https://docs.pytest.org/en/7.1.x/reference/exit-codes.html -if [[ ${MODE} = "merge_gate" && ${TEST_RETCODE} -eq 5 ]]; then +if [[ (${MODE} = "merge_gate" || ${MODE} = "quarantined") && ${TEST_RETCODE} -eq 5 ]]; then exit 0 fi exit ${TEST_RETCODE} diff --git a/ci/conda_recipe/meta.yaml b/ci/conda_recipe/meta.yaml index c2fbc9cb..20347c94 100644 --- a/ci/conda_recipe/meta.yaml +++ b/ci/conda_recipe/meta.yaml @@ -17,7 +17,7 @@ build: noarch: python package: name: snowflake-ml-python - version: 1.2.1 + version: 1.2.2 requirements: build: - python diff --git a/ci/get_excluded_tests.sh b/ci/get_excluded_tests.sh index 1da9df0c..79703670 100755 --- a/ci/get_excluded_tests.sh +++ b/ci/get_excluded_tests.sh @@ -1,18 +1,18 @@ #!/bin/bash # Usage -# exclude_tests.sh [-b ] [-f ] [-m unused|unaffected|all] +# exclude_tests.sh [-b ] [-f ] [- merge_gate|continuous_run|release] # # Flags # -b: specify path to bazel # -f: specify output file path # -m: specify the mode from the following options -# unused: exclude integration tests whose dependency is not part of the wheel package. -# The missing dependency could happen when a new operator is being developed, -# but not yet released. -# unaffected: exclude integration tests whose dependency is not part of the affected targets +# merge_gate: exclude local_only + integration tests whose dependency is not part of the affected targets # compare to the the merge base to main of current revision. -# all (default): exclude the union of above rules. +# continuous_run (default): exclude integration tests whose dependency is not part of the wheel package. +# The missing dependency could happen when a new operator is being developed, +# but not yet released. (Alias: release) +# quarantined: exclude all tests that are not quarantined # set -o pipefail @@ -22,7 +22,7 @@ PROG=$0 help() { local exit_code=$1 - echo "Usage: ${PROG} [-b ] [-f ] [-m unused|unaffected|all]" + echo "Usage: ${PROG} [-b ] [-f ] [-m merge_gate|continuous_run|release|quarantined]" exit "${exit_code}" } @@ -30,9 +30,10 @@ echo "Running ${PROG}" bazel="bazel" output_path="/tmp/files_to_exclude" -mode="all" +mode="continuous_run" +SF_ENV="prod3" -while getopts "b:f:m:h" opt; do +while getopts "b:f:m:e:h" opt; do case "${opt}" in b) bazel=${OPTARG} @@ -42,9 +43,15 @@ while getopts "b:f:m:h" opt; do ;; m) mode=${OPTARG} - if ! [[ $mode = "unused" || $mode = "unaffected" || $mode = "all" ]]; then + if ! [[ $mode = "merge_gate" || $mode = "continuous_run" || $mode = "release" || $mode = "quarantined" ]]; then help 1 fi + if [[ $mode = "release" ]]; then + mode="continuous_run" + fi + ;; + e) + SF_ENV=${OPTARG} ;; h) help 0 @@ -61,24 +68,24 @@ done working_dir=$(mktemp -d "/tmp/tmp_XXXXX") trap 'rm -rf "${working_dir}"' EXIT -if [[ $mode = "unused" || $mode = "all" ]]; then - # Compute missing dependencies by subtracting deps included in wheel from deps required by tests. - # We only care about dependencies in //snowflake since that's our dev directory. - # Reverse search on testing files depending on missing deps and exclude those. - unused_test_rule_file=${working_dir}/unused_test_rule +# Compute missing dependencies by subtracting deps included in wheel from deps required by tests. +# We only care about dependencies in //snowflake since that's our dev directory. +# Reverse search on testing files depending on missing deps and exclude those. - # -- Begin of Query Rules Heredoc -- - cat >"${unused_test_rule_file}" <"${unused_test_rule_file}" <"${unaffected_test_rule_file}" <"${targets_to_exclude_file}" ;; -unaffected) - echo "${unaffected_test_targets}" >"${targets_to_exclude_file}" - ;; -all) +merge_gate) # Concat and deduplicate. targets_to_exclude=$(printf "%s\n%s\n" "${unused_test_targets}" "${unaffected_test_targets}" | awk '!a[$0]++') echo "${targets_to_exclude}" >"${targets_to_exclude_file}" ;; +quarantined) + quarantined_test_rule_file=${working_dir}/quarantined_test_rule + +# -- Begin of Query Rules Heredoc -- + cat >"${quarantined_test_rule_file}" <"${targets_to_exclude_file}" + ;; *) help 1 ;; @@ -118,11 +134,11 @@ excluded_test_source_rule_file=${working_dir}/excluded_test_source_rule # -- Begin of Query Rules Heredoc -- cat >"${excluded_test_source_rule_file}" <"${working_dir}/type_checked_targets_query" + "$(<"${SCRIPTPATH}/../targets/untyped.txt")" "$(<"${SCRIPTPATH}/../targets/local_only.txt")" "${affected_targets}" >"${working_dir}/type_checked_targets_query" type_check_targets=$("${bazel}" query --query_file="${working_dir}/type_checked_targets_query" | awk 'NF { print "\""$0"\","}') echo "${type_check_targets}" diff --git a/codegen/sklearn_wrapper_template.py_template b/codegen/sklearn_wrapper_template.py_template index bda05a0e..dfed46b6 100644 --- a/codegen/sklearn_wrapper_template.py_template +++ b/codegen/sklearn_wrapper_template.py_template @@ -20,7 +20,7 @@ from snowflake.ml._internal.env_utils import SNOWML_SPROC_ENV from snowflake.ml._internal.utils import pkg_version_utils, identifier from snowflake.snowpark import DataFrame, Session from snowflake.snowpark._internal.type_utils import convert_sp_to_sf_type -from snowflake.ml.modeling._internal.snowpark_handlers import SnowparkHandlers as HandlersImpl +from snowflake.ml.modeling._internal.snowpark_implementations.snowpark_handlers import SnowparkHandlers as HandlersImpl from snowflake.ml.modeling._internal.model_trainer_builder import ModelTrainerBuilder from snowflake.ml.modeling._internal.model_trainer import ModelTrainer from snowflake.ml.modeling._internal.estimator_utils import ( @@ -29,7 +29,7 @@ from snowflake.ml.modeling._internal.estimator_utils import ( transform_snowml_obj_to_sklearn_obj, validate_sklearn_args, ) -from snowflake.ml.modeling._internal.estimator_protocols import FitPredictHandlers +from snowflake.ml.modeling._internal.estimator_protocols import TransformerHandlers from snowflake.ml.model.model_signature import ( DataType, @@ -85,7 +85,7 @@ class {transform.original_class_name}(BaseTransformer): self._model_signature_dict: Optional[Dict[str, ModelSignature]] = None # If user used snowpark dataframe during fit, here it stores the snowpark input_cols, otherwise the processed input_cols self._snowpark_cols: Optional[List[str]] = self.input_cols - self._handlers: FitPredictHandlers = HandlersImpl(class_name={transform.original_class_name}.__class__.__name__, subproject=_SUBPROJECT, autogenerated=True) + self._handlers: TransformerHandlers = HandlersImpl(class_name={transform.original_class_name}.__class__.__name__, subproject=_SUBPROJECT, autogenerated=True) self._autogenerated = True def _get_rand_id(self) -> str: diff --git a/packages.bzl b/packages.bzl index 8441381a..fefa8d60 100644 --- a/packages.bzl +++ b/packages.bzl @@ -2,6 +2,7 @@ PACKAGES = [ "//snowflake/cortex:cortex_pkg", + "//snowflake/ml/feature_store:fs_pkg", "//snowflake/ml/modeling/impute:impute_pkg", "//snowflake/ml/modeling/metrics:metrics_pkg", "//snowflake/ml/modeling/model_selection:model_selection_pkg", diff --git a/snowflake/ml/_internal/env_utils.py b/snowflake/ml/_internal/env_utils.py index 1fd225c3..8b14094a 100644 --- a/snowflake/ml/_internal/env_utils.py +++ b/snowflake/ml/_internal/env_utils.py @@ -294,19 +294,22 @@ def get_matched_package_versions_in_snowflake_conda_channel( url = f"{_SNOWFLAKE_CONDA_CHANNEL_URL}/{conda_os.value}/repodata.json" if req.name not in _SNOWFLAKE_CONDA_PACKAGE_CACHE: - http_client = retryable_http.get_http_client() - parsed_python_version = version.Version(python_version) - python_version_build_str = f"py{parsed_python_version.major}{parsed_python_version.minor}" - repodata = http_client.get(url).json() - assert isinstance(repodata, dict) - packages_info = repodata["packages"] - assert isinstance(packages_info, dict) - version_list = [ - version.parse(package_info["version"]) - for package_info in packages_info.values() - if package_info["name"] == req.name and python_version_build_str in package_info["build"] - ] - _SNOWFLAKE_CONDA_PACKAGE_CACHE[req.name] = version_list + try: + http_client = retryable_http.get_http_client() + parsed_python_version = version.Version(python_version) + python_version_build_str = f"py{parsed_python_version.major}{parsed_python_version.minor}" + repodata = http_client.get(url).json() + assert isinstance(repodata, dict) + packages_info = repodata["packages"] + assert isinstance(packages_info, dict) + version_list = [ + version.parse(package_info["version"]) + for package_info in packages_info.values() + if package_info["name"] == req.name and python_version_build_str in package_info["build"] + ] + _SNOWFLAKE_CONDA_PACKAGE_CACHE[req.name] = version_list + except Exception: + pass matched_versions = list(req.specifier.filter(set(_SNOWFLAKE_CONDA_PACKAGE_CACHE.get(req.name, [])))) return matched_versions diff --git a/snowflake/ml/_internal/exceptions/modeling_error_messages.py b/snowflake/ml/_internal/exceptions/modeling_error_messages.py index d407162a..affdc6d5 100644 --- a/snowflake/ml/_internal/exceptions/modeling_error_messages.py +++ b/snowflake/ml/_internal/exceptions/modeling_error_messages.py @@ -1,4 +1,8 @@ -ATTRIBUTE_NOT_SET = "{} is not set." +ATTRIBUTE_NOT_SET = ( + "{} is not set. To read more about Snowpark ML general API differences, please refer to: " + "https://docs.snowflake.com/en/developer-guide/snowpark-ml/snowpark-ml-modeling#general-api" + "-differences." +) SIZE_MISMATCH = "Size mismatch: {}={}, {}={}." INVALID_MODEL_PARAM = "Invalid parameter {} for model {}. Valid parameters: {}." UNSUPPORTED_MODEL_CONVERSION = "Object doesn't support {}. Please use {}." diff --git a/snowflake/ml/feature_store/BUILD.bazel b/snowflake/ml/feature_store/BUILD.bazel index bbd441d6..0366ead5 100644 --- a/snowflake/ml/feature_store/BUILD.bazel +++ b/snowflake/ml/feature_store/BUILD.bazel @@ -1,4 +1,4 @@ -load("//bazel:py_rules.bzl", "py_library") +load("//bazel:py_rules.bzl", "py_library", "py_package") package_group( name = "feature_store", @@ -7,10 +7,7 @@ package_group( ], ) -package(default_visibility = [ - ":feature_store", - "//bazel:snowml_public_common", -]) +package(default_visibility = ["//visibility:public"]) py_library( name = "init", @@ -38,3 +35,11 @@ py_library( "//snowflake/ml/dataset", ], ) + +py_package( + name = "fs_pkg", + packages = ["snowflake.ml"], + deps = [ + ":feature_store_lib", + ], +) diff --git a/snowflake/ml/feature_store/_internal/BUILD.bazel b/snowflake/ml/feature_store/_internal/BUILD.bazel index cfd68961..09c1b801 100644 --- a/snowflake/ml/feature_store/_internal/BUILD.bazel +++ b/snowflake/ml/feature_store/_internal/BUILD.bazel @@ -1,9 +1,6 @@ load("//bazel:py_rules.bzl", "py_library") -package(default_visibility = [ - "//bazel:snowml_public_common", - "//snowflake/ml/feature_store", -]) +package(default_visibility = ["//visibility:public"]) py_library( name = "synthetic_data_generator", diff --git a/snowflake/ml/feature_store/_internal/scripts/upload_test_datasets.py b/snowflake/ml/feature_store/_internal/scripts/upload_test_datasets.py index cf4d4ddb..f60dedc9 100644 --- a/snowflake/ml/feature_store/_internal/scripts/upload_test_datasets.py +++ b/snowflake/ml/feature_store/_internal/scripts/upload_test_datasets.py @@ -74,6 +74,7 @@ def create_winedata(sess: Session, overwrite_mode: str) -> None: if __name__ == "__main__": sess = Session.builder.configs(SnowflakeLoginOptions()).create() + sess.sql(f"USE DATABASE {FS_INTEG_TEST_DB}").collect() create_tripdata(sess, "overwrite") create_winedata(sess, "overwrite") diff --git a/snowflake/ml/feature_store/feature_store.py b/snowflake/ml/feature_store/feature_store.py index 3d4525d9..e130b650 100644 --- a/snowflake/ml/feature_store/feature_store.py +++ b/snowflake/ml/feature_store/feature_store.py @@ -54,7 +54,7 @@ _FEATURE_STORE_OBJECT_TAG = "SNOWML_FEATURE_STORE_OBJECT" _PROJECT = "FeatureStore" _DT_OR_VIEW_QUERY_PATTERN = re.compile( - r"""CREATE\ (?P(DYNAMIC\ TABLE|VIEW))\ .* + r"""CREATE\ (OR\ REPLACE\ )?(?P(DYNAMIC\ TABLE|VIEW))\ .* COMMENT\ =\ '(?P.*)'\s* TAG.*?{entity_tag}\ =\ '(?P.*?)',\n .*?{ts_col_tag}\ =\ '(?P.*?)',?.*? @@ -252,12 +252,18 @@ def register_entity(self, entity: Entity) -> None: join_keys = [f"'{key.resolved()}'" for key in entity.join_keys] join_keys_str = ",".join(join_keys) full_tag_name = self._get_fully_qualified_name(tag_name) - self._session.sql( - f"""CREATE TAG IF NOT EXISTS {full_tag_name} - ALLOWED_VALUES {join_keys_str} - COMMENT = '{entity.desc}' - """ - ).collect(statement_params=self._telemetry_stmp) + try: + self._session.sql( + f"""CREATE TAG IF NOT EXISTS {full_tag_name} + ALLOWED_VALUES {join_keys_str} + COMMENT = '{entity.desc}' + """ + ).collect(statement_params=self._telemetry_stmp) + except Exception as e: + raise snowml_exceptions.SnowflakeMLException( + error_code=error_codes.INTERNAL_SNOWPARK_ERROR, + original_exception=RuntimeError(f"Failed to register entity `{entity.name}`: {e}."), + ) from e logger.info(f"Registered Entity {entity}.") # TODO: add support to update column desc once SNOW-894249 is fixed @@ -267,6 +273,7 @@ def register_feature_view( feature_view: FeatureView, version: str, block: bool = False, + override: bool = False, ) -> FeatureView: """ Materialize a FeatureView to Snowflake backend. @@ -281,6 +288,9 @@ def register_feature_view( NOTE: Version only accepts letters, numbers and underscore. Also version will be capitalized. block: Specify whether the FeatureView backend materialization should be blocking or not. If blocking then the API will wait until the initial FeatureView data is generated. + override: Override the existing FeatureView with same version. This is the same as dropping the FeatureView + first then recreate. NOTE: there will be backfill cost associated if the FeatureView is being + continuously maintained. Returns: A materialized FeatureView object. @@ -312,14 +322,15 @@ def register_feature_view( ) feature_view_name = FeatureView._get_physical_name(feature_view.name, version) - dynamic_table_results = self._find_object("DYNAMIC TABLES", feature_view_name) - view_results = self._find_object("VIEWS", feature_view_name) - if len(dynamic_table_results) > 0 or len(view_results) > 0: - raise snowml_exceptions.SnowflakeMLException( - error_code=error_codes.OBJECT_ALREADY_EXISTS, - original_exception=ValueError(f"FeatureView {feature_view.name}/{version} already exists."), - suppress_source_trace=True, - ) + if not override: + dynamic_table_results = self._find_object("DYNAMIC TABLES", feature_view_name) + view_results = self._find_object("VIEWS", feature_view_name) + if len(dynamic_table_results) > 0 or len(view_results) > 0: + raise snowml_exceptions.SnowflakeMLException( + error_code=error_codes.OBJECT_ALREADY_EXISTS, + original_exception=ValueError(f"FeatureView {feature_view.name}/{version} already exists."), + suppress_source_trace=True, + ) fully_qualified_name = self._get_fully_qualified_name(feature_view_name) entities = _FEATURE_VIEW_ENTITY_TAG_DELIMITER.join([e.name for e in feature_view.entities]) @@ -349,10 +360,12 @@ def create_col_desc(col: StructField) -> str: self._default_warehouse, timestamp_col, block, + override, ) else: try: - query = f"""CREATE VIEW {fully_qualified_name} ({column_descs}) + override_clause = " OR REPLACE" if override else "" + query = f"""CREATE{override_clause} VIEW {fully_qualified_name} ({column_descs}) COMMENT = '{feature_view.desc}' TAG ( {_FEATURE_VIEW_ENTITY_TAG} = '{entities}', @@ -459,7 +472,7 @@ def list_feature_views( fvs = self._find_feature_views(entity_name, feature_view_name) else: fvs = [] - for row in self._get_backend_representations(feature_view_name, prefix_match=True): + for row in self._get_fv_backend_representations(feature_view_name, prefix_match=True): fvs.append(self._compose_feature_view(row)) if as_dataframe: @@ -491,7 +504,7 @@ def get_feature_view(self, name: str, version: str) -> FeatureView: version = FeatureViewVersion(version) fv_name = FeatureView._get_physical_name(name, version) - results = self._get_backend_representations(fv_name) + results = self._get_fv_backend_representations(fv_name) if len(results) != 1: raise snowml_exceptions.SnowflakeMLException( error_code=error_codes.NOT_FOUND, @@ -787,6 +800,7 @@ def retrieve_feature_values( features: Union[List[Union[FeatureView, FeatureViewSlice]], List[str]], spine_timestamp_col: Optional[str] = None, exclude_columns: Optional[List[str]] = None, + include_feature_view_timestamp_col: bool = False, ) -> DataFrame: """ Enrich spine dataframe with feature values. Mainly used to generate inference data input. @@ -798,6 +812,8 @@ def retrieve_feature_values( or a list of serialized feature objects from Dataset. spine_timestamp_col: Timestamp column in spine_df for point-in-time feature value lookup. exclude_columns: Column names to exclude from the result dataframe. + include_feature_view_timestamp_col: Generated dataset will include timestamp column of feature view + (if feature view has timestamp column) if set true. Default to false. Returns: Snowpark DataFrame containing the joined results. @@ -817,6 +833,7 @@ def retrieve_feature_values( spine_df, cast(List[Union[FeatureView, FeatureViewSlice]], features), spine_timestamp_col, + include_feature_view_timestamp_col, ) if exclude_columns is not None: @@ -834,6 +851,7 @@ def generate_dataset( spine_label_cols: Optional[List[str]] = None, exclude_columns: Optional[List[str]] = None, save_mode: str = "errorifexists", + include_feature_view_timestamp_col: bool = False, desc: str = "", ) -> Dataset: """ @@ -856,6 +874,8 @@ def generate_dataset( save_mode: How new data is saved. currently support: errorifexists: Raise error if registered table already exists. merge: Merge new data if registered table already exists. + include_feature_view_timestamp_col: Generated dataset will include timestamp column of feature view + (if feature view has timestamp column) if set true. Default to false. desc: A description about this dataset. Returns: @@ -892,7 +912,9 @@ def generate_dataset( ), ) - result_df, join_keys = self._join_features(spine_df, features, spine_timestamp_col) + result_df, join_keys = self._join_features( + spine_df, features, spine_timestamp_col, include_feature_view_timestamp_col + ) snapshot_table = None if materialized_table is not None: @@ -1028,10 +1050,12 @@ def _create_dynamic_table( warehouse: SqlIdentifier, timestamp_col: SqlIdentifier, block: bool, + override: bool, ) -> None: # TODO: cluster by join keys once DT supports that try: - query = f"""CREATE DYNAMIC TABLE {fully_qualified_name} ({column_descs}) + override_clause = " OR REPLACE" if override else "" + query = f"""CREATE{override_clause} DYNAMIC TABLE {fully_qualified_name} ({column_descs}) TARGET_LAG = '{'DOWNSTREAM' if schedule_task else feature_view.refresh_freq}' COMMENT = '{feature_view.desc}' TAG ( @@ -1048,26 +1072,29 @@ def _create_dynamic_table( ) if schedule_task: - self._session.sql( - f"""CREATE TASK {fully_qualified_name} - WAREHOUSE = {warehouse} - SCHEDULE = 'USING CRON {feature_view.refresh_freq}' - AS ALTER DYNAMIC TABLE {fully_qualified_name} REFRESH + try: + self._session.sql( + f"""CREATE{override_clause} TASK {fully_qualified_name} + WAREHOUSE = {warehouse} + SCHEDULE = 'USING CRON {feature_view.refresh_freq}' + AS ALTER DYNAMIC TABLE {fully_qualified_name} REFRESH + """ + ).collect(statement_params=self._telemetry_stmp) + self._session.sql( + f""" + ALTER TASK {fully_qualified_name} + SET TAG {self._get_fully_qualified_name(_FEATURE_STORE_OBJECT_TAG)} = '' """ - ).collect(statement_params=self._telemetry_stmp) - self._session.sql( - f""" - ALTER TASK {fully_qualified_name} - SET TAG {self._get_fully_qualified_name(_FEATURE_STORE_OBJECT_TAG)} = '' - """ - ).collect(statement_params=self._telemetry_stmp) - self._session.sql(f"ALTER TASK {fully_qualified_name} RESUME").collect( - statement_params=self._telemetry_stmp - ) + ).collect(statement_params=self._telemetry_stmp) + self._session.sql(f"ALTER TASK {fully_qualified_name} RESUME").collect( + statement_params=self._telemetry_stmp + ) + except Exception: + self._session.sql(f"DROP DYNAMIC TABLE IF EXISTS {fully_qualified_name}").collect( + statement_params=self._telemetry_stmp + ) + raise except Exception as e: - self._session.sql(f"DROP DYNAMIC TABLE IF EXISTS {fully_qualified_name}").collect( - statement_params=self._telemetry_stmp - ) raise snowml_exceptions.SnowflakeMLException( error_code=error_codes.INTERNAL_SNOWPARK_ERROR, original_exception=RuntimeError( @@ -1102,7 +1129,6 @@ def _dump_dataset( error_code=error_codes.INVALID_ARGUMENT, original_exception=ValueError(f"Dataset df must contain only one query. Got: {df.queries['queries']}"), ) - schema = ", ".join([f"{c.name} {type_utils.convert_sp_to_sf_type(c.datatype)}" for c in df.schema.fields]) fully_qualified_name = self._get_fully_qualified_name(table_name) @@ -1152,6 +1178,7 @@ def _join_features( spine_df: DataFrame, features: List[Union[FeatureView, FeatureViewSlice]], spine_timestamp_col: Optional[SqlIdentifier], + include_feature_view_timestamp_col: bool, ) -> Tuple[DataFrame, List[SqlIdentifier]]: if len(spine_df.queries["queries"]) != 1: raise snowml_exceptions.SnowflakeMLException( @@ -1198,9 +1225,15 @@ def _join_features( if spine_timestamp_col is not None and f.timestamp_col is not None: if self._asof_join_enabled: + if include_feature_view_timestamp_col: + f_ts_col_alias = identifier.concat_names([f.name, "_", f.version, "_", f.timestamp_col]) + f_ts_col_str = f"r_{layer}.{f.timestamp_col} AS {f_ts_col_alias}," + else: + f_ts_col_str = "" query = f""" SELECT l_{layer}.*, + {f_ts_col_str} r_{layer}.* EXCLUDE ({join_keys_str}, {f.timestamp_col}) FROM ({query}) l_{layer} ASOF JOIN ( @@ -1360,7 +1393,7 @@ def _get_fully_qualified_name(self, name: Union[SqlIdentifier, str]) -> str: return f"{self._config.full_schema_path}.{name}" # TODO: SHOW DYNAMIC TABLES is very slow while other show objects are fast, investigate with DT in SNOW-902804. - def _get_backend_representations( + def _get_fv_backend_representations( self, object_name: Optional[SqlIdentifier], prefix_match: bool = False ) -> List[Row]: dynamic_table_results = self._find_object("DYNAMIC TABLES", object_name, prefix_match) @@ -1399,8 +1432,10 @@ def _find_feature_views( if not self._validate_entity_exists(entity_name): return [] - all_fv_names = [SqlIdentifier(r["name"], case_sensitive=True) for r in self._get_backend_representations(None)] - if len(all_fv_names) == 0: + all_fvs = self._get_fv_backend_representations(object_name=None) + fv_maps = {SqlIdentifier(r["name"], case_sensitive=True): r for r in all_fvs} + + if len(fv_maps.keys()) == 0: return [] # NOTE: querying INFORMATION_SCHEMA for Entity lineage can be expensive depending on how many active @@ -1420,7 +1455,7 @@ def _find_feature_views( WHERE LEVEL = 'TABLE' AND TAG_NAME = '{_FEATURE_VIEW_ENTITY_TAG}' """ - for fv_name in all_fv_names + for fv_name in fv_maps.keys() ] results = self._session.sql("\nUNION\n".join(queries)).collect(statement_params=self._telemetry_stmp) @@ -1432,15 +1467,16 @@ def _find_feature_views( outputs = [] for r in results: if entity_name == SqlIdentifier(r["TAG_VALUE"], case_sensitive=True): - fv_name, version = r["OBJECT_NAME"].split(_FEATURE_VIEW_NAME_DELIMITER) + fv_name, _ = r["OBJECT_NAME"].split(_FEATURE_VIEW_NAME_DELIMITER) fv_name = SqlIdentifier(fv_name, case_sensitive=True) + obj_name = SqlIdentifier(r["OBJECT_NAME"], case_sensitive=True) if feature_view_name is not None: if fv_name == feature_view_name: - outputs.append(self.get_feature_view(fv_name, version)) + outputs.append(self._compose_feature_view(fv_maps[obj_name])) else: continue else: - outputs.append(self.get_feature_view(fv_name.identifier(), version)) + outputs.append(self._compose_feature_view(fv_maps[obj_name])) return outputs def _compose_feature_view(self, row: Row) -> FeatureView: diff --git a/snowflake/ml/feature_store/notebooks/customer_demo/DBT_External_Feature_Pipeline_Demo.ipynb b/snowflake/ml/feature_store/notebooks/customer_demo/DBT_External_Feature_Pipeline_Demo.ipynb new file mode 100644 index 00000000..91d4c115 --- /dev/null +++ b/snowflake/ml/feature_store/notebooks/customer_demo/DBT_External_Feature_Pipeline_Demo.ipynb @@ -0,0 +1,533 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "5f46aef7-1fc7-408e-acf1-0dc030981c58", + "metadata": {}, + "source": [ + "- snowflake-ml-python version: 1.2.1\n", + "- Last updated on: 1/30/2024" + ] + }, + { + "cell_type": "markdown", + "id": "70cdcdfb-a40f-4b5a-9ae8-6768b097f65a", + "metadata": {}, + "source": [ + "# DBT External Feature Pipeline Demo\n", + "\n", + "This notebook showcases the interoperation between DBT and Snowflake Feature Store. The source data is managed in Snowflake database, while the feature pipelines are managed and executed from DBT. The output is stored as feature tables in Snowflake. Then We read from the feature tables and register as Feature View.\n", + "\n", + "This demo requires DBT account." + ] + }, + { + "cell_type": "markdown", + "id": "76628c92-5f51-4562-86a1-dadc2aeb85c0", + "metadata": {}, + "source": [ + "## Setup Snowflake connection" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "22427859-5cc8-43de-bdef-ebaa7ec33670", + "metadata": {}, + "outputs": [], + "source": [ + "from snowflake.snowpark import Session\n", + "from snowflake.ml.utils.connection_params import SnowflakeLoginOptions\n", + "\n", + "session = Session.builder.configs(SnowflakeLoginOptions()).create()" + ] + }, + { + "cell_type": "markdown", + "id": "fdfb517c-c5fd-4119-b27f-5b3d778574de", + "metadata": {}, + "source": [ + "Create test database, schema and warehouse." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "510d1d12-e5c1-4078-884a-771cc7ea257d", + "metadata": {}, + "outputs": [], + "source": [ + "# database name where test data and feature store lives.\n", + "FS_DEMO_DB = f\"SNOWML_FEATURE_STORE_DBT_DEMO\"\n", + "# schema where test data lives.\n", + "TEST_DATASET_SCHEMA = 'DBT_DATA'\n", + "# feature store name.\n", + "FS_DEMO_SCHEMA = \"FS_DBT_DEMO\"\n", + "\n", + "session.sql(f\"DROP DATABASE IF EXISTS {FS_DEMO_DB}\").collect()\n", + "session.sql(f\"CREATE DATABASE IF NOT EXISTS {FS_DEMO_DB}\").collect()\n", + "session.sql(f\"\"\"\n", + " CREATE SCHEMA IF NOT EXISTS \n", + " {FS_DEMO_DB}.{TEST_DATASET_SCHEMA}\n", + "\"\"\").collect()" + ] + }, + { + "cell_type": "markdown", + "id": "320c79c9-d6aa-4d66-b270-8463a33f0d03", + "metadata": {}, + "source": [ + "## Prepare source data\n", + "\n", + "This notebook will use public `fraud_transactions` data as source. It contains transaction data range between [2019-04-01 00:00:00.000, 2019-09-01 00:00:00.000). We will split this dataset into two parts based on its timestamp. The first part includes rows before 2019-07-01, the second part includes rows after 2019-07-01. We copy the first part into `CUSTOMER_TRANSACTIONS_FRAUD` table now. And will copy second part into same table later." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2af11339-5f7d-4f76-b352-d495353b6136", + "metadata": {}, + "outputs": [], + "source": [ + "# Replace with your local path\n", + "LOCAL_PATH_CUSTOMER_TRANSACTIONS_FRAUD = \"./fraud_transactions.csv.gz\"\n", + "\n", + "raw_data_path = f\"{FS_DEMO_DB}.{TEST_DATASET_SCHEMA}.RAW_FRAUD_TRANSACTIONS\"\n", + "session.sql(\n", + " f\"\"\"create or replace TABLE {raw_data_path} ( \n", + " TRANSACTION_ID NUMBER, \n", + " TX_DATETIME TIMESTAMP_NTZ, \n", + " CUSTOMER_ID NUMBER, \n", + " TERMINAL_ID NUMBER, \n", + " TX_AMOUNT FLOAT, \n", + " TX_TIME_SECONDS NUMBER, \n", + " TX_TIME_DAYS NUMBER, \n", + " TX_FRAUD NUMBER, \n", + " TX_FRAUD_SCENARIO NUMBER)\n", + " \"\"\").collect()\n", + "\n", + "session.file.put(\n", + " LOCAL_PATH_CUSTOMER_TRANSACTIONS_FRAUD, \n", + " f\"@{FS_DEMO_DB}.{TEST_DATASET_SCHEMA}.%RAW_FRAUD_TRANSACTIONS\", \n", + " auto_compress=False)\n", + "session.sql(f\"\"\"\n", + " copy into {raw_data_path} file_format = (type = csv)\"\"\").collect()\n", + "session.sql(f\"SELECT COUNT(*) FROM {raw_data_path}\").show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d26d7215-3845-4436-a4f7-486d48f7815b", + "metadata": {}, + "outputs": [], + "source": [ + "fraud_data_path = f\"{FS_DEMO_DB}.{TEST_DATASET_SCHEMA}.CUSTOMER_TRANSACTIONS_FRAUD\"\n", + "session.sql(f\"\"\"\n", + " CREATE OR REPLACE TABLE {fraud_data_path} AS\n", + " SELECT *\n", + " FROM {FS_DEMO_DB}.{TEST_DATASET_SCHEMA}.RAW_FRAUD_TRANSACTIONS\n", + " WHERE TX_DATETIME < '2019-07-01'\n", + "\"\"\").collect()\n", + "session.sql(f\"SELECT COUNT(*) FROM {fraud_data_path}\").show()" + ] + }, + { + "cell_type": "markdown", + "id": "4bb4b5a4-72c5-42b6-87ba-e63483ae4c08", + "metadata": {}, + "source": [ + "## Define models in DBT\n", + "Now lets switch to [DBT IDE](https://cloud.getdbt.com/develop/15898/projects/334785)(this link will not work for you, you will need to create your own project) for a while. You will need a DBT account beforehand. Once you have DBT account, then you can clone the demo code from [here](https://github.com/sfc-gh-wezhou/FS_DBT_DEMO/tree/dev/models/example) (Snowflake repo). Below screenshot shows how DBT IDE looks like. In the file explorer section, you can see the code structure. Our [DBT models](https://docs.getdbt.com/docs/build/python-models) defined under models/example folder. We have 3 models: customers, terminals and transactions. These 3 models will later output 3 Snowflake DataFrame object. Lastly, Feature Store will register these DataFrames and make them FeatureViews." + ] + }, + { + "attachments": { + "b635b471-d26d-4374-abe0-41c284eaae6a.png": { + "image/png": "" + } + }, + "cell_type": "markdown", + "id": "77d60dc3-bb36-4b19-8dfd-22a5f15ffb29", + "metadata": {}, + "source": [ + "![image.png](attachment:b635b471-d26d-4374-abe0-41c284eaae6a.png)" + ] + }, + { + "cell_type": "markdown", + "id": "18fba209-f034-4fe3-b9d7-0daa8b7376c3", + "metadata": {}, + "source": [ + "## Run models in DBT\n", + "After we defined models, now we can run and generate our feature tables. Simple exeucte `dbt run` in the terminal and it will do all the work. " + ] + }, + { + "attachments": { + "5b91a161-f657-45a8-b48d-ad595e00e7e6.png": { + "image/png": "" + } + }, + "cell_type": "markdown", + "id": "45d69561-52ef-430b-9401-e188c9d126d0", + "metadata": {}, + "source": [ + "![image.png](attachment:5b91a161-f657-45a8-b48d-ad595e00e7e6.png)" + ] + }, + { + "cell_type": "markdown", + "id": "83d8259b-1449-40c2-87f7-d34c81d11903", + "metadata": {}, + "source": [ + "After the run success, lets check whether the feature tables are populated.\n", + "\n", + "(TODO, the output schema has a werid \"FS_DBT_\" prefix that comes from nowhere)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "da97a2f6-af82-4258-9ba5-9e1b3c13d70b", + "metadata": {}, + "outputs": [], + "source": [ + "# replace 'transactions' with 'customers' or 'terminals' to show respective table.\n", + "session.sql(f\"SELECT * FROM {FS_DEMO_DB}.FS_DBT_{TEST_DATASET_SCHEMA}.transactions\").show()" + ] + }, + { + "cell_type": "markdown", + "id": "5c85ca3a-baf5-4fb1-a74a-d45a90182bb1", + "metadata": {}, + "source": [ + "## Register feature tables as Feature Views\n", + "\n", + "Now lets create Feature Views with Feature Store. Since DBT is responsible for executing the pipeline, the feature tables will be registered as external pipeline. Underlying, it creates views, instead of dynamic tables, from the feature tables." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1d04b1b8-dd0d-47a5-a980-3c3f34b276f8", + "metadata": {}, + "outputs": [], + "source": [ + "from snowflake.ml.feature_store import (\n", + " FeatureStore,\n", + " FeatureView,\n", + " Entity,\n", + " CreationMode\n", + ")\n", + "\n", + "fs = FeatureStore(\n", + " session=session, \n", + " database=FS_DEMO_DB, \n", + " name=FS_DEMO_SCHEMA, \n", + " default_warehouse='PUBLIC',\n", + " creation_mode=CreationMode.CREATE_IF_NOT_EXIST,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "be6784b7-b96e-4c0a-9b8e-44a48b3d5951", + "metadata": {}, + "source": [ + "Register entities for features." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "806ad73d-b0fc-48ea-b63d-0f9669828486", + "metadata": {}, + "outputs": [], + "source": [ + "customer = Entity(name=\"CUSTOMER\", join_keys=[\"CUSTOMER_ID\"])\n", + "terminal = Entity(name=\"TERMINAL\", join_keys=[\"TERMINAL_ID\"])\n", + "transaction = Entity(name=\"TRANSACTION\", join_keys=[\"TRANSACTION_ID\"])\n", + "fs.register_entity(customer)\n", + "fs.register_entity(terminal)\n", + "fs.register_entity(transaction)\n", + "fs.list_entities().show()" + ] + }, + { + "cell_type": "markdown", + "id": "96f49f77-25cf-4c0c-a1d0-a04808a222a2", + "metadata": {}, + "source": [ + "Define feature views. `feature_df` is a dataframe object that selects from a subset of columns of feature tables. `refresh_freq` is None indicates it is static and won't be refreshed. Underlying it will create views on the feature tables." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "399984e1-a89c-486f-9370-fef9a8921c99", + "metadata": {}, + "outputs": [], + "source": [ + "# terminal features\n", + "terminals_df = session.sql(f\"\"\"\n", + " SELECT \n", + " TERMINAL_ID,\n", + " EVENT_TIMESTAMP,\n", + " TERM_RISK_1,\n", + " TERM_RISK_7,\n", + " TERM_RISK_30\n", + " FROM {FS_DEMO_DB}.FS_DBT_{TEST_DATASET_SCHEMA}.terminals\n", + " \"\"\")\n", + "terminals_fv = FeatureView(\n", + " name=\"terminal_features\", \n", + " entities=[terminal],\n", + " feature_df=terminals_df,\n", + " timestamp_col=\"EVENT_TIMESTAMP\",\n", + " refresh_freq=None,\n", + " desc=\"A bunch of terminal related features\")\n", + "\n", + "# customer features\n", + "customers_df = session.sql(f\"\"\"\n", + " SELECT \n", + " CUSTOMER_ID,\n", + " EVENT_TIMESTAMP,\n", + " CUST_AVG_AMOUNT_1,\n", + " CUST_AVG_AMOUNT_7,\n", + " CUST_AVG_AMOUNT_30\n", + " FROM {FS_DEMO_DB}.FS_DBT_{TEST_DATASET_SCHEMA}.customers\n", + " \"\"\")\n", + "customers_fv = FeatureView(\n", + " name=\"customers_features\", \n", + " entities=[customer],\n", + " feature_df=customers_df,\n", + " timestamp_col=\"EVENT_TIMESTAMP\",\n", + " refresh_freq=None,\n", + " desc=\"A bunch of customer related features\")\n", + "\n", + "# transaction features\n", + "transactions_df = session.sql(f\"\"\"\n", + " SELECT \n", + " TRANSACTION_ID, \n", + " EVENT_TIMESTAMP, \n", + " TX_AMOUNT,\n", + " TX_FRAUD\n", + " FROM {FS_DEMO_DB}.FS_DBT_{TEST_DATASET_SCHEMA}.transactions\n", + " \"\"\")\n", + "transactions_fv = FeatureView(\n", + " name=\"transactions_features\", \n", + " entities=[transaction],\n", + " feature_df=transactions_df,\n", + " timestamp_col=\"EVENT_TIMESTAMP\",\n", + " refresh_freq=None,\n", + " desc=\"A bunch of transaction related features\")" + ] + }, + { + "cell_type": "markdown", + "id": "49df8d25-bba1-4fcf-9b80-df292d4d6cbf", + "metadata": {}, + "source": [ + "Register these feature views in feature store so you can retrieve them back later even after notebook session is destroyed. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a7c18556-290b-418a-9dd8-b98b0e9acf8d", + "metadata": {}, + "outputs": [], + "source": [ + "terminals_fv = fs.register_feature_view(\n", + " feature_view=terminals_fv,\n", + " version=\"1\",\n", + " block=True)\n", + "\n", + "customers_fv = fs.register_feature_view(\n", + " feature_view=customers_fv,\n", + " version=\"1\",\n", + " block=True)\n", + "\n", + "transactions_fv = fs.register_feature_view(\n", + " feature_view=transactions_fv,\n", + " version=\"1\",\n", + " block=True)" + ] + }, + { + "cell_type": "markdown", + "id": "59befe8d-86a4-41f8-aeff-73d3756a2f48", + "metadata": {}, + "source": [ + "Lets check whether feature views are reigstered successfully in feature store. You will see 3 registerd feature views and their status is \"static\"." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bdab88b1-2b92-41ac-88d1-c91d25f3155e", + "metadata": {}, + "outputs": [], + "source": [ + "fs.list_feature_views().select([\n", + " \"NAME\", \n", + " \"VERSION\", \n", + " \"ENTITIES\", \n", + " \"REFRESH_FREQ\", \n", + " \"STATUS\", \n", + " \"PHYSICAL_NAME\"]).show()" + ] + }, + { + "cell_type": "markdown", + "id": "0f96f221-274d-4a0e-8e46-72748e1f7a92", + "metadata": {}, + "source": [ + "## Generate training dataset with point-in-time correctness\n", + "We can now generate training dataset with feature views. Firstly, we create a mock spine dataframe which has 3 columns: instance_id, customer_id and event_timestamp. Note the event_timestamp of 3 rows are same: \"2019-09-01 00:00:00.000\". Later, we will update the source table (`CUSTOMER_TRANSACTIONS_FRAUD`) and feature tables with newer events. We will still use this `spine_df` with same timestamp to generate dataset but it is expected to output a different training data. The new training data will join spine_df with latest feature values from newer events. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6d04c095-0702-4fe5-905d-fb8e576710a5", + "metadata": {}, + "outputs": [], + "source": [ + "spine_df = session.create_dataframe(\n", + " [\n", + " (1, 2443, \"2019-09-01 00:00:00.000\"), \n", + " (2, 1889, \"2019-09-01 00:00:00.000\"),\n", + " (3, 1309, \"2019-09-01 00:00:00.000\")\n", + " ], \n", + " schema=[\"INSTANCE_ID\", \"CUSTOMER_ID\", \"EVENT_TIMESTAMP\"])\n", + "\n", + "old_training_data = fs.generate_dataset(\n", + " spine_df=spine_df,\n", + " features=[customers_fv],\n", + " materialized_table=\"customer_fraud_training_data\",\n", + " spine_timestamp_col=\"EVENT_TIMESTAMP\",\n", + " spine_label_cols = []\n", + ")\n", + "old_training_data.df.show()" + ] + }, + { + "cell_type": "markdown", + "id": "b0a7eab2-493b-44a4-ae5c-08bd25daa93a", + "metadata": {}, + "source": [ + "## Update features from DBT\n", + "Now we are injecting newer events into source, then refresh the pipeline and generate new feature values. We firstly check how many rows the source table currently has." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f62a45b2-b966-4230-959f-206e54202599", + "metadata": {}, + "outputs": [], + "source": [ + "session.sql(f\"SELECT COUNT(*) FROM {fraud_data_path}\").show()" + ] + }, + { + "cell_type": "markdown", + "id": "5256584e-4048-4d1c-930d-7cb312b36e4d", + "metadata": {}, + "source": [ + "We inject new events with timestamp later than '2019-07-01'. Then check how many rows in the source table after the injection." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5494dd0a-dce3-40c2-8810-6aa1a8d7f31b", + "metadata": {}, + "outputs": [], + "source": [ + "session.sql(f\"\"\"\n", + " INSERT INTO {fraud_data_path}\n", + " SELECT *\n", + " FROM {raw_data_path}\n", + " WHERE TX_DATETIME >= '2019-07-01'\n", + "\"\"\").collect()\n", + "session.sql(f\"SELECT COUNT(*) FROM {fraud_data_path}\").show()" + ] + }, + { + "cell_type": "markdown", + "id": "3df454f3-590f-4547-8051-5eb053b18d2b", + "metadata": {}, + "source": [ + "Then, we go back to DBT and the pipelines again." + ] + }, + { + "cell_type": "markdown", + "id": "810025ca-7d9d-4e99-b9af-1643d0647c29", + "metadata": {}, + "source": [ + "## Generate new training dataset\n", + "We don't need to update feature views because the underlying tables are updated by DBT. We only need to generate dataset again with same timestamp and it will join with newer feature values." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "69299109-9cd9-433d-b38c-c980af589765", + "metadata": {}, + "outputs": [], + "source": [ + "new_training_data = fs.generate_dataset(\n", + " spine_df=spine_df,\n", + " features=[customers_fv],\n", + " materialized_table=\"customer_fraud_training_data\",\n", + " spine_timestamp_col=\"EVENT_TIMESTAMP\",\n", + " spine_label_cols = [],\n", + " save_mode=\"merge\",\n", + ")\n", + "new_training_data.df.show()" + ] + }, + { + "cell_type": "markdown", + "id": "57aee19d-b0b5-4d9c-bfc7-4714a57aab2d", + "metadata": {}, + "source": [ + "## Cleanup notebook" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c942f02a-10ad-4cae-b15f-69b530193ae7", + "metadata": {}, + "outputs": [], + "source": [ + "session.sql(f\"DROP DATABASE IF EXISTS {FS_DEMO_DB}\").collect()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/snowflake/ml/feature_store/notebooks/customer_demo/DBT_External_Feature_Pipeline_Demo.pdf b/snowflake/ml/feature_store/notebooks/customer_demo/DBT_External_Feature_Pipeline_Demo.pdf new file mode 100644 index 00000000..97666219 Binary files /dev/null and b/snowflake/ml/feature_store/notebooks/customer_demo/DBT_External_Feature_Pipeline_Demo.pdf differ diff --git a/snowflake/ml/feature_store/notebooks/customer_demo/fraud_transactions.csv.gz b/snowflake/ml/feature_store/notebooks/customer_demo/fraud_transactions.csv.gz new file mode 100644 index 00000000..be7e05cb Binary files /dev/null and b/snowflake/ml/feature_store/notebooks/customer_demo/fraud_transactions.csv.gz differ diff --git a/snowflake/ml/model/_deploy_client/image_builds/server_image_builder.py b/snowflake/ml/model/_deploy_client/image_builds/server_image_builder.py index 0713911d..dad0409c 100644 --- a/snowflake/ml/model/_deploy_client/image_builds/server_image_builder.py +++ b/snowflake/ml/model/_deploy_client/image_builds/server_image_builder.py @@ -2,6 +2,7 @@ import os import posixpath from string import Template +from typing import List import importlib_resources @@ -36,6 +37,7 @@ def __init__( session: snowpark.Session, artifact_stage_location: str, compute_pool: str, + external_access_integrations: List[str], ) -> None: """Initialization @@ -47,6 +49,7 @@ def __init__( artifact_stage_location: Spec file and future deployment related artifacts will be stored under {stage}/models/{model_id} compute_pool: The compute pool used to run docker image build workload. + external_access_integrations: EAIs for network connection. """ self.context_dir = context_dir self.image_repo = image_repo @@ -54,6 +57,7 @@ def __init__( self.session = session self.artifact_stage_location = artifact_stage_location self.compute_pool = compute_pool + self.external_access_integrations = external_access_integrations self.client = snowservice_client.SnowServiceClient(session) assert artifact_stage_location.startswith( @@ -202,4 +206,8 @@ def _construct_and_upload_job_spec(self, base_image: str, kaniko_shell_script_st def _launch_kaniko_job(self, spec_stage_location: str) -> None: logger.debug("Submitting job for building docker image with kaniko") - self.client.create_job(compute_pool=self.compute_pool, spec_stage_location=spec_stage_location) + self.client.create_job( + compute_pool=self.compute_pool, + spec_stage_location=spec_stage_location, + external_access_integrations=self.external_access_integrations, + ) diff --git a/snowflake/ml/model/_deploy_client/image_builds/server_image_builder_test.py b/snowflake/ml/model/_deploy_client/image_builds/server_image_builder_test.py index 60a4271c..e553903f 100644 --- a/snowflake/ml/model/_deploy_client/image_builds/server_image_builder_test.py +++ b/snowflake/ml/model/_deploy_client/image_builds/server_image_builder_test.py @@ -16,6 +16,7 @@ def setUp(self) -> None: self.compute_pool = "test_pool" self.context_tarball_stage_location = f"{self.artifact_stage_location}/context.tar.gz" self.full_image_name = "org-account.registry.snowflakecomputing.com/db/schema/repo/image:latest" + self.eais = ["eai_1"] @mock.patch( # type: ignore[misc] "snowflake.ml.model._deploy_client.image_builds.server_image_builder.snowpark.Session" @@ -33,6 +34,7 @@ def test_construct_and_upload_docker_entrypoint_script(self, m_session_class: mo session=m_session, artifact_stage_location=self.artifact_stage_location, compute_pool=self.compute_pool, + external_access_integrations=self.eais, ) shell_file_path = os.path.join(context_dir, constants.KANIKO_SHELL_SCRIPT_NAME) diff --git a/snowflake/ml/model/_deploy_client/snowservice/deploy.py b/snowflake/ml/model/_deploy_client/snowservice/deploy.py index 182c417b..fafa3e93 100644 --- a/snowflake/ml/model/_deploy_client/snowservice/deploy.py +++ b/snowflake/ml/model/_deploy_client/snowservice/deploy.py @@ -465,6 +465,7 @@ def _build_and_upload_image(self, context_dir: str, image_repo: str, full_image_ session=self.session, artifact_stage_location=self._model_artifact_stage_location, compute_pool=self.options.compute_pool, + external_access_integrations=self.options.external_access_integrations, ) else: image_builder = client_image_builder.ClientImageBuilder( @@ -587,6 +588,7 @@ def _deploy_workflow(self, image: str) -> str: spec_stage_location=spec_stage_location, min_instances=self.options.min_instances, max_instances=self.options.max_instances, + external_access_integrations=self.options.external_access_integrations, ) logger.info(f"Wait for service {self._service_name} to become ready...") client.block_until_resource_is_ready( diff --git a/snowflake/ml/model/_deploy_client/snowservice/deploy_options.py b/snowflake/ml/model/_deploy_client/snowservice/deploy_options.py index b96ea8c5..3d8ae330 100644 --- a/snowflake/ml/model/_deploy_client/snowservice/deploy_options.py +++ b/snowflake/ml/model/_deploy_client/snowservice/deploy_options.py @@ -1,6 +1,6 @@ import inspect import logging -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from snowflake.ml._internal.exceptions import ( error_codes, @@ -16,6 +16,7 @@ def __init__( self, compute_pool: str, *, + external_access_integrations: List[str], image_repo: Optional[str] = None, min_instances: Optional[int] = 1, max_instances: Optional[int] = 1, @@ -34,7 +35,15 @@ def __init__( Args: compute_pool: SnowService compute pool name. Please refer to official doc for how to create a - compute pool: https://docs.snowflake.com/LIMITEDACCESS/snowpark-containers/reference/compute-pool + compute pool: + https://docs.snowflake.com/en/developer-guide/snowpark-container-services/working-with-compute-pool + external_access_integrations: External Access Integrations name used to build image and deploy the model. + Please refer to the doc for how to create an External Access Integrations: https://docs.snowflake.com/ + developer-guide/snowpark-container-services/additional-considerations-services-jobs + #configuring-network-capabilities . + To make sure your image could be built, access to the following endpoint must be allowed. + docker.com:80, docker.com:443, anaconda.com:80, anaconda.com:443, anaconda.org:80, anaconda.org:443, + pypi.org:80, pypi.org:443 image_repo: SnowService image repo path. e.g. "///". Default to auto inferred based on session information. min_instances: Minimum number of service replicas. Default to 1. @@ -70,6 +79,7 @@ def __init__( self.model_in_image = model_in_image self.debug_mode = debug_mode self.enable_ingress = enable_ingress + self.external_access_integrations = external_access_integrations if self.num_workers is None and self.use_gpu: logger.info("num_workers has been defaulted to 1 when using GPU.") diff --git a/snowflake/ml/model/_deploy_client/snowservice/deploy_test.py b/snowflake/ml/model/_deploy_client/snowservice/deploy_test.py index 20c5132c..f5f5bfb1 100644 --- a/snowflake/ml/model/_deploy_client/snowservice/deploy_test.py +++ b/snowflake/ml/model/_deploy_client/snowservice/deploy_test.py @@ -29,6 +29,7 @@ def setUp(self) -> None: self.options: Dict[str, Any] = { "compute_pool": "mock_compute_pool", "image_repo": "mock_image_repo", + "external_access_integrations": ["eai_1"], } self.m_session.add_mock_sql( @@ -431,6 +432,7 @@ def setUp(self, m_model_meta_class: mock.MagicMock) -> None: "stage": "mock_stage", "compute_pool": "mock_compute_pool", "image_repo": "mock_image_repo", + "external_access_integrations": ["eai_a", "eai_b"], } self.deployment = SnowServiceDeployment( diff --git a/snowflake/ml/model/_deploy_client/utils/snowservice_client.py b/snowflake/ml/model/_deploy_client/utils/snowservice_client.py index 5205ef95..5f4f2275 100644 --- a/snowflake/ml/model/_deploy_client/utils/snowservice_client.py +++ b/snowflake/ml/model/_deploy_client/utils/snowservice_client.py @@ -2,7 +2,7 @@ import logging import textwrap import time -from typing import Optional +from typing import List, Optional from snowflake.ml._internal.exceptions import ( error_codes, @@ -36,6 +36,7 @@ def create_or_replace_service( service_name: str, compute_pool: str, spec_stage_location: str, + external_access_integrations: List[str], *, min_instances: Optional[int] = 1, max_instances: Optional[int] = 1, @@ -48,6 +49,7 @@ def create_or_replace_service( service_name: Name of the service. min_instances: Minimum number of service replicas. max_instances: Maximum number of service replicas. + external_access_integrations: EAIs for network connection. compute_pool: Name of the compute pool. spec_stage_location: Stage path for the service spec. """ @@ -61,13 +63,14 @@ def create_or_replace_service( SPEC = '{path}' MIN_INSTANCES={min_instances} MAX_INSTANCES={max_instances} + EXTERNAL_ACCESS_INTEGRATIONS = ({', '.join(external_access_integrations)}) """ ) logger.info(f"Creating service {service_name}") logger.debug(f"Create service with SQL: \n {sql}") self.session.sql(sql).collect() - def create_job(self, compute_pool: str, spec_stage_location: str) -> None: + def create_job(self, compute_pool: str, spec_stage_location: str, external_access_integrations: List[str]) -> None: """Execute the job creation SQL command. Note that the job creation is synchronous, hence we execute it in a async way so that we can query the log in the meantime. @@ -76,7 +79,7 @@ def create_job(self, compute_pool: str, spec_stage_location: str) -> None: Args: compute_pool: name of the compute pool spec_stage_location: path to the stage location where the spec is located at. - + external_access_integrations: EAIs for network connection. """ stage, path = uri.get_stage_and_path(spec_stage_location) sql = textwrap.dedent( @@ -85,6 +88,7 @@ def create_job(self, compute_pool: str, spec_stage_location: str) -> None: IN COMPUTE POOL {compute_pool} FROM {stage} SPEC = '{path}' + EXTERNAL_ACCESS_INTEGRATIONS = ({', '.join(external_access_integrations)}) """ ) logger.debug(f"Create job with SQL: \n {sql}") diff --git a/snowflake/ml/model/_deploy_client/utils/snowservice_client_test.py b/snowflake/ml/model/_deploy_client/utils/snowservice_client_test.py index e394c5c6..b2477e30 100644 --- a/snowflake/ml/model/_deploy_client/utils/snowservice_client_test.py +++ b/snowflake/ml/model/_deploy_client/utils/snowservice_client_test.py @@ -38,6 +38,7 @@ def test_create_or_replace_service(self) -> None: SPEC = '{m_stage_path}' MIN_INSTANCES={m_min_instances} MAX_INSTANCES={m_max_instances} + EXTERNAL_ACCESS_INTEGRATIONS=(eai_a, eai_b) """, result=mock_data_frame.MockDataFrame(collect_result=[]), ) @@ -48,6 +49,7 @@ def test_create_or_replace_service(self) -> None: max_instances=m_max_instances, compute_pool=m_compute_pool, spec_stage_location=m_spec_storgae_location, + external_access_integrations=["eai_a", "eai_b"], ) def _add_mock_cursor_to_session(self, *, expected_job_id: Optional[str] = None) -> None: @@ -71,6 +73,7 @@ def test_create_job_successfully(self) -> None: self.client.create_job( compute_pool=m_compute_pool, spec_stage_location=m_spec_storgae_location, + external_access_integrations=["eai_a", "eai_b"], ) def test_create_job_failed(self) -> None: @@ -96,6 +99,7 @@ def test_create_job_failed(self) -> None: self.client.create_job( compute_pool=m_compute_pool, spec_stage_location=m_spec_storgae_location, + external_access_integrations=["eai_a", "eai_b"], ) self.assertTrue(cm.output, test_log) diff --git a/snowflake/ml/model/model_signature.py b/snowflake/ml/model/model_signature.py index 4309ecce..1a2fc5fd 100644 --- a/snowflake/ml/model/model_signature.py +++ b/snowflake/ml/model/model_signature.py @@ -1,6 +1,18 @@ import enum +import json import warnings -from typing import Any, Dict, List, Literal, Optional, Sequence, Tuple, Type +from typing import ( + Any, + Dict, + List, + Literal, + Optional, + Sequence, + Tuple, + Type, + Union, + cast, +) import numpy as np import pandas as pd @@ -337,6 +349,31 @@ def get_sql_identifier_from_feature(self, ft_name: str) -> sql_identifier.SqlIde assert_never(self) +def _get_dataframe_values_range( + df: snowflake.snowpark.DataFrame, +) -> Dict[str, Union[Tuple[int, int], Tuple[float, float]]]: + columns = [ + F.array_construct(F.min(field.name), F.max(field.name)).as_(field.name) + for field in df.schema.fields + if isinstance(field.datatype, spt._NumericType) + ] + if not columns: + return {} + res = df.select(columns).collect() + if len(res) != 1: + raise snowml_exceptions.SnowflakeMLException( + error_code=error_codes.INTERNAL_SNOWML_ERROR, + original_exception=ValueError(f"Unable to get the value range of fields {df.columns}"), + ) + return cast( + Dict[str, Union[Tuple[int, int], Tuple[float, float]]], + { + sql_identifier.SqlIdentifier(k, case_sensitive=True).identifier(): (json.loads(v)[0], json.loads(v)[1]) + for k, v in res[0].as_dict().items() + }, + ) + + def _validate_snowpark_data( data: snowflake.snowpark.DataFrame, features: Sequence[core.BaseFeatureSpec] ) -> SnowparkIdentifierRule: @@ -361,6 +398,7 @@ def _validate_snowpark_data( SnowparkIdentifierRule.NORMALIZED: [], } schema = data.schema + values_range = _get_dataframe_values_range(data) for identifier_rule in errors.keys(): for feature in features: try: @@ -401,8 +439,11 @@ def _validate_snowpark_data( + f"Feature is a scalar feature, while {field.name} is not." ), ) + continue try: - _validate_snowpark_type_feature(data, field, ft_type, feature.name) + _validate_snowpark_type_feature( + data, field, ft_type, feature.name, values_range.get(field.name, None) + ) except snowml_exceptions.SnowflakeMLException as e: errors[identifier_rule].append(e.original_exception) break @@ -433,17 +474,12 @@ def _validate_snowpark_data( def _validate_snowpark_type_feature( - df: snowflake.snowpark.DataFrame, field: spt.StructField, ft_type: DataType, ft_name: str + df: snowflake.snowpark.DataFrame, + field: spt.StructField, + ft_type: DataType, + ft_name: str, + value_range: Optional[Union[Tuple[int, int], Tuple[float, float]]], ) -> None: - def get_value_range(field_name: str) -> Tuple[int, int]: - res = df.select(F.min(field_name).as_("MIN"), F.max(field_name).as_("MAX")).collect() - if len(res) != 1: - raise snowml_exceptions.SnowflakeMLException( - error_code=error_codes.INTERNAL_SNOWML_ERROR, - original_exception=ValueError(f"Unable to get the value range of field {field_name}"), - ) - return res[0].MIN, res[0].MAX - field_data_type = field.datatype col_name = identifier.get_unescaped_names(field.name) @@ -465,16 +501,27 @@ def get_value_range(field_name: str) -> Tuple[int, int]: error_code=error_codes.INVALID_DATA, original_exception=ValueError( f"Data Validation Error in feature {ft_name}: " - + f"Feature type {ft_type} is not met by column {col_name}." + f"Feature type {ft_type} is not met by column {col_name} " + f"because of its original type {field_data_type}" + ), + ) + if value_range is None: + raise snowml_exceptions.SnowflakeMLException( + error_code=error_codes.INVALID_DATA, + original_exception=ValueError( + f"Data Validation Error in feature {ft_name}: " + f"Feature type {ft_type} is not met by column {col_name} " + f"because of its original type {field_data_type} is non-Numeric." ), ) - min_v, max_v = get_value_range(field.name) + min_v, max_v = value_range if max_v > np.iinfo(ft_type._numpy_type).max or min_v < np.iinfo(ft_type._numpy_type).min: raise snowml_exceptions.SnowflakeMLException( error_code=error_codes.INVALID_DATA, original_exception=ValueError( f"Data Validation Error in feature {ft_name}: " - + f"Feature type {ft_type} is not met by column {col_name}." + f"Feature type {ft_type} is not met by column {col_name} " + f"because it overflows with min" ), ) elif ft_type in [core.DataType.FLOAT, core.DataType.DOUBLE]: @@ -494,7 +541,16 @@ def get_value_range(field_name: str) -> Tuple[int, int]: + f"Feature type {ft_type} is not met by column {col_name}." ), ) - min_v, max_v = get_value_range(field.name) + if value_range is None: + raise snowml_exceptions.SnowflakeMLException( + error_code=error_codes.INVALID_DATA, + original_exception=ValueError( + f"Data Validation Error in feature {ft_name}: " + f"Feature type {ft_type} is not met by column {col_name} " + f"because of its original type {field_data_type} is non-Numeric." + ), + ) + min_v, max_v = value_range if ( max_v > np.finfo(ft_type._numpy_type).max # type: ignore[arg-type] or min_v < np.finfo(ft_type._numpy_type).min # type: ignore[arg-type] diff --git a/snowflake/ml/model/type_hints.py b/snowflake/ml/model/type_hints.py index 8250e435..74dc75fe 100644 --- a/snowflake/ml/model/type_hints.py +++ b/snowflake/ml/model/type_hints.py @@ -3,6 +3,7 @@ TYPE_CHECKING, Any, Dict, + List, Literal, Optional, Sequence, @@ -173,6 +174,13 @@ class SnowparkContainerServiceDeployOptions(DeployOptions): debug_mode: When set to True, deployment artifacts will be persisted in a local temp directory. enable_ingress: When set to True, will expose HTTP endpoint for access to the predict method of the created service. + external_access_integrations: External Access Integrations name used to build image and deploy the model. + Please refer to the doc for how to create an External Access Integrations: https://docs.snowflake.com/ + developer-guide/snowpark-container-services/additional-considerations-services-jobs + #configuring-network-capabilities . + To make sure your image could be built, access to the following endpoint must be allowed. + docker.com:80, docker.com:443, anaconda.com:80, anaconda.com:443, anaconda.org:80, anaconda.org:443, + pypi.org:80, pypi.org:443 """ compute_pool: str @@ -187,6 +195,7 @@ class SnowparkContainerServiceDeployOptions(DeployOptions): model_in_image: NotRequired[bool] debug_mode: NotRequired[bool] enable_ingress: NotRequired[bool] + external_access_integrations: List[str] class ModelMethodSaveOptions(TypedDict): diff --git a/snowflake/ml/modeling/_internal/BUILD.bazel b/snowflake/ml/modeling/_internal/BUILD.bazel index 1ab6e5c2..4d9832e8 100644 --- a/snowflake/ml/modeling/_internal/BUILD.bazel +++ b/snowflake/ml/modeling/_internal/BUILD.bazel @@ -2,21 +2,6 @@ load("//bazel:py_rules.bzl", "py_library", "py_test") package(default_visibility = ["//visibility:public"]) -py_library( - name = "snowpark_handlers", - srcs = ["snowpark_handlers.py"], - deps = [ - "//snowflake/ml/_internal:env_utils", - "//snowflake/ml/_internal:telemetry", - "//snowflake/ml/_internal/exceptions", - "//snowflake/ml/_internal/exceptions:modeling_error_messages", - "//snowflake/ml/_internal/utils:identifier", - "//snowflake/ml/_internal/utils:query_result_checker", - "//snowflake/ml/_internal/utils:snowpark_dataframe_utils", - "//snowflake/ml/_internal/utils:temp_file_utils", - ], -) - py_library( name = "estimator_protocols", srcs = ["estimator_protocols.py"], @@ -60,8 +45,8 @@ py_test( name = "model_specifications_test", srcs = ["model_specifications_test.py"], deps = [ - ":distributed_hpo_trainer", ":model_specifications", + "//snowflake/ml/modeling/_internal/snowpark_implementations:distributed_hpo_trainer", "//snowflake/ml/utils:connection_params", ], ) @@ -72,75 +57,16 @@ py_library( deps = [], ) -py_library( - name = "pandas_trainer", - srcs = ["pandas_trainer.py"], - deps = [ - ":model_trainer", - ], -) - -py_library( - name = "snowpark_trainer", - srcs = ["snowpark_trainer.py"], - deps = [ - ":model_specifications", - ":model_trainer", - "//snowflake/ml/_internal:env_utils", - "//snowflake/ml/_internal:telemetry", - "//snowflake/ml/_internal/exceptions", - "//snowflake/ml/_internal/exceptions:modeling_error_messages", - "//snowflake/ml/_internal/utils:identifier", - "//snowflake/ml/_internal/utils:pkg_version_utils", - "//snowflake/ml/_internal/utils:query_result_checker", - "//snowflake/ml/_internal/utils:snowpark_dataframe_utils", - "//snowflake/ml/_internal/utils:temp_file_utils", - ], -) - -py_library( - name = "distributed_hpo_trainer", - srcs = ["distributed_hpo_trainer.py"], - deps = [ - ":model_specifications", - ":snowpark_trainer", - "//snowflake/ml/_internal:env_utils", - "//snowflake/ml/_internal:telemetry", - "//snowflake/ml/_internal/exceptions", - "//snowflake/ml/_internal/exceptions:modeling_error_messages", - "//snowflake/ml/_internal/utils:identifier", - "//snowflake/ml/_internal/utils:pkg_version_utils", - "//snowflake/ml/_internal/utils:snowpark_dataframe_utils", - "//snowflake/ml/_internal/utils:temp_file_utils", - ], -) - -py_library( - name = "xgboost_external_memory_trainer", - srcs = ["xgboost_external_memory_trainer.py"], - deps = [ - ":model_specifications", - ":snowpark_trainer", - "//snowflake/ml/_internal:telemetry", - "//snowflake/ml/_internal/exceptions", - "//snowflake/ml/_internal/exceptions:modeling_error_messages", - "//snowflake/ml/_internal/utils:identifier", - "//snowflake/ml/_internal/utils:pkg_version_utils", - "//snowflake/ml/_internal/utils:snowpark_dataframe_utils", - "//snowflake/ml/_internal/utils:temp_file_utils", - ], -) - py_library( name = "model_trainer_builder", srcs = ["model_trainer_builder.py"], deps = [ - ":distributed_hpo_trainer", ":estimator_utils", ":model_trainer", - ":pandas_trainer", - ":snowpark_trainer", - ":xgboost_external_memory_trainer", + "//snowflake/ml/modeling/_internal/local_implementations:pandas_trainer", + "//snowflake/ml/modeling/_internal/snowpark_implementations:distributed_hpo_trainer", + "//snowflake/ml/modeling/_internal/snowpark_implementations:snowpark_trainer", + "//snowflake/ml/modeling/_internal/snowpark_implementations:xgboost_external_memory_trainer", ], ) @@ -148,21 +74,12 @@ py_test( name = "model_trainer_builder_test", srcs = ["model_trainer_builder_test.py"], deps = [ - ":distributed_hpo_trainer", ":model_trainer", ":model_trainer_builder", - ":pandas_trainer", - ":snowpark_trainer", - ":xgboost_external_memory_trainer", + "//snowflake/ml/modeling/_internal/local_implementations:pandas_trainer", + "//snowflake/ml/modeling/_internal/snowpark_implementations:distributed_hpo_trainer", + "//snowflake/ml/modeling/_internal/snowpark_implementations:snowpark_trainer", + "//snowflake/ml/modeling/_internal/snowpark_implementations:xgboost_external_memory_trainer", "//snowflake/ml/utils:connection_params", ], ) - -py_test( - name = "xgboost_external_memory_trainer_test", - srcs = ["xgboost_external_memory_trainer_test.py"], - deps = [ - ":xgboost_external_memory_trainer", - "//snowflake/ml/_internal/utils:temp_file_utils", - ], -) diff --git a/snowflake/ml/modeling/_internal/estimator_protocols.py b/snowflake/ml/modeling/_internal/estimator_protocols.py index c507b2a7..2b71e1ec 100644 --- a/snowflake/ml/modeling/_internal/estimator_protocols.py +++ b/snowflake/ml/modeling/_internal/estimator_protocols.py @@ -6,47 +6,7 @@ # TODO: Add more specific entities to type hint estimators instead of using `object`. -class FitPredictHandlers(Protocol): - def batch_inference( - self, - dataset: DataFrame, - session: Session, - estimator: object, - dependencies: List[str], - inference_method: str, - input_cols: List[str], - pass_through_columns: List[str], - expected_output_cols_list: List[str], - expected_output_cols_type: str = "", - ) -> DataFrame: - raise NotImplementedError - - def score_pandas( - self, - dataset: pd.DataFrame, - estimator: object, - input_cols: List[str], - label_cols: List[str], - sample_weight_col: Optional[str], - ) -> float: - raise NotImplementedError - - def score_snowpark( - self, - dataset: DataFrame, - session: Session, - estimator: object, - dependencies: List[str], - score_sproc_imports: List[str], - input_cols: List[str], - label_cols: List[str], - sample_weight_col: Optional[str], - ) -> float: - raise NotImplementedError - - -# TODO: Add more specific entities to type hint estimators instead of using `object`. -class CVHandlers(Protocol): +class TransformerHandlers(Protocol): def batch_inference( self, dataset: DataFrame, diff --git a/snowflake/ml/modeling/_internal/estimator_protocols_test.py b/snowflake/ml/modeling/_internal/estimator_protocols_test.py index ac999acf..592d4962 100644 --- a/snowflake/ml/modeling/_internal/estimator_protocols_test.py +++ b/snowflake/ml/modeling/_internal/estimator_protocols_test.py @@ -2,18 +2,15 @@ from absl.testing import absltest, parameterized -from snowflake.ml.modeling._internal.estimator_protocols import ( - CVHandlers, - FitPredictHandlers, -) +from snowflake.ml.modeling._internal.estimator_protocols import TransformerHandlers class EstimatorProtocolsTest(parameterized.TestCase): def test_fit_predict_handlers(self) -> None: - self.assertIsInstance(FitPredictHandlers, Protocol) + self.assertIsInstance(TransformerHandlers, Protocol) def test_cv_handlers(self) -> None: - self.assertIsInstance(CVHandlers, Protocol) + self.assertIsInstance(TransformerHandlers, Protocol) if __name__ == "__main__": diff --git a/snowflake/ml/modeling/_internal/local_implementations/BUILD.bazel b/snowflake/ml/modeling/_internal/local_implementations/BUILD.bazel new file mode 100644 index 00000000..4295fa33 --- /dev/null +++ b/snowflake/ml/modeling/_internal/local_implementations/BUILD.bazel @@ -0,0 +1,11 @@ +load("//bazel:py_rules.bzl", "py_library") + +package(default_visibility = ["//visibility:public"]) + +py_library( + name = "pandas_trainer", + srcs = ["pandas_trainer.py"], + deps = [ + "//snowflake/ml/modeling/_internal:model_trainer", + ], +) diff --git a/snowflake/ml/modeling/_internal/pandas_trainer.py b/snowflake/ml/modeling/_internal/local_implementations/pandas_trainer.py similarity index 100% rename from snowflake/ml/modeling/_internal/pandas_trainer.py rename to snowflake/ml/modeling/_internal/local_implementations/pandas_trainer.py diff --git a/snowflake/ml/modeling/_internal/model_specifications_test.py b/snowflake/ml/modeling/_internal/model_specifications_test.py index 785f922d..8e0a8066 100644 --- a/snowflake/ml/modeling/_internal/model_specifications_test.py +++ b/snowflake/ml/modeling/_internal/model_specifications_test.py @@ -11,10 +11,12 @@ from sklearn.model_selection import GridSearchCV from xgboost import XGBRegressor -from snowflake.ml.modeling._internal.distributed_hpo_trainer import construct_cv_results from snowflake.ml.modeling._internal.model_specifications import ( ModelSpecificationsBuilder, ) +from snowflake.ml.modeling._internal.snowpark_implementations.distributed_hpo_trainer import ( + construct_cv_results, +) from snowflake.snowpark import Row each_cv_result_basic_sample = [ diff --git a/snowflake/ml/modeling/_internal/model_trainer_builder.py b/snowflake/ml/modeling/_internal/model_trainer_builder.py index c4947fab..6918ed2f 100644 --- a/snowflake/ml/modeling/_internal/model_trainer_builder.py +++ b/snowflake/ml/modeling/_internal/model_trainer_builder.py @@ -4,17 +4,21 @@ from sklearn import model_selection from snowflake.ml._internal.exceptions import error_codes, exceptions -from snowflake.ml.modeling._internal.distributed_hpo_trainer import ( - DistributedHPOTrainer, -) from snowflake.ml.modeling._internal.estimator_utils import ( get_module_name, is_single_node, ) +from snowflake.ml.modeling._internal.local_implementations.pandas_trainer import ( + PandasModelTrainer, +) from snowflake.ml.modeling._internal.model_trainer import ModelTrainer -from snowflake.ml.modeling._internal.pandas_trainer import PandasModelTrainer -from snowflake.ml.modeling._internal.snowpark_trainer import SnowparkModelTrainer -from snowflake.ml.modeling._internal.xgboost_external_memory_trainer import ( +from snowflake.ml.modeling._internal.snowpark_implementations.distributed_hpo_trainer import ( + DistributedHPOTrainer, +) +from snowflake.ml.modeling._internal.snowpark_implementations.snowpark_trainer import ( + SnowparkModelTrainer, +) +from snowflake.ml.modeling._internal.snowpark_implementations.xgboost_external_memory_trainer import ( XGBoostExternalMemoryTrainer, ) from snowflake.snowpark import DataFrame, Session @@ -76,9 +80,9 @@ def build( batch_size: int = -1, ) -> ModelTrainer: """ - Builder method that creates an approproiate ModelTrainer instance based on the given params. + Builder method that creates an appropriate ModelTrainer instance based on the given params. """ - assert input_cols is not None # Make MyPy happpy + assert input_cols is not None # Make MyPy happy if isinstance(dataset, pd.DataFrame): return PandasModelTrainer( estimator=estimator, @@ -100,7 +104,7 @@ def build( "subproject": subproject, } - assert dataset._session is not None # Make MyPy happpy + assert dataset._session is not None # Make MyPy happy if isinstance(estimator, model_selection.GridSearchCV) or isinstance( estimator, model_selection.RandomizedSearchCV ): diff --git a/snowflake/ml/modeling/_internal/model_trainer_builder_test.py b/snowflake/ml/modeling/_internal/model_trainer_builder_test.py index 8fbb37e5..0d653ec2 100644 --- a/snowflake/ml/modeling/_internal/model_trainer_builder_test.py +++ b/snowflake/ml/modeling/_internal/model_trainer_builder_test.py @@ -8,12 +8,14 @@ from sklearn.model_selection import GridSearchCV from xgboost import XGBRegressor -from snowflake.ml.modeling._internal.distributed_hpo_trainer import ( +from snowflake.ml.modeling._internal.model_trainer_builder import ModelTrainerBuilder +from snowflake.ml.modeling._internal.snowpark_implementations.distributed_hpo_trainer import ( DistributedHPOTrainer, ) -from snowflake.ml.modeling._internal.model_trainer_builder import ModelTrainerBuilder -from snowflake.ml.modeling._internal.snowpark_trainer import SnowparkModelTrainer -from snowflake.ml.modeling._internal.xgboost_external_memory_trainer import ( +from snowflake.ml.modeling._internal.snowpark_implementations.snowpark_trainer import ( + SnowparkModelTrainer, +) +from snowflake.ml.modeling._internal.snowpark_implementations.xgboost_external_memory_trainer import ( XGBoostExternalMemoryTrainer, ) from snowflake.ml.utils.connection_params import SnowflakeLoginOptions diff --git a/snowflake/ml/modeling/_internal/snowpark_implementations/BUILD.bazel b/snowflake/ml/modeling/_internal/snowpark_implementations/BUILD.bazel new file mode 100644 index 00000000..2d378c08 --- /dev/null +++ b/snowflake/ml/modeling/_internal/snowpark_implementations/BUILD.bazel @@ -0,0 +1,78 @@ +load("//bazel:py_rules.bzl", "py_library", "py_test") + +package(default_visibility = ["//visibility:public"]) + +py_library( + name = "snowpark_handlers", + srcs = ["snowpark_handlers.py"], + deps = [ + "//snowflake/ml/_internal:env_utils", + "//snowflake/ml/_internal:telemetry", + "//snowflake/ml/_internal/exceptions", + "//snowflake/ml/_internal/exceptions:modeling_error_messages", + "//snowflake/ml/_internal/utils:identifier", + "//snowflake/ml/_internal/utils:query_result_checker", + "//snowflake/ml/_internal/utils:snowpark_dataframe_utils", + "//snowflake/ml/_internal/utils:temp_file_utils", + ], +) + +py_library( + name = "snowpark_trainer", + srcs = ["snowpark_trainer.py"], + deps = [ + "//snowflake/ml/_internal:env_utils", + "//snowflake/ml/_internal:telemetry", + "//snowflake/ml/_internal/exceptions", + "//snowflake/ml/_internal/exceptions:modeling_error_messages", + "//snowflake/ml/_internal/utils:identifier", + "//snowflake/ml/_internal/utils:pkg_version_utils", + "//snowflake/ml/_internal/utils:query_result_checker", + "//snowflake/ml/_internal/utils:snowpark_dataframe_utils", + "//snowflake/ml/_internal/utils:temp_file_utils", + "//snowflake/ml/modeling/_internal:model_specifications", + "//snowflake/ml/modeling/_internal:model_trainer", + ], +) + +py_library( + name = "distributed_hpo_trainer", + srcs = ["distributed_hpo_trainer.py"], + deps = [ + ":snowpark_trainer", + "//snowflake/ml/_internal:env_utils", + "//snowflake/ml/_internal:telemetry", + "//snowflake/ml/_internal/exceptions", + "//snowflake/ml/_internal/exceptions:modeling_error_messages", + "//snowflake/ml/_internal/utils:identifier", + "//snowflake/ml/_internal/utils:pkg_version_utils", + "//snowflake/ml/_internal/utils:snowpark_dataframe_utils", + "//snowflake/ml/_internal/utils:temp_file_utils", + "//snowflake/ml/modeling/_internal:model_specifications", + ], +) + +py_library( + name = "xgboost_external_memory_trainer", + srcs = ["xgboost_external_memory_trainer.py"], + deps = [ + ":snowpark_trainer", + "//snowflake/ml/_internal:telemetry", + "//snowflake/ml/_internal/exceptions", + "//snowflake/ml/_internal/exceptions:modeling_error_messages", + "//snowflake/ml/_internal/utils:identifier", + "//snowflake/ml/_internal/utils:pkg_version_utils", + "//snowflake/ml/_internal/utils:snowpark_dataframe_utils", + "//snowflake/ml/_internal/utils:temp_file_utils", + "//snowflake/ml/modeling/_internal:model_specifications", + ], +) + +py_test( + name = "xgboost_external_memory_trainer_test", + srcs = ["xgboost_external_memory_trainer_test.py"], + deps = [ + ":xgboost_external_memory_trainer", + "//snowflake/ml/_internal/utils:temp_file_utils", + ], +) diff --git a/snowflake/ml/modeling/_internal/distributed_hpo_trainer.py b/snowflake/ml/modeling/_internal/snowpark_implementations/distributed_hpo_trainer.py similarity index 99% rename from snowflake/ml/modeling/_internal/distributed_hpo_trainer.py rename to snowflake/ml/modeling/_internal/snowpark_implementations/distributed_hpo_trainer.py index 0366c4e2..aff9ec13 100644 --- a/snowflake/ml/modeling/_internal/distributed_hpo_trainer.py +++ b/snowflake/ml/modeling/_internal/snowpark_implementations/distributed_hpo_trainer.py @@ -24,7 +24,9 @@ from snowflake.ml.modeling._internal.model_specifications import ( ModelSpecificationsBuilder, ) -from snowflake.ml.modeling._internal.snowpark_trainer import SnowparkModelTrainer +from snowflake.ml.modeling._internal.snowpark_implementations.snowpark_trainer import ( + SnowparkModelTrainer, +) from snowflake.snowpark import DataFrame, Session, functions as F from snowflake.snowpark._internal.utils import ( TempObjectType, diff --git a/snowflake/ml/modeling/_internal/snowpark_handlers.py b/snowflake/ml/modeling/_internal/snowpark_implementations/snowpark_handlers.py similarity index 100% rename from snowflake/ml/modeling/_internal/snowpark_handlers.py rename to snowflake/ml/modeling/_internal/snowpark_implementations/snowpark_handlers.py diff --git a/snowflake/ml/modeling/_internal/snowpark_trainer.py b/snowflake/ml/modeling/_internal/snowpark_implementations/snowpark_trainer.py similarity index 100% rename from snowflake/ml/modeling/_internal/snowpark_trainer.py rename to snowflake/ml/modeling/_internal/snowpark_implementations/snowpark_trainer.py diff --git a/snowflake/ml/modeling/_internal/xgboost_external_memory_trainer.py b/snowflake/ml/modeling/_internal/snowpark_implementations/xgboost_external_memory_trainer.py similarity index 99% rename from snowflake/ml/modeling/_internal/xgboost_external_memory_trainer.py rename to snowflake/ml/modeling/_internal/snowpark_implementations/xgboost_external_memory_trainer.py index 5f7e5942..61525aaf 100644 --- a/snowflake/ml/modeling/_internal/xgboost_external_memory_trainer.py +++ b/snowflake/ml/modeling/_internal/snowpark_implementations/xgboost_external_memory_trainer.py @@ -23,7 +23,9 @@ ModelSpecifications, ModelSpecificationsBuilder, ) -from snowflake.ml.modeling._internal.snowpark_trainer import SnowparkModelTrainer +from snowflake.ml.modeling._internal.snowpark_implementations.snowpark_trainer import ( + SnowparkModelTrainer, +) from snowflake.snowpark import ( DataFrame, Session, diff --git a/snowflake/ml/modeling/_internal/xgboost_external_memory_trainer_test.py b/snowflake/ml/modeling/_internal/snowpark_implementations/xgboost_external_memory_trainer_test.py similarity index 96% rename from snowflake/ml/modeling/_internal/xgboost_external_memory_trainer_test.py rename to snowflake/ml/modeling/_internal/snowpark_implementations/xgboost_external_memory_trainer_test.py index 8a663c91..f16cc0b0 100644 --- a/snowflake/ml/modeling/_internal/xgboost_external_memory_trainer_test.py +++ b/snowflake/ml/modeling/_internal/snowpark_implementations/xgboost_external_memory_trainer_test.py @@ -9,7 +9,7 @@ cleanup_temp_files, get_temp_file_path, ) -from snowflake.ml.modeling._internal.xgboost_external_memory_trainer import ( +from snowflake.ml.modeling._internal.snowpark_implementations.xgboost_external_memory_trainer import ( get_data_iterator, ) diff --git a/snowflake/ml/modeling/framework/BUILD.bazel b/snowflake/ml/modeling/framework/BUILD.bazel index e366e22c..272c0531 100644 --- a/snowflake/ml/modeling/framework/BUILD.bazel +++ b/snowflake/ml/modeling/framework/BUILD.bazel @@ -17,6 +17,6 @@ py_library( "//snowflake/ml/_internal/utils:identifier", "//snowflake/ml/_internal/utils:parallelize", "//snowflake/ml/modeling/_internal:estimator_protocols", - "//snowflake/ml/modeling/_internal:snowpark_handlers", + "//snowflake/ml/modeling/_internal/snowpark_implementations:snowpark_handlers", ], ) diff --git a/snowflake/ml/modeling/model_selection/BUILD.bazel b/snowflake/ml/modeling/model_selection/BUILD.bazel index 23e28418..7d937456 100644 --- a/snowflake/ml/modeling/model_selection/BUILD.bazel +++ b/snowflake/ml/modeling/model_selection/BUILD.bazel @@ -29,7 +29,7 @@ py_library( "//snowflake/ml/_internal:telemetry", "//snowflake/ml/_internal/exceptions", "//snowflake/ml/modeling/_internal:model_trainer_builder", - "//snowflake/ml/modeling/_internal:snowpark_handlers", + "//snowflake/ml/modeling/_internal/snowpark_implementations:snowpark_handlers", ], ) @@ -41,6 +41,6 @@ py_library( "//snowflake/ml/_internal:telemetry", "//snowflake/ml/_internal/exceptions", "//snowflake/ml/modeling/_internal:model_trainer_builder", - "//snowflake/ml/modeling/_internal:snowpark_handlers", + "//snowflake/ml/modeling/_internal/snowpark_implementations:snowpark_handlers", ], ) diff --git a/snowflake/ml/modeling/model_selection/grid_search_cv.py b/snowflake/ml/modeling/model_selection/grid_search_cv.py index dd1392db..87cc794c 100644 --- a/snowflake/ml/modeling/model_selection/grid_search_cv.py +++ b/snowflake/ml/modeling/model_selection/grid_search_cv.py @@ -3,7 +3,6 @@ # Do not modify the auto-generated code(except automatic reformatting by precommit hooks). # from typing import Any, Dict, Iterable, List, Optional, Set, Union -from uuid import uuid4 import cloudpickle as cp import numpy as np @@ -22,7 +21,7 @@ ModelSignature, _infer_signature, ) -from snowflake.ml.modeling._internal.estimator_protocols import CVHandlers +from snowflake.ml.modeling._internal.estimator_protocols import TransformerHandlers from snowflake.ml.modeling._internal.estimator_utils import ( gather_dependencies, original_estimator_has_callable, @@ -30,7 +29,7 @@ validate_sklearn_args, ) from snowflake.ml.modeling._internal.model_trainer_builder import ModelTrainerBuilder -from snowflake.ml.modeling._internal.snowpark_handlers import ( +from snowflake.ml.modeling._internal.snowpark_implementations.snowpark_handlers import ( SnowparkHandlers as HandlersImpl, ) from snowflake.ml.modeling.framework.base import BaseTransformer @@ -266,20 +265,11 @@ def __init__( # type: ignore[no-untyped-def] self.set_drop_input_cols(drop_input_cols) self.set_sample_weight_col(sample_weight_col) self.set_passthrough_cols(passthrough_cols) - self._handlers: CVHandlers = HandlersImpl( + self._handlers: TransformerHandlers = HandlersImpl( class_name=self.__class__.__name__, subproject=_SUBPROJECT, ) - def _get_rand_id(self) -> str: - """ - Generate random id to be used in sproc and stage names. - - Returns: - Random id string usable in sproc, table, and stage names. - """ - return str(uuid4()).replace("-", "_").upper() - def _get_active_columns(self) -> List[str]: """ "Get the list of columns that are relevant to the transformer.""" selected_cols = ( diff --git a/snowflake/ml/modeling/model_selection/randomized_search_cv.py b/snowflake/ml/modeling/model_selection/randomized_search_cv.py index edf912ad..23c3dd54 100644 --- a/snowflake/ml/modeling/model_selection/randomized_search_cv.py +++ b/snowflake/ml/modeling/model_selection/randomized_search_cv.py @@ -1,5 +1,4 @@ from typing import Any, Dict, Iterable, List, Optional, Set, Union -from uuid import uuid4 import cloudpickle as cp import numpy as np @@ -19,7 +18,7 @@ ModelSignature, _infer_signature, ) -from snowflake.ml.modeling._internal.estimator_protocols import CVHandlers +from snowflake.ml.modeling._internal.estimator_protocols import TransformerHandlers from snowflake.ml.modeling._internal.estimator_utils import ( gather_dependencies, original_estimator_has_callable, @@ -27,7 +26,7 @@ validate_sklearn_args, ) from snowflake.ml.modeling._internal.model_trainer_builder import ModelTrainerBuilder -from snowflake.ml.modeling._internal.snowpark_handlers import ( +from snowflake.ml.modeling._internal.snowpark_implementations.snowpark_handlers import ( SnowparkHandlers as HandlersImpl, ) from snowflake.ml.modeling.framework.base import BaseTransformer @@ -278,20 +277,11 @@ def __init__( # type: ignore[no-untyped-def] self.set_drop_input_cols(drop_input_cols) self.set_sample_weight_col(sample_weight_col) self.set_passthrough_cols(passthrough_cols) - self._handlers: CVHandlers = HandlersImpl( + self._handlers: TransformerHandlers = HandlersImpl( class_name=self.__class__.__name__, subproject=_SUBPROJECT, ) - def _get_rand_id(self) -> str: - """ - Generate random id to be used in sproc and stage names. - - Returns: - Random id string usable in sproc, table, and stage names. - """ - return str(uuid4()).replace("-", "_").upper() - def _get_active_columns(self) -> List[str]: """ "Get the list of columns that are relevant to the transformer.""" selected_cols = ( diff --git a/snowflake/ml/modeling/parameters/BUILD.bazel b/snowflake/ml/modeling/parameters/BUILD.bazel index 45637277..942a613d 100644 --- a/snowflake/ml/modeling/parameters/BUILD.bazel +++ b/snowflake/ml/modeling/parameters/BUILD.bazel @@ -19,9 +19,9 @@ py_test( ], deps = [ ":disable_distributed_hpo", - "//snowflake/ml/modeling/_internal:distributed_hpo_trainer", "//snowflake/ml/modeling/_internal:model_trainer_builder", - "//snowflake/ml/modeling/_internal:snowpark_trainer", + "//snowflake/ml/modeling/_internal/snowpark_implementations:distributed_hpo_trainer", + "//snowflake/ml/modeling/_internal/snowpark_implementations:snowpark_trainer", "//snowflake/ml/modeling/xgboost:xgb_classifier", ], ) diff --git a/snowflake/ml/modeling/parameters/disable_distributed_hpo_test.py b/snowflake/ml/modeling/parameters/disable_distributed_hpo_test.py index 8574a733..174ad3c2 100644 --- a/snowflake/ml/modeling/parameters/disable_distributed_hpo_test.py +++ b/snowflake/ml/modeling/parameters/disable_distributed_hpo_test.py @@ -4,11 +4,13 @@ from sklearn.model_selection import GridSearchCV from snowflake.ml.modeling.xgboost.xgb_classifier import XGBClassifier -from snowflake.ml.modeling._internal.distributed_hpo_trainer import ( +from snowflake.ml.modeling._internal.model_trainer_builder import ModelTrainerBuilder +from snowflake.ml.modeling._internal.snowpark_implementations.distributed_hpo_trainer import ( DistributedHPOTrainer, ) -from snowflake.ml.modeling._internal.model_trainer_builder import ModelTrainerBuilder -from snowflake.ml.modeling._internal.snowpark_trainer import SnowparkModelTrainer +from snowflake.ml.modeling._internal.snowpark_implementations.snowpark_trainer import ( + SnowparkModelTrainer, +) from snowflake.snowpark import DataFrame, Session diff --git a/snowflake/ml/version.bzl b/snowflake/ml/version.bzl index f660cf73..ac48ae7f 100644 --- a/snowflake/ml/version.bzl +++ b/snowflake/ml/version.bzl @@ -1,2 +1,2 @@ # This is parsed by regex in conda reciper meta file. Make sure not to break it. -VERSION = "1.2.1" +VERSION = "1.2.2" diff --git a/tests/conftest.py b/tests/conftest.py index 959a94e7..f7cbc71e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -17,7 +17,8 @@ def _random_name_for_temp_object(object_type: TempObjectType) -> str: @pytest.fixture(scope="session", autouse=True) def random_name_for_temp_object_mock(): with mock.patch( - "snowflake.ml.modeling._internal.snowpark_handlers.random_name_for_temp_object", _random_name_for_temp_object + "snowflake.ml.modeling._internal.snowpark_implementations.snowpark_handlers.random_name_for_temp_object", + _random_name_for_temp_object, ) as _fixture: cp.register_pickle_by_value(inspect.getmodule(_random_name_for_temp_object)) yield _fixture diff --git a/tests/integ/snowflake/ml/_internal/BUILD.bazel b/tests/integ/snowflake/ml/_internal/BUILD.bazel index 94d7fdb9..38ccfd5a 100644 --- a/tests/integ/snowflake/ml/_internal/BUILD.bazel +++ b/tests/integ/snowflake/ml/_internal/BUILD.bazel @@ -26,7 +26,7 @@ py_test( srcs = ["snowpark_handlers_test.py"], deps = [ "//snowflake/ml/_internal:env_utils", - "//snowflake/ml/modeling/_internal:snowpark_handlers", + "//snowflake/ml/modeling/_internal/snowpark_implementations:snowpark_handlers", "//snowflake/ml/utils:connection_params", "//tests/integ/snowflake/ml/test_utils:common_test_base", ], diff --git a/tests/integ/snowflake/ml/_internal/snowpark_handlers_test.py b/tests/integ/snowflake/ml/_internal/snowpark_handlers_test.py index bb492901..377f94d8 100644 --- a/tests/integ/snowflake/ml/_internal/snowpark_handlers_test.py +++ b/tests/integ/snowflake/ml/_internal/snowpark_handlers_test.py @@ -7,7 +7,9 @@ from sklearn.datasets import load_diabetes from sklearn.linear_model import LinearRegression as SkLinearRegression -from snowflake.ml.modeling._internal.snowpark_handlers import SnowparkHandlers +from snowflake.ml.modeling._internal.snowpark_implementations.snowpark_handlers import ( + SnowparkHandlers, +) from tests.integ.snowflake.ml.test_utils import common_test_base diff --git a/snowflake/ml/feature_store/tests/BUILD.bazel b/tests/integ/snowflake/ml/feature_store/BUILD.bazel similarity index 98% rename from snowflake/ml/feature_store/tests/BUILD.bazel rename to tests/integ/snowflake/ml/feature_store/BUILD.bazel index c1c43f45..7e62c6b0 100644 --- a/snowflake/ml/feature_store/tests/BUILD.bazel +++ b/tests/integ/snowflake/ml/feature_store/BUILD.bazel @@ -7,6 +7,7 @@ package(default_visibility = [ py_library( name = "common_utils", + testonly = True, srcs = [ "common_utils.py", ], diff --git a/snowflake/ml/feature_store/tests/common_utils.py b/tests/integ/snowflake/ml/feature_store/common_utils.py similarity index 62% rename from snowflake/ml/feature_store/tests/common_utils.py rename to tests/integ/snowflake/ml/feature_store/common_utils.py index 47ef317e..fae6dde9 100644 --- a/snowflake/ml/feature_store/tests/common_utils.py +++ b/tests/integ/snowflake/ml/feature_store/common_utils.py @@ -1,3 +1,4 @@ +from datetime import datetime, timedelta from typing import Any, Callable, Dict, List from unittest.mock import Mock from uuid import uuid4 @@ -5,9 +6,13 @@ import pandas as pd from pandas.testing import assert_frame_equal +from snowflake.ml._internal.utils.sql_identifier import ( + SqlIdentifier, + to_sql_identifiers, +) from snowflake.ml.feature_store.feature_view import FeatureView from snowflake.ml.utils.connection_params import SnowflakeLoginOptions -from snowflake.snowpark import Session +from snowflake.snowpark import Row, Session # Database used for feature store integration test FS_INTEG_TEST_DB = "SNOWML_FEATURE_STORE_TEST_DB" @@ -25,6 +30,9 @@ # Wine quality dataset FS_INTEG_TEST_WINE_QUALITY_DATA = "wine_quality_data" +# If object live time is greater than specified hours it will be deleted. +DB_OBJECT_EXPIRE_HOURS = 24 + def create_random_schema(session: Session, prefix: str, database: str = FS_INTEG_TEST_DB) -> str: schema = prefix + "_" + uuid4().hex.upper() @@ -64,3 +72,24 @@ def dispatch(*args: Any) -> Any: def get_test_warehouse_name(session: Session) -> str: session_warehouse = session.get_current_warehouse() return session_warehouse if session_warehouse else "REGTEST_ML_4XL_MULTI" + + +def cleanup_temporary_objects(session: Session) -> None: + current_time = datetime.now().astimezone() + + def is_object_expired(row: Row) -> bool: + time_diff: timedelta = current_time - row["created_on"] + return time_diff >= timedelta(hours=DB_OBJECT_EXPIRE_HOURS) + + result = session.sql(f"SHOW SCHEMAS IN DATABASE {FS_INTEG_TEST_DB}").collect() + permanent_schemas = to_sql_identifiers(["INFORMATION_SCHEMA", "PUBLIC", FS_INTEG_TEST_DATASET_SCHEMA]) + for row in result: + if SqlIdentifier(row["name"]) not in permanent_schemas and is_object_expired(row): + session.sql(f"DROP SCHEMA IF EXISTS {FS_INTEG_TEST_DB}.{row['name']}").collect() + + full_schema_path = f"{FS_INTEG_TEST_DB}.{FS_INTEG_TEST_DATASET_SCHEMA}" + result = session.sql(f"SHOW TABLES IN {full_schema_path}").collect() + permanent_tables = to_sql_identifiers([FS_INTEG_TEST_YELLOW_TRIP_DATA, FS_INTEG_TEST_WINE_QUALITY_DATA]) + for row in result: + if SqlIdentifier(row["name"]) not in permanent_tables and is_object_expired(row): + session.sql(f"DROP TABLE IF EXISTS {full_schema_path}.{row['name']}").collect() diff --git a/snowflake/ml/feature_store/tests/feature_store_case_sensitivity_test.py b/tests/integ/snowflake/ml/feature_store/feature_store_case_sensitivity_test.py similarity index 99% rename from snowflake/ml/feature_store/tests/feature_store_case_sensitivity_test.py rename to tests/integ/snowflake/ml/feature_store/feature_store_case_sensitivity_test.py index 4788c65d..02d89bf8 100644 --- a/snowflake/ml/feature_store/tests/feature_store_case_sensitivity_test.py +++ b/tests/integ/snowflake/ml/feature_store/feature_store_case_sensitivity_test.py @@ -5,6 +5,7 @@ from common_utils import ( FS_INTEG_TEST_DATASET_SCHEMA, FS_INTEG_TEST_DB, + cleanup_temporary_objects, create_random_schema, get_test_warehouse_name, ) @@ -46,6 +47,7 @@ class FeatureStoreCaseSensitivityTest(parameterized.TestCase): @classmethod def setUpClass(cls) -> None: cls._session = Session.builder.configs(SnowflakeLoginOptions()).create() + cleanup_temporary_objects(cls._session) cls._active_fs = [] cls._mock_table = cls._create_mock_table("mock_data") cls._test_warehouse_name = get_test_warehouse_name(cls._session) diff --git a/snowflake/ml/feature_store/tests/feature_store_large_scale_test.py b/tests/integ/snowflake/ml/feature_store/feature_store_large_scale_test.py similarity index 99% rename from snowflake/ml/feature_store/tests/feature_store_large_scale_test.py rename to tests/integ/snowflake/ml/feature_store/feature_store_large_scale_test.py index 4c667d7a..47f7c7b2 100644 --- a/snowflake/ml/feature_store/tests/feature_store_large_scale_test.py +++ b/tests/integ/snowflake/ml/feature_store/feature_store_large_scale_test.py @@ -8,6 +8,7 @@ FS_INTEG_TEST_DB, FS_INTEG_TEST_WINE_QUALITY_DATA, FS_INTEG_TEST_YELLOW_TRIP_DATA, + cleanup_temporary_objects, create_random_schema, get_test_warehouse_name, ) @@ -32,6 +33,7 @@ class FeatureStoreLargeScaleTest(absltest.TestCase): @classmethod def setUpClass(self) -> None: self._session = Session.builder.configs(SnowflakeLoginOptions()).create() + cleanup_temporary_objects(self._session) self._active_feature_store = [] self._test_warehouse_name = get_test_warehouse_name(self._session) diff --git a/snowflake/ml/feature_store/tests/feature_store_object_test.py b/tests/integ/snowflake/ml/feature_store/feature_store_object_test.py similarity index 100% rename from snowflake/ml/feature_store/tests/feature_store_object_test.py rename to tests/integ/snowflake/ml/feature_store/feature_store_object_test.py diff --git a/snowflake/ml/feature_store/tests/feature_store_test.py b/tests/integ/snowflake/ml/feature_store/feature_store_test.py similarity index 89% rename from snowflake/ml/feature_store/tests/feature_store_test.py rename to tests/integ/snowflake/ml/feature_store/feature_store_test.py index 6be2f7e7..0e616f01 100644 --- a/snowflake/ml/feature_store/tests/feature_store_test.py +++ b/tests/integ/snowflake/ml/feature_store/feature_store_test.py @@ -1,4 +1,5 @@ -from typing import List, Optional, Union, cast +import datetime +from typing import List, Optional, Tuple, Union, cast from uuid import uuid4 from absl.testing import absltest @@ -6,6 +7,7 @@ FS_INTEG_TEST_DATASET_SCHEMA, FS_INTEG_TEST_DB, FS_INTEG_TEST_DUMMY_DB, + cleanup_temporary_objects, compare_dataframe, compare_feature_views, create_mock_session, @@ -38,6 +40,7 @@ class FeatureStoreTest(absltest.TestCase): @classmethod def setUpClass(self) -> None: self._session = Session.builder.configs(SnowflakeLoginOptions()).create() + cleanup_temporary_objects(self._session) self._active_feature_store = [] try: @@ -357,15 +360,20 @@ def test_register_feature_view_as_view(self) -> None: # generate data on multiple feature views spine_df = self._session.create_dataframe([(1, 101)], schema=["id", "ts"]) - ds = fs.generate_dataset(spine_df=spine_df, features=[fv, new_fv], spine_timestamp_col="ts") + ds = fs.generate_dataset( + spine_df=spine_df, features=[fv, new_fv], spine_timestamp_col="ts", include_feature_view_timestamp_col=True + ) + compare_dataframe( actual_df=ds.df.to_pandas(), target_data={ "ID": [1], "TS": [101], + "FV_V1_TS": [100], "NAME": ["jonh"], "TITLE": ["boss"], "AGE": [20], + "NEW_FV_V1_TS": [100], "DEPT": ["sales"], }, sort_cols=["ID"], @@ -679,14 +687,18 @@ def test_retrieve_time_series_feature_values(self) -> None: spine_df=spine_df, features=cast(List[Union[FeatureView, FeatureViewSlice]], [fv1, fv2, fv3]), spine_timestamp_col="ts", + include_feature_view_timestamp_col=True, ) + compare_dataframe( actual_df=df.to_pandas(), target_data={ "ID": [1, 1, 2], "TS": [90, 101, 202], + "FV1_V1_TS": [None, 100, 200], "NAME": [None, "jonh", "porter"], "TITLE": [None, "boss", "manager"], + "FV2_V1_TS": [None, 100, 200], "AGE": [None, 20, 30], "DEPT": ["sales", "sales", "engineer"], }, @@ -1168,6 +1180,7 @@ def test_generate_dataset(self) -> None: materialized_table="foobar", spine_timestamp_col="ts", ) + compare_dataframe( actual_df=ds1.df.to_pandas(), target_data={ @@ -1189,6 +1202,7 @@ def test_generate_dataset(self) -> None: spine_timestamp_col="ts", save_mode="merge", ) + compare_dataframe( actual_df=ds2.df.to_pandas(), target_data={ @@ -1210,6 +1224,7 @@ def test_generate_dataset(self) -> None: spine_timestamp_col="ts", save_mode="merge", ) + compare_dataframe( actual_df=ds3.df.to_pandas(), target_data={ @@ -1388,6 +1403,7 @@ def check_fs_objects(expected_count: int) -> None: def test_dynamic_table_full_refresh_warning(self) -> None: temp_stage_name = "test_dynamic_table_full_refresh_warning_stage" + self._session.sql(f"USE DATABASE {FS_INTEG_TEST_DB}").collect() self._session.sql(f"CREATE OR REPLACE STAGE {temp_stage_name}").collect() udf_name = f"{FS_INTEG_TEST_DB}.{FS_INTEG_TEST_DATASET_SCHEMA}.minus_one" @@ -1490,6 +1506,168 @@ def test_update_feature_view(self) -> None: self.assertEqual(result[0]["target_lag"], "1 minute") self.assertEqual(SqlIdentifier(result[0]["warehouse"]), SqlIdentifier(alternative_wh)) + def test_override_feature_view(self) -> None: + fs = self._create_feature_store() + + e = Entity("foo", ["id"]) + fs.register_entity(e) + + def create_fvs(fs: FeatureStore, sql: str, override: bool) -> Tuple[FeatureView, FeatureView, FeatureView]: + fv1 = FeatureView( + name="fv1", + entities=[e], + feature_df=self._session.sql(sql), + refresh_freq="1m", + ) + fv1 = fs.register_feature_view(feature_view=fv1, version="v1", override=override) + + fv2 = FeatureView( + name="fv2", + entities=[e], + feature_df=self._session.sql(sql), + refresh_freq="* * * * * America/Los_Angeles", + ) + fv2 = fs.register_feature_view(feature_view=fv2, version="v2", override=override) + + fv3 = FeatureView( + name="fv3", + entities=[e], + feature_df=self._session.sql(sql), + ) + fv3 = fs.register_feature_view(feature_view=fv3, version="v3", override=override) + + return fv1, fv2, fv3 + + sql = f"SELECT id, name FROM {self._mock_table}" + fv1, fv2, fv3 = create_fvs(fs, sql, False) + compare_dataframe( + actual_df=fs.read_feature_view(fv1).to_pandas(), + target_data={ + "ID": [1, 2], + "NAME": ["jonh", "porter"], + }, + sort_cols=["ID"], + ) + compare_feature_views(fs.list_feature_views(as_dataframe=False), [fv1, fv2, fv3]) + + # Override existing feature views + sql = f"SELECT id, name, title FROM {self._mock_table}" + fv1, fv2, fv3 = create_fvs(fs, sql, True) + + compare_dataframe( + actual_df=fs.read_feature_view(fv1).to_pandas(), + target_data={ + "ID": [1, 2], + "NAME": ["jonh", "porter"], + "TITLE": ["boss", "manager"], + }, + sort_cols=["ID"], + ) + compare_feature_views(fs.list_feature_views(as_dataframe=False), [fv1, fv2, fv3]) + + # Override non-existing feature view + non_existing_fv = FeatureView( + name="non_existing_fv", + entities=[e], + feature_df=self._session.sql(sql), + refresh_freq="1m", + ) + fs.register_feature_view(feature_view=non_existing_fv, version="v1", override=True) + + def test_generate_dataset_point_in_time_join(self) -> None: + fs = self._create_feature_store() + + # Below test requires ASOF join is activated. + # When _is_asof_join_enabled is false, the alternative union-window-join is used. + # It's a known issue that union-window-join will fail below test. + self.assertTrue(fs._is_asof_join_enabled()) + + entity = Entity("CUSTOMER", ["CUSTOMER_ID"]) + fs.register_entity(entity) + + self._session.sql( + f""" + CREATE OR REPLACE TABLE {fs._config.full_schema_path}.CUSTOMER_FEATURES ( + CUSTOMER_ID NUMBER, + FEATURE_TS TIMESTAMP_NTZ, + CUST_AVG_AMOUNT_7 NUMBER, + CUST_AVG_AMOUNT_30 NUMBER) + """ + ).collect() + + # Each customer_id has 2 rows with different timestamps. + # This feature value have 2 features: CUST_AVG_AMOUNT_7 and CUST_AVG_AMOUNT_30. + # Each customer_id has null or non-null values for these two features. + # generate_dataset() should always return second row as result, regardless of + # whether the feature value is null or non-null. + self._session.sql( + f""" + INSERT INTO {fs._config.full_schema_path}.CUSTOMER_FEATURES + (CUSTOMER_ID, FEATURE_TS, CUST_AVG_AMOUNT_7, CUST_AVG_AMOUNT_30) + VALUES + (1, '2019-04-01', 1, 1), + (1, '2019-04-02', 10, 10), + (2, '2019-04-01', 2, 2), + (2, '2019-04-02', 20, null), + (3, '2019-04-01', 3, 3), + (3, '2019-04-02', null, 30), + (4, '2019-04-01', null, 4), + (4, '2019-04-02', 40, 40), + (5, '2019-04-01', 5, 5), + (5, '2019-04-02', null, null) + """ + ).collect() + + customers_fv = FeatureView( + name="CUSTOMER_FV", + entities=[entity], + feature_df=self._session.sql(f"SELECT * FROM {fs._config.full_schema_path}.CUSTOMER_FEATURES"), + timestamp_col="FEATURE_TS", + refresh_freq=None, + ) + + customers_fv = fs.register_feature_view(feature_view=customers_fv, version="1", block=True) + + spine_df = self._session.create_dataframe( + [ + (1, "2019-04-03"), + (2, "2019-04-03"), + (3, "2019-04-03"), + (4, "2019-04-03"), + (5, "2019-04-03"), + ], + schema=["CUSTOMER_ID", "EVENT_TS"], + ) + + dataset = fs.generate_dataset( + spine_df=spine_df, + features=[customers_fv], + materialized_table="customer_frad_training_data", + spine_timestamp_col="EVENT_TS", + spine_label_cols=[], + include_feature_view_timestamp_col=True, + ) + + # CUST_AVG_AMOUNT_7 and CUST_AVG_AMOUNT_30 are expected to be same as the values + # in second row of each customer_id (with timestamp 2019-04-02). + compare_dataframe( + actual_df=dataset.df.to_pandas(), + target_data={ + "CUSTOMER_ID": [1, 2, 3, 4, 5], + "EVENT_TS": ["2019-04-03", "2019-04-03", "2019-04-03", "2019-04-03", "2019-04-03"], + "CUSTOMER_FV_1_FEATURE_TS": [ + datetime.date(2019, 4, 2), + datetime.date(2019, 4, 2), + datetime.date(2019, 4, 2), + datetime.date(2019, 4, 2), + datetime.date(2019, 4, 2), + ], + "CUST_AVG_AMOUNT_7": [10, 20, None, 40, None], + "CUST_AVG_AMOUNT_30": [10, None, 30, 40, None], + }, + sort_cols=["CUSTOMER_ID"], + ) + if __name__ == "__main__": absltest.main() diff --git a/tests/integ/snowflake/ml/model/spcs_llm_model_integ_test.py b/tests/integ/snowflake/ml/model/spcs_llm_model_integ_test.py index 8b62f4f9..d3fbb1ce 100644 --- a/tests/integ/snowflake/ml/model/spcs_llm_model_integ_test.py +++ b/tests/integ/snowflake/ml/model/spcs_llm_model_integ_test.py @@ -1,6 +1,5 @@ import os import tempfile -import unittest import pandas as pd import pytest @@ -19,7 +18,6 @@ ) -@unittest.skip("release-1.2.1") @pytest.mark.conda_incompatible class TestSPCSLLMModelInteg(spcs_integ_test_base.SpcsIntegTestBase): def setUp(self) -> None: @@ -49,7 +47,7 @@ def test_text_generation_pipeline( stage_path = f"@{self._test_stage}/{self._run_id}" deployment_stage_path = f"@{self._test_stage}/{self._run_id}" - model_api.save_model( + model_api.save_model( # type: ignore[call-overload] name="model", session=self._session, stage_path=stage_path, @@ -68,6 +66,7 @@ def test_text_generation_pipeline( "compute_pool": self._TEST_GPU_COMPUTE_POOL, "num_gpus": 1, "model_in_image": True, + "external_access_integrations": self._SPCS_EAIS, } deploy_info = model_api.deploy( @@ -78,7 +77,7 @@ def test_text_generation_pipeline( model_id=svc_func_name, platform=deploy_platforms.TargetPlatform.SNOWPARK_CONTAINER_SERVICES, options={ - **deployment_options, + **deployment_options, # type: ignore[arg-type] }, # type: ignore[call-overload] ) assert deploy_info is not None diff --git a/tests/integ/snowflake/ml/model/warehouse_model_integ_test_utils.py b/tests/integ/snowflake/ml/model/warehouse_model_integ_test_utils.py index 4701b497..14ecd17b 100644 --- a/tests/integ/snowflake/ml/model/warehouse_model_integ_test_utils.py +++ b/tests/integ/snowflake/ml/model/warehouse_model_integ_test_utils.py @@ -66,7 +66,7 @@ def base_test_case( platform=deploy_platforms.TargetPlatform.WAREHOUSE, target_method=target_method_arg, options={ - **permanent_deploy_args, + **permanent_deploy_args, # type: ignore[arg-type] **additional_deploy_options, }, # type: ignore[call-overload] ) diff --git a/tests/integ/snowflake/ml/modeling/model_selection/BUILD.bazel b/tests/integ/snowflake/ml/modeling/model_selection/BUILD.bazel index e80541e7..83890da1 100644 --- a/tests/integ/snowflake/ml/modeling/model_selection/BUILD.bazel +++ b/tests/integ/snowflake/ml/modeling/model_selection/BUILD.bazel @@ -57,3 +57,17 @@ py_test( "//snowflake/ml/utils:connection_params", ], ) + +py_test( + name = "check_sklearn_inference_test", + timeout = "long", + srcs = ["check_sklearn_inference_test.py"], + shard_count = 2, + deps = [ + "//snowflake/ml/_internal/exceptions", + "//snowflake/ml/modeling/linear_model:linear_regression", + "//snowflake/ml/modeling/model_selection:grid_search_cv", + "//snowflake/ml/modeling/model_selection:randomized_search_cv", + "//snowflake/ml/utils:connection_params", + ], +) diff --git a/tests/integ/snowflake/ml/modeling/model_selection/check_sklearn_inference_test.py b/tests/integ/snowflake/ml/modeling/model_selection/check_sklearn_inference_test.py new file mode 100644 index 00000000..53916ad1 --- /dev/null +++ b/tests/integ/snowflake/ml/modeling/model_selection/check_sklearn_inference_test.py @@ -0,0 +1,108 @@ +from typing import List, Tuple + +import inflection +import pandas as pd +from absl.testing import absltest +from sklearn.datasets import load_iris +from sklearn.linear_model import LinearRegression + +from snowflake.ml._internal.exceptions import exceptions +from snowflake.ml.modeling.model_selection import ( # type: ignore[attr-defined] + GridSearchCV, + RandomizedSearchCV, +) + + +def _load_iris_data() -> Tuple[pd.DataFrame, List[str], List[str]]: + input_df_pandas = load_iris(as_frame=True).frame + input_df_pandas.columns = [f'"{inflection.parameterize(c, "_")}"' for c in input_df_pandas.columns] + input_df_pandas['"index"'] = input_df_pandas.reset_index().index + + input_cols = [c for c in input_df_pandas.columns if not c.startswith('"target"')] + label_col = [c for c in input_df_pandas.columns if c.startswith('"target"')] + + return input_df_pandas, input_cols, label_col + + +class CheckSklearnInferenceCornerCases(absltest.TestCase): + """sklearn_inference function is the base helper function to execute all the pandas dataframe input + This test is to cover corner cases that implemented within sklearn_inference, including + - output_cols dimension mismatch + - double quoted column names + + Args: + absltest (_type_): default test + """ + + def setUp(self) -> None: + pd_data, input_col, label_col = _load_iris_data() + self._input_df_pandas = pd_data + self._input_cols = input_col + self._label_col = label_col + + def test_sklearn_inference_gridsearch(self) -> None: + reg = GridSearchCV( + estimator=LinearRegression(), param_grid={"fit_intercept": [True, False], "positive": [True, False]} + ) + reg.set_input_cols(self._input_cols) + reg.set_label_cols(self._label_col) + reg.set_drop_input_cols(True) + reg.fit(self._input_df_pandas) + # In predict function, the pandas dataframe's column name is actually wrong (["1"]) + # it would raise error + with self.assertRaises(exceptions.SnowflakeMLException): + reg._sklearn_inference(pd.DataFrame({"1": []}), "predict", [""]) + + # in the pandas dataframe's column name, some of them are single quoted + # some of them are double quoted + test_pd = self._input_df_pandas + test_pd.columns = [ + '"sepal_length_cm"', + "sepal_width_cm", + '"petal_length_cm"', + "petal_width_cm", + '"target"', + '"index"', + ] + reg._sklearn_inference(test_pd, "predict", [""]) + + # When output cols is an empty array ([]) + # it would raise error + with self.assertRaises(exceptions.SnowflakeMLException): + reg._sklearn_inference(self._input_df_pandas, "predict", []) + + def test_sklearn_inference_randomizedsearch(self) -> None: + reg = RandomizedSearchCV( + estimator=LinearRegression(), + param_distributions={"fit_intercept": [True, False], "positive": [True, False]}, + ) + reg.set_input_cols(self._input_cols) + reg.set_label_cols(self._label_col) + reg.set_drop_input_cols(True) + reg.fit(self._input_df_pandas) + # In predict function, the pandas dataframe's column name is actually wrong (["1"]) + # it would raise error + with self.assertRaises(exceptions.SnowflakeMLException): + reg._sklearn_inference(pd.DataFrame({"1": []}), "predict", [""]) + + # in the pandas dataframe's column name, some of them are single quoted + # some of them are double quoted + test_pd = self._input_df_pandas + test_pd.columns = [ + '"sepal_length_cm"', + "sepal_width_cm", + '"petal_length_cm"', + "petal_width_cm", + '"target"', + '"index"', + ] + reg._sklearn_inference(test_pd, "predict", [""]) + + # When output cols is an empty array ([]) + # it would raise error + with self.assertRaises(exceptions.SnowflakeMLException): + reg._sklearn_inference(self._input_df_pandas, "predict", []) + + +if __name__ == "__main__": + absltest.main() diff --git a/tests/integ/snowflake/ml/modeling/model_selection/grid_search_integ_test.py b/tests/integ/snowflake/ml/modeling/model_selection/grid_search_integ_test.py index aa5f41f1..10c32a70 100644 --- a/tests/integ/snowflake/ml/modeling/model_selection/grid_search_integ_test.py +++ b/tests/integ/snowflake/ml/modeling/model_selection/grid_search_integ_test.py @@ -239,6 +239,9 @@ def test_fit_and_compare_results_distributed( ) np.testing.assert_allclose(actual_score, sklearn_score, rtol=1.0e-1, atol=1.0e-2) + actual_score = reg.score(self._input_df_pandas) + np.testing.assert_allclose(actual_score, sklearn_score, rtol=1.0e-1, atol=1.0e-2) + # n_features_in_ is available because `refit` is set to `True`. self.assertEqual(sk_obj.n_features_in_, sklearn_reg.n_features_in_) @@ -326,6 +329,10 @@ def test_transform(self, mock_is_single_node) -> None: np.testing.assert_allclose(transformed, sk_transformed, rtol=1.0e-1, atol=1.0e-2) + transformed = reg.transform(self._input_df_pandas[self._input_cols]) + transformed = transformed[actual_output_cols].to_numpy() + np.testing.assert_allclose(transformed, sk_transformed, rtol=1.0e-1, atol=1.0e-2) + def test_not_fitted_exception(self) -> None: param_grid = {"max_depth": [2, 6], "learning_rate": [0.1, 0.01]} reg = GridSearchCV(estimator=XGBClassifier(), param_grid=param_grid) diff --git a/tests/integ/snowflake/ml/modeling/model_selection/randomized_search_integ_test.py b/tests/integ/snowflake/ml/modeling/model_selection/randomized_search_integ_test.py index a0457bd7..ac3b9ebe 100644 --- a/tests/integ/snowflake/ml/modeling/model_selection/randomized_search_integ_test.py +++ b/tests/integ/snowflake/ml/modeling/model_selection/randomized_search_integ_test.py @@ -191,6 +191,8 @@ def test_fit_and_compare_results( self._input_df_pandas[self._input_cols], self._input_df_pandas[self._label_col] ) np.testing.assert_allclose(actual_score, sklearn_score, rtol=1.0e-1, atol=1.0e-2) + actual_score = reg.score(self._input_df_pandas) + np.testing.assert_allclose(actual_score, sklearn_score, rtol=1.0e-1, atol=1.0e-2) # n_features_in_ is available because `refit` is set to `True`. self.assertEqual(sk_obj.n_features_in_, sklearn_reg.n_features_in_) @@ -279,6 +281,10 @@ def test_transform(self, mock_is_single_node) -> None: np.testing.assert_allclose(transformed, sk_transformed, rtol=1.0e-1, atol=1.0e-2) + transformed = reg.transform(self._input_df_pandas[self._input_cols]) + transformed = transformed[actual_output_cols].to_numpy() + np.testing.assert_allclose(transformed, sk_transformed, rtol=1.0e-1, atol=1.0e-2) + def test_not_fitted_exception(self) -> None: param_distributions = {"max_depth": [2, 6], "learning_rate": [0.1, 0.01]} reg = RandomizedSearchCV(estimator=XGBClassifier(), param_distributions=param_distributions) diff --git a/tests/integ/snowflake/ml/registry/model_registry_snowservice_integ_test.py b/tests/integ/snowflake/ml/registry/model_registry_snowservice_integ_test.py index a32dc42d..fa710a8a 100644 --- a/tests/integ/snowflake/ml/registry/model_registry_snowservice_integ_test.py +++ b/tests/integ/snowflake/ml/registry/model_registry_snowservice_integ_test.py @@ -37,6 +37,7 @@ # "compute_pool": self._TEST_CPU_COMPUTE_POOL, # "image_repo": self._db_manager.get_snowservice_image_repo(repo=self._TEST_IMAGE_REPO), # "num_workers": 1, +# "external_access_integrations": self._SPCS_EAIS, # }, # }, # ) @@ -56,6 +57,7 @@ # "options": { # "compute_pool": self._TEST_CPU_COMPUTE_POOL, # "image_repo": self._db_manager.get_snowservice_image_repo(repo=self._TEST_IMAGE_REPO), +# "external_access_integrations": self._SPCS_EAIS, # }, # }, # ) @@ -81,6 +83,7 @@ # "compute_pool": self._TEST_CPU_COMPUTE_POOL, # "image_repo": self._db_manager.get_snowservice_image_repo(repo=self._TEST_IMAGE_REPO), # "num_workers": 1, +# "external_access_integrations": self._SPCS_EAIS, # }, # }, # ) @@ -111,6 +114,7 @@ # "image_repo": self._db_manager.get_snowservice_image_repo(repo=self._TEST_IMAGE_REPO), # "num_workers": 1, # "use_gpu": True, +# "external_access_integrations": self._SPCS_EAIS, # }, # }, # ) @@ -132,6 +136,7 @@ # "options": { # "compute_pool": self._TEST_CPU_COMPUTE_POOL, # "image_repo": self._db_manager.get_snowservice_image_repo(repo=self._TEST_IMAGE_REPO), +# "external_access_integrations": self._SPCS_EAIS, # }, # }, # ) diff --git a/tests/integ/snowflake/ml/registry/model_registry_snowservice_merge_gate_integ_test.py b/tests/integ/snowflake/ml/registry/model_registry_snowservice_merge_gate_integ_test.py index bc01667c..21d68f05 100644 --- a/tests/integ/snowflake/ml/registry/model_registry_snowservice_merge_gate_integ_test.py +++ b/tests/integ/snowflake/ml/registry/model_registry_snowservice_merge_gate_integ_test.py @@ -27,6 +27,7 @@ def _run_deployment() -> None: "options": { "compute_pool": self._TEST_CPU_COMPUTE_POOL, "enable_remote_image_build": True, + "external_access_integrations": self._SPCS_EAIS, }, }, ) @@ -57,6 +58,7 @@ def _run_deployment() -> None: "compute_pool": self._TEST_CPU_COMPUTE_POOL, "enable_remote_image_build": True, "model_in_image": True, + "external_access_integrations": self._SPCS_EAIS, }, }, ) diff --git a/tests/integ/snowflake/ml/test_utils/common_test_base.py b/tests/integ/snowflake/ml/test_utils/common_test_base.py index f5994365..1a38fa73 100644 --- a/tests/integ/snowflake/ml/test_utils/common_test_base.py +++ b/tests/integ/snowflake/ml/test_utils/common_test_base.py @@ -259,7 +259,7 @@ def {func_name}({first_arg_name}: snowflake.snowpark.Session, {", ".join(arg_lis {func_body} """ - final_packages = packages[:] + [f"snowflake-ml-python=={_snowml_pkg_ver}"] + final_packages = packages[:] + [f"snowflake-ml-python=={_snowml_pkg_ver}", "snowflake-snowpark-python"] with tempfile.NamedTemporaryFile("w", encoding="utf-8", suffix=".py", delete=False) as temp_file: temp_file.write(func_source) diff --git a/tests/integ/snowflake/ml/test_utils/spcs_integ_test_base.py b/tests/integ/snowflake/ml/test_utils/spcs_integ_test_base.py index 256aa67a..e4b3fc94 100644 --- a/tests/integ/snowflake/ml/test_utils/spcs_integ_test_base.py +++ b/tests/integ/snowflake/ml/test_utils/spcs_integ_test_base.py @@ -16,6 +16,7 @@ class SpcsIntegTestBase(absltest.TestCase): _TEST_CPU_COMPUTE_POOL = "REGTEST_INFERENCE_CPU_POOL" _TEST_GPU_COMPUTE_POOL = "REGTEST_INFERENCE_GPU_POOL" + _SPCS_EAIS = ["SPCS_EGRESS_ACCESS_INTEGRATION"] def setUp(self) -> None: """Creates Snowpark and Snowflake environments for testing.""" diff --git a/tests/integ/snowflake/ml/test_utils/test_env_utils.py b/tests/integ/snowflake/ml/test_utils/test_env_utils.py index 74fd170c..74a066f1 100644 --- a/tests/integ/snowflake/ml/test_utils/test_env_utils.py +++ b/tests/integ/snowflake/ml/test_utils/test_env_utils.py @@ -10,7 +10,7 @@ def get_available_session() -> session.Session: - return ( # type: ignore[no-any-return] + return ( session._get_active_session() if snowpark_utils.is_in_stored_procedure() # type: ignore[no-untyped-call] # else session.Session.builder.configs(connection_params.SnowflakeLoginOptions()).create()