Skip to content

Commit a466dd8

Browse files
committed
Add API e2e test
Signed-off-by: Rajvaibhav Rahane <[email protected]>
1 parent 4d6d326 commit a466dd8

File tree

5 files changed

+181
-29
lines changed

5 files changed

+181
-29
lines changed

e2e/test_core/Dockerfile

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,19 @@ WORKDIR /e2e
44

55
COPY ./remote_vector_index_builder/core /remote_vector_index_builder/core
66

7+
COPY ./remote_vector_index_builder/app /remote_vector_index_builder/app
8+
79
RUN pip install --no-cache-dir --upgrade -r /remote_vector_index_builder/core/requirements.txt
810

11+
RUN pip install --no-cache-dir --upgrade -r /remote_vector_index_builder/app/requirements.txt
12+
913
COPY ./e2e/test_core /e2e/test_core
1014

1115
RUN pip install --no-cache-dir --upgrade -r /e2e/test_core/requirements.txt
1216

1317
ENV PYTHONPATH='${PYTHONPATH}:/tmp/faiss/build/faiss/python:/remote_vector_index_builder:/'
14-
RUN ["python", "test_core/test_imports.py"]
15-
CMD ["python", "test_core/run_e2e.py"]
18+
19+
RUN ["python", "../remote_vector_index_builder/core/test_imports.py"]
20+
RUN ["python", "../remote_vector_index_builder/app/test_imports.py"]
21+
22+
RUN ["python", "test_core/test_imports.py"]

e2e/test_core/docker-compose.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,17 @@ services:
1616
remote-vector-index-builder:
1717
container_name: remote-vector-index-builder
1818
image: remote-vector-index-builder:e2e-latest
19-
pull_policy: never
19+
command: >
20+
bash -c "pwd & fastapi run ../remote_vector_index_builder/app/main.py --port 1025 &
21+
sleep 5 &&
22+
python test_core/run_e2e.py"
2023
environment:
2124
- S3_ENDPOINT_URL=http://localstack-main:4566
2225
- AWS_ACCESS_KEY_ID=test
2326
- AWS_SECRET_ACCESS_KEY=test
2427
- AWS_DEFAULT_REGION=us-east-1
28+
ports:
29+
- "8080:1025"
2530

2631
deploy:
2732
resources:
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
# Copyright OpenSearch Contributors
2+
# SPDX-License-Identifier: Apache-2.0
3+
#
4+
# The OpenSearch Contributors require contributions made to
5+
# this file be licensed under the Apache-2.0 license or a
6+
# compatible open source license.
7+
8+
import logging
9+
import time
10+
from typing import Dict, Any
11+
import requests
12+
from requests.exceptions import HTTPError, ConnectionError, Timeout
13+
14+
from app.models.job import JobStatus
15+
16+
class RemoteVectorAPIClient:
17+
def __init__(self, base_url: str = "http://localhost:1025", timeout: int = 30):
18+
self.base_url = base_url
19+
self.timeout = timeout
20+
21+
22+
def wait_for_job_completion(
23+
self,
24+
job_id: str,
25+
timeout: int = 1200,
26+
interval: int = 10
27+
) -> Dict[str, Any]:
28+
"""Wait for job to complete with timeout"""
29+
start_time = time.time()
30+
attempts = 0
31+
32+
logger = logging.getLogger(__name__)
33+
34+
while True:
35+
if time.time() - start_time > timeout:
36+
raise TimeoutError(
37+
f"Job {job_id} did not complete within {timeout} seconds"
38+
)
39+
40+
try:
41+
attempts += 1
42+
status_response = self.get_job_status(job_id)
43+
44+
task_status = status_response.get("task_status")
45+
46+
if task_status == JobStatus.COMPLETED:
47+
logger.info(f"Job {job_id} completed successfully")
48+
return status_response
49+
elif task_status == JobStatus.FAILED:
50+
raise RuntimeError(
51+
f"Job {job_id} failed: {status_response.get('error_message')}"
52+
)
53+
elif task_status == JobStatus.RUNNING:
54+
logger.debug(
55+
f"Job {job_id} still running (attempt {attempts}), "
56+
f"waiting {interval} seconds..."
57+
)
58+
time.sleep(interval)
59+
else:
60+
raise RuntimeError(f"Unknown job status: {task_status}")
61+
62+
except APIError as e:
63+
if time.time() - start_time > timeout:
64+
raise
65+
logger.warning(
66+
f"Error checking job status (attempt {attempts}): {str(e)}, "
67+
f"retrying in {interval} seconds..."
68+
)
69+
time.sleep(interval)
70+
71+
def get_job_status(self, job_id: str) -> Dict[str, Any]:
72+
"""Get status of a job"""
73+
logger = logging.getLogger(__name__)
74+
try:
75+
response = self._make_request(
76+
method="GET",
77+
endpoint=f"/_status/{job_id}"
78+
)
79+
return response.json()
80+
except APIError:
81+
logger.error(f"Failed to get status for job {job_id}")
82+
raise
83+
84+
def build_index(self, index_build_parameters: Dict[str, Any]) -> str:
85+
"""Create a new index build job"""
86+
logger = logging.getLogger(__name__)
87+
try:
88+
response = self._make_request(
89+
method="POST",
90+
endpoint="/_build",
91+
json=index_build_parameters
92+
)
93+
return response.json()["job_id"]
94+
except APIError:
95+
logger.error("Failed to create index build job")
96+
raise
97+
98+
def _make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
99+
"""Make HTTP request with error handling"""
100+
101+
logger = logging.getLogger(__name__)
102+
url = f"{self.base_url}/{endpoint.lstrip('/')}"
103+
try:
104+
response = requests.request(
105+
method=method,
106+
url=url,
107+
timeout=self.timeout,
108+
**kwargs
109+
)
110+
response.raise_for_status()
111+
return response
112+
except HTTPError as e:
113+
error_detail = None
114+
try:
115+
error_detail = e.response.json()
116+
except:
117+
error_detail = e.response.text
118+
119+
logger.error(
120+
f"HTTP {e.response.status_code} Error: "
121+
f"URL: {url}, "
122+
f"Method: {method}, "
123+
f"Detail: {error_detail}"
124+
)
125+
raise APIError(f"API request failed: {str(e)}") from e
126+
except ConnectionError as e:
127+
logger.error(f"Connection failed to {url}: {str(e)}")
128+
raise APIError("Could not connect to API server") from e
129+
except Timeout as e:
130+
logger.error(f"Request timed out to {url}: {str(e)}")
131+
raise APIError("API request timed out") from e
132+
except Exception as e:
133+
logger.error(f"Unexpected error making request to {url}: {str(e)}")
134+
raise APIError("Unexpected error during API request") from e
135+
136+
class APIError(Exception):
137+
"""Base exception for API errors"""
138+
pass

