Skip to content

Commit

Permalink
code review
Browse files Browse the repository at this point in the history
  • Loading branch information
Qup42 committed Nov 13, 2024
1 parent e0e130d commit 1ab5cc5
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 40 deletions.
77 changes: 51 additions & 26 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Server::Server(unsigned short port, size_t numThreads,
enablePatternTrick_(usePatternTrick),
// The number of server threads currently also is the number of queries
// that can be processed simultaneously.
threadPool_{numThreads} {
queryThreadPool_{numThreads} {
// This also directly triggers the update functions and propagates the
// values of the parameters to the cache.
RuntimeParameters().setOnUpdateAction<"cache-max-num-entries">(
Expand Down Expand Up @@ -398,7 +398,12 @@ Awaitable<void> Server::process(
} else if (auto cmd = checkParameter("cmd", "clear-delta-triples")) {
requireValidAccessToken("clear-delta-triples");
logCommand(cmd, "clear delta triples");

Check warning on line 400 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L399-L400

Added lines #L399 - L400 were not covered by tests
index_.deltaTriplesManager().clear();
// The function requires a SharedCancellationHandle, but the operation is
// not cancellable.
auto handle = std::make_shared<ad_utility::CancellationHandle<>>();
co_await computeInNewThread(
updateThreadPool_, [this] { index_.deltaTriplesManager().clear(); },
handle);
response = createOkResponse("Delta triples have been cleared", request,
MediaType::textPlain);

Check warning on line 408 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L403-L408

Added lines #L403 - L408 were not covered by tests
} else if (auto cmd = checkParameter("cmd", "get-settings")) {
Expand Down Expand Up @@ -854,16 +859,11 @@ Awaitable<void> Server::processQuery(
}

// ____________________________________________________________________________
Awaitable<void> Server::processUpdate(
Awaitable<void> Server::processUpdateImpl(
const ad_utility::url_parser::ParamValueMap& params, const string& update,
ad_utility::Timer& requestTimer,
const ad_utility::httpUtils::HttpRequest auto& request, auto&& send,
TimeLimit timeLimit) {
auto messageSender = createMessageSender(queryHub_, request, update);

auto [cancellationHandle, cancelTimeoutOnDestruction] =
setupCancellationHandle(messageSender.getQueryId(), timeLimit);

ad_utility::Timer& requestTimer, TimeLimit timeLimit, auto& messageSender,
ad_utility::SharedCancellationHandle cancellationHandle,
DeltaTriples& deltaTriples) {
auto [pinSubtrees, pinResult] = determineResultPinning(params);
LOG(INFO) << "Processing the following SPARQL update:"

Check warning on line 868 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L866-L868

Added lines #L866 - L868 were not covered by tests
<< (pinResult ? " [pin result]" : "")
Expand All @@ -882,24 +882,47 @@ Awaitable<void> Server::processUpdate(
"following query was sent instead of an update: ",
plannedQuery.parsedQuery_._originalString));
}

// Update the delta triples.
//
// TODO: This does not yet handle concurrent updates correctly, but it should
// work fine when a new update is sent only after the previous one has
// finished.
auto& deltaTriplesManager = index_.deltaTriplesManager();
deltaTriplesManager.modify(
[this, &plannedQuery, &qet, &cancellationHandle](auto& deltaTriples) {
ExecuteUpdate::executeUpdate(index_, plannedQuery.parsedQuery_, qet,
deltaTriples, cancellationHandle);
});
ExecuteUpdate::executeUpdate(index_, plannedQuery.parsedQuery_, qet,
deltaTriples, cancellationHandle);

Check warning on line 886 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L880-L886

Added lines #L880 - L886 were not covered by tests

LOG(INFO) << "Done processing update"
<< ", total time was " << requestTimer.msecs().count() << " ms"
<< std::endl;
LOG(DEBUG) << "Runtime Info:\n"
<< qet.getRootOperation()->runtimeInfo().toString() << std::endl;
}

Check warning on line 893 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L888-L893

Added lines #L888 - L893 were not covered by tests

// ____________________________________________________________________________
Awaitable<void> Server::processUpdate(
const ad_utility::url_parser::ParamValueMap& params, const string& update,
ad_utility::Timer& requestTimer,
const ad_utility::httpUtils::HttpRequest auto& request, auto&& send,
TimeLimit timeLimit) {
auto messageSender = createMessageSender(queryHub_, request, update);

Check warning on line 901 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L900-L901

Added lines #L900 - L901 were not covered by tests

auto [cancellationHandle, cancelTimeoutOnDestruction] =
setupCancellationHandle(messageSender.getQueryId(), timeLimit);

Check warning on line 904 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L903-L904

Added lines #L903 - L904 were not covered by tests

// Update the delta triples.
co_await computeInNewThread(
updateThreadPool_,
[this, &params, &update, &requestTimer, &timeLimit, &messageSender,
&cancellationHandle] {
index_.deltaTriplesManager().modify(
[this, &params, &update, &requestTimer, &timeLimit, &messageSender,
&cancellationHandle](auto& deltaTriples) {

Check warning on line 913 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L907-L913

Added lines #L907 - L913 were not covered by tests
// TODO<qup42> this must also be possible in a nicer way
auto coro = processUpdateImpl(params, update, requestTimer,
timeLimit, messageSender,
cancellationHandle, deltaTriples);
boost::asio::io_context ioContext;
boost::asio::co_spawn(ioContext, std::move(coro),
boost::asio::detached);
ioContext.run();
});
},
cancellationHandle);

Check warning on line 924 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L915-L924

Added lines #L915 - L924 were not covered by tests

// TODO<qup42> send a proper response
// SPARQL 1.1 Protocol 2.2.4 Successful Responses: "The response body of a
// successful update request is implementation defined."
Expand Down Expand Up @@ -990,7 +1013,8 @@ Awaitable<void> Server::processQueryOrUpdate(

// _____________________________________________________________________________
template <std::invocable Function, typename T>
Awaitable<T> Server::computeInNewThread(Function function,
Awaitable<T> Server::computeInNewThread(net::static_thread_pool& threadPool,
Function function,
SharedCancellationHandle handle) {
// `interruptible` will set the shared state of this promise
// with a function that can be used to cancel the timer.
Expand All @@ -1010,13 +1034,13 @@ Awaitable<T> Server::computeInNewThread(Function function,
// this might still block. However it will make the code check the
// cancellation handle while waiting for a thread in the pool to become ready.
return ad_utility::interruptible(
ad_utility::runFunctionOnExecutor(threadPool_.get_executor(),
ad_utility::runFunctionOnExecutor(threadPool.get_executor(),

Check warning on line 1037 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L1037

Added line #L1037 was not covered by tests
std::move(inner), net::use_awaitable),
std::move(handle), std::move(cancelTimerPromise));
}

// _____________________________________________________________________________
net::awaitable<std::optional<Server::PlannedQuery>> Server::parseAndPlan(
Awaitable<std::optional<Server::PlannedQuery>> Server::parseAndPlan(
const std::string& query, const vector<DatasetClause>& queryDatasets,
QueryExecutionContext& qec, SharedCancellationHandle handle,
TimeLimit timeLimit) {
Expand All @@ -1028,6 +1052,7 @@ net::awaitable<std::optional<Server::PlannedQuery>> Server::parseAndPlan(
// function, because then the conan build fails in a very strange way,
// probably related to issues in GCC's coroutine implementation.
return computeInNewThread(
queryThreadPool_,

Check warning on line 1055 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L1055

Added line #L1055 was not covered by tests
[&query, &qec, enablePatternTrick = enablePatternTrick_,
handle = std::move(handle), timeLimit,
&queryDatasets]() mutable -> std::optional<PlannedQuery> {
Expand Down
16 changes: 12 additions & 4 deletions src/engine/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ class Server {
/// the `WebSocketHandler` created for `HttpServer`.
std::weak_ptr<ad_utility::websocket::QueryHub> queryHub_;

net::static_thread_pool threadPool_;
net::static_thread_pool queryThreadPool_;
net::static_thread_pool updateThreadPool_{1};

/// Executor with a single thread that is used to run timers asynchronously.
net::static_thread_pool timerExecutor_{1};
Expand Down Expand Up @@ -173,12 +174,18 @@ class Server {
const std::string& operation, QueryExecutionContext& qec,
SharedCancellationHandle handle, TimeLimit timeLimit,
const ad_utility::Timer& requestTimer);

// Creates a `MessageSender` for the given operation.
ad_utility::websocket::MessageSender createMessageSender(
const std::weak_ptr<ad_utility::websocket::QueryHub>& queryHub,
const ad_utility::httpUtils::HttpRequest auto& request,
const string& operation);
// Execute an update operation. The function must have exclusive access to the
// DeltaTriples object.
Awaitable<void> processUpdateImpl(
const ad_utility::url_parser::ParamValueMap& params, const string& update,
ad_utility::Timer& requestTimer, TimeLimit timeLimit, auto& messageSender,
ad_utility::SharedCancellationHandle cancellationHandle,
DeltaTriples& deltaTriples);

static json composeErrorResponseJson(
const string& query, const std::string& errorMsg,
Expand All @@ -190,10 +197,11 @@ class Server {
json composeCacheStatsJson() const;

/// Invoke `function` on `threadPool_`, and return an awaitable to wait for
/// it's completion, wrapping the result.
/// its completion, wrapping the result.
template <std::invocable Function,
typename T = std::invoke_result_t<Function>>
Awaitable<T> computeInNewThread(Function function,
Awaitable<T> computeInNewThread(net::static_thread_pool& threadPool,
Function function,
SharedCancellationHandle handle);

/// This method extracts a client-defined query id from the passed HTTP
Expand Down
11 changes: 1 addition & 10 deletions src/index/DeltaTriples.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,7 @@ void DeltaTriplesManager::modify(
}

// _____________________________________________________________________________
void DeltaTriplesManager::clear() {
deltaTriples_.withWriteLock([this](DeltaTriples& deltaTriples) {
deltaTriples.clear();
auto newSnapshot = deltaTriples.getSnapshot();
currentLocatedTriplesSnapshot_.withWriteLock(
[&newSnapshot](auto& currentSnapshot) {
currentSnapshot = std::move(newSnapshot);
});
});
}
void DeltaTriplesManager::clear() { modify(&DeltaTriples::clear); }

Check warning on line 216 in src/index/DeltaTriples.cpp

View check run for this annotation

Codecov / codecov/patch

src/index/DeltaTriples.cpp#L216

Added line #L216 was not covered by tests

// _____________________________________________________________________________
SharedLocatedTriplesSnapshot DeltaTriplesManager::getCurrentSnapshot() const {
Expand Down

0 comments on commit 1ab5cc5

Please sign in to comment.