Closed
Description
Is there an existing issue for this?
- I have searched the existing issues
Describe the bug
I have prepared data in minio remote path, and use bulk_import
to load data, I use list_import_jobs
method to get current jobs, it works well when I don't pass parameter collection_name
, however, when I set collection_name
, it can't return correct result.
Expected Behavior
When I use list_import_jobs
with collection_name
it should return result correctly.
Steps/Code To Reproduce behavior
from minio import Minio
from pymilvus.bulk_writer import list_import_jobs
from db.milvus_fixed import bulk_import
bucket_name = "a-bucket"
def create_minio_client() -> Minio:
return Minio(
endpoint='127.0.0.1:9000',
access_key='minioadmin',
secret_key='minioadmin',
secure=False,
)
if __name__ == '__main__':
minio_client = create_minio_client()
minio_path = "/data/da137d38-4ff7-4f5d-b2d4-8debaa3dba18"
db_name = "local_test"
collection_name = "test"
objects = minio_client.list_objects(
bucket_name=bucket_name,
prefix=minio_path,
recursive=True,
)
paths = [obj.object_name for obj in objects]
response = bulk_import(
url="http://localhost:19530",
collection_name=collection_name,
files=[[path] for path in paths],
db_name=db_name,
)
job_id = response.json()["data"]["jobId"]
print("insert to custom db job_id is {}".format(job_id))
response = bulk_import(
url="http://localhost:19530",
collection_name=collection_name,
files=[[path] for path in paths],
db_name="default",
)
job_id = response.json()["data"]["jobId"]
print("insert to default db job_id is {}".format(job_id))
response = list_import_jobs(url="http://127.0.0.1:19530")
data = (response.json())["data"]
print("list jobs: {}".format(data))
response = list_import_jobs(url="http://127.0.0.1:19530", collection_name=collection_name)
data = (response.json())["data"]
print("list jobs with collection name: {}".format(data))
And this is output
insert to custom db job_id is 454799264148166193
insert to default db job_id is 454799264148166197
list jobs: {'records': [{'collectionName': 'test', 'jobId': '454799264148158255', 'progress': 100, 'state': 'Completed'}, {'collectionName': 'test', 'jobId': '454799264148166193', 'progress': 0, 'state': 'Pending'}, {'collectionName': 'test', 'jobId': '454799264148166197', 'progress': 0, 'state': 'Pending'}]}
list jobs with collection name: {'records': [{'collectionName': 'test', 'jobId': '454799264148166197', 'progress': 0, 'state': 'Pending'}]}
You could see that the non default collection job query result is missing when I pass collection_name
.
The bulk_import
is modified to suppport db_name
parameter as PR #2446 does.
## bulkinsert RESTful api wrapper
def bulk_import(
url: str,
collection_name: str,
db_name: str = "default",
files: Optional[List[List[str]]] = None,
object_url: str = "",
cluster_id: str = "",
api_key: str = "",
access_key: str = "",
secret_key: str = "",
**kwargs,
) -> requests.Response:
"""call bulkinsert restful interface to import files
Args:
url (str): url of the server
collection_name (str): name of the target collection
db_name (str): name of database
partition_name (str): name of the target partition
files (list of list of str): The files that contain the data to import.
A sub-list contains a single JSON or Parquet file, or a set of Numpy files.
object_url (str): The URL of the object to import.
This URL should be accessible to the S3-compatible
object storage service, such as AWS S3, GCS, Azure blob storage.
cluster_id (str): id of a milvus instance(for cloud)
api_key (str): API key to authenticate your requests.
access_key (str): access key to access the object storage
secret_key (str): secret key to access the object storage
Returns:
response of the restful interface
"""
request_url = url + "/v2/vectordb/jobs/import/create"
partition_name = kwargs.pop("partition_name", "")
params = {
"collectionName": collection_name,
"partitionName": partition_name,
"files": files,
"objectUrl": object_url,
"clusterId": cluster_id,
"accessKey": access_key,
"secretKey": secret_key,
"dbName": db_name,
}
resp = _post_request(url=request_url, api_key=api_key, params=params, **kwargs)
_handle_response(request_url, resp.json())
return resp
Environment details
- Hardware/Softward conditions
- OS: Windows
- CPU: 13th Gen Intel(R) Core(TM) i7-1365U
- Method of installation: docker-compose, standalone
- Milvus version : 2.4.15
- Milvus configuration :
insdie docker-compose.yaml
services:
etcd:
container_name: milvus-etcd
image: quay.io/coreos/etcd:v3.5.5
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
- ETCD_SNAPSHOT_COUNT=50000
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
healthcheck:
test: ["CMD", "etcdctl", "endpoint", "health"]
interval: 30s
timeout: 20s
retries: 3
minio:
container_name: milvus-minio
image: minio/minio:RELEASE.2023-03-20T20-16-18Z
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
ports:
- "9001:9001"
- "9000:9000"
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
command: minio server /minio_data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
standalone:
container_name: milvus-standalone
image: milvusdb/milvus:v2.4.15
command: ["milvus", "run", "standalone"]
security_opt:
- seccomp:unconfined
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"]
interval: 30s
start_period: 90s
timeout: 20s
retries: 3
ports:
- "19530:19530"
- "9091:9091"
depends_on:
- "etcd"
- "minio"
attu:
container_name: milvus-attu
image: zilliz/attu:v2.4
environment:
MILVUS_URL: standalone:19530
ports:
- "8000:3000"
depends_on:
- "standalone"
networks:
- default
networks:
default:
name: milvus
Anything else?
No response