e2e/test_core/run_e2e.py

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@
99
from core.common.models import IndexBuildParameters
1010
from core.common.models.index_build_parameters import DataType
1111
from core.object_store.types import ObjectStoreType
12+
from e2e.test_core.remote_vector_api_client import RemoteVectorAPIClient
1213
from e2e.test_core.utils.logging_config import configure_logger
1314
from e2e.test_core.vector_dataset_generator import VectorDatasetGenerator
1415
from botocore.exceptions import ClientError
15-
from core.tasks import run_tasks
16-
import os
1716
import logging
1817
import time
1918
import sys
@@ -46,37 +45,40 @@ def run_e2e_index_builder(config_path: str = "test_core/test-datasets.yml"):
4645
dataset_config = dataset_generator.config["datasets"][dataset_name]
4746
s3_config = dataset_generator.config["storage"]["s3"]
4847

49-
index_build_params = IndexBuildParameters(
50-
vector_path=s3_config["paths"]["vectors"].format(
48+
index_build_params = {
49+
"vector_path":s3_config["paths"]["vectors"].format(
5150
dataset_name=dataset_name
5251
),
53-
doc_id_path=s3_config["paths"]["doc_ids"].format(
52+
"doc_id_path":s3_config["paths"]["doc_ids"].format(
5453
dataset_name=dataset_name
5554
),
56-
container_name=bucket,
57-
dimension=dataset_config["dimension"],
58-
doc_count=dataset_config["num_vectors"],
59-
data_type=DataType.FLOAT,
60-
repository_type=ObjectStoreType.S3,
61-
)
62-
logger.info("\nRunning vector index builder workflow...")
63-
object_store_config = {
64-
"retries": s3_config["retries"],
65-
"region": s3_config["region"],
66-
"S3_ENDPOINT_URL": os.environ.get(
67-
"S3_ENDPOINT_URL", "http://localhost:4566"
68-
),
55+
"container_name":bucket,
56+
"dimension":dataset_config["dimension"],
57+
"doc_count":dataset_config["num_vectors"],
58+
"data_type":DataType.FLOAT,
59+
"repository_type":ObjectStoreType.S3,
6960
}
61+
62+
logger.info("\nRunning vector index builder workflow...")
63+
64+
client = RemoteVectorAPIClient(timeout=30)
65+
7066
start_time = time.time()
71-
result = run_tasks(
72-
index_build_params=index_build_params,
73-
object_store_config=object_store_config,
67+
# Submit job
68+
job_id = client.build_index(index_build_params)
69+
logger.info(f"Created job: {job_id}")
70+
71+
# Wait for completion (20 minute timeout)
72+
result = client.wait_for_job_completion(
73+
job_id,
74+
timeout=1200, # 20 minutes
75+
interval=10 # Check every 10 seconds
7476
)
7577
run_tasks_total_time = time.time() - start_time
7678

77-
if result.error:
78-
logger.error(f"Error in workflow: {result.error}")
79-
raise RuntimeError(f"Test failed for dataset {dataset_name}: {result.error}")
79+
if result["task_status"] != "COMPLETED_INDEX_BUILD":
80+
logger.error(f"Error in workflow: {result.get('error_message')}")
81+
raise RuntimeError(f"Job failed: {result.get('error_message')}")
8082

8183
logger.info(f"Successfully processed dataset: {dataset_name}")
8284
metrics = {

e2e/test_core/vector_dataset_generator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from core.object_store.object_store_factory import ObjectStoreFactory
1515
from core.object_store.types import ObjectStoreType
1616
import logging
17-
17+
from tqdm import tqdm
1818

1919
class VectorDatasetGenerator:
2020
"""
@@ -67,7 +67,7 @@ def generate_vectors(self, dataset_name):
6767

6868
vectors_list = []
6969
doc_ids_list = []
70-
for i in range(0, n_vectors, batch_size):
70+
for i in tqdm(range(0, n_vectors, batch_size)):
7171
batch_size_current = min(batch_size, n_vectors - i)
7272

7373
# Generate batch

0 commit comments

Comments
 (0)