1 | """ |
---|
2 | This module implements corfunc |
---|
3 | """ |
---|
4 | import warnings |
---|
5 | import numpy as np |
---|
6 | from scipy.optimize import curve_fit |
---|
7 | from scipy.interpolate import interp1d |
---|
8 | from scipy.fftpack import dct |
---|
9 | from scipy.signal import argrelextrema |
---|
10 | from numpy.linalg import lstsq |
---|
11 | from sas.sascalc.dataloader.data_info import Data1D |
---|
12 | from sas.sascalc.corfunc.transform_thread import TransformThread |
---|
13 | |
---|
14 | class CorfuncCalculator(object): |
---|
15 | |
---|
16 | class _Interpolator(object): |
---|
17 | """ |
---|
18 | Interpolates between curve f and curve g over the range start:stop and |
---|
19 | caches the result of the function when it's called |
---|
20 | |
---|
21 | :param f: The first curve to interpolate |
---|
22 | :param g: The second curve to interpolate |
---|
23 | :param start: The value at which to start the interpolation |
---|
24 | :param stop: The value at which to stop the interpolation |
---|
25 | """ |
---|
26 | def __init__(self, f, g, start, stop): |
---|
27 | self.f = f |
---|
28 | self.g = g |
---|
29 | self.start = start |
---|
30 | self.stop = stop |
---|
31 | self._lastx = [] |
---|
32 | self._lasty = [] |
---|
33 | |
---|
34 | def __call__(self, x): |
---|
35 | if self._lastx == [] or x.tolist() != self._lastx.tolist(): |
---|
36 | self._lasty = self._smoothed_function(x) |
---|
37 | self._lastx = x |
---|
38 | return self._lasty |
---|
39 | |
---|
40 | def _smoothed_function(self,x): |
---|
41 | ys = np.zeros(x.shape) |
---|
42 | ys[x <= self.start] = self.f(x[x <= self.start]) |
---|
43 | ys[x >= self.stop] = self.g(x[x >= self.stop]) |
---|
44 | with warnings.catch_warnings(): |
---|
45 | # Ignore divide by zero error |
---|
46 | warnings.simplefilter('ignore') |
---|
47 | h = 1/(1+(x-self.stop)**2/(self.start-x)**2) |
---|
48 | mask = np.logical_and(x > self.start, x < self.stop) |
---|
49 | ys[mask] = h[mask]*self.g(x[mask])+(1-h[mask])*self.f(x[mask]) |
---|
50 | return ys |
---|
51 | |
---|
52 | |
---|
53 | def __init__(self, data=None, lowerq=None, upperq=None, scale=1): |
---|
54 | """ |
---|
55 | Initialize the class. |
---|
56 | |
---|
57 | :param data: Data of the type DataLoader.Data1D |
---|
58 | :param lowerq: The Q value to use as the boundary for |
---|
59 | Guinier extrapolation |
---|
60 | :param upperq: A tuple of the form (lower, upper). |
---|
61 | Values between lower and upper will be used for Porod extrapolation |
---|
62 | :param scale: Scaling factor for I(q) |
---|
63 | """ |
---|
64 | self._data = None |
---|
65 | self.set_data(data, scale) |
---|
66 | self.lowerq = lowerq |
---|
67 | self.upperq = upperq |
---|
68 | self.background = 0 |
---|
69 | self._transform_thread = None |
---|
70 | |
---|
71 | def set_data(self, data, scale=1): |
---|
72 | """ |
---|
73 | Prepares the data for analysis |
---|
74 | |
---|
75 | :return: new_data = data * scale - background |
---|
76 | """ |
---|
77 | if data is None: |
---|
78 | return |
---|
79 | # Only process data of the class Data1D |
---|
80 | if not issubclass(data.__class__, Data1D): |
---|
81 | raise ValueError, "Data must be of the type DataLoader.Data1D" |
---|
82 | |
---|
83 | # Prepare the data |
---|
84 | new_data = Data1D(x=data.x, y=data.y) |
---|
85 | new_data *= scale |
---|
86 | |
---|
87 | # Ensure the errors are set correctly |
---|
88 | if new_data.dy is None or len(new_data.x) != len(new_data.dy) or \ |
---|
89 | (min(new_data.dy) == 0 and max(new_data.dy) == 0): |
---|
90 | new_data.dy = np.ones(len(new_data.x)) |
---|
91 | |
---|
92 | self._data = new_data |
---|
93 | |
---|
94 | def compute_background(self, upperq=None): |
---|
95 | """ |
---|
96 | Compute the background level from the Porod region of the data |
---|
97 | """ |
---|
98 | if self._data is None: return 0 |
---|
99 | elif upperq is None and self.upperq is not None: upperq = self.upperq |
---|
100 | elif upperq == 0: return 0 |
---|
101 | q = self._data.x |
---|
102 | mask = np.logical_and(q > upperq[0], q < upperq[1]) |
---|
103 | _, _, bg = self._fit_porod(q[mask], self._data.y[mask]) |
---|
104 | |
---|
105 | return bg |
---|
106 | |
---|
107 | def compute_extrapolation(self): |
---|
108 | """ |
---|
109 | Extrapolate and interpolate scattering data |
---|
110 | |
---|
111 | :return: The extrapolated data |
---|
112 | """ |
---|
113 | q = self._data.x |
---|
114 | iq = self._data.y |
---|
115 | |
---|
116 | s2 = self._fit_data(q, iq) |
---|
117 | qs = np.arange(0, q[-1]*100, (q[1]-q[0])) |
---|
118 | iqs = s2(qs) |
---|
119 | |
---|
120 | extrapolation = Data1D(qs, iqs) |
---|
121 | |
---|
122 | return extrapolation |
---|
123 | |
---|
124 | def compute_transform(self, extrapolation, background=None, |
---|
125 | completefn=None, updatefn=None): |
---|
126 | """ |
---|
127 | Transform an extrapolated scattering curve into a correlation function. |
---|
128 | |
---|
129 | :param extrapolation: The extrapolated data |
---|
130 | :param background: The background value (if not provided, previously |
---|
131 | calculated value will be used) |
---|
132 | :param completefn: The function to call when the transform calculation |
---|
133 | is complete` |
---|
134 | :param updatefn: The function to call to update the GUI with the status |
---|
135 | of the transform calculation |
---|
136 | :return: The transformed data |
---|
137 | """ |
---|
138 | if self._transform_thread is not None: |
---|
139 | if self._transform_thread.isrunning(): return |
---|
140 | |
---|
141 | if background is None: background = self.background |
---|
142 | |
---|
143 | self._transform_thread = TransformThread(self._data, extrapolation, |
---|
144 | background, completefn=completefn, updatefn=updatefn) |
---|
145 | self._transform_thread.queue() |
---|
146 | |
---|
147 | def transform_isrunning(self): |
---|
148 | if self._transform_thread is None: return False |
---|
149 | return self._transform_thread.isrunning() |
---|
150 | |
---|
151 | def stop_transform(self): |
---|
152 | if self._transform_thread.isrunning(): |
---|
153 | self._transform_thread.stop() |
---|
154 | |
---|
155 | def extract_parameters(self, transformed_data): |
---|
156 | """ |
---|
157 | Extract the interesting measurements from a correlation function |
---|
158 | :param transformed_data: Fourier transformation of the |
---|
159 | extrapolated data |
---|
160 | """ |
---|
161 | # Calculate indexes of maxima and minima |
---|
162 | x = transformed_data.x |
---|
163 | y = transformed_data.y |
---|
164 | maxs = argrelextrema(y, np.greater)[0] |
---|
165 | mins = argrelextrema(y, np.less)[0] |
---|
166 | |
---|
167 | # If there are no maxima, return None |
---|
168 | if len(maxs) == 0: |
---|
169 | return None |
---|
170 | |
---|
171 | GammaMin = y[mins[0]] # The value at the first minimum |
---|
172 | |
---|
173 | ddy = (y[:-2]+y[2:]-2*y[1:-1])/(x[2:]-x[:-2])**2 # 2nd derivative of y |
---|
174 | dy = (y[2:]-y[:-2])/(x[2:]-x[:-2]) # 1st derivative of y |
---|
175 | # Find where the second derivative goes to zero |
---|
176 | zeros = argrelextrema(np.abs(ddy), np.less)[0] |
---|
177 | # locate the first inflection point |
---|
178 | linear_point = zeros[0] |
---|
179 | linear_point = int(mins[0]/10) |
---|
180 | |
---|
181 | # Try to calculate slope around linear_point using 80 data points |
---|
182 | lower = linear_point - 40 |
---|
183 | upper = linear_point + 40 |
---|
184 | |
---|
185 | # If too few data points to the left, use linear_point*2 data points |
---|
186 | if lower < 0: |
---|
187 | lower = 0 |
---|
188 | upper = linear_point * 2 |
---|
189 | # If too few to right, use 2*(dy.size - linear_point) data points |
---|
190 | elif upper > len(dy): |
---|
191 | upper = len(dy) |
---|
192 | width = len(dy) - linear_point |
---|
193 | lower = 2*linear_point - dy.size |
---|
194 | |
---|
195 | m = np.mean(dy[lower:upper]) # Linear slope |
---|
196 | b = y[1:-1][linear_point]-m*x[1:-1][linear_point] # Linear intercept |
---|
197 | |
---|
198 | Lc = (GammaMin-b)/m # Hard block thickness |
---|
199 | |
---|
200 | # Find the data points where the graph is linear to within 1% |
---|
201 | mask = np.where(np.abs((y-(m*x+b))/y) < 0.01)[0] |
---|
202 | if len(mask) == 0: # Return garbage for bad fits |
---|
203 | return { 'max': self._round_sig_figs(x[maxs[0]], 6) } |
---|
204 | dtr = x[mask[0]] # Beginning of Linear Section |
---|
205 | d0 = x[mask[-1]] # End of Linear Section |
---|
206 | GammaMax = y[mask[-1]] |
---|
207 | A = -GammaMin/GammaMax # Normalized depth of minimum |
---|
208 | |
---|
209 | params = { |
---|
210 | 'max': x[maxs[0]], |
---|
211 | 'dtr': dtr, |
---|
212 | 'Lc': Lc, |
---|
213 | 'd0': d0, |
---|
214 | 'A': A, |
---|
215 | 'fill': Lc/x[maxs[0]] |
---|
216 | } |
---|
217 | |
---|
218 | for key, val in params.iteritems(): |
---|
219 | params[key] = self._round_sig_figs(val, 6) |
---|
220 | |
---|
221 | return params |
---|
222 | |
---|
223 | |
---|
224 | def _porod(self, q, K, sigma, bg): |
---|
225 | """Equation for the Porod region of the data""" |
---|
226 | return bg + (K*q**(-4))*np.exp(-q**2*sigma**2) |
---|
227 | |
---|
228 | def _fit_guinier(self, q, iq): |
---|
229 | """Fit the Guinier region of the curve""" |
---|
230 | A = np.vstack([q**2, np.ones(q.shape)]).T |
---|
231 | return lstsq(A, np.log(iq)) |
---|
232 | |
---|
233 | def _fit_porod(self, q, iq): |
---|
234 | """Fit the Porod region of the curve""" |
---|
235 | fitp = curve_fit(lambda q, k, sig, bg: self._porod(q, k, sig, bg)*q**2, |
---|
236 | q, iq*q**2)[0] |
---|
237 | k, sigma, bg = fitp |
---|
238 | return k, sigma, bg |
---|
239 | |
---|
240 | def _fit_data(self, q, iq): |
---|
241 | """ |
---|
242 | Given a data set, extrapolate out to large q with Porod and |
---|
243 | to q=0 with Guinier |
---|
244 | """ |
---|
245 | mask = np.logical_and(q > self.upperq[0], q < self.upperq[1]) |
---|
246 | |
---|
247 | # Returns an array where the 1st and 2nd elements are the values of k |
---|
248 | # and sigma for the best-fit Porod function |
---|
249 | k, sigma, _ = self._fit_porod(q[mask], iq[mask]) |
---|
250 | bg = self.background |
---|
251 | |
---|
252 | # Smooths between the best-fit porod function and the data to produce a |
---|
253 | # better fitting curve |
---|
254 | data = interp1d(q, iq) |
---|
255 | s1 = self._Interpolator(data, |
---|
256 | lambda x: self._porod(x, k, sigma, bg), self.upperq[0], q[-1]) |
---|
257 | |
---|
258 | mask = np.logical_and(q < self.lowerq, 0 < q) |
---|
259 | |
---|
260 | # Returns parameters for the best-fit Guinier function |
---|
261 | g = self._fit_guinier(q[mask], iq[mask])[0] |
---|
262 | |
---|
263 | # Smooths between the best-fit Guinier function and the Porod curve |
---|
264 | s2 = self._Interpolator((lambda x: (np.exp(g[1]+g[0]*x**2))), s1, q[0], |
---|
265 | self.lowerq) |
---|
266 | |
---|
267 | return s2 |
---|
268 | |
---|
269 | def _round_sig_figs(self, x, sigfigs): |
---|
270 | """ |
---|
271 | Round a number to a given number of significant figures. |
---|
272 | |
---|
273 | :param x: The value to round |
---|
274 | :param sigfigs: How many significant figures to round to |
---|
275 | :return rounded_str: x rounded to the given number of significant |
---|
276 | figures, as a string |
---|
277 | """ |
---|
278 | # Index of first significant digit |
---|
279 | significant_digit = -int(np.floor(np.log10(np.abs(x)))) |
---|
280 | # Number of digits required for correct number of sig figs |
---|
281 | digits = significant_digit + (sigfigs - 1) |
---|
282 | rounded = np.round(x, decimals=digits) |
---|
283 | rounded_str = "{1:.{0}f}".format(sigfigs -1 + significant_digit, |
---|
284 | rounded) |
---|
285 | return rounded_str |
---|