Skip to content

Commit 44abead

Browse files
committed
Pass the current builder to the async put thread.
Signed-off-by: Ye Cao <[email protected]>
1 parent cdffb5b commit 44abead

File tree

2 files changed

+27
-6
lines changed

2 files changed

+27
-6
lines changed

python/vineyard/core/builder.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ def put(
156156
builder: Optional[BuilderContext] = None,
157157
persist: bool = False,
158158
name: Optional[str] = None,
159+
as_async: bool = False,
159160
**kwargs
160161
):
161162
"""Put python value to vineyard.
@@ -185,16 +186,22 @@ def put(
185186
name: str, optional
186187
If given, the name will be automatically associated with the resulted
187188
object. Note that only take effect when the object is persisted.
189+
as_async: bool, optional
190+
If true, which means the object will be put to vineyard asynchronously.
191+
Thus we need to use the passed builder.
188192
kw:
189193
User-specific argument that will be passed to the builder.
190194
191195
Returns:
192196
ObjectID: The result object id will be returned.
193197
"""
194-
if builder is not None:
198+
if builder is not None and not as_async:
195199
return builder(client, value, **kwargs)
196200

197-
meta = get_current_builders().run(client, value, **kwargs)
201+
if as_async:
202+
meta = builder.run(client, value, **kwargs)
203+
else:
204+
meta = get_current_builders().run(client, value, **kwargs)
198205

199206
# the builders is expected to return an :class:`ObjectMeta`, or an
200207
# :class:`Object` (in the `bytes_builder` and `memoryview` builder).

python/vineyard/core/client.py

+18-4
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
from vineyard._C import VineyardException
4343
from vineyard._C import _connect
4444
from vineyard.core.builder import BuilderContext
45+
from vineyard.core.builder import get_current_builders
4546
from vineyard.core.builder import put
4647
from vineyard.core.resolver import ResolverContext
4748
from vineyard.core.resolver import get
@@ -839,10 +840,11 @@ def _put_internal(
839840
builder: Optional[BuilderContext] = None,
840841
persist: bool = False,
841842
name: Optional[str] = None,
843+
as_async: bool = False,
842844
**kwargs,
843845
):
844846
try:
845-
return put(self, value, builder, persist, name, **kwargs)
847+
return put(self, value, builder, persist, name, as_async, **kwargs)
846848
except NotEnoughMemoryException as exec:
847849
with envvars(
848850
{'VINEYARD_RPC_SKIP_RETRY': '1', 'VINEYARD_IPC_SKIP_RETRY': '1'}
@@ -868,7 +870,7 @@ def _put_internal(
868870
host, port = meta[instance_id]['rpc_endpoint'].split(':')
869871
self._rpc_client = _connect(host, port)
870872
self.compression = previous_compression_state
871-
return put(self, value, builder, persist, name, **kwargs)
873+
return put(self, value, builder, persist, name, as_async, **kwargs)
872874

873875
@_apply_docstring(put)
874876
def put(
@@ -881,16 +883,28 @@ def put(
881883
**kwargs,
882884
):
883885
if as_async:
886+
884887
def _default_callback(future):
885888
try:
886889
result = future.result()
887-
print(f"Successfully put object {result}", flush=True)
890+
if isinstance(result, ObjectID):
891+
print(f"Successfully put object {result}", flush=True)
892+
elif isinstance(result, ObjectMeta):
893+
print(f"Successfully put object {result.id}", flush=True)
888894
except Exception as e:
889895
print(f"Failed to put object: {e}", flush=True)
890896

897+
current_builder = builder or get_current_builders()
898+
891899
thread_pool = self.put_thread_pool
892900
result = thread_pool.submit(
893-
self._put_internal, value, builder, persist, name, **kwargs
901+
self._put_internal,
902+
value,
903+
current_builder,
904+
persist,
905+
name,
906+
as_async=True,
907+
**kwargs,
894908
)
895909
result.add_done_callback(_default_callback)
896910
return result

0 commit comments

Comments
 (0)