Skip to content

Commit 9a89ee0

Browse files
committed
Move all batch evaluation logic to ConstructBatchProcessor
1 parent 1b20b67 commit 9a89ee0

File tree

5 files changed

+137
-331
lines changed

5 files changed

+137
-331
lines changed

src/engine/ConstructBatchProcessor.cpp

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,70 @@ std::optional<std::string> ConstructBatchProcessor::processCurrentRow() {
100100
return std::nullopt;
101101
}
102102

103+
// _____________________________________________________________________________
104+
std::optional<ConstructBatchProcessor::StringTriple>
105+
ConstructBatchProcessor::getStringTriple() {
106+
while (batchStart_ < rowIndicesVec_.size()) {
107+
blueprint_->cancellationHandle_->throwIfCancelled();
108+
109+
loadBatchIfNeeded();
110+
111+
if (auto result = processCurrentBatchAsStringTriple()) {
112+
return result;
113+
}
114+
115+
advanceToNextBatch();
116+
}
117+
118+
return std::nullopt;
119+
}
120+
121+
// _____________________________________________________________________________
122+
std::optional<ConstructBatchProcessor::StringTriple>
123+
ConstructBatchProcessor::processCurrentBatchAsStringTriple() {
124+
while (rowInBatchIdx_ < batchCache_->numRows_) {
125+
if (auto result = processCurrentRowAsStringTriple()) {
126+
return result;
127+
}
128+
advanceToNextRow();
129+
}
130+
return std::nullopt;
131+
}
132+
133+
// _____________________________________________________________________________
134+
std::optional<ConstructBatchProcessor::StringTriple>
135+
ConstructBatchProcessor::processCurrentRowAsStringTriple() {
136+
while (tripleIdx_ < blueprint_->numTemplateTriples()) {
137+
auto subject =
138+
getTermStringPtr(tripleIdx_, 0, *batchCache_, rowInBatchIdx_);
139+
auto predicate =
140+
getTermStringPtr(tripleIdx_, 1, *batchCache_, rowInBatchIdx_);
141+
auto object = getTermStringPtr(tripleIdx_, 2, *batchCache_, rowInBatchIdx_);
142+
143+
++tripleIdx_;
144+
145+
StringTriple triple = instantiateTriple(subject, predicate, object);
146+
if (!triple.isEmpty()) {
147+
return triple;
148+
}
149+
// Triple was UNDEF (incomplete), continue to next triple pattern.
150+
}
151+
152+
return std::nullopt;
153+
}
154+
155+
// _____________________________________________________________________________
156+
ConstructBatchProcessor::StringTriple
157+
ConstructBatchProcessor::instantiateTriple(
158+
const std::shared_ptr<const std::string>& subject,
159+
const std::shared_ptr<const std::string>& predicate,
160+
const std::shared_ptr<const std::string>& object) const {
161+
if (!subject || !predicate || !object) {
162+
return StringTriple{};
163+
}
164+
return StringTriple{*subject, *predicate, *object};
165+
}
166+
103167
// _____________________________________________________________________________
104168
void ConstructBatchProcessor::advanceToNextRow() {
105169
++rowInBatchIdx_;

src/engine/ConstructBatchProcessor.h

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "backports/span.h"
1616
#include "engine/ConstructIdCache.h"
1717
#include "engine/InstantiationBlueprint.h"
18+
#include "engine/QueryExecutionTree.h"
1819
#include "engine/QueryExportTypes.h"
1920
#include "util/http/MediaTypes.h"
2021
#include "util/stream_generator.h"
@@ -27,6 +28,7 @@ class ConstructBatchProcessor
2728
public:
2829
using IdCache = ConstructIdCache;
2930
using IdCacheStatsLogger = ConstructIdCacheStatsLogger;
31+
using StringTriple = QueryExecutionTree::StringTriple;
3032

3133
// Default batch size for processing rows.
3234
static constexpr size_t DEFAULT_BATCH_SIZE = 64;
@@ -46,6 +48,10 @@ class ConstructBatchProcessor
4648
// Returns the next formatted triple, or nullopt when exhausted.
4749
std::optional<std::string> get() override;
4850

51+
// Returns the next triple as StringTriple, or nullopt when exhausted.
52+
// Use either get() OR getStringTriple(), not both on the same instance.
53+
std::optional<StringTriple> getStringTriple();
54+
4955
private:
5056
// Evaluates all `Variables` and `BlankNodes` for a batch of result-table
5157
// rows. Column-oriented access pattern for variables.
@@ -88,6 +94,13 @@ class ConstructBatchProcessor
8894
const std::shared_ptr<const std::string>& predicate,
8995
const std::shared_ptr<const std::string>& object) const;
9096

97+
// Instantiates a single triple as StringTriple. Returns empty StringTriple
98+
// if any component is UNDEF.
99+
StringTriple instantiateTriple(
100+
const std::shared_ptr<const std::string>& subject,
101+
const std::shared_ptr<const std::string>& predicate,
102+
const std::shared_ptr<const std::string>& object) const;
103+
91104
// Load a new batch of rows for evaluation if we don't have one.
92105
void loadBatchIfNeeded();
93106

@@ -97,6 +110,12 @@ class ConstructBatchProcessor
97110
// Process triples for the current row, returning the next formatted triple.
98111
std::optional<std::string> processCurrentRow();
99112

113+
// Process rows in the current batch, returning the next `StringTriple`.
114+
std::optional<StringTriple> processCurrentBatchAsStringTriple();
115+
116+
// Process triples for the current row, returning the next `StringTriple`.
117+
std::optional<StringTriple> processCurrentRowAsStringTriple();
118+
100119
// Advance to next row in batch.
101120
void advanceToNextRow();
102121

@@ -126,4 +145,23 @@ class ConstructBatchProcessor
126145
std::optional<BatchEvaluationCache> batchCache_;
127146
};
128147

148+
// _____________________________________________________________________________
149+
// Adapter to use ConstructBatchProcessor::getStringTriple() with
150+
// InputRangeTypeErased<StringTriple>. This is a thin wrapper that delegates
151+
// to the batch processor's getStringTriple() method.
152+
class StringTripleBatchProcessor : public ad_utility::InputRangeFromGet<
153+
ConstructBatchProcessor::StringTriple> {
154+
public:
155+
explicit StringTripleBatchProcessor(
156+
std::unique_ptr<ConstructBatchProcessor> processor)
157+
: processor_(std::move(processor)) {}
158+
159+
std::optional<ConstructBatchProcessor::StringTriple> get() override {
160+
return processor_->getStringTriple();
161+
}
162+
163+
private:
164+
std::unique_ptr<ConstructBatchProcessor> processor_;
165+
};
166+
129167
#endif // QLEVER_SRC_ENGINE_CONSTRUCTBATCHPROCESSOR_H

0 commit comments

Comments
 (0)