Skip to content

Commit f6e5d0f

Browse files
RobinTFmarvin7122
authored andcommitted
Prepare recomputation of triple statistics when there have been updates (ad-freiburg#2640)
Implement the functions for recomputing the triple statistics, as preparation for ad-freiburg#2408 (rebuild index) NOTE: This could also be used to recompute the triple statistics stored in the `.meta-data.json` file, either periodically in the background, or when they are requested via `cmd=stats`. That is work for a separate PR
1 parent e1c184d commit f6e5d0f

File tree

6 files changed

+319
-0
lines changed

6 files changed

+319
-0
lines changed

src/index/IndexImpl.cpp

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "util/InputRangeUtils.h"
2828
#include "util/Iterators.h"
2929
#include "util/JoinAlgorithms/JoinAlgorithms.h"
30+
#include "util/ParallelExecutor.h"
3031
#include "util/ProgressBar.h"
3132
#include "util/ThreadSafeQueue.h"
3233
#include "util/Timer.h"
@@ -1787,3 +1788,107 @@ void IndexImpl::setPrefixesForEncodedValues(
17871788
encodedIriManager_ =
17881789
EncodedIriManager{std::move(prefixesWithoutAngleBrackets)};
17891790
}
1791+
1792+
// _____________________________________________________________________________
1793+
void IndexImpl::countDistinct(std::optional<Id>& lastId, size_t& counter,
1794+
const IdTable& table) {
1795+
AD_CORRECTNESS_CHECK(
1796+
!table.empty(), "Empty tables should never be yielded by the lazy scan.");
1797+
auto col = table.getColumn(0);
1798+
counter += ql::ranges::distance(col | ::ranges::views::unique([](Id a, Id b) {
1799+
return a.getBits() == b.getBits();
1800+
}));
1801+
if (lastId == col.front()) {
1802+
// Avoid double counting in case the last id of the previous block is the
1803+
// same as the first id of this block.
1804+
counter--;
1805+
}
1806+
lastId = col.back();
1807+
}
1808+
1809+
namespace {
1810+
// Helper function that returns a packaged task that computes distinct counts
1811+
// over all tables produced by scanning the given permutation. The customAction
1812+
// is invoked for each table to allow for additional computations while
1813+
// scanning.
1814+
std::packaged_task<void()> computeStatistics(
1815+
const LocatedTriplesSharedState& locatedTriplesSharedState, size_t& counter,
1816+
const Permutation& permutation, auto customAction) {
1817+
return std::packaged_task{[&counter, &permutation, &locatedTriplesSharedState,
1818+
customAction = std::move(customAction)]() {
1819+
auto cancellationHandle =
1820+
std::make_shared<ad_utility::SharedCancellationHandle::element_type>();
1821+
ScanSpecification scanSpec{std::nullopt, std::nullopt, std::nullopt};
1822+
auto tables = permutation.lazyScan(
1823+
permutation.getScanSpecAndBlocks(scanSpec, *locatedTriplesSharedState),
1824+
std::nullopt, CompressedRelationReader::ColumnIndicesRef{},
1825+
cancellationHandle, *locatedTriplesSharedState);
1826+
std::optional<Id> lastCol0 = std::nullopt;
1827+
for (const auto& table : tables) {
1828+
std::invoke(customAction, table);
1829+
IndexImpl::countDistinct(lastCol0, counter, table);
1830+
}
1831+
}};
1832+
}
1833+
} // namespace
1834+
1835+
// _____________________________________________________________________________
1836+
nlohmann::json IndexImpl::recomputeStatistics(
1837+
const LocatedTriplesSharedState& locatedTriplesSharedState) const {
1838+
size_t numTriples = 0;
1839+
size_t numTriplesInternal = 0;
1840+
size_t numSubjects = 0;
1841+
size_t numPredicates = 0;
1842+
size_t numPredicatesInternal = 0;
1843+
size_t numObjects = 0;
1844+
uint64_t nextBlankNode = 0;
1845+
1846+
std::vector<std::packaged_task<void()>> tasks;
1847+
1848+
auto getCounterTask = [&locatedTriplesSharedState](
1849+
size_t& counter, const Permutation& permutation,
1850+
auto customAction) {
1851+
return computeStatistics(locatedTriplesSharedState, counter, permutation,
1852+
customAction);
1853+
};
1854+
1855+
tasks.push_back(getCounterTask(
1856+
numPredicates, *pso_,
1857+
[&numTriples, &nextBlankNode](const IdTable& table) {
1858+
numTriples += table.numRows();
1859+
for (auto col : table.getColumns()) {
1860+
for (auto id : col) {
1861+
if (id.getDatatype() == Datatype::BlankNodeIndex) {
1862+
nextBlankNode =
1863+
std::max(nextBlankNode, id.getBlankNodeIndex().get() + 1);
1864+
}
1865+
}
1866+
}
1867+
}));
1868+
1869+
tasks.push_back(getCounterTask(numPredicatesInternal,
1870+
pso_->internalPermutation(),
1871+
[&numTriplesInternal](const IdTable& table) {
1872+
numTriplesInternal += table.numRows();
1873+
}));
1874+
1875+
if (hasAllPermutations()) {
1876+
tasks.push_back(getCounterTask(numSubjects, *spo_, ad_utility::noop));
1877+
tasks.push_back(getCounterTask(numObjects, *osp_, ad_utility::noop));
1878+
}
1879+
ad_utility::runTasksInParallel(std::move(tasks));
1880+
auto configuration = configurationJson_;
1881+
configuration["num-triples"] =
1882+
NumNormalAndInternal{numTriples, numTriplesInternal};
1883+
configuration["num-predicates"] =
1884+
NumNormalAndInternal{numPredicates, numPredicatesInternal};
1885+
if (hasAllPermutations()) {
1886+
// These are unused.
1887+
AD_CORRECTNESS_CHECK(numSubjects_.internal == 0);
1888+
AD_CORRECTNESS_CHECK(numObjects_.internal == 0);
1889+
configuration["num-subjects"] = NumNormalAndInternal{numSubjects, 0};
1890+
configuration["num-objects"] = NumNormalAndInternal{numObjects, 0};
1891+
}
1892+
configuration["num-blank-nodes-total"] = nextBlankNode;
1893+
return configuration;
1894+
}

src/index/IndexImpl.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
#ifndef QLEVER_SRC_INDEX_INDEXIMPL_H
88
#define QLEVER_SRC_INDEX_INDEXIMPL_H
99

10+
#include <gtest/gtest_prod.h>
11+
1012
#include <memory>
1113
#include <optional>
1214
#include <string>
@@ -643,6 +645,7 @@ class IndexImpl {
643645
friend class IndexTest_createFromTsvTest_Test;
644646
friend class IndexTest_createFromOnDiskIndexTest_Test;
645647
friend class CreatePatternsFixture_createPatterns_Test;
648+
FRIEND_TEST(IndexImpl, recomputeStatistics);
646649

647650
bool isLiteral(std::string_view object) const;
648651

@@ -801,6 +804,18 @@ class IndexImpl {
801804

802805
void storeTextScoringParamsInConfiguration(TextScoringMetric scoringMetric,
803806
float b, float k);
807+
808+
// Helper function to count the number of distinct Ids in a sorted IdTable.
809+
// `lastId` is used to keep track of the last seen Id between multiple calls
810+
// for subsequent tables and `counter` is the counter that is incremented.
811+
// This function is only exposed for testing.
812+
static void countDistinct(std::optional<Id>& lastId, size_t& counter,
813+
const IdTable& table);
814+
815+
// Recompute the statistics about the index based on the passed located
816+
// triples shared state.
817+
nlohmann::json recomputeStatistics(
818+
const LocatedTriplesSharedState& locatedTriplesSharedState) const;
804819
};
805820

806821
#endif // QLEVER_SRC_INDEX_INDEXIMPL_H

src/util/ParallelExecutor.h

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright 2026 The QLever Authors, in particular:
2+
//
3+
// 2026 Robin Textor-Falconi <[email protected]>, UFR
4+
//
5+
// UFR = University of Freiburg, Chair of Algorithms and Data Structures
6+
7+
#ifndef QLEVER_SRC_UTIL_PARALLELEXECUTOR_H
8+
#define QLEVER_SRC_UTIL_PARALLELEXECUTOR_H
9+
10+
#include <future>
11+
#include <vector>
12+
13+
#include "util/jthread.h"
14+
15+
namespace ad_utility {
16+
// Run the given tasks in parallel and wait for their completion. This function
17+
// will spawn a new thread for each task. If one of the tasks throws an
18+
// exception, this exception will be rethrown in the main thread. If multiple
19+
// tasks throw exceptions, only the first one will be rethrown.
20+
inline void runTasksInParallel(
21+
std::vector<std::packaged_task<void()>>&& tasks) {
22+
std::vector<std::future<void>> futures;
23+
futures.reserve(tasks.size());
24+
std::vector<JThread> threads;
25+
futures.reserve(tasks.size());
26+
for (auto& task : tasks) {
27+
futures.push_back(task.get_future());
28+
threads.push_back(JThread{std::move(task)});
29+
}
30+
// Wait for completion.
31+
for (auto& future : futures) {
32+
future.get();
33+
}
34+
}
35+
} // namespace ad_utility
36+
37+
#endif // QLEVER_SRC_UTIL_PARALLELEXECUTOR_H

test/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,3 +488,5 @@ addLinkAndDiscoverTest(UnitOfMeasurementTest util)
488488
addLinkAndDiscoverTest(MaterializedViewsTest qlever engine server)
489489

490490
addLinkAndDiscoverTestNoLibs(ConstexprMapTest)
491+
492+
addLinkAndDiscoverTestNoLibs(ParallelExecutorTest)

test/IndexTest.cpp

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,3 +662,81 @@ TEST(IndexTest, getBlankNodeManager) {
662662
const Index& index3 = getQec(kb)->getIndex();
663663
EXPECT_EQ(index3.getBlankNodeManager()->minIndex_, 3);
664664
}
665+
666+
// _____________________________________________________________________________
667+
TEST(IndexImpl, recomputeStatistics) {
668+
std::string turtleInput =
669+
"<x> <label> \"alpha\" . <x> <label> \"A\" . "
670+
"<y> <label> \"Beta\". <z> <label> \"zz\"@en";
671+
auto index = makeTestIndex("recomputeStatistics", std::move(turtleInput));
672+
auto cancellationHandle =
673+
std::make_shared<ad_utility::SharedCancellationHandle::element_type>();
674+
675+
auto& indexImpl = index.getImpl();
676+
// No-op, should return the same stats.
677+
auto result = indexImpl.recomputeStatistics(
678+
index.deltaTriplesManager().getCurrentLocatedTriplesSharedState());
679+
EXPECT_EQ(result, indexImpl.configurationJson_);
680+
681+
// Now, modify the index by adding triples.
682+
Id blankNodeId = Id::makeFromBlankNodeIndex(BlankNodeIndex::make(42));
683+
index.deltaTriplesManager().modify<void>([&cancellationHandle, blankNodeId](
684+
DeltaTriples& deltaTriples) {
685+
LocalVocabEntry zzz{ad_utility::triple_component::Iri::fromIriref("<zzz>")};
686+
LocalVocabEntry literal{
687+
ad_utility::triple_component::Literal::fromStringRepresentation(
688+
"\"test\"@en")};
689+
Id zzzId = Id::makeFromLocalVocabIndex(&zzz);
690+
Id literalId = Id::makeFromLocalVocabIndex(&literal);
691+
// Create duplicate in different graph.
692+
Id x = Id::makeFromVocabIndex(VocabIndex::make(11));
693+
Id label = Id::makeFromVocabIndex(VocabIndex::make(10));
694+
Id alpha = Id::makeFromVocabIndex(VocabIndex::make(1));
695+
deltaTriples.insertTriples(
696+
cancellationHandle, {IdTriple{{x, label, alpha, x}},
697+
IdTriple{{blankNodeId, zzzId, literalId, zzzId}}});
698+
});
699+
700+
for (bool loadAllPermutations : {true, false}) {
701+
using NNAI = Index::NumNormalAndInternal;
702+
703+
// Simulate scenario where not all permutations are loaded.
704+
if (!loadAllPermutations) {
705+
// Overwrite with unloaded permutation.
706+
indexImpl.SPO() = Permutation{Permutation::SPO,
707+
ad_utility::makeUnlimitedAllocator<Id>()};
708+
// Zero out original values.
709+
indexImpl.configurationJson_["num-subjects"] = NNAI(0, 0);
710+
indexImpl.configurationJson_["num-objects"] = NNAI(0, 0);
711+
}
712+
713+
auto newStats = indexImpl.recomputeStatistics(
714+
index.deltaTriplesManager().getCurrentLocatedTriplesSharedState());
715+
EXPECT_NE(newStats, indexImpl.configurationJson_);
716+
EXPECT_EQ(newStats["num-triples"], NNAI(5, 6));
717+
EXPECT_EQ(newStats["num-predicates"], NNAI(2, 4));
718+
if (loadAllPermutations) {
719+
EXPECT_EQ(newStats["num-subjects"], NNAI(4, 0));
720+
EXPECT_EQ(newStats["num-objects"], NNAI(5, 0));
721+
} else {
722+
EXPECT_EQ(newStats["num-subjects"], NNAI(0, 0));
723+
EXPECT_EQ(newStats["num-objects"], NNAI(0, 0));
724+
}
725+
// Blank node ids are remapped, so we cannot predict the exact number.
726+
EXPECT_NE(newStats["num-blank-nodes-total"], 0);
727+
}
728+
}
729+
730+
// _____________________________________________________________________________
731+
TEST(IndexImpl, countDistinct) {
732+
std::vector<IdTable> tables;
733+
tables.push_back(makeIdTableFromVector({{1}, {2}}));
734+
tables.push_back(makeIdTableFromVector({{2}, {2}}));
735+
736+
size_t counter = 0;
737+
std::optional<Id> lastId;
738+
for (const IdTable& table : tables) {
739+
IndexImpl::countDistinct(lastId, counter, table);
740+
}
741+
EXPECT_EQ(counter, 2);
742+
}

test/ParallelExecutorTest.cpp

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright 2026 The QLever Authors, in particular:
2+
//
3+
// 2026 Robin Textor-Falconi <[email protected]>, UFR
4+
//
5+
// UFR = University of Freiburg, Chair of Algorithms and Data Structures
6+
7+
#include <absl/strings/str_cat.h>
8+
#include <gtest/gtest.h>
9+
10+
#include "backports/algorithm.h"
11+
#include "util/GTestHelpers.h"
12+
#include "util/ParallelExecutor.h"
13+
14+
// _____________________________________________________________________________
15+
TEST(ParallelExecutor, noTasks) { ad_utility::runTasksInParallel({}); }
16+
17+
// _____________________________________________________________________________
18+
TEST(ParallelExecutor, singleTask) {
19+
bool executed = false;
20+
std::vector<std::packaged_task<void()>> tasks;
21+
tasks.push_back(std::packaged_task{[&executed]() { executed = true; }});
22+
ad_utility::runTasksInParallel(std::move(tasks));
23+
EXPECT_TRUE(executed);
24+
}
25+
26+
// _____________________________________________________________________________
27+
TEST(ParallelExecutor, multipleTasks) {
28+
constexpr size_t NUM_TASKS = 10;
29+
std::array<bool, NUM_TASKS> executed;
30+
ql::ranges::fill(executed, false);
31+
std::vector<std::packaged_task<void()>> tasks;
32+
for (size_t i = 0; i < NUM_TASKS; ++i) {
33+
tasks.push_back(
34+
std::packaged_task{[&executed, i]() { executed.at(i) = true; }});
35+
}
36+
ad_utility::runTasksInParallel(std::move(tasks));
37+
for (size_t i = 0; i < NUM_TASKS; ++i) {
38+
EXPECT_TRUE(executed.at(i));
39+
}
40+
}
41+
42+
// _____________________________________________________________________________
43+
TEST(ParallelExecutor, multipleTaskWithOneException) {
44+
constexpr size_t NUM_TASKS = 10;
45+
std::array<bool, NUM_TASKS> executed;
46+
ql::ranges::fill(executed, false);
47+
std::vector<std::packaged_task<void()>> tasks;
48+
for (size_t i = 0; i < NUM_TASKS; ++i) {
49+
tasks.push_back(std::packaged_task{[&executed, i]() {
50+
executed.at(i) = true;
51+
if (i == 5) {
52+
throw std::runtime_error("Error");
53+
}
54+
}});
55+
}
56+
EXPECT_THROW(ad_utility::runTasksInParallel(std::move(tasks)),
57+
std::runtime_error);
58+
for (size_t i = 0; i < NUM_TASKS; ++i) {
59+
EXPECT_TRUE(executed.at(i));
60+
}
61+
}
62+
63+
// _____________________________________________________________________________
64+
TEST(ParallelExecutor, multipleTaskWithOnlyExceptions) {
65+
constexpr size_t NUM_TASKS = 10;
66+
std::array<bool, NUM_TASKS> executed;
67+
ql::ranges::fill(executed, false);
68+
std::vector<std::packaged_task<void()>> tasks;
69+
for (size_t i = 0; i < NUM_TASKS; ++i) {
70+
tasks.push_back(std::packaged_task{[&executed, i]() {
71+
executed.at(i) = true;
72+
throw std::runtime_error(absl::StrCat("Error ", i));
73+
}});
74+
}
75+
// Only the first error should be rethrown for simplicity.
76+
AD_EXPECT_THROW_WITH_MESSAGE_AND_TYPE(
77+
ad_utility::runTasksInParallel(std::move(tasks)),
78+
::testing::StrEq("Error 0"), std::runtime_error);
79+
for (size_t i = 0; i < NUM_TASKS; ++i) {
80+
EXPECT_TRUE(executed.at(i));
81+
}
82+
}

0 commit comments

Comments
 (0)