1 | from random import randint, random |
---|
2 | import numpy |
---|
3 | |
---|
4 | class DE: |
---|
5 | """ |
---|
6 | Differential evolution test implementation |
---|
7 | Implements the Scheme_DE_rand_1 variant |
---|
8 | |
---|
9 | F: float |
---|
10 | weighting factor which determines the magnitude of perturbation |
---|
11 | occurring in each generation. |
---|
12 | |
---|
13 | crossover_rate: float |
---|
14 | In general, 0 < crossover_rate < 1. Usually |
---|
15 | considerably lower than 1. Convergence slows as the value |
---|
16 | increases, but higher crossover rates may be necessary when |
---|
17 | |
---|
18 | func: w = f(p) |
---|
19 | The function to be minimized, of the form w = f(p), where p |
---|
20 | is a vector of length func_dim and w is a scalar |
---|
21 | |
---|
22 | func_dim: int |
---|
23 | The dimension of the objective function |
---|
24 | |
---|
25 | pop: array |
---|
26 | The initial population. This should be an iterable composed of |
---|
27 | Rank-1 numpy arrays. The population size must be at least 4, |
---|
28 | preferably much greater. |
---|
29 | |
---|
30 | l_bound, u_bound: vector |
---|
31 | arrays of size func_dim representing the upper and lower bounds |
---|
32 | for the parameters in the solution vectors |
---|
33 | |
---|
34 | tol: float |
---|
35 | if (max(pop_values) - min(pop_values) <= conv), the population |
---|
36 | has converged and the evolution will stop |
---|
37 | """ |
---|
38 | |
---|
39 | def __init__(self, F, crossover_rate, func, func_dim, pop, l_bound, |
---|
40 | u_bound, tol=1e-7): |
---|
41 | self.pop_size = len(pop) |
---|
42 | self.dimension = func_dim |
---|
43 | self.F_orig = F |
---|
44 | self.F = F |
---|
45 | self.crossover_rate = crossover_rate |
---|
46 | self.func = func |
---|
47 | self.tol = tol |
---|
48 | self.l_bound = l_bound |
---|
49 | self.u_bound = u_bound |
---|
50 | self.population = pop |
---|
51 | self.generations = 0 |
---|
52 | self.pop_values = [self.func(p) for p in self.population] |
---|
53 | self.best_gene,self.best_value = self.get_best_gene() |
---|
54 | self.best_gene_history = [self.best_gene] |
---|
55 | self.best_val_history = [self.best_value] |
---|
56 | self.ncalls = 0 |
---|
57 | |
---|
58 | #////////////////////////////////////////////////// |
---|
59 | def evolve(self, numgens=1000, monitor=None): |
---|
60 | '''Evolve the population for numgens generations, or until it converges. |
---|
61 | Returns the best vector from the run''' |
---|
62 | try: |
---|
63 | import psyco |
---|
64 | psyco.full() |
---|
65 | except ImportError: |
---|
66 | pass |
---|
67 | |
---|
68 | start_gen = self.generations |
---|
69 | for gen in xrange(self.generations + 1, self.generations + numgens + 1): |
---|
70 | candidate_pop = [] |
---|
71 | |
---|
72 | for i in xrange(self.pop_size): |
---|
73 | (r1, r2, r3) = self.select_participants(i) |
---|
74 | |
---|
75 | #perturbation/mutation |
---|
76 | candidate = self.population[r1] + self.F*(self.population[r2] |
---|
77 | - self.population[r3]) |
---|
78 | |
---|
79 | #crossover |
---|
80 | candidate = self.crossover(candidate, i) |
---|
81 | |
---|
82 | #check bound constraints |
---|
83 | if not self.is_within_bounds(candidate): |
---|
84 | candidate = self.get_random_gene() |
---|
85 | |
---|
86 | #test fitness to determine membership in next gen |
---|
87 | candidate_val = self.func(candidate) |
---|
88 | if candidate_val < self.pop_values[i]: |
---|
89 | candidate_pop.append(candidate) |
---|
90 | self.pop_values[i] = float(candidate_val) |
---|
91 | else: |
---|
92 | candidate_pop.append(self.population[i]) |
---|
93 | self.ncalls += self.pop_size |
---|
94 | |
---|
95 | self.population = candidate_pop |
---|
96 | x,fx = self.get_best_gene() |
---|
97 | if fx < self.best_value: |
---|
98 | self.best_gene, self.best_value = x,fx |
---|
99 | if monitor is not None: |
---|
100 | monitor.improvement(x,fx,self.ncalls) |
---|
101 | self.best_val_history.append(self.best_value) |
---|
102 | self.best_gene_history.append(self.best_gene) |
---|
103 | self.generations = gen |
---|
104 | |
---|
105 | if monitor is not None: |
---|
106 | monitor.progress(gen-start_gen,numgens) |
---|
107 | |
---|
108 | if self.is_converged(): |
---|
109 | break |
---|
110 | |
---|
111 | return self.best_gene |
---|
112 | |
---|
113 | |
---|
114 | #//////////////////////////////////////////////// |
---|
115 | def get_random_gene(self): |
---|
116 | '''Generate a random gene within the bounds constraints. |
---|
117 | Used to replace out-of-bounds genes''' |
---|
118 | result = [numpy.random.uniform(low, high) |
---|
119 | for low, high in zip(self.l_bound, self.u_bound)] |
---|
120 | return numpy.array(result) |
---|
121 | |
---|
122 | #//////////////////////////////////////////////// |
---|
123 | def is_within_bounds(self, gene): |
---|
124 | '''Determine whether the gene meets the bounds constraints''' |
---|
125 | return numpy.all( (self.l_bound < gene) & (self.u_bound > gene) ) |
---|
126 | |
---|
127 | #//////////////////////////////////////////////// |
---|
128 | #This appears to be a total failure. I'll leave the code in, |
---|
129 | #but it's not called from the main evolution loop anymore. |
---|
130 | def adjust_F(self): |
---|
131 | '''Adjust F to account for stagnation of the population ''' |
---|
132 | #has the best vector improved this generation? |
---|
133 | idx = len(self.best_val_history) |
---|
134 | if self.best_val_history[idx - 1] == self.best_val_history[idx - 2]: |
---|
135 | self.stagnant_gens += 1 |
---|
136 | else: |
---|
137 | self.stagnant_gens = 0 |
---|
138 | |
---|
139 | #adapt F to account for stagnation |
---|
140 | self.F = min( (self.F_orig + (self.stagnant_gens * 0.01)), 2.0) |
---|
141 | |
---|
142 | #//////////////////////////////////////////////// |
---|
143 | def get_best_gene(self): |
---|
144 | '''find the most fit gene in the current population''' |
---|
145 | #print "pop", self.pop_values |
---|
146 | |
---|
147 | best_index = numpy.argmin(self.pop_values) |
---|
148 | return self.population[best_index],self.pop_values[best_index], |
---|
149 | |
---|
150 | #//////////////////////////////////////////////// |
---|
151 | def select_participants(self, i): |
---|
152 | '''generate r1, r2, and r3 randomly from the range [0, NP-1] |
---|
153 | such that they are distinct values not equal to i''' |
---|
154 | choices = [i] |
---|
155 | for choice in xrange(3): |
---|
156 | while True: |
---|
157 | j = numpy.random.randint(0, self.pop_size-1) |
---|
158 | if j not in choices: |
---|
159 | break |
---|
160 | choices.append(j) |
---|
161 | return choices[1:] |
---|
162 | #//////////////////////////////////////////////// |
---|
163 | def crossover(self, candidate, i): |
---|
164 | '''Perform a crossover between the candidate and the ith member of |
---|
165 | the previous generation.''' |
---|
166 | result = [] |
---|
167 | |
---|
168 | #generate lower bound of crossover (this bound is inclusive) |
---|
169 | low = randint(0, self.dimension-1) |
---|
170 | |
---|
171 | #determine the size of the crossover |
---|
172 | L = 0 |
---|
173 | while random() < self.crossover_rate and L < (self.dimension - low): |
---|
174 | L += 1 |
---|
175 | |
---|
176 | #calculate the upper bound of crossover (this bound is exclusive) |
---|
177 | high = low + L |
---|
178 | |
---|
179 | high = numpy.repeat(high, self.dimension) |
---|
180 | low = numpy.repeat(low, self.dimension) |
---|
181 | seq = numpy.arange(0, self.dimension) |
---|
182 | |
---|
183 | result = numpy.choose( ((seq >= low)&(seq < high)), |
---|
184 | [candidate, self.population[i]] ) |
---|
185 | return result |
---|
186 | |
---|
187 | def is_converged(self): |
---|
188 | '''check for convergence''' |
---|
189 | return max(self.pop_values) - min(self.pop_values) <= self.tol |
---|
190 | |
---|
191 | def diffev(fn, bounds, x0=None, pop_scale=4, crossover_rate=0.8, |
---|
192 | Fscale=1, tolerance=1e-5, maxiter=1000, monitor=None): |
---|
193 | """ |
---|
194 | Run differential evolution, returning x,fx,ncalls |
---|
195 | """ |
---|
196 | lo, hi = numpy.asarray(bounds[0]), numpy.asarray(bounds[1]) |
---|
197 | dim = len(lo) |
---|
198 | pop = gen_pop(dim*pop_scale, lo, hi, dim) |
---|
199 | if x0 is not None: |
---|
200 | pop[0] = numpy.asarray(x0) |
---|
201 | evolver = DE(Fscale, crossover_rate, fn, dim, pop, lo, hi, tolerance) |
---|
202 | evolver.evolve(maxiter, monitor=monitor) |
---|
203 | return evolver.best_gene, evolver.best_value, evolver.ncalls |
---|
204 | |
---|
205 | |
---|
206 | |
---|
207 | |
---|
208 | ######################################################################## |
---|
209 | #end DE def |
---|
210 | ######################################################################## |
---|
211 | def gen_pop(size, l_bound, u_bound, dimension): |
---|
212 | '''generate a random population of vectors within the given bounds. dimension |
---|
213 | indicates the length of the vectors. l_bound and u_bound should be vectors |
---|
214 | of length dimension (any iterable container should work)''' |
---|
215 | result = [] |
---|
216 | for i in xrange(size): |
---|
217 | entry = [] |
---|
218 | for j in xrange(dimension): |
---|
219 | entry.append( ((u_bound[j] - l_bound[j]) * random()) + l_bound[j] ) |
---|
220 | result.append(numpy.array(entry)) |
---|
221 | return result |
---|