-
Notifications
You must be signed in to change notification settings - Fork 4
/
task.c
142 lines (121 loc) · 3.11 KB
/
task.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/* SPDX-License-Identifier: BSD-3-Clause */
#include "ucentral.h"
struct ucentral_task {
int admin;
time_t uuid;
uint32_t id;
int ret;
struct task *task;
struct runqueue_process proc;
struct uloop_timeout delay;
};
static void
runqueue_proc_cb(struct uloop_process *p, int ret)
{
struct runqueue_process *t = container_of(p, struct runqueue_process, proc);
struct ucentral_task *u = container_of(t, struct ucentral_task, proc);
u->ret = ret;
runqueue_task_complete(&t->task);
}
static void task_run_cb(struct runqueue *q, struct runqueue_task *task)
{
struct ucentral_task *t = container_of(task, struct ucentral_task, proc.task);
pid_t pid;
pid = fork();
if (pid < 0)
return;
if (pid) {
runqueue_process_add(q, &t->proc, pid);
t->proc.proc.cb = runqueue_proc_cb;
return;
}
t->task->run(t->uuid, t->id);
free(t);
exit(1);
}
static const struct runqueue_task_type task_type = {
.run = task_run_cb,
.cancel = runqueue_process_cancel_cb,
.kill = runqueue_process_kill_cb,
};
static void
task_delay(struct uloop_timeout *delay)
{
struct ucentral_task *t = container_of(delay, struct ucentral_task, delay);
struct runqueue *r = t->admin ? &adminqueue : &runqueue;
runqueue_task_add(r, &t->proc.task, false);
}
static void
task_complete(struct runqueue *q, struct runqueue_task *task)
{
struct ucentral_task *t = container_of(task, struct ucentral_task, proc.task);
t->task->cancelled = task->cancelled;
t->task->complete(t->task, t->uuid, t->id, t->ret);
if (t->task->periodic) {
t->delay.cb = task_delay;
uloop_timeout_set(&t->delay, t->task->periodic * 1000);
} else {
t->task->t = NULL;
free(t);
}
}
void
task_run(struct task *task, time_t uuid, uint32_t id, int admin)
{
struct ucentral_task *t = calloc(1, sizeof(*t));
struct runqueue *r = admin ? &adminqueue : &runqueue;
t->admin = admin;
t->uuid = uuid;
t->id = id;
t->task = task;
t->proc.task.type = &task_type;
t->proc.task.run_timeout = task->run_time * 1000;
t->proc.task.cancel_type = SIGKILL;
t->proc.task.complete = task_complete;
task->t = t;
if (task->delay) {
t->delay.cb = task_delay;
uloop_timeout_set(&t->delay, task->delay * 1000);
} else {
runqueue_task_add(r, &t->proc.task, false);
}
}
void
task_config(struct task *task, time_t uuid, uint32_t id)
{
struct ucentral_task *t = calloc(1, sizeof(*t));
t->uuid = uuid;
t->id = id;
t->task = task;
t->proc.task.type = &task_type;
t->proc.task.run_timeout = task->run_time * 1000;
t->proc.task.complete = task_complete;
task->t = t;
runqueue_task_add(&applyqueue, &t->proc.task, false);
}
void
task_telemetry(struct task *task, time_t uuid, uint32_t id)
{
struct ucentral_task *t = calloc(1, sizeof(*t));
t->uuid = uuid;
t->id = id;
t->task = task;
t->proc.task.type = &task_type;
t->proc.task.run_timeout = task->run_time * 1000;
t->proc.task.complete = task_complete;
task->t = t;
runqueue_task_add(&telemetryqueue, &t->proc.task, false);
}
void
task_stop(struct task *task)
{
if (!task->t)
return;
task->periodic = 0;
uloop_timeout_cancel(&task->t->delay);
runqueue_task_kill(&task->t->proc.task);
if (task->t) {
free(task->t);
task->t = NULL;
}
}