Skip to content

Commit 9575846

Browse files
authored
Merge pull request #1486 from mudler/worker_optimizations
Worker worker_status timer optimizations
2 parents 4da589c + b71811f commit 9575846

File tree

11 files changed

+259
-91
lines changed

11 files changed

+259
-91
lines changed

lib/OpenQA/Utils.pm

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ $VERSION = sprintf "%d.%03d", q$Revision: 1.12 $ =~ /(\d+)/g;
7878
read_test_modules
7979
exists_worker
8080
safe_call
81+
feature_scaling
82+
logistic_map_steps
83+
logistic_map
84+
rand_range
85+
in_range
8186
);
8287

8388
if ($0 =~ /\.t$/) {
@@ -839,5 +844,21 @@ sub safe_call {
839844
return $ret;
840845
}
841846

847+
# Args:
848+
# First is i-th element, Second is maximum element number, Third and Fourth are the range limit (lower and upper)
849+
# $i, $imax, MIN, MAX
850+
sub feature_scaling { $_[2] + ((($_[0] - 1) * ($_[3] - $_[2])) / (($_[1] - 1) || 1)) }
851+
# $r, $xn
852+
sub logistic_map { $_[0] * $_[1] * (1 - $_[1]) }
853+
# $steps, $r, $xn
854+
sub logistic_map_steps {
855+
$_[2] = 0.1 if $_[2] <= 0; # do not let population die. - with this change we get more "chaos"
856+
$_[2] = logistic_map($_[1], $_[2]) for (1 .. $_[0]);
857+
$_[2];
858+
}
859+
sub rand_range { $_[0] + rand($_[1] - $_[0]) }
860+
sub in_range { $_[0] >= $_[1] && $_[0] <= $_[2] ? 1 : 0 }
861+
862+
842863
1;
843864
# vim: set sw=4 et:

lib/OpenQA/WebSockets/Server.pm

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ use OpenQA::Utils qw(log_debug log_warning log_error);
2323
use OpenQA::Schema;
2424
use OpenQA::ServerStartup;
2525
use Data::Dumper;
26-
26+
use Data::Dump 'pp';
2727
use db_profiler;
2828

29+
use constant WORKERS_CHECKER_THRESHOLD => 120;
30+
2931
require Exporter;
3032
our (@ISA, @EXPORT, @EXPORT_OK);
3133

@@ -200,19 +202,7 @@ sub _message {
200202
}
201203

202204
$worker->{last_seen} = time();
203-
if ($json->{type} eq 'ok') {
204-
$ws->tx->send({json => {type => 'ok'}});
205-
# NOTE: Update the worker state from keepalives.
206-
# We could check if the worker is dead before updating seen state
207-
# the downside of it will be that we will have more timewindows
208-
# where the worker is seen as dead.
209-
#
210-
# if ($w and $w->dead()) # It's still one query, at this point let's just update the seen status
211-
# log_debug("Keepalive from worker $worker->{id} received, and worker thought dead. updating the DB");
212-
app->schema->txn_do(sub { my $w = app->schema->resultset("Workers")->find($worker->{id}); $w->seen; })
213-
if $worker && exists $worker->{id};
214-
}
215-
elsif ($json->{type} eq 'accepted') {
205+
if ($json->{type} eq 'accepted') {
216206
my $jobid = $json->{jobid};
217207
log_debug("Worker: $worker->{id} accepted job $jobid");
218208
}
@@ -247,14 +237,18 @@ sub _message {
247237
$worker_status->{$wid} = $json;
248238
log_debug(sprintf('Received from worker "%u" worker_status message "%s"', $wid, Dumper($json)));
249239

250-
# XXX: This would make keepalive useless.
251-
# app->schema->txn_do(
252-
# sub {
253-
# my $w = app->schema->resultset("Workers")->find($wid);
254-
# return unless $w;
255-
# log_debug("Updated worker seen from worker_status");
256-
# $w->seen;
257-
# });
240+
try {
241+
app->schema->txn_do(
242+
sub {
243+
my $w = app->schema->resultset("Workers")->find($wid);
244+
return unless $w;
245+
log_debug("Updated worker seen from worker_status");
246+
$w->seen;
247+
});
248+
}
249+
catch {
250+
log_error("Failed updating worker seen status: $_");
251+
};
258252

259253
my $registered_job_id;
260254
my $registered_job_token;
@@ -266,6 +260,15 @@ sub _message {
266260
if $worker_status->{$wid}->{job}->{id};
267261
};
268262

263+
try {
264+
my $workers_population = app->schema->resultset("Workers")->count();
265+
my $msg = {type => 'info', population => $workers_population};
266+
$ws->tx->send({json => $msg} => sub { log_debug("Sent population to worker: " . pp($msg)) });
267+
}
268+
catch {
269+
log_debug("Could not be able to send population number to worker: $_");
270+
};
271+
269272
try {
270273
# We cover the case where id can be the same, but the token will differ.
271274
die "Do not check" unless ($registered_job_id);
@@ -327,6 +330,7 @@ sub _message {
327330
catch {
328331
log_debug("Failed parsing status message : $_");
329332
};
333+
330334
}
331335
else {
332336
log_error(sprintf('Received unknown message type "%s" from worker %u', $json->{type}, $worker->{id}));
@@ -384,8 +388,7 @@ sub _workers_checker {
384388
try {
385389
$schema->txn_do(
386390
sub {
387-
my $threshold = 40;
388-
my $stale_jobs = _get_stale_worker_jobs($threshold);
391+
my $stale_jobs = _get_stale_worker_jobs(WORKERS_CHECKER_THRESHOLD);
389392
for my $job ($stale_jobs->all) {
390393
next unless _is_job_considered_dead($job);
391394

lib/OpenQA/Worker/Commands.pm

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use OpenQA::Worker::Common;
2222
use OpenQA::Worker::Jobs;
2323
use POSIX ':sys_wait_h';
2424
use OpenQA::Worker::Engines::isotovideo;
25+
use Data::Dump 'pp';
2526

2627
## WEBSOCKET commands
2728
sub websocket_commands {
@@ -35,12 +36,11 @@ sub websocket_commands {
3536
}
3637
}
3738
else {
38-
# requests
39-
my $type = $json->{type};
40-
if (!$type) {
41-
log_warning('Received WS message without type!');
39+
if (!$json->{type}) {
40+
log_warning('Received WS message without type! ' . pp($json));
4241
return;
4342
}
43+
my $type = $json->{type};
4444
my $jobid = $json->{jobid} // '';
4545
my $joburl;
4646
my $host = $ws_to_host->{$tx};
@@ -70,6 +70,11 @@ sub websocket_commands {
7070
log_debug("received command: $type");
7171
stop_job($type);
7272
}
73+
elsif ($type eq 'info') {
74+
$hosts->{$host}{population} = $json->{population} if $json->{population};
75+
log_debug("Population for $host is " . $hosts->{$host}{population});
76+
change_timer("workerstatus-$host", OpenQA::Worker::Common::calculate_status_timer($hosts, $host));
77+
}
7378
elsif ($type eq 'stop_waitforneedle') {
7479
if (backend_running) {
7580
$ua->post("$joburl/isotovideo/stop_waitforneedle");
@@ -129,9 +134,6 @@ sub websocket_commands {
129134
}
130135
}
131136
}
132-
elsif ($type eq 'ok') {
133-
# ignore keepalives, but dont' report as unknown
134-
}
135137
elsif ($type eq 'grab_job') {
136138
state $check_job_running;
137139
state $job_in_progress;

lib/OpenQA/Worker/Common.pm

Lines changed: 74 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ use Carp;
2222
use POSIX 'uname';
2323
use Mojo::URL;
2424
use OpenQA::Client;
25-
use OpenQA::Utils qw(log_error log_debug log_warning log_info);
25+
use OpenQA::Utils qw(log_error log_debug log_warning log_info), qw(feature_scaling rand_range logistic_map_steps);
26+
use Scalar::Util 'looks_like_number';
27+
use Config::IniFiles;
28+
use List::Util 'max';
2629

2730
use base 'Exporter';
2831
our @EXPORT = qw($job $instance $worker_settings $pooldir $nocleanup
@@ -56,6 +59,8 @@ my ($sysname, $hostname, $release, $version, $machine) = POSIX::uname();
5659
use constant {
5760
STATUS_UPDATES_SLOW => 10,
5861
STATUS_UPDATES_FAST => 0.5,
62+
MAX_TIMER => 100, # It should never be more than OpenQA::WebSockets::Server::_workers_checker threshold
63+
MIN_TIMER => 20,
5964
};
6065

6166
# the template noted what architecture are known
@@ -346,29 +351,56 @@ sub send_status {
346351
$tx->send($status_message);
347352
}
348353

354+
sub calculate_status_timer {
355+
my ($hosts, $host) = @_;
356+
my $i = $hosts->{$host}{workerid} ? $hosts->{$host}{workerid} : looks_like_number($instance) ? $instance : 1;
357+
my $imax = $hosts->{$host}{population} ? $hosts->{$host}{population} : 1;
358+
my $scale_factor = $imax;
359+
my $steps = 215;
360+
my $r = 3.81199961;
361+
362+
# my $scale_factor = 4;
363+
# my $scale_factor = (MAX_TIMER - MIN_TIMER)/MIN_TIMER;
364+
# log_debug("I: $i population: $imax scale_factor: $scale_factor");
365+
366+
# XXX: we are using now fixed values, to stick with a
367+
# predictable behavior but random intervals
368+
# seems to work as well.
369+
# my $steps = int(rand_range(2, 120));
370+
# my $r = rand_range(3.20, 3.88);
371+
372+
my $population = feature_scaling($i, $imax, 0, 1);
373+
my $status_timer
374+
= abs(feature_scaling(logistic_map_steps($steps, $r, $population) * $scale_factor, $imax, MIN_TIMER, MAX_TIMER));
375+
$status_timer = $status_timer > MIN_TIMER
376+
&& $status_timer < MAX_TIMER ? $status_timer : $status_timer > MAX_TIMER ? MAX_TIMER : MIN_TIMER;
377+
return int($status_timer);
378+
}
379+
349380
sub call_websocket {
350381
my ($host, $ua_url) = @_;
351382
my $ua = $hosts->{$host}{ua};
383+
my $status_timer = calculate_status_timer($hosts, $host, $instance, $worker_settings);
352384

385+
log_debug("worker_status timer time window: $status_timer");
353386
$ua->websocket(
354387
$ua_url => {'Sec-WebSocket-Extensions' => 'permessage-deflate'} => sub {
355388
my ($ua, $tx) = @_;
356389
if ($tx->is_websocket) {
357-
# keep websocket connection busy
358-
$tx->send({json => {type => 'ok'}}); # Send keepalive immediately
359-
$hosts->{$host}{timers}{keepalive}
360-
= add_timer("keepalive-$host", 10, sub { $tx->send({json => {type => 'ok'}}); });
361-
362-
$hosts->{$host}{timers}{status} = add_timer("workerstatus-$host", 15,
363-
sub { send_status($tx); log_debug("Sending worker status to $host (workerstatus timer)"); });
390+
$hosts->{$host}{timers}{status} = add_timer(
391+
"workerstatus-$host",
392+
$status_timer,
393+
sub {
394+
send_status($tx);
395+
log_debug("Sending worker status to $host (workerstatus timer)");
396+
});
364397

365398
$tx->on(json => \&OpenQA::Worker::Commands::websocket_commands);
366399
$tx->on(
367400
finish => sub {
368401
my (undef, $code, $reason) = @_;
369402
log_debug("Connection turned off from $host - $code : "
370403
. (defined $reason ? $reason : "Not specified"));
371-
remove_timer("keepalive-$host");
372404
remove_timer("workerstatus-$host");
373405

374406
$hosts->{$host}{timers}{setup_websocket}
@@ -520,4 +552,37 @@ sub verify_workerid {
520552
return $hosts->{$host}{workerid};
521553
}
522554

555+
sub read_worker_config {
556+
my ($instance, $host) = @_;
557+
my $worker_dir = $ENV{OPENQA_CONFIG} || '/etc/openqa';
558+
my $cfg = Config::IniFiles->new(-file => $worker_dir . '/workers.ini');
559+
560+
my $sets = {};
561+
for my $section ('global', $instance) {
562+
if ($cfg && $cfg->SectionExists($section)) {
563+
for my $set ($cfg->Parameters($section)) {
564+
$sets->{uc $set} = $cfg->val($section, $set);
565+
}
566+
}
567+
}
568+
# use separate set as we may not want to advertise other host confiuration to the world in job settings
569+
my $host_settings;
570+
$host ||= $sets->{HOST} ||= 'localhost';
571+
delete $sets->{HOST};
572+
my @hosts = split / /, $host;
573+
for my $section (@hosts) {
574+
if ($cfg && $cfg->SectionExists($section)) {
575+
for my $set ($cfg->Parameters($section)) {
576+
$host_settings->{$section}{uc $set} = $cfg->val($section, $set);
577+
}
578+
}
579+
else {
580+
$host_settings->{$section} = {};
581+
}
582+
}
583+
$host_settings->{HOSTS} = \@hosts;
584+
585+
return $sets, $host_settings;
586+
}
587+
523588
1;

script/openqa-websockets

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ BEGIN {
3535
$ENV{MOJO_LISTEN} ||= 'http://localhost:9527/';
3636

3737
# allow up to 20GB - hdd images
38-
$ENV{MOJO_MAX_MESSAGE_SIZE} = 1024 * 1024 * 1024 * 20;
39-
$ENV{MOJO_INACTIVITY_TIMEOUT} = 300;
38+
$ENV{MOJO_MAX_MESSAGE_SIZE} = 1024 * 1024 * 1024 * 20;
4039

4140
use OpenQA::WebSockets;
4241

script/worker

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ BEGIN {
100100

101101
use FindBin;
102102
use lib "$FindBin::Bin/../lib";
103-
use Config::IniFiles;
104103
use Getopt::Long;
105104
Getopt::Long::Configure("no_ignore_case");
106105

@@ -124,43 +123,10 @@ GetOptions(
124123

125124
usage(0) if ($options{help});
126125

127-
sub read_worker_config {
128-
my ($instance, $host) = @_;
129-
my $worker_dir = $ENV{OPENQA_CONFIG} || '/etc/openqa';
130-
my $cfg = Config::IniFiles->new(-file => $worker_dir . '/workers.ini');
131-
132-
my $sets = {};
133-
for my $section ('global', $instance) {
134-
if ($cfg && $cfg->SectionExists($section)) {
135-
for my $set ($cfg->Parameters($section)) {
136-
$sets->{uc $set} = $cfg->val($section, $set);
137-
}
138-
}
139-
}
140-
# use separate set as we may not want to advertise other host confiuration to the world in job settings
141-
my $host_settings;
142-
$host ||= $sets->{HOST} ||= 'localhost';
143-
delete $sets->{HOST};
144-
my @hosts = split / /, $host;
145-
for my $section (@hosts) {
146-
if ($cfg && $cfg->SectionExists($section)) {
147-
for my $set ($cfg->Parameters($section)) {
148-
$host_settings->{$section}{uc $set} = $cfg->val($section, $set);
149-
}
150-
}
151-
else {
152-
$host_settings->{$section} = {};
153-
}
154-
}
155-
$host_settings->{HOSTS} = \@hosts;
156-
157-
return $sets, $host_settings;
158-
}
159-
160126
# count workers from 1 if not set - if tap devices are used worker would try to use tap -1
161127
$options{instance} ||= 1;
162128

163-
my ($worker_settings, $host_settings) = read_worker_config($options{instance}, $options{host});
129+
my ($worker_settings, $host_settings) = OpenQA::Worker::Common::read_worker_config($options{instance}, $options{host});
164130
$worker_settings->{LOG_LEVEL} = 'debug' if $options{verbose};
165131
$OpenQA::Worker::Common::worker_settings = $worker_settings;
166132
# XXX: this should be sent to the scheduler to be included in the worker's table

0 commit comments

Comments
 (0)