-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbandit.py
372 lines (302 loc) · 12.3 KB
/
bandit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
import random
from tqdm import tqdm
import time
import math
import scipy.stats as stats
from scipy.stats import beta
class EpsilonGreedyBandit:
def __init__(self, arms, epsilon=0.1):
self.arms = arms
self.epsilon = epsilon
self.q_values = [0.0] * len(arms)
self.counts = [0] * len(arms)
def select_arm(self):
if random.random() < self.epsilon:
return random.randint(0, len(self.arms) - 1)
max_q = max(self.q_values)
candidates = [i for i, q in enumerate(self.q_values) if q == max_q]
return random.choice(candidates)
def update(self, arm_index, reward, **kwargs):
self.counts[arm_index] += 1
n = self.counts[arm_index]
old_q = self.q_values[arm_index]
self.q_values[arm_index] = ((n - 1) * old_q + reward) / n
def __repr__(self):
return f"EpsilonGreedyBandit(arms={self.arms}, epsilon={self.epsilon})"
def report(self):
print("Q-values per arm:")
for arm, q, cnt in zip(self.arms, self.q_values, self.counts):
print(f" num_tasks={arm}: avg_reward={q:.5f}, count={cnt}")
class EpsilonFirstBandit:
def __init__(self, arms, exploration_steps=100, epsilon=0.1):
self.arms = arms
self.exploration_steps = exploration_steps
self.q_values = [0.0] * len(arms)
self.counts = [0] * len(arms)
self.epsilon = epsilon
self.step = 0
def select_arm(self):
if self.step < self.exploration_steps or random.random() < self.epsilon:
return random.randint(0, len(self.arms) - 1)
max_q = max(self.q_values)
candidates = [i for i, q in enumerate(self.q_values) if q == max_q]
self.step += 1
return random.choice(candidates)
def update(self, arm_index, reward, **kwargs):
self.counts[arm_index] += 1
n = self.counts[arm_index]
old_q = self.q_values[arm_index]
self.q_values[arm_index] = ((n - 1) * old_q + reward) / n
def __repr__(self):
return f"EpsilonFirstBandit(arms={self.arms}, exploration_fraction={self.exploration_fraction})"
def report(self):
print("Q-values per arm:")
for arm, q, cnt in zip(self.arms, self.q_values, self.counts):
print(f" num_tasks={arm}: avg_reward={q:.5f}, count={cnt}")
class EpsilonDecreasingBandit:
def __init__(
self, arms, initial_epsilon=1.0, limit_epsilon=0.1, half_decay_steps=100
):
self.arms = arms
self.initial_epsilon = initial_epsilon
self.epsilon = initial_epsilon
self.limit_epsilon = limit_epsilon
self.half_decay_steps = half_decay_steps
self.q_values = [0.0] * len(arms)
self.counts = [0] * len(arms)
self.step = 0
def select_arm(self):
self.step += 1
self.update_epsilon()
if random.random() < self.epsilon:
return random.randint(0, len(self.arms) - 1)
max_q = max(self.q_values)
candidates = [i for i, q in enumerate(self.q_values) if q == max_q]
return random.choice(candidates)
def update(self, arm_index, reward, **kwargs):
self.counts[arm_index] += 1
n = self.counts[arm_index]
old_q = self.q_values[arm_index]
self.q_values[arm_index] = ((n - 1) * old_q + reward) / n
def update_epsilon(self):
self.epsilon = self.limit_epsilon + (
self.initial_epsilon - self.limit_epsilon
) * (0.5 ** (self.step / self.half_decay_steps))
def __repr__(self):
return f"EpsilonDecreasingBandit(arms={self.arms}, initial_epsilon={self.initial_epsilon}, limit_epsilon={self.limit_epsilon}, half_decay_steps={self.half_decay_steps})"
def report(self):
print("Q-values per arm:")
for arm, q, cnt in zip(self.arms, self.q_values, self.counts):
print(f" num_tasks={arm}: avg_reward={q:.5f}, count={cnt}")
class UCB1Bandit:
def __init__(self, arms):
self.arms = arms
self.total_count = 0
self.q_values = [0.0] * len(arms)
self.counts = [0] * len(arms)
def select_arm(self):
for arm_index in range(len(self.arms)):
if self.counts[arm_index] == 0:
return arm_index
ucb_values = [
self.q_values[i]
+ math.sqrt((2 * math.log(self.total_count)) / self.counts[i])
for i in range(len(self.arms))
]
return ucb_values.index(max(ucb_values))
def update(self, arm_index, reward, **kwargs):
if not (0 <= reward <= 1):
raise ValueError("Reward must be in the range [0, 1].")
self.counts[arm_index] += 1
self.total_count += 1
n = self.counts[arm_index]
old_q = self.q_values[arm_index]
self.q_values[arm_index] = ((n - 1) * old_q + reward) / n
def __repr__(self):
return f"UCB1Bandit(arms={self.arms})"
def report(self):
print("Q-values per arm:")
for arm, q, cnt in zip(self.arms, self.q_values, self.counts):
print(f" num_tasks={arm}: avg_reward={q:.5f}, count={cnt}")
class GreedyBanditWithHistory:
def __init__(self, arms, history_length=100):
self.arms = arms
self.history_length = history_length
self.q_values = [0.0] * len(arms)
self.counts = [0] * len(arms)
self.history = [[] for _ in range(len(arms))]
def select_arm(self):
if any(len(history) < self.history_length for history in self.history):
candidates = [
i
for i, history in enumerate(self.history)
if len(history) < self.history_length
]
return random.choice(candidates)
max_q = max(self.q_values)
candidates = [i for i, q in enumerate(self.q_values) if q == max_q]
return random.choice(candidates)
def update(self, arm_index, reward, **kwargs):
if len(self.history[arm_index]) >= self.history_length:
self.history[arm_index].pop(0)
self.history[arm_index].append(reward)
self.counts[arm_index] = len(self.history[arm_index])
self.q_values[arm_index] = sum(self.history[arm_index]) / self.counts[arm_index]
def __repr__(self):
return f"GreedyBanditWithHistory(arms={self.arms}, history_length={self.history_length})"
def report(self):
print("Q-values per arm:")
for arm, q, cnt, history in zip(
self.arms, self.q_values, self.counts, self.history
):
print(
f" num_tasks={arm}: avg_reward={q:.5f}, count={cnt}, history={history}"
)
class WilsonSamplingBandit:
def __init__(
self, arms, z_score=1.96
): # 1.96 corresponds to ~95% confidence interval
self.arms = arms
self.q_values = [0] * len(arms)
self.counts = [0] * len(arms)
self.z_score = z_score
def select_arm(self):
if sum(self.counts) == 0:
return random.choice(range(len(self.arms)))
scores = [
self._wilson_score(i, self.q_values[i]) for i in range(len(self.arms))
]
return scores.index(max(scores))
def update(self, arm_index, reward, **kwargs):
if not (0 <= reward <= 1):
raise ValueError("Reward must be in the range [0, 1].")
self.counts[arm_index] += 1
n = self.counts[arm_index]
old_q = self.q_values[arm_index]
self.q_values[arm_index] = ((n - 1) * old_q + reward) / n
def _wilson_score(self, arm_index, q_val):
n = self.counts[arm_index]
if n == 0:
return 1.0
proportion = q_val
factor = self.z_score**2 / (2 * n)
adj_factor = (
self.z_score
/ 2
* math.sqrt((proportion * (1 - proportion) + self.z_score**2 / (4 * n)) / n)
)
return (proportion + factor - adj_factor) / (1 + self.z_score**2 / n)
def __repr__(self):
return f"WilsonSamplingBandit(arms={self.arms}, z_score={self.z_score})"
def report(self):
print("Success and Failure counts per arm:")
for arm, q, cnt in zip(self.arms, self.q_values, self.counts):
print(f" num_tasks={arm}: avg_reward={q:.5f}, count={cnt}")
class ThompsonSamplingBandit:
def __init__(self, arms):
self.arms = arms
self.alpha = [1] * len(arms) # Number of successes + 1 (beta prior parameter)
self.beta = [1] * len(arms) # Number of failures + 1 (beta prior parameter)
def select_arm(self):
sampled_means = [
random.betavariate(self.alpha[i], self.beta[i])
for i in range(len(self.arms))
]
return sampled_means.index(max(sampled_means))
def update(self, arm_index, reward, success=1, failure=0, **kwargs):
self.alpha[arm_index] += success
self.beta[arm_index] += failure
def __repr__(self):
return f"ThompsonSamplingBandit(arms={self.arms})"
def report(self):
print("Alpha and Beta values per arm (Beta distribution parameters):")
for arm, a, b in zip(self.arms, self.alpha, self.beta):
expected_reward = a / (a + b)
print(
f" num_tasks={arm}: alpha={a}, beta={b}, expected_reward={expected_reward:.5f}"
)
def simulate_fail_fraction(num_tasks):
p = 0.07 + num_tasks / 300
return p + random.uniform(-0.05, 0.05)
def testing_simulation_function(num_tasks):
successful_tasks = num_tasks * (1 - simulate_fail_fraction(num_tasks))
failed_tasks = num_tasks - successful_tasks
return successful_tasks, failed_tasks
def deploy_bandit(
bandit,
fun,
failure_threshold=0.1,
default_wait_time=5,
extra_wait_time=10,
waiting_args=None,
max_steps=500,
verbose=False,
reward_factor=1e-6,
):
if waiting_args is None:
raise ValueError("waiting_args must be provided")
else:
if not (isinstance(waiting_args, tuple) or isinstance(waiting_args, list)):
waiting_args = (waiting_args,)
state = "ALIVE"
last_alive_successes = 0.0
last_arm_index = None
waiting_steps = 0
waiting_time = 0.0
iterator = range(max_steps)
if verbose:
iterator = tqdm(range(max_steps))
for _ in iterator:
if verbose:
bandit.report()
if state == "ALIVE":
current_arm_index = bandit.select_arm()
fun_args = bandit.arms[current_arm_index]
if not (isinstance(fun_args, tuple) or isinstance(fun_args, list)):
fun_args = (fun_args,)
successful_tasks, failed_tasks = fun(*fun_args)
fail_fraction = failed_tasks / (successful_tasks + failed_tasks)
time.sleep(default_wait_time)
waiting_time += default_wait_time
if fail_fraction >= failure_threshold:
last_alive_successes = successful_tasks
last_arm_index = current_arm_index
state = "WAITING"
waiting_steps = 0
else:
reward = successful_tasks / waiting_time * reward_factor
bandit.update(current_arm_index, reward)
waiting_time = 0.0
else:
successful_tasks, failed_tasks = fun(*waiting_args)
fail_fraction = failed_tasks / (successful_tasks + failed_tasks)
waiting_steps += 1
time.sleep(default_wait_time + extra_wait_time)
waiting_time += default_wait_time + extra_wait_time
if fail_fraction < failure_threshold:
reward = last_alive_successes / waiting_time * reward_factor
bandit.update(last_arm_index, reward)
waiting_time = 0.0
state = "ALIVE"
if verbose:
bandit.report()
if __name__ == "__main__":
arms = list(range(10, 210, 10))
print(arms)
# bandit = EpsilonDecreasingBandit(arms=arms, initial_epsilon=1.0, limit_epsilon=0.05, half_decay_steps=200)
# bandit = UCB1Bandit(arms=arms)
# bandit = WilsonSamplingBandit(arms=arms, z_score=1.96)
bandit = ThompsonSamplingBandit(arms=arms)
base_time = 0.1
multiplier = 0.05
deploy_bandit(
bandit,
testing_simulation_function,
failure_threshold=0.1,
default_wait_time=base_time * multiplier,
extra_wait_time=multiplier * base_time * 2,
waiting_args=10,
max_steps=10000,
verbose=True,
reward_factor=1.0,
)