Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions jobs/classifier/Dockerfile

This file was deleted.

118 changes: 34 additions & 84 deletions jobs/classifier/tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"outputs": [],
"source": [
"!pip install snowflake-snowpark-python\n",
"!pip install snowflake-ml-python\n",
"!pip install pandas"
]
},
Expand Down Expand Up @@ -196,53 +197,6 @@
"\n"
]
},
{
"cell_type": "code",
"execution_count": 19,
"id": "1094817b-e5b9-4047-9745-2f8900533c01",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"preprod9-aivanoutest02.awsuswest2preprod9.registry-dev.snowflakecomputing.com/aivanoudb/public/project_repo\n"
]
}
],
"source": [
"\n",
"create_image_repo_sql = f\"CREATE IMAGE REPOSITORY IF NOT EXISTS {image_registry}\"\n",
"session.sql(create_image_repo_sql).collect()\n",
"\n",
"get_image_repo_sql = f\"show image repositories like '{image_registry}';\"\n",
"repository_url = session.sql(get_image_repo_sql).collect()[0]['repository_url']\n",
"print(repository_url)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "85304ddc-0b56-4325-8b74-dbb4091643af",
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"\n",
"os.environ['SPCS_USERNAME']=user\n",
"os.environ['SPCS_PASSWORD']=password\n",
"os.environ['SPCS_IMAGE_REPO']=repository_url\n",
"os.environ['SPCS_IMAGE_NAME']=image_name\n",
"\n",
"!docker login $SPCS_IMAGE_REPO -u $SPCS_USERNAME -p $SPCS_PASSWORD \n",
"\n",
"!docker build --platform linux/amd64 -t $SPCS_IMAGE_REPO/$SPCS_IMAGE_NAME -f ./Dockerfile ./ &>/dev/null\n",
"\n",
"!docker push $SPCS_IMAGE_REPO/$SPCS_IMAGE_NAME\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -264,7 +218,15 @@
},
{
"cell_type": "code",
"execution_count": 209,
"execution_count": null,
"id": "08218a7e",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "c8758d39-2323-4769-80f0-8236aa6e53fd",
"metadata": {},
"outputs": [
Expand All @@ -279,38 +241,27 @@
],
"source": [
"\n",
"\n",
"print(session.sql(f'DROP SERVICE IF EXISTS {job_name}').collect())\n",
"import os\n",
"from snowflake.ml.jobs import submit_directory\n",
"\n",
"sql_retrieval_command = f\"select * from {input_table}\"\n",
"\n",
"create_async_job_sql = f\"\"\"\n",
"EXECUTE JOB SERVICE\n",
"IN COMPUTE POOL {compute_pool_name} \n",
"NAME = {job_name}\n",
"ASYNC = True\n",
"REPLICAS = {num_replicas}\n",
"QUERY_WAREHOUSE = {warehouse}\n",
"EXTERNAL_ACCESS_INTEGRATIONS = ({external_access_integration})\n",
"FROM SPECIFICATION $$\n",
" spec:\n",
" container:\n",
" - name: main\n",
" image: /{database}/{schema}/{image_registry}/{image_name}\n",
" command: ['python', '-u', './main.py', '--sql=\"{sql_retrieval_command}\"', '--output-table={output_table}', '--batch-size=512']\n",
" env:\n",
" SNOWFLAKE_QUERY_WAREHOUSE: {warehouse}\n",
" resources:\n",
" requests:\n",
" memory: \"10Gi\"\n",
" limits:\n",
" memory: \"10Gi\"\n",
"\n",
"$$\n",
"\"\"\"\n",
"\n",
"print(session.sql(create_async_job_sql).collect())\n",
"\n"
"job = submit_directory(\n",
" os.path.dirname(__file__),\n",
" compute_pool_name,\n",
" entrypoint=\"main.py\",\n",
" args=[\n",
" f'--sql=\"{sql_retrieval_command}\"',\n",
" f'--output-table={output_table}',\n",
" '--batch-size=512',\n",
" ],\n",
" stage_name=\"payload_stage\",\n",
" external_access_integrations=[external_access_integration],\n",
" query_warehouse=warehouse,\n",
" num_instances=num_replicas, # FIXME: Coming soon, not available yet\n",
")\n",
"\n",
"print(f\"Started job {job.id}\")"
]
},
{
Expand All @@ -326,7 +277,11 @@
"res= session.sql(f'SHOW SERVICE CONTAINERS IN SERVICE {job_name}').collect()\n",
"\n",
"for row in res:\n",
" print(f\"{row['service_name']}/{row['instance_id']}/{row['container_name']} - status: {row['status']}, message: {row['message']}\")\n"
" print(f\"{row['service_name']}/{row['instance_id']}/{row['container_name']} - status: {row['status']}, message: {row['message']}\")\n",
"\n",
"print(f\"Job status: {job.status}\")\n",
"for i in num_replicas: # TODO: We should add an API for retrieving this info from the MLJob object\n",
" print(f\"Job instance {i} status: {job.get_instance_status(i)}\") # FIXME: Coming soon, not available yet"
]
},
{
Expand All @@ -338,12 +293,7 @@
},
"outputs": [],
"source": [
"\n",
"\n",
"logs = session.sql(f\"CALL SYSTEM$GET_SERVICE_LOGS('{job_name}', 0, 'main')\").collect()\n",
"for line in logs[0][0].split('\\n'):\n",
" print(line)\n",
"\n"
"job.show_logs() # or print(job.get_logs())"
]
},
{
Expand Down