-
Notifications
You must be signed in to change notification settings - Fork 3k
[OV JS] Improve Core.importModel implementation: SharedStreamBuffer + variant source #33658
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
c70dfc4
a4a27a1
4930478
93f2987
7877ee7
8f64ade
5f4758e
5863474
47cb1a5
69df8c4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -5,9 +5,13 @@ | |||||
|
|
||||||
| #include <napi.h> | ||||||
|
|
||||||
| #include <istream> | ||||||
| #include <memory> | ||||||
| #include <thread> | ||||||
| #include <variant> | ||||||
|
|
||||||
| #include "openvino/runtime/core.hpp" | ||||||
| #include "openvino/runtime/shared_buffer.hpp" | ||||||
|
|
||||||
| class CoreWrap : public Napi::ObjectWrap<CoreWrap> { | ||||||
| public: | ||||||
|
|
@@ -135,17 +139,32 @@ struct TsfnContextPath { | |||||
| }; | ||||||
|
|
||||||
| struct ImportModelContext { | ||||||
| ImportModelContext(Napi::Env env, ov::Core& core) : deferred(Napi::Promise::Deferred::New(env)), _core{core} {}; | ||||||
| std::thread nativeThread; | ||||||
| // Buffer source: pins JS Buffer, wraps with SharedStreamBuffer (zero-copy) | ||||||
| struct BufferSource { | ||||||
| Napi::ObjectReference buffer_ref; // pins JS Buffer | ||||||
| std::unique_ptr<ov::SharedStreamBuffer> shared_buf; | ||||||
| }; | ||||||
|
|
||||||
| // Tensor source: stores tensor + pins JS object | ||||||
| struct TensorSource { | ||||||
| Napi::ObjectReference tensor_ref; // pins JS TensorWrap | ||||||
| ov::Tensor tensor; | ||||||
| }; | ||||||
|
|
||||||
| using Source = std::variant<std::monostate, BufferSource, TensorSource>; | ||||||
| Source source{std::monostate{}}; | ||||||
|
|
||||||
| ImportModelContext(Napi::Env env, ov::Core& core) : deferred(Napi::Promise::Deferred::New(env)), _core{core} {} | ||||||
|
|
||||||
| std::thread nativeThread; | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| Napi::Promise::Deferred deferred; | ||||||
| Napi::ThreadSafeFunction tsfn; | ||||||
|
|
||||||
| std::stringstream _stream; | ||||||
| std::string _device; | ||||||
| std::map<std::string, ov::Any> _config = {}; | ||||||
| ov::AnyMap _config; | ||||||
| ov::Core& _core; | ||||||
| ov::CompiledModel _compiled_model; | ||||||
| std::string _error_msg; | ||||||
| }; | ||||||
|
|
||||||
| void FinalizerCallbackModel(Napi::Env env, void* finalizeData, TsfnContextModel* context); | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,10 +10,20 @@ | |
| #include "node/include/helper.hpp" | ||
| #include "node/include/model_wrap.hpp" | ||
| #include "node/include/read_model_args.hpp" | ||
| #include "node/include/tensor.hpp" | ||
| #include "node/include/type_validation.hpp" | ||
| #include "openvino/core/model_util.hpp" | ||
| #include "openvino/runtime/shared_buffer.hpp" | ||
| #include "openvino/util/common_util.hpp" | ||
|
|
||
| // Helper for std::visit with multiple lambdas | ||
| template <class... Ts> | ||
| struct overloaded : Ts... { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is helper in OV with this implementation |
||
| using Ts::operator()...; | ||
| }; | ||
| template <class... Ts> | ||
| overloaded(Ts...) -> overloaded<Ts...>; | ||
|
|
||
| void validate_set_property_args(const Napi::CallbackInfo& info) { | ||
| const size_t args_length = info.Length(); | ||
| const bool is_device_specified = info[0].IsString(); | ||
|
|
@@ -315,41 +325,32 @@ Napi::Value CoreWrap::import_model(const Napi::CallbackInfo& info) { | |
| std::vector<std::string> allowed_signatures; | ||
|
|
||
| try { | ||
| // Handle Tensor input | ||
| // Tensor input | ||
| if (ov::js::validate<TensorWrap, Napi::String>(info, allowed_signatures) || | ||
| ov::js::validate<TensorWrap, Napi::String, Napi::Object>(info, allowed_signatures)) { | ||
| const ov::Tensor tensor = cast_to_tensor(info, 0); | ||
| const std::string device_name = info[1].As<Napi::String>().Utf8Value(); | ||
| const std::string device = info[1].ToString(); | ||
| const ov::AnyMap config = info.Length() == 3 ? to_anyMap(info.Env(), info[2]) : ov::AnyMap{}; | ||
|
|
||
| ov::CompiledModel compiled; | ||
| if (info.Length() == 2) { | ||
| compiled = _core.import_model(tensor, device_name); | ||
| } else { | ||
| compiled = _core.import_model(tensor, device_name, to_anyMap(info.Env(), info[2])); | ||
| } | ||
|
|
||
| return CompiledModelWrap::wrap(info.Env(), compiled); | ||
| } | ||
| return CompiledModelWrap::wrap(info.Env(), _core.import_model(tensor, device, config)); | ||
|
|
||
| // Handle Buffer input | ||
| if (ov::js::validate<Napi::Buffer<uint8_t>, Napi::String>(info, allowed_signatures) || | ||
| ov::js::validate<Napi::Buffer<uint8_t>, Napi::String, Napi::Object>(info, allowed_signatures)) { | ||
| // Buffer input (zero-copy with SharedStreamBuffer) | ||
| } else if (ov::js::validate<Napi::Buffer<uint8_t>, Napi::String>(info, allowed_signatures) || | ||
| ov::js::validate<Napi::Buffer<uint8_t>, Napi::String, Napi::Object>(info, allowed_signatures)) { | ||
| const auto& model_data = info[0].As<Napi::Buffer<uint8_t>>(); | ||
| const auto model_stream = std::string(reinterpret_cast<char*>(model_data.Data()), model_data.Length()); | ||
| std::stringstream _stream; | ||
| _stream << model_stream; | ||
|
|
||
| ov::CompiledModel compiled; | ||
| if (info.Length() == 2) { | ||
| compiled = _core.import_model(_stream, std::string(info[1].ToString())); | ||
| } else { | ||
| compiled = _core.import_model(_stream, std::string(info[1].ToString()), to_anyMap(info.Env(), info[2])); | ||
| } | ||
|
|
||
| return CompiledModelWrap::wrap(info.Env(), compiled); | ||
| } | ||
| // Use SharedStreamBuffer to avoid extra copies of data | ||
| ov::SharedStreamBuffer shared_buffer(model_data.Data(), model_data.Length()); | ||
| std::istream stream(&shared_buffer); | ||
|
|
||
| const std::string device = info[1].ToString(); | ||
| const ov::AnyMap config = info.Length() == 3 ? to_anyMap(info.Env(), info[2]) : ov::AnyMap{}; | ||
|
|
||
| OPENVINO_THROW("'importModelSync'", ov::js::get_parameters_error_msg(info, allowed_signatures)); | ||
| return CompiledModelWrap::wrap(info.Env(), _core.import_model(stream, device, config)); | ||
|
|
||
| } else { | ||
| OPENVINO_THROW("'importModelSync'", ov::js::get_parameters_error_msg(info, allowed_signatures)); | ||
| } | ||
|
|
||
| } catch (std::exception& e) { | ||
| reportError(info.Env(), e.what()); | ||
|
|
@@ -364,14 +365,37 @@ void ImportModelFinalizer(Napi::Env env, void* finalizeData, ImportModelContext* | |
|
|
||
| void importModelThread(ImportModelContext* context, std::mutex& mutex) { | ||
| // Imports model without blocking the main thread. | ||
| { | ||
| try { | ||
| const std::lock_guard<std::mutex> lock(mutex); | ||
| context->_compiled_model = context->_core.import_model(context->_stream, context->_device, context->_config); | ||
|
|
||
| context->_compiled_model = | ||
| std::visit(overloaded{ | ||
| [](std::monostate&) -> ov::CompiledModel { | ||
| throw std::runtime_error("ImportModelContext source not initialized"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not OPENVINO_THROW/ASSERT is not used? |
||
| }, | ||
| [&](ImportModelContext::BufferSource& src) -> ov::CompiledModel { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can |
||
| std::istream stream(src.shared_buf.get()); | ||
| return context->_core.import_model(stream, context->_device, context->_config); | ||
| }, | ||
| [&](ImportModelContext::TensorSource& src) -> ov::CompiledModel { | ||
| return context->_core.import_model(src.tensor, context->_device, context->_config); | ||
| }, | ||
| }, | ||
| context->source); | ||
|
|
||
| } catch (const std::exception& e) { | ||
| context->_error_msg = e.what(); | ||
| } catch (...) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @almilosz |
||
| context->_error_msg = "Unknown error in importModel worker thread"; | ||
| } | ||
|
|
||
| // Callback to return to JS the results of core.import_model() | ||
| auto callback = [](Napi::Env env, Napi::Function, ImportModelContext* context) { | ||
| context->deferred.Resolve(cpp_to_js(env, context->_compiled_model)); | ||
| if (!context->_error_msg.empty()) { | ||
| context->deferred.Reject(Napi::Error::New(env, context->_error_msg).Value()); | ||
almilosz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } else { | ||
| context->deferred.Resolve(cpp_to_js(env, context->_compiled_model)); | ||
| } | ||
| }; | ||
|
|
||
| // Addon's main thread will safely invoke the JS callback function on the behalf of the additional thread. | ||
|
|
@@ -383,29 +407,18 @@ Napi::Value CoreWrap::import_model_async(const Napi::CallbackInfo& info) { | |
| const auto& env = info.Env(); | ||
| std::vector<std::string> allowed_signatures; | ||
|
|
||
| try { | ||
| // Validate all supported signatures | ||
| if (ov::js::validate<TensorWrap, Napi::String>(info, allowed_signatures) || | ||
| ov::js::validate<TensorWrap, Napi::String, Napi::Object>(info, allowed_signatures) || | ||
| ov::js::validate<Napi::Buffer<uint8_t>, Napi::String>(info, allowed_signatures) || | ||
| ov::js::validate<Napi::Buffer<uint8_t>, Napi::String, Napi::Object>(info, allowed_signatures)) { | ||
| // Prepare validated data that will be transferred to the new thread. | ||
| auto context_data = new ImportModelContext(env, _core); | ||
|
|
||
| // Handle Tensor input | ||
| if (ov::js::validate_value<TensorWrap>(env, info[0])) { | ||
| const ov::Tensor tensor = cast_to_tensor(info, 0); | ||
| const auto* data_ptr = reinterpret_cast<const char*>(tensor.data()); | ||
| context_data->_stream << std::string(data_ptr, tensor.get_byte_size()); | ||
| } else { | ||
| // Handle Buffer input | ||
| const auto& model_data = info[0].As<Napi::Buffer<uint8_t>>(); | ||
| const auto model_stream = std::string(reinterpret_cast<char*>(model_data.Data()), model_data.Length()); | ||
| context_data->_stream << model_stream; | ||
| } | ||
| // Tensor input | ||
| if (ov::js::validate<TensorWrap, Napi::String>(info, allowed_signatures) || | ||
| ov::js::validate<TensorWrap, Napi::String, Napi::Object>(info, allowed_signatures)) { | ||
| auto* context_data = new ImportModelContext(env, _core); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @almilosz |
||
| try { | ||
| ImportModelContext::TensorSource tensor_src; | ||
| tensor_src.tensor_ref = Napi::Persistent(info[0].ToObject()); | ||
| tensor_src.tensor = cast_to_tensor(info, 0); | ||
|
|
||
| context_data->source = std::move(tensor_src); | ||
| context_data->_device = info[1].ToString(); | ||
| context_data->_config = info.Length() == 3 ? to_anyMap(env, info[2]) : ov::AnyMap(); | ||
| context_data->_config = info.Length() == 3 ? to_anyMap(env, info[2]) : ov::AnyMap{}; | ||
|
|
||
| context_data->tsfn = Napi::ThreadSafeFunction::New(env, | ||
| Napi::Function(), | ||
|
|
@@ -417,15 +430,46 @@ Napi::Value CoreWrap::import_model_async(const Napi::CallbackInfo& info) { | |
| (void*)nullptr); | ||
|
|
||
| context_data->nativeThread = std::thread(importModelThread, context_data, std::ref(_mutex)); | ||
| // Returns a Promise to JS. Method import_model() is performed on additional thread. | ||
| return context_data->deferred.Promise(); | ||
| } else { | ||
| OPENVINO_THROW("'importModel'", ov::js::get_parameters_error_msg(info, allowed_signatures)); | ||
| } catch (...) { | ||
| delete context_data; | ||
| throw; | ||
| } | ||
|
|
||
| } catch (std::exception& e) { | ||
| reportError(info.Env(), e.what()); | ||
| return info.Env().Undefined(); | ||
| // Buffer input (zero-copy with SharedStreamBuffer) | ||
| } else if (ov::js::validate<Napi::Buffer<uint8_t>, Napi::String>(info, allowed_signatures) || | ||
| ov::js::validate<Napi::Buffer<uint8_t>, Napi::String, Napi::Object>(info, allowed_signatures)) { | ||
| auto* context_data = new ImportModelContext(env, _core); | ||
| try { | ||
| auto buf = info[0].As<Napi::Buffer<uint8_t>>(); | ||
|
|
||
| ImportModelContext::BufferSource buf_src; | ||
| buf_src.buffer_ref = Napi::Persistent(buf.ToObject()); | ||
| buf_src.shared_buf = std::make_unique<ov::SharedStreamBuffer>(buf.Data(), buf.Length()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this buffer is unique ptr. The Shared stream buffer is view like class it should be enough to keep |
||
|
|
||
| context_data->source = std::move(buf_src); | ||
| context_data->_device = info[1].ToString(); | ||
| context_data->_config = info.Length() == 3 ? to_anyMap(env, info[2]) : ov::AnyMap{}; | ||
|
|
||
| context_data->tsfn = Napi::ThreadSafeFunction::New(env, | ||
| Napi::Function(), | ||
| "TSFN", | ||
| 0, | ||
| 1, | ||
| context_data, | ||
| ImportModelFinalizer, | ||
| (void*)nullptr); | ||
|
|
||
| context_data->nativeThread = std::thread(importModelThread, context_data, std::ref(_mutex)); | ||
| return context_data->deferred.Promise(); | ||
| } catch (...) { | ||
| delete context_data; | ||
| throw; | ||
| } | ||
|
|
||
| } else { | ||
| reportError(env, "'importModel'" + ov::js::get_parameters_error_msg(info, allowed_signatures)); | ||
| return env.Undefined(); | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should use first variant as default initializer