Skip to content

Commit 7ae3053

Browse files
authoredJan 24, 2025
enhance: Add schema update time verification to insert and upsert to use cache (#2551)
enhance: Add schema update time verification to insert and upsert to use cache issue: milvus-io/milvus#39093 --------- Signed-off-by: Xianhui.Lin <xianhui.lin@zilliz.com>
1 parent e97ab91 commit 7ae3053

File tree

7 files changed

+98
-35
lines changed

7 files changed

+98
-35
lines changed
 

‎pymilvus/client/abstract.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ def __init__(self, raw: Any):
182182
self.num_shards = 0
183183
self.num_partitions = 0
184184
self.enable_dynamic_field = False
185-
185+
self.update_timestamp = 0
186186
if self._raw:
187187
self.__pack(self._raw)
188188

@@ -209,7 +209,6 @@ def __pack(self, raw: Any):
209209
# for kv in raw.extra_params:
210210

211211
self.fields = [FieldSchema(f) for f in raw.schema.fields]
212-
213212
self.functions = [FunctionSchema(f) for f in raw.schema.functions]
214213
function_output_field_names = [f for fn in self.functions for f in fn.output_field_names]
215214
for field in self.fields:

‎pymilvus/client/grpc_handler.py

+60-4
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ def __init__(
9494
self._setup_db_interceptor(kwargs.get("db_name"))
9595
self._setup_grpc_channel()
9696
self.callbacks = []
97+
self.schema_cache = {}
9798

9899
def register_state_change_callback(self, callback: Callable):
99100
self.callbacks.append(callback)
@@ -161,6 +162,7 @@ def close(self):
161162
self._channel.close()
162163

163164
def reset_db_name(self, db_name: str):
165+
self.schema_cache.clear()
164166
self._setup_db_interceptor(db_name)
165167
self._setup_grpc_channel()
166168
self._setup_identifier_interceptor(self._user)
@@ -526,10 +528,28 @@ def insert_rows(
526528
collection_name, entities, partition_name, schema, timeout, **kwargs
527529
)
528530
resp = self._stub.Insert(request=request, timeout=timeout)
531+
if resp.status.error_code == common_pb2.SchemaMismatch:
532+
schema = self.update_schema(collection_name, timeout)
533+
request = self._prepare_row_insert_request(
534+
collection_name, entities, partition_name, schema, timeout, **kwargs
535+
)
536+
resp = self._stub.Insert(request=request, timeout=timeout)
529537
check_status(resp.status)
530538
ts_utils.update_collection_ts(collection_name, resp.timestamp)
531539
return MutationResult(resp)
532540

541+
def update_schema(self, collection_name: str, timeout: Optional[float] = None):
542+
self.schema_cache.pop(collection_name, None)
543+
schema = self.describe_collection(collection_name, timeout=timeout)
544+
schema_timestamp = schema.get("update_timestamp", 0)
545+
546+
self.schema_cache[collection_name] = {
547+
"schema": schema,
548+
"schema_timestamp": schema_timestamp,
549+
}
550+
551+
return schema
552+
533553
def _prepare_row_insert_request(
534554
self,
535555
collection_name: str,
@@ -542,9 +562,9 @@ def _prepare_row_insert_request(
542562
if isinstance(entity_rows, dict):
543563
entity_rows = [entity_rows]
544564

545-
if not isinstance(schema, dict):
546-
schema = self.describe_collection(collection_name, timeout=timeout)
547-
565+
schema, schema_timestamp = self._get_schema_from_cache_or_remote(
566+
collection_name, schema, timeout
567+
)
548568
fields_info = schema.get("fields")
549569
enable_dynamic = schema.get("enable_dynamic_field", False)
550570

@@ -554,8 +574,33 @@ def _prepare_row_insert_request(
554574
partition_name,
555575
fields_info,
556576
enable_dynamic=enable_dynamic,
577+
schema_timestamp=schema_timestamp,
557578
)
558579

580+
def _get_schema_from_cache_or_remote(
581+
self, collection_name: str, schema: Optional[dict] = None, timeout: Optional[float] = None
582+
):
583+
"""
584+
checks the cache for the schema. If not found, it fetches it remotely and updates the cache
585+
"""
586+
if collection_name in self.schema_cache:
587+
# Use the cached schema and timestamp
588+
schema = self.schema_cache[collection_name]["schema"]
589+
schema_timestamp = self.schema_cache[collection_name]["schema_timestamp"]
590+
else:
591+
# Fetch the schema remotely if not in cache
592+
if not isinstance(schema, dict):
593+
schema = self.describe_collection(collection_name, timeout=timeout)
594+
schema_timestamp = schema.get("update_timestamp", 0)
595+
596+
# Cache the fetched schema and timestamp
597+
self.schema_cache[collection_name] = {
598+
"schema": schema,
599+
"schema_timestamp": schema_timestamp,
600+
}
601+
602+
return schema, schema_timestamp
603+
559604
def _prepare_batch_insert_request(
560605
self,
561606
collection_name: str,
@@ -723,13 +768,18 @@ def _prepare_row_upsert_request(
723768
if not isinstance(rows, list):
724769
raise ParamError(message="'rows' must be a list, please provide valid row data.")
725770

726-
fields_info, enable_dynamic = self._get_info(collection_name, timeout, **kwargs)
771+
schema, schema_timestamp = self._get_schema_from_cache_or_remote(
772+
collection_name, timeout=timeout
773+
)
774+
fields_info = schema.get("fields")
775+
enable_dynamic = schema.get("enable_dynamic_field", False)
727776
return Prepare.row_upsert_param(
728777
collection_name,
729778
rows,
730779
partition_name,
731780
fields_info,
732781
enable_dynamic=enable_dynamic,
782+
schema_timestamp=schema_timestamp,
733783
)
734784

735785
@retry_on_rpc_failure()
@@ -748,6 +798,12 @@ def upsert_rows(
748798
)
749799
rf = self._stub.Upsert.future(request, timeout=timeout)
750800
response = rf.result()
801+
if response.status.error_code == common_pb2.SchemaMismatch:
802+
schema = self.update_schema(collection_name, timeout)
803+
request = self._prepare_row_insert_request(
804+
collection_name, entities, partition_name, schema, timeout, **kwargs
805+
)
806+
response = self._stub.Insert(request=request, timeout=timeout)
751807
check_status(response.status)
752808
m = MutationResult(response)
753809
ts_utils.update_collection_ts(collection_name, m.timestamp)

‎pymilvus/client/prepare.py

+4
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,7 @@ def row_insert_param(
607607
entities: List,
608608
partition_name: str,
609609
fields_info: Dict,
610+
schema_timestamp: int = 0,
610611
enable_dynamic: bool = False,
611612
):
612613
if not fields_info:
@@ -618,6 +619,7 @@ def row_insert_param(
618619
collection_name=collection_name,
619620
partition_name=p_name,
620621
num_rows=len(entities),
622+
schema_timestamp=schema_timestamp,
621623
)
622624

623625
return cls._parse_row_request(request, fields_info, enable_dynamic, entities)
@@ -630,6 +632,7 @@ def row_upsert_param(
630632
partition_name: str,
631633
fields_info: Any,
632634
enable_dynamic: bool = False,
635+
schema_timestamp: int = 0,
633636
):
634637
if not fields_info:
635638
raise ParamError(message="Missing collection meta to validate entities")
@@ -640,6 +643,7 @@ def row_upsert_param(
640643
collection_name=collection_name,
641644
partition_name=p_name,
642645
num_rows=len(entities),
646+
schema_timestamp=schema_timestamp,
643647
)
644648

645649
return cls._parse_upsert_row_request(request, fields_info, enable_dynamic, entities)

0 commit comments

Comments
 (0)