source: sasview/src/sas/sascalc/corfunc/corfunc_calculator.py @ dbfd307

magnetic_scattrelease-4.2.2ticket-1009ticket-1094-headlessticket-1242-2d-resolutionticket-1249
Last change on this file since dbfd307 was dbfd307, checked in by Paul Kienzle <pkienzle@…>, 5 years ago

use np ≥ 1.14.0 default for lstsq rcond, but compatible with np < 1.14.0

  • Property mode set to 100644
File size: 10.5 KB
Line 
1"""
2This module implements corfunc
3"""
4import warnings
5import numpy as np
6from scipy.optimize import curve_fit
7from scipy.interpolate import interp1d
8from scipy.fftpack import dct
9from scipy.signal import argrelextrema
10from numpy.linalg import lstsq
11from sas.sascalc.dataloader.data_info import Data1D
12from sas.sascalc.corfunc.transform_thread import FourierThread
13from sas.sascalc.corfunc.transform_thread import HilbertThread
14
15class CorfuncCalculator(object):
16
17    class _Interpolator(object):
18        """
19        Interpolates between curve f and curve g over the range start:stop and
20        caches the result of the function when it's called
21
22        :param f: The first curve to interpolate
23        :param g: The second curve to interpolate
24        :param start: The value at which to start the interpolation
25        :param stop: The value at which to stop the interpolation
26        """
27        def __init__(self, f, g, start, stop):
28            self.f = f
29            self.g = g
30            self.start = start
31            self.stop = stop
32            self._lastx = np.empty(0, dtype='d')
33            self._lasty = None
34
35        def __call__(self, x):
36            # If input is a single number, evaluate the function at that number
37            # and return a single number
38            if isinstance(x, (float, int)):
39                return self._smoothed_function(np.array([x]))[0]
40            # If input is a list, and is different to the last input, evaluate
41            # the function at each point. If the input is the same as last time
42            # the function was called, return the result that was calculated
43            # last time instead of explicity evaluating the function again.
44            if not np.array_equal(x, self._lastx):
45                self._lastx, self._lasty = x, self._smoothed_function(x)
46            return self._lasty
47
48        def _smoothed_function(self,x):
49            ys = np.zeros(x.shape)
50            ys[x <= self.start] = self.f(x[x <= self.start])
51            ys[x >= self.stop] = self.g(x[x >= self.stop])
52            with warnings.catch_warnings():
53                # Ignore divide by zero error
54                warnings.simplefilter('ignore')
55                h = 1/(1+(x-self.stop)**2/(self.start-x)**2)
56            mask = np.logical_and(x > self.start, x < self.stop)
57            ys[mask] = h[mask]*self.g(x[mask])+(1-h[mask])*self.f(x[mask])
58            return ys
59
60
61    def __init__(self, data=None, lowerq=None, upperq=None, scale=1):
62        """
63        Initialize the class.
64
65        :param data: Data of the type DataLoader.Data1D
66        :param lowerq: The Q value to use as the boundary for
67            Guinier extrapolation
68        :param upperq: A tuple of the form (lower, upper).
69            Values between lower and upper will be used for Porod extrapolation
70        :param scale: Scaling factor for I(q)
71        """
72        self._data = None
73        self.set_data(data, scale)
74        self.lowerq = lowerq
75        self.upperq = upperq
76        self.background = self.compute_background()
77        self._transform_thread = None
78
79    def set_data(self, data, scale=1):
80        """
81        Prepares the data for analysis
82
83        :return: new_data = data * scale - background
84        """
85        if data is None:
86            return
87        # Only process data of the class Data1D
88        if not issubclass(data.__class__, Data1D):
89            raise ValueError("Correlation function cannot be computed with 2D data.")
90
91        # Prepare the data
92        new_data = Data1D(x=data.x, y=data.y)
93        new_data *= scale
94
95        # Ensure the errors are set correctly
96        if new_data.dy is None or len(new_data.x) != len(new_data.dy) or \
97            (min(new_data.dy) == 0 and max(new_data.dy) == 0):
98            new_data.dy = np.ones(len(new_data.x))
99
100        self._data = new_data
101
102    def compute_background(self, upperq=None):
103        """
104        Compute the background level from the Porod region of the data
105        """
106        if self._data is None: return 0
107        elif upperq is None and self.upperq is not None: upperq = self.upperq
108        elif upperq is None and self.upperq is None: return 0
109        q = self._data.x
110        mask = np.logical_and(q > upperq[0], q < upperq[1])
111        _, _, bg = self._fit_porod(q[mask], self._data.y[mask])
112
113        return bg
114
115    def compute_extrapolation(self):
116        """
117        Extrapolate and interpolate scattering data
118
119        :return: The extrapolated data
120        """
121        q = self._data.x
122        iq = self._data.y
123
124        params, s2 = self._fit_data(q, iq)
125        # Extrapolate to 100*Qmax in experimental data
126        qs = np.arange(0, q[-1]*100, (q[1]-q[0]))
127        iqs = s2(qs)
128
129        extrapolation = Data1D(qs, iqs)
130
131        return params, extrapolation, s2
132
133    def compute_transform(self, extrapolation, trans_type, background=None,
134        completefn=None, updatefn=None):
135        """
136        Transform an extrapolated scattering curve into a correlation function.
137
138        :param extrapolation: The extrapolated data
139        :param background: The background value (if not provided, previously
140            calculated value will be used)
141        :param extrap_fn: A callable function representing the extraoplated data
142        :param completefn: The function to call when the transform calculation
143            is complete
144        :param updatefn: The function to call to update the GUI with the status
145            of the transform calculation
146        :return: The transformed data
147        """
148        if self._transform_thread is not None:
149            if self._transform_thread.isrunning(): return
150
151        if background is None: background = self.background
152
153        if trans_type == 'fourier':
154            self._transform_thread = FourierThread(self._data, extrapolation,
155            background, completefn=completefn,
156            updatefn=updatefn)
157        elif trans_type == 'hilbert':
158            self._transform_thread = HilbertThread(self._data, extrapolation,
159            background, completefn=completefn, updatefn=updatefn)
160        else:
161            err = ("Incorrect transform type supplied, must be 'fourier'",
162                " or 'hilbert'")
163            raise ValueError(err)
164
165        self._transform_thread.queue()
166
167    def transform_isrunning(self):
168        if self._transform_thread is None: return False
169        return self._transform_thread.isrunning()
170
171    def stop_transform(self):
172        if self._transform_thread.isrunning():
173            self._transform_thread.stop()
174
175    def extract_parameters(self, transformed_data):
176        """
177        Extract the interesting measurements from a correlation function
178
179        :param transformed_data: Fourier transformation of the extrapolated data
180        """
181        # Calculate indexes of maxima and minima
182        x = transformed_data.x
183        y = transformed_data.y
184        maxs = argrelextrema(y, np.greater)[0]
185        mins = argrelextrema(y, np.less)[0]
186
187        # If there are no maxima, return None
188        if len(maxs) == 0:
189            return None
190
191        GammaMin = y[mins[0]]  # The value at the first minimum
192
193        ddy = (y[:-2]+y[2:]-2*y[1:-1])/(x[2:]-x[:-2])**2  # 2nd derivative of y
194        dy = (y[2:]-y[:-2])/(x[2:]-x[:-2])  # 1st derivative of y
195        # Find where the second derivative goes to zero
196        zeros = argrelextrema(np.abs(ddy), np.less)[0]
197        # locate the first inflection point
198        linear_point = zeros[0]
199
200        # Try to calculate slope around linear_point using 80 data points
201        lower = linear_point - 40
202        upper = linear_point + 40
203
204        # If too few data points to the left, use linear_point*2 data points
205        if lower < 0:
206            lower = 0
207            upper = linear_point * 2
208        # If too few to right, use 2*(dy.size - linear_point) data points
209        elif upper > len(dy):
210            upper = len(dy)
211            width = len(dy) - linear_point
212            lower = 2*linear_point - dy.size
213
214        m = np.mean(dy[lower:upper])  # Linear slope
215        b = y[1:-1][linear_point]-m*x[1:-1][linear_point]  # Linear intercept
216
217        Lc = (GammaMin-b)/# Hard block thickness
218
219        # Find the data points where the graph is linear to within 1%
220        mask = np.where(np.abs((y-(m*x+b))/y) < 0.01)[0]
221        if len(mask) == 0:  # Return garbage for bad fits
222            return { 'max': self._round_sig_figs(x[maxs[0]], 6) }
223        dtr = x[mask[0]]  # Beginning of Linear Section
224        d0 = x[mask[-1]]  # End of Linear Section
225        GammaMax = y[mask[-1]]
226        A = np.abs(GammaMin/GammaMax)  # Normalized depth of minimum
227
228        params = {
229            'max': x[maxs[0]],
230            'dtr': dtr,
231            'Lc': Lc,
232            'd0': d0,
233            'A': A,
234            'fill': Lc/x[maxs[0]]
235        }
236
237        return params
238
239
240    def _porod(self, q, K, sigma, bg):
241        """Equation for the Porod region of the data"""
242        return bg + (K*q**(-4))*np.exp(-q**2*sigma**2)
243
244    def _fit_guinier(self, q, iq):
245        """Fit the Guinier region of the curve"""
246        A = np.vstack([q**2, np.ones(q.shape)]).T
247        # CRUFT: numpy>=1.14.0 allows rcond=None for the following default
248        rcond = np.finfo(float).eps * max(A.shape)
249        return lstsq(A, np.log(iq), rcond=rcond)
250
251    def _fit_porod(self, q, iq):
252        """Fit the Porod region of the curve"""
253        fitp = curve_fit(lambda q, k, sig, bg: self._porod(q, k, sig, bg)*q**2,
254                         q, iq*q**2, bounds=([-np.inf, 0, -np.inf], [np.inf, np.inf, np.inf]))[0]
255        k, sigma, bg = fitp
256        return k, sigma, bg
257
258    def _fit_data(self, q, iq):
259        """
260        Given a data set, extrapolate out to large q with Porod and
261        to q=0 with Guinier
262        """
263        mask = np.logical_and(q > self.upperq[0], q < self.upperq[1])
264
265        # Returns an array where the 1st and 2nd elements are the values of k
266        # and sigma for the best-fit Porod function
267        k, sigma, _ = self._fit_porod(q[mask], iq[mask])
268        bg = self.background
269
270        # Smooths between the best-fit porod function and the data to produce a
271        # better fitting curve
272        data = interp1d(q, iq)
273        s1 = self._Interpolator(data,
274            lambda x: self._porod(x, k, sigma, bg), self.upperq[0], q[-1])
275
276        mask = np.logical_and(q < self.lowerq, 0 < q)
277
278        # Returns parameters for the best-fit Guinier function
279        g = self._fit_guinier(q[mask], iq[mask])[0]
280
281        # Smooths between the best-fit Guinier function and the Porod curve
282        s2 = self._Interpolator((lambda x: (np.exp(g[1]+g[0]*x**2))), s1, q[0],
283            self.lowerq)
284
285        params = {'A': g[1], 'B': g[0], 'K': k, 'sigma': sigma}
286
287        return params, s2
Note: See TracBrowser for help on using the repository browser.