diff --git a/bootcamp/tutorials/integration/pdf_files/WhatisMilvus.pdf b/bootcamp/tutorials/integration/pdf_files/WhatisMilvus.pdf new file mode 100644 index 000000000..a8b8e7826 Binary files /dev/null and b/bootcamp/tutorials/integration/pdf_files/WhatisMilvus.pdf differ diff --git a/bootcamp/tutorials/integration/rag_with_milvus_and_unstructured.ipynb b/bootcamp/tutorials/integration/rag_with_milvus_and_unstructured.ipynb new file mode 100644 index 000000000..2dabe1990 --- /dev/null +++ b/bootcamp/tutorials/integration/rag_with_milvus_and_unstructured.ipynb @@ -0,0 +1,442 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\"Open \n", + " \"GitHub" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Build a RAG with Milvus and Unstructured\n", + "\n", + "[Unstructured](https://docs.unstructured.io/welcome) provides a platform and tools to ingest and process unstructured documents for Retrieval Augmented Generation (RAG) and model fine-tuning. It offers both a no-code UI platform and serverless API services, allowing users to process data on Unstructured-hosted compute resources.\n", + "\n", + "In this tutorial, we will use Unstructured to ingest PDF documents and then use Milvus to build a RAG pipeline.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Preparation\n", + "### Dependencies and Environment" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "! pip install -qU \"unstructured-ingest[pdf]\" unstructured pymilvus openai" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "> If you are using Google Colab, to enable dependencies just installed, you may need to **restart the runtime** (click on the \"Runtime\" menu at the top of the screen, and select \"Restart session\" from the dropdown menu)." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You can get your `UNSTRUCTURED_API_KEY` and `UNSTRUCTURED_URL` environment variables from [here](https://docs.unstructured.io/api-reference/api-services/saas-api-development-guide).\n", + "\n", + "We will use OpenAI as the LLM in this example. You should prepare the [api key](https://platform.openai.com/docs/quickstart) `OPENAI_API_KEY` as an environment variable." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "\n", + "os.environ[\"UNSTRUCTURED_API_KEY\"] = \"***********\"\n", + "os.environ[\"UNSTRUCTURED_URL\"] = \"***********\"\n", + "\n", + "os.environ[\"OPENAI_API_KEY\"] = \"***********\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Prepare Milvus and OpenAI clients\n", + "You can use the Milvus client to create a Milvus collection and insert data into it." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "from pymilvus import MilvusClient, DataType\n", + "\n", + "# Initialize Milvus client\n", + "milvus_client = MilvusClient(uri=\"./milvus_demo.db\") # TODO" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "> As for the argument of `MilvusClient`:\n", + "> - Setting the `uri` as a local file, e.g.`./milvus.db`, is the most convenient method, as it automatically utilizes [Milvus Lite](https://milvus.io/docs/milvus_lite.md) to store all data in this file.\n", + "> - If you have large scale of data, say more than a million vectors, you can set up a more performant Milvus server on [Docker or Kubernetes](https://milvus.io/docs/quickstart.md). In this setup, please use the server address and port as your uri, e.g.`http://localhost:19530`. If you enable the authentication feature on Milvus, use \":\" as the token, otherwise don't set the token.\n", + "> - If you want to use [Zilliz Cloud](https://zilliz.com/cloud), the fully managed cloud service for Milvus, adjust the `uri` and `token`, which correspond to the [Public Endpoint and Api key](https://docs.zilliz.com/docs/on-zilliz-cloud-console#free-cluster-details) in Zilliz Cloud." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Check if the collection already exists and drop it if it does." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "collection_name = \"my_rag_collection\"\n", + "\n", + "if milvus_client.has_collection(collection_name):\n", + " milvus_client.drop_collection(collection_name)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Prepare a OpenAI client to generate embeddings and generate responses." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "from openai import OpenAI\n", + "\n", + "openai_client = OpenAI()\n", + "\n", + "\n", + "def emb_text(text):\n", + " return (\n", + " openai_client.embeddings.create(input=text, model=\"text-embedding-3-small\")\n", + " .data[0]\n", + " .embedding\n", + " )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Generate a test embedding and print its dimension and first few elements." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "1536\n", + "[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]\n" + ] + } + ], + "source": [ + "test_embedding = emb_text(\"This is a test\")\n", + "embedding_dim = len(test_embedding)\n", + "print(embedding_dim)\n", + "print(test_embedding[:10])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create Milvus Collection\n", + "We will create a collection with the following schema:\n", + "- `id`: the primary key, which is a unique identifier for each document.\n", + "- `vector`: the embedding of the document.\n", + "- `text`: the text content of the document.\n", + "- `metadata`: the metadata of the document.\n", + "\n", + "Then we build an `AUTOINDEX` index on the `vector` field. And then create the collection." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "# Create schema\n", + "schema = milvus_client.create_schema(auto_id=False, enable_dynamic_field=False)\n", + "# Add fields to schema\n", + "schema.add_field(field_name=\"id\", datatype=DataType.INT64, is_primary=True)\n", + "schema.add_field(field_name=\"vector\", datatype=DataType.FLOAT_VECTOR, dim=embedding_dim)\n", + "schema.add_field(field_name=\"text\", datatype=DataType.VARCHAR, max_length=65535)\n", + "schema.add_field(field_name=\"metadata\", datatype=DataType.JSON)\n", + "index_params = MilvusClient.prepare_index_params()\n", + "index_params.add_index(\n", + " field_name=\"vector\",\n", + " metric_type=\"COSINE\",\n", + " index_type=\"AUTOINDEX\",\n", + ")\n", + "milvus_client.create_collection(\n", + " collection_name=collection_name,\n", + " schema=schema,\n", + " index_params=index_params,\n", + " consistency_level=\"Strong\",\n", + ")\n", + "\n", + "milvus_client.load_collection(collection_name=collection_name)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Load data from Unstructured\n", + "Unstructured provides a flexible and powerful ingestion pipeline to process various file types, including PDF, HTML, and more.\n", + "We will use the ingest functionality to partition PDF files in a local directory. And then load the data into Milvus." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "is_executing": true + } + }, + "outputs": [], + "source": [ + "from unstructured_ingest.v2.pipeline.pipeline import Pipeline\n", + "from unstructured_ingest.v2.interfaces import ProcessorConfig\n", + "from unstructured_ingest.v2.processes.connectors.local import (\n", + " LocalIndexerConfig,\n", + " LocalDownloaderConfig,\n", + " LocalConnectionConfig,\n", + " LocalUploaderConfig,\n", + ")\n", + "from unstructured_ingest.v2.processes.partitioner import PartitionerConfig\n", + "\n", + "directory_with_pdfs = \"./pdf_files\"\n", + "directory_with_results = \"./pdf_processed_outputs\"\n", + "\n", + "Pipeline.from_configs(\n", + " context=ProcessorConfig(),\n", + " indexer_config=LocalIndexerConfig(input_path=directory_with_pdfs),\n", + " downloader_config=LocalDownloaderConfig(),\n", + " source_connection_config=LocalConnectionConfig(),\n", + " partitioner_config=PartitionerConfig(\n", + " partition_by_api=True,\n", + " api_key=os.getenv(\"UNSTRUCTURED_API_KEY\"),\n", + " partition_endpoint=os.getenv(\"UNSTRUCTURED_API_URL\"),\n", + " strategy=\"hi_res\",\n", + " additional_partition_args={\n", + " \"split_pdf_page\": True,\n", + " \"split_pdf_concurrency_level\": 15,\n", + " },\n", + " ),\n", + " uploader_config=LocalUploaderConfig(output_dir=directory_with_results),\n", + ").run()\n", + "\n", + "\n", + "from unstructured.staging.base import elements_from_json\n", + "\n", + "\n", + "def load_processed_files(directory_path):\n", + " elements = []\n", + " for filename in os.listdir(directory_path):\n", + " if filename.endswith(\".json\"):\n", + " file_path = os.path.join(directory_path, filename)\n", + " try:\n", + " elements.extend(elements_from_json(filename=file_path))\n", + " except IOError:\n", + " print(f\"Error: Could not read file {filename}.\")\n", + "\n", + " return elements\n", + "\n", + "\n", + "elements = load_processed_files(directory_with_results)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Insert data into Milvus." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "is_executing": true + } + }, + "outputs": [], + "source": [ + "data = []\n", + "for i, element in enumerate(elements):\n", + " data.append(\n", + " {\n", + " \"id\": i,\n", + " \"vector\": emb_text(element.text),\n", + " \"text\": element.text,\n", + " \"metadata\": element.metadata.to_dict(),\n", + " }\n", + " )\n", + "milvus_client.insert(collection_name=collection_name, data=data)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Retrieve and Generate Response" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Define a function to retrieve relevant documents from Milvus." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "def retrieve_documents(question, top_k=3):\n", + " search_res = milvus_client.search(\n", + " collection_name=collection_name,\n", + " data=[emb_text(question)],\n", + " limit=top_k,\n", + " # search_params={\"metric_type\": \"IP\", \"params\": {}},\n", + " output_fields=[\"text\"],\n", + " )\n", + " return [(res[\"entity\"][\"text\"], res[\"distance\"]) for res in search_res[0]]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Define a function to generate a response using the retrieved documents in the RAG pipeline." + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [], + "source": [ + "def generate_rag_response(question):\n", + " retrieved_docs = retrieve_documents(question)\n", + " context = \"\\n\".join([f\"Text: {doc[0]}\\n\" for doc in retrieved_docs])\n", + " system_prompt = (\n", + " \"You are an AI assistant. Provide answers based on the given context.\"\n", + " )\n", + " user_prompt = f\"\"\"\n", + " Use the following pieces of information to answer the question. If the information is not in the context, say you don't know.\n", + " \n", + " Context:\n", + " {context}\n", + " \n", + " Question: {question}\n", + " \"\"\"\n", + " response = openai_client.chat.completions.create(\n", + " model=\"gpt-4o-mini\",\n", + " messages=[\n", + " {\"role\": \"system\", \"content\": system_prompt},\n", + " {\"role\": \"user\", \"content\": user_prompt},\n", + " ],\n", + " )\n", + " return response.choices[0].message.content" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's test the RAG pipeline with a sample question." + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO: HTTP Request: POST https://api.openai.com/v1/embeddings \"HTTP/1.1 200 OK\"\n", + "INFO: HTTP Request: POST https://api.openai.com/v1/chat/completions \"HTTP/1.1 200 OK\"\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Question: What is the Advanced Search Algorithms in Milvus?\n", + "Answer: The Advanced Search Algorithms in Milvus refer to a wide range of in-memory and on-disk indexing/search algorithms it supports, including IVF, HNSW, DiskANN, and more. These algorithms have been deeply optimized, and Milvus delivers 30%-70% better performance compared to popular implementations like FAISS and HNSWLib.\n" + ] + } + ], + "source": [ + "question = \"What is the Advanced Search Algorithms in Milvus?\"\n", + "answer = generate_rag_response(question)\n", + "print(f\"Question: {question}\")\n", + "print(f\"Answer: {answer}\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "py310", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.13" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} \ No newline at end of file