-
Notifications
You must be signed in to change notification settings - Fork 0
/
deap_gp.py
181 lines (159 loc) · 6.95 KB
/
deap_gp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import operator
import random
import math
import multiprocessing
import signal
import numpy as np
from deap import base, creator, tools, gp
from shapely.geometry import LineString
from gptrajec import transform_2d, transform_3d, eaTrajec
import validation as v
# division operator immune to ZeroDivisionError
def protectedDiv(left, right):
return 1 if math.isclose(right,0) else left/right
# create toolbox instance
toolbox = base.Toolbox()
# create required classes
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMin)
#creator.create("DblIndividual", list, fitness=creator.FitnessMin)
# **makes no sense but params and individual args are switched:
def evalPath_2d(params, individual):
# Transform the tree expression in a callable function
func = toolbox.compile(expr=individual)
# Validate the line (only requirement is non-intersection)
x = params['x']
y = [func(p) for p in x]
line = transform_2d(np.column_stack((x, y)), params['interval'])
line = LineString(line)
if not params['adaptive_mode']:
valid = v.validate_2d(line, params)
if valid:
# Evaluate the fitness (only consider length)
fitness = line.length
else:
if params['delete_invalid']:
fitness = np.nan
else:
# Severely penalise invalid lines
fitness = eval(params['inv_cost'], {}, {"length": line.length})
else:
fitness = v.flexible_validate_2d(line, params) + line.length
return fitness,
def evalPath_3d(params, individual):
x = params['x']
# Transform the tree expression in a callable function...
func = toolbox.compile(expr=individual)
# Validate the line (only requirement is non-intersection)
y = [func(p, 0) for p in x]
z = [func(0, p) for p in x]
line = transform_3d(np.column_stack((x, y, z)), params['interval'])
line_norm = np.linalg.norm(line)
geo_z = line[:,2]
line = LineString(line)
if not params['adaptive_mode']:
# to save some validation time, check path intersection with global min-max:
if any(zc <= params['global_min_z'] for zc in geo_z) or any(zc >= params['global_max_z'] for zc in geo_z):
valid = False
else:
valid = v.validate_3d(line, params)
if valid:
# Evaluate the fitness (only consider length)
# line.length only works 2D
fitness = np.hypot(line.length, np.sum(np.abs(np.diff(geo_z))))
#fitness = line_norm
else:
if params['delete_invalid']:
fitness = np.nan
else:
# Severely penalise invalid lines
fitness = eval(params['inv_cost'], {}, {"length": line.length})
else:
fitness = v.flexible_validate_3d(line, params) + np.hypot(line.length, np.sum(np.abs(np.diff(geo_z))))
#print(f'z= {np.mean(geo_z)}')
return fitness,
# NEW: for multiprocessing
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def main(cfg):
random.seed(cfg.seed)
# initialise the primitive set
if not cfg.enable_3d:
pset = gp.PrimitiveSet("MAIN", 1)
else:
pset = gp.PrimitiveSet("MAIN", 2)
pset.renameArguments(ARG1='z')
pset.renameArguments(ARG0='y')
pset.addPrimitive(operator.add, 2)
pset.addPrimitive(operator.sub, 2)
pset.addPrimitive(operator.mul, 2)
#pset.addPrimitive(protectedDiv, 2)
pset.addPrimitive(operator.neg, 1)
# Doesn't work (overflows): operator.pow, math.exp
# Works: math.tanh (degrades)
# Also conider math.cbrt, math.exp2, math.expm1, math.log, math.sqrt
pset.addPrimitive(math.cos, 1)
pset.addPrimitive(math.sin, 1)
pset.addEphemeralConstant(f"rand101_{cfg.seed}", lambda: random.randint(-1,1))
toolbox.register("compile", gp.compile, pset=pset)
# init multiprocessing pool
pool=None
if cfg.multiproc == 'imap':
pool = multiprocessing.Pool(None, init_worker)
toolbox.register("map", pool.imap)
# dict of args to pass to evaluation functions
eval_args = {'x' : cfg.x,
'barriers' : cfg.barriers,
'global_max_z' : cfg.global_max,
'global_min_z' : cfg.global_min,
'dem' : cfg.dem,
'interval' : cfg.interval,
'validation_3d' : cfg.validation_3d,
'adaptive_mode' : cfg.adaptive_mode,
'inv_cost' : cfg.invalidity_cost,
'int_cost' : cfg.intersection_cost,
'delete_invalid' : cfg.delete_invalid}
# toolbox registrations taking args are done here:
toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=cfg.init_min, max_=cfg.init_max)
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
if cfg.dbl_tourn:
toolbox.register("select", tools.selDoubleTournament, fitness_size=cfg.tournsize, parsimony_size=cfg.parsimony_size, fitness_first=cfg.fitness_first)
else:
toolbox.register("select", tools.selTournament, tournsize=cfg.tournsize)
toolbox.register("mate", gp.cxOnePoint)
toolbox.register("expr_mut", gp.genFull, min_=cfg.init_min, max_=cfg.init_max)
toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)
# decorate with constraints
toolbox.decorate("mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=cfg.max_height))
toolbox.decorate("mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=cfg.max_height))
if cfg.max_length:
toolbox.decorate("mate", gp.staticLimit(key=len, max_value=cfg.max_length))
toolbox.decorate("mutate", gp.staticLimit(key=len, max_value=cfg.max_length))
# initialise initial pop and hof
# here 2d and 3 methods diverge:
if cfg.enable_3d:
pop = toolbox.population(cfg.pop_size)
toolbox.register("evaluate", evalPath_3d, eval_args)
else: # then 2D
pop = toolbox.population(cfg.pop_size) # default 300
toolbox.register("evaluate", evalPath_2d, eval_args)
hof = tools.HallOfFame(cfg.hof_size) # default 1
# initialise stats
stats_fit = tools.Statistics(lambda ind: ind.fitness.values)
stats_size = tools.Statistics(len)
stats_height = tools.Statistics(lambda ind: ind.height)
mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size, height=stats_height)
mstats.register("mean", np.mean)
mstats.register("std", np.std)
mstats.register("min", np.min)
mstats.register("max", np.max)
# and, action!
pop, log, gen_best, durs, msg = eaTrajec(pop, toolbox,
cfg,
stats=mstats,
halloffame=hof,
mp_pool=pool)
return log, hof, pset, gen_best, durs, msg
if __name__ == "__main__":
main()