diff --git a/src/hydra-eval-jobs/hydra-eval-jobs.cc b/src/hydra-eval-jobs/hydra-eval-jobs.cc index b83cae916..793f1c242 100644 --- a/src/hydra-eval-jobs/hydra-eval-jobs.cc +++ b/src/hydra-eval-jobs/hydra-eval-jobs.cc @@ -27,6 +27,8 @@ #include #include +#include + #include void check_pid_status_nonblocking(pid_t check_pid) @@ -160,6 +162,9 @@ static void worker( auto vRoot = state.allocValue(); state.autoCallFunction(autoArgs, vTop, *vRoot); + size_t prev = 0; + auto start = std::chrono::steady_clock::now(); + while (true) { /* Wait for the master to send us a job name. */ writeLine(to.get(), "next"); @@ -234,6 +239,10 @@ static void worker( if (v->type() == nString) job["namedConstituents"].push_back(v->string_view()); } + + auto glob = v->attrs()->get(state.symbols.create("_hydraGlobConstituents")); + bool globConstituents = glob && state.forceBool(*glob->value, glob->pos, "while evaluating the `_hydraGlobConstituents` attribute"); + job["globConstituents"] = globConstituents; } /* Register the derivation as a GC root. !!! This @@ -294,14 +303,229 @@ static void worker( /* If our RSS exceeds the maximum, exit. The master will start a new process. */ + + auto end = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(start - end).count(); struct rusage r; getrusage(RUSAGE_SELF, &r); + size_t delta = (size_t)r.ru_maxrss - prev; // KiB + if (delta > maxMemorySize * 1024 * 0.5 || (duration > 1)) + printError("evaluating job '%s' increased memory usage by %d MiB", attrPath, + (r.ru_maxrss - prev)/1024); + + prev = r.ru_maxrss; if ((size_t) r.ru_maxrss > maxMemorySize * 1024) break; } writeLine(to.get(), "restart"); } +struct DependencyCycle : public std::exception { + std::string a; + std::string b; + std::set remainingAggregates; + + DependencyCycle(const std::string & a, const std::string & b, const std::set & remainingAggregates) : a(a), b(b), remainingAggregates(remainingAggregates) {} + + std::string what() { + return fmt("Dependency cycle: %s <-> %s", a, b); + } +}; + +struct AggregateJob +{ + std::string name; + std::set dependencies; + std::unordered_map brokenJobs; + + bool operator<(const AggregateJob & b) const { return name < b.name; } +}; + +// This is copied from `libutil/topo-sort.hh` in CppNix and slightly modified. +// However, I needed a way to use strings as identifiers to sort, but still be able +// to put AggregateJob objects into this function since I'd rather not +// have to transform back and forth between a list of strings and AggregateJobs +// in resolveNamedConstituents. +std::vector topoSort(std::set items) +{ + std::vector sorted; + std::set visited, parents; + + std::map dictIdentToObject; + for (auto & it : items) { + dictIdentToObject.insert({it.name, it}); + } + + std::function dfsVisit; + + dfsVisit = [&](const std::string & path, const std::string * parent) { + if (parents.count(path)) { + dictIdentToObject.erase(path); + dictIdentToObject.erase(*parent); + std::set remaining; + for (auto & [k, _] : dictIdentToObject) { + remaining.insert(k); + } + throw DependencyCycle(path, *parent, remaining); + } + + if (!visited.insert(path).second) return; + parents.insert(path); + + std::set references = dictIdentToObject[path].dependencies; + + for (auto & i : references) + /* Don't traverse into items that don't exist in our starting set. */ + if (i != path && dictIdentToObject.find(i) != dictIdentToObject.end()) + dfsVisit(i, &path); + + sorted.push_back(dictIdentToObject[path]); + parents.erase(path); + }; + + for (auto & [i, _] : dictIdentToObject) + dfsVisit(i, nullptr); + + return sorted; +} + +static bool insertMatchingConstituents(const std::string & childJobName, + const std::string & jobName, + std::function isBroken, + nlohmann::json & jobs, + std::set & results) +{ + bool expansionFound = false; + for (auto job = jobs.begin(); job != jobs.end(); job++) { + // Never select the job itself as constituent. Trivial way + // to avoid obvious cycles. + if (job.key() == jobName) { + continue; + } + auto jobName = job.key(); + if (fnmatch(childJobName.c_str(), jobName.c_str(), 0) == 0 && !isBroken(jobName, *job)) { + results.insert(jobName); + expansionFound = true; + } + } + + return expansionFound; +} + +static std::vector resolveNamedConstituents(nlohmann::json & jobs) +{ + std::set aggregateJobs; + for (auto i = jobs.begin(); i != jobs.end(); ++i) { + auto jobName = i.key(); + auto & job = i.value(); + + auto named = job.find("namedConstituents"); + if (named != job.end()) { + bool globConstituents = job.value("globConstituents", false); + std::unordered_map brokenJobs; + std::set results; + + auto isBroken = [&brokenJobs, &jobName]( + const std::string & childJobName, nlohmann::json & job) -> bool { + if (job.find("error") != job.end()) { + std::string error = job["error"]; + printError("aggregate job '%s' references broken job '%s': %s", jobName, childJobName, error); + brokenJobs[childJobName] = error; + return true; + } else { + return false; + } + }; + + for (const std::string & childJobName : *named) { + auto childJob = jobs.find(childJobName); + if (childJob == jobs.end()) { + if (!globConstituents) { + printError("aggregate job '%s' references non-existent job '%s'", jobName, childJobName); + brokenJobs[childJobName] = "does not exist"; + } else if (!insertMatchingConstituents(childJobName, jobName, isBroken, jobs, results)) { + warn("aggregate job '%s' references constituent glob pattern '%s' with no matches", jobName, childJobName); + brokenJobs[childJobName] = "constituent glob pattern had no matches"; + } + } else if (!isBroken(childJobName, *childJob)) { + results.insert(childJobName); + } + } + + aggregateJobs.insert(AggregateJob(jobName, results, brokenJobs)); + } + } + + return topoSort(aggregateJobs); +} + +static void rewriteAggregates(nlohmann::json & jobs, + std::vector aggregateJobs, + bool dryRun, + ref store) +{ + for (auto & aggregateJob : aggregateJobs) { + auto & job = jobs.find(aggregateJob.name).value(); + if (dryRun) { + for (auto & childJobName : aggregateJob.dependencies) { + std::string constituentDrvPath = jobs[childJobName]["drvPath"]; + job["constituents"].push_back(constituentDrvPath); + } + } else { + auto drvPath = store->parseStorePath((std::string) job["drvPath"]); + auto drv = store->readDerivation(drvPath); + + for (auto & childJobName : aggregateJob.dependencies) { + auto childDrvPath = store->parseStorePath((std::string) jobs[childJobName]["drvPath"]); + auto childDrv = store->readDerivation(childDrvPath); + job["constituents"].push_back(store->printStorePath(childDrvPath)); + drv.inputDrvs.map[childDrvPath].value = {childDrv.outputs.begin()->first}; + } + + if (aggregateJob.brokenJobs.empty()) { + std::string drvName(drvPath.name()); + assert(hasSuffix(drvName, drvExtension)); + drvName.resize(drvName.size() - drvExtension.size()); + + auto hashModulo = hashDerivationModulo(*store, drv, true); + if (hashModulo.kind != DrvHash::Kind::Regular) continue; + auto h = hashModulo.hashes.find("out"); + if (h == hashModulo.hashes.end()) continue; + auto outPath = store->makeOutputPath("out", h->second, drvName); + drv.env["out"] = store->printStorePath(outPath); + drv.outputs.insert_or_assign("out", DerivationOutput::InputAddressed { .path = outPath }); + auto newDrvPath = store->printStorePath(writeDerivation(*store, drv)); + + debug("rewrote aggregate derivation %s -> %s", store->printStorePath(drvPath), newDrvPath); + + job["drvPath"] = newDrvPath; + job["outputs"]["out"] = store->printStorePath(outPath); + } + } + + job.erase("namedConstituents"); + + /* Register the derivation as a GC root. !!! This + registers roots for jobs that we may have already + done. */ + auto localStore = store.dynamic_pointer_cast(); + if (gcRootsDir != "" && localStore) { + auto drvPath = job["drvPath"].get(); + Path root = gcRootsDir + "/" + std::string(baseNameOf(drvPath)); + if (!pathExists(root)) + localStore->addPermRoot(localStore->parseStorePath(drvPath), root); + } + + if (!aggregateJob.brokenJobs.empty()) { + std::stringstream ss; + for (const auto& [jobName, error] : aggregateJob.brokenJobs) { + ss << jobName << ": " << error << "\n"; + } + job["error"] = ss.str(); + } + } +} + int main(int argc, char * * argv) { /* Prevent undeclared dependencies in the evaluation via @@ -484,101 +708,22 @@ int main(int argc, char * * argv) if (state->exc) std::rethrow_exception(state->exc); - /* For aggregate jobs that have named consistuents + /* For aggregate jobs that have named constituents (i.e. constituents that are a job name rather than a derivation), look up the referenced job and add it to the dependencies of the aggregate derivation. */ auto store = openStore(); - for (auto i = state->jobs.begin(); i != state->jobs.end(); ++i) { - auto jobName = i.key(); - auto & job = i.value(); - - auto named = job.find("namedConstituents"); - if (named == job.end()) continue; - - std::unordered_map brokenJobs; - auto getNonBrokenJobOrRecordError = [&brokenJobs, &jobName, &state]( - const std::string & childJobName) -> std::optional { - auto childJob = state->jobs.find(childJobName); - if (childJob == state->jobs.end()) { - printError("aggregate job '%s' references non-existent job '%s'", jobName, childJobName); - brokenJobs[childJobName] = "does not exist"; - return std::nullopt; - } - if (childJob->find("error") != childJob->end()) { - std::string error = (*childJob)["error"]; - printError("aggregate job '%s' references broken job '%s': %s", jobName, childJobName, error); - brokenJobs[childJobName] = error; - return std::nullopt; - } - return *childJob; - }; - - if (myArgs.dryRun) { - for (std::string jobName2 : *named) { - auto job2 = getNonBrokenJobOrRecordError(jobName2); - if (!job2) { - continue; - } - std::string drvPath2 = (*job2)["drvPath"]; - job["constituents"].push_back(drvPath2); - } - } else { - auto drvPath = store->parseStorePath((std::string) job["drvPath"]); - auto drv = store->readDerivation(drvPath); - - for (std::string jobName2 : *named) { - auto job2 = getNonBrokenJobOrRecordError(jobName2); - if (!job2) { - continue; - } - auto drvPath2 = store->parseStorePath((std::string) (*job2)["drvPath"]); - auto drv2 = store->readDerivation(drvPath2); - job["constituents"].push_back(store->printStorePath(drvPath2)); - drv.inputDrvs.map[drvPath2].value = {drv2.outputs.begin()->first}; - } - - if (brokenJobs.empty()) { - std::string drvName(drvPath.name()); - assert(hasSuffix(drvName, drvExtension)); - drvName.resize(drvName.size() - drvExtension.size()); - - auto hashModulo = hashDerivationModulo(*store, drv, true); - if (hashModulo.kind != DrvHash::Kind::Regular) continue; - auto h = hashModulo.hashes.find("out"); - if (h == hashModulo.hashes.end()) continue; - auto outPath = store->makeOutputPath("out", h->second, drvName); - drv.env["out"] = store->printStorePath(outPath); - drv.outputs.insert_or_assign("out", DerivationOutput::InputAddressed { .path = outPath }); - auto newDrvPath = store->printStorePath(writeDerivation(*store, drv)); - - debug("rewrote aggregate derivation %s -> %s", store->printStorePath(drvPath), newDrvPath); - - job["drvPath"] = newDrvPath; - job["outputs"]["out"] = store->printStorePath(outPath); - } - } - - job.erase("namedConstituents"); - - /* Register the derivation as a GC root. !!! This - registers roots for jobs that we may have already - done. */ - auto localStore = store.dynamic_pointer_cast(); - if (gcRootsDir != "" && localStore) { - auto drvPath = job["drvPath"].get(); - Path root = gcRootsDir + "/" + std::string(baseNameOf(drvPath)); - if (!pathExists(root)) - localStore->addPermRoot(localStore->parseStorePath(drvPath), root); - } - - if (!brokenJobs.empty()) { - std::stringstream ss; - for (const auto& [jobName, error] : brokenJobs) { - ss << jobName << ": " << error << "\n"; - } - job["error"] = ss.str(); + try { + auto namedConstituents = resolveNamedConstituents(state->jobs); + rewriteAggregates(state->jobs, namedConstituents, myArgs.dryRun, store); + } catch (DependencyCycle & e) { + printError("Found dependency cycle between jobs '%s' and '%s'", e.a, e.b); + state->jobs[e.a]["error"] = e.what(); + state->jobs[e.b]["error"] = e.what(); + + for (auto & jobName : e.remainingAggregates) { + state->jobs[jobName]["error"] = "Skipping aggregate because of a dependency cycle"; } } diff --git a/t/evaluator/evaluate-constituents-globbing.t b/t/evaluator/evaluate-constituents-globbing.t new file mode 100644 index 000000000..c4a67f131 --- /dev/null +++ b/t/evaluator/evaluate-constituents-globbing.t @@ -0,0 +1,167 @@ +use strict; +use warnings; +use Setup; +use Test2::V0; +use Hydra::Helper::Exec; +use Data::Dumper; + +my $ctx = test_context(); + +subtest "general glob testing" => sub { + my $jobsetCtx = $ctx->makeJobset( + expression => 'constituents-glob.nix', + ); + my $jobset = $jobsetCtx->{"jobset"}; + + my ($res, $stdout, $stderr) = captureStdoutStderr(60, + ("hydra-eval-jobset", $jobsetCtx->{"project"}->name, $jobset->name) + ); + + subtest "non_match_aggregate failed" => sub { + ok(utf8::decode($stderr), "Stderr output is UTF8-clean"); + like( + $stderr, + qr/warning: aggregate job 'non_match_aggregate' references constituent glob pattern 'tests\.\*' with no matches/, + "The stderr record includes a relevant error message" + ); + + $jobset->discard_changes; # refresh from DB + like( + $jobset->errormsg, + qr/tests\.\*: constituent glob pattern had no matches/, + "The jobset records a relevant error message" + ); + }; + + my $builds = {}; + for my $build ($jobset->builds) { + $builds->{$build->job} = $build; + } + + subtest "basic globbing works" => sub { + ok(defined $builds->{"ok_aggregate"}, "'ok_aggregate' is part of the jobset evaluation"); + my @constituents = $builds->{"ok_aggregate"}->constituents->all; + is(2, scalar @constituents, "'ok_aggregate' has two constituents"); + + my @sortedConstituentNames = sort (map { $_->nixname } @constituents); + + is($sortedConstituentNames[0], "empty-dir-A", "first constituent of 'ok_aggregate' is 'empty-dir-A'"); + is($sortedConstituentNames[1], "empty-dir-B", "second constituent of 'ok_aggregate' is 'empty-dir-B'"); + }; + + subtest "transitivity is OK" => sub { + ok(defined $builds->{"indirect_aggregate"}, "'indirect_aggregate' is part of the jobset evaluation"); + my @constituents = $builds->{"indirect_aggregate"}->constituents->all; + is(1, scalar @constituents, "'indirect_aggregate' has one constituent"); + is($constituents[0]->nixname, "direct_aggregate", "'indirect_aggregate' has 'direct_aggregate' as single constituent"); + }; +}; + +subtest "* selects all except current aggregate" => sub { + my $jobsetCtx = $ctx->makeJobset( + expression => 'constituents-glob-all.nix', + ); + my $jobset = $jobsetCtx->{"jobset"}; + + my ($res, $stdout, $stderr) = captureStdoutStderr(60, + ("hydra-eval-jobset", $jobsetCtx->{"project"}->name, $jobset->name) + ); + + subtest "no eval errors" => sub { + ok(utf8::decode($stderr), "Stderr output is UTF8-clean"); + ok( + $stderr !~ "aggregate job ‘ok_aggregate’ has a constituent .* that doesn't correspond to a Hydra build", + "Catchall wildcard must not select itself as constituent" + ); + + $jobset->discard_changes; # refresh from DB + is( + $jobset->errormsg, + "", + "eval-errors non-empty" + ); + }; + + my $builds = {}; + for my $build ($jobset->builds) { + $builds->{$build->job} = $build; + } + + subtest "two constituents" => sub { + ok(defined $builds->{"ok_aggregate"}, "'ok_aggregate' is part of the jobset evaluation"); + my @constituents = $builds->{"ok_aggregate"}->constituents->all; + is(2, scalar @constituents, "'ok_aggregate' has two constituents"); + + my @sortedConstituentNames = sort (map { $_->nixname } @constituents); + + is($sortedConstituentNames[0], "empty-dir-A", "first constituent of 'ok_aggregate' is 'empty-dir-A'"); + is($sortedConstituentNames[1], "empty-dir-B", "second constituent of 'ok_aggregate' is 'empty-dir-B'"); + }; +}; + +subtest "trivial cycle check" => sub { + my $jobsetCtx = $ctx->makeJobset( + expression => 'constituents-cycle.nix', + ); + my $jobset = $jobsetCtx->{"jobset"}; + + my ($res, $stdout, $stderr) = captureStdoutStderr(60, + ("hydra-eval-jobset", $jobsetCtx->{"project"}->name, $jobset->name) + ); + + ok( + $stderr =~ "Found dependency cycle between jobs 'indirect_aggregate' and 'ok_aggregate'", + "Dependency cycle error is on stderr" + ); + + ok(utf8::decode($stderr), "Stderr output is UTF8-clean"); + + $jobset->discard_changes; # refresh from DB + like( + $jobset->errormsg, + qr/Dependency cycle: indirect_aggregate <-> ok_aggregate/, + "eval-errors non-empty" + ); + + is(0, $jobset->builds->count, "No builds should be scheduled"); +}; + +subtest "cycle check with globbing" => sub { + my $jobsetCtx = $ctx->makeJobset( + expression => 'constituents-cycle-glob.nix', + ); + my $jobset = $jobsetCtx->{"jobset"}; + + my ($res, $stdout, $stderr) = captureStdoutStderr(60, + ("hydra-eval-jobset", $jobsetCtx->{"project"}->name, $jobset->name) + ); + + ok(utf8::decode($stderr), "Stderr output is UTF8-clean"); + + $jobset->discard_changes; # refresh from DB + like( + $jobset->errormsg, + qr/in job ‘packages\.constituentA’:\nDependency cycle: indirect_aggregate <-> packages.constituentA/, + "packages.constituentA error missing" + ); + like( + $jobset->errormsg, + qr/in job ‘indirect_aggregate’:\nDependency cycle: indirect_aggregate <-> packages.constituentA/, + "indirect_aggregate error missing" + ); + like( + $jobset->errormsg, + qr/in job ‘ok_aggregate’:\nSkipping aggregate because of a dependency cycle/, + "skipped aggregate error missing" + ); + + is(1, $jobset->builds->count, "One job is scheduled"); + my $builds = {}; + for my $build ($jobset->builds) { + $builds->{$build->job} = $build; + } + + ok(defined $builds->{"packages.constituentB"}, "'packages.constituentB' is part of the jobset evaluation"); +}; + +done_testing; diff --git a/t/jobs/constituents-cycle-glob.nix b/t/jobs/constituents-cycle-glob.nix new file mode 100644 index 000000000..efc152ced --- /dev/null +++ b/t/jobs/constituents-cycle-glob.nix @@ -0,0 +1,34 @@ +with import ./config.nix; +{ + packages.constituentA = mkDerivation { + name = "empty-dir-A"; + builder = ./empty-dir-builder.sh; + _hydraAggregate = true; + _hydraGlobConstituents = true; + constituents = [ "*_aggregate" ]; + }; + + packages.constituentB = mkDerivation { + name = "empty-dir-B"; + builder = ./empty-dir-builder.sh; + }; + + ok_aggregate = mkDerivation { + name = "direct_aggregate"; + _hydraAggregate = true; + _hydraGlobConstituents = true; + constituents = [ + "packages.*" + ]; + builder = ./empty-dir-builder.sh; + }; + + indirect_aggregate = mkDerivation { + name = "indirect_aggregate"; + _hydraAggregate = true; + constituents = [ + "ok_aggregate" + ]; + builder = ./empty-dir-builder.sh; + }; +} diff --git a/t/jobs/constituents-cycle.nix b/t/jobs/constituents-cycle.nix new file mode 100644 index 000000000..7e086aa19 --- /dev/null +++ b/t/jobs/constituents-cycle.nix @@ -0,0 +1,21 @@ +with import ./config.nix; +{ + ok_aggregate = mkDerivation { + name = "direct_aggregate"; + _hydraAggregate = true; + _hydraGlobConstituents = true; + constituents = [ + "indirect_aggregate" + ]; + builder = ./empty-dir-builder.sh; + }; + + indirect_aggregate = mkDerivation { + name = "indirect_aggregate"; + _hydraAggregate = true; + constituents = [ + "ok_aggregate" + ]; + builder = ./empty-dir-builder.sh; + }; +} diff --git a/t/jobs/constituents-glob-all.nix b/t/jobs/constituents-glob-all.nix new file mode 100644 index 000000000..d671fd70f --- /dev/null +++ b/t/jobs/constituents-glob-all.nix @@ -0,0 +1,22 @@ +with import ./config.nix; +{ + packages.constituentA = mkDerivation { + name = "empty-dir-A"; + builder = ./empty-dir-builder.sh; + }; + + packages.constituentB = mkDerivation { + name = "empty-dir-B"; + builder = ./empty-dir-builder.sh; + }; + + ok_aggregate = mkDerivation { + name = "direct_aggregate"; + _hydraAggregate = true; + _hydraGlobConstituents = true; + constituents = [ + "*" + ]; + builder = ./empty-dir-builder.sh; + }; +} diff --git a/t/jobs/constituents-glob.nix b/t/jobs/constituents-glob.nix new file mode 100644 index 000000000..27cabd668 --- /dev/null +++ b/t/jobs/constituents-glob.nix @@ -0,0 +1,41 @@ +with import ./config.nix; +{ + packages.constituentA = mkDerivation { + name = "empty-dir-A"; + builder = ./empty-dir-builder.sh; + }; + + packages.constituentB = mkDerivation { + name = "empty-dir-B"; + builder = ./empty-dir-builder.sh; + }; + + ok_aggregate = mkDerivation { + name = "direct_aggregate"; + _hydraAggregate = true; + _hydraGlobConstituents = true; + constituents = [ + "packages.*" + ]; + builder = ./empty-dir-builder.sh; + }; + + indirect_aggregate = mkDerivation { + name = "indirect_aggregate"; + _hydraAggregate = true; + constituents = [ + "ok_aggregate" + ]; + builder = ./empty-dir-builder.sh; + }; + + non_match_aggregate = mkDerivation { + name = "mixed_aggregate"; + _hydraAggregate = true; + _hydraGlobConstituents = true; + constituents = [ + "tests.*" + ]; + builder = ./empty-dir-builder.sh; + }; +}