source: sasview/src/sas/sascalc/file_converter/nxcansas_writer.py @ fc51d06

magnetic_scattrelease-4.2.2ticket-1009ticket-1094-headlessticket-1242-2d-resolutionticket-1243ticket-1249unittest-saveload
Last change on this file since fc51d06 was fc51d06, checked in by krzywon, 12 months ago

Fix broken unit tests and general cleanup.

  • Property mode set to 100644
File size: 14.4 KB
Line 
1"""
2    NXcanSAS 1/2D data reader for writing HDF5 formatted NXcanSAS files.
3"""
4
5import h5py
6import numpy as np
7import re
8import os
9
10from sas.sascalc.dataloader.readers.cansas_reader_HDF5 import Reader as Cansas2Reader
11from sas.sascalc.dataloader.data_info import Data1D, Data2D
12
13class NXcanSASWriter(Cansas2Reader):
14    """
15    A class for writing in NXcanSAS data files. Any number of data sets may be
16    written to the file. Currently 1D and 2D SAS data sets are supported
17
18    NXcanSAS spec: http://download.nexusformat.org/sphinx/classes/contributed_definitions/NXcanSAS.html
19
20    :Dependencies:
21        The NXcanSAS writer requires h5py => v2.5.0 or later.
22    """
23
24    def write(self, dataset, filename):
25        """
26        Write an array of Data1d or Data2D objects to an NXcanSAS file, as
27        one SASEntry with multiple SASData elements. The metadata of the first
28        elememt in the array will be written as the SASentry metadata
29        (detector, instrument, sample, etc).
30
31        :param dataset: A list of Data1D or Data2D objects to write
32        :param filename: Where to write the NXcanSAS file
33        """
34
35        def _h5_string(string):
36            """
37            Convert a string to a numpy string in a numpy array. This way it is
38            written to the HDF5 file as a fixed length ASCII string and is
39            compatible with the Reader read() method.
40            """
41            if isinstance(string, np.ndarray):
42                return string
43            elif not isinstance(string, str):
44                string = str(string)
45
46            return np.array([np.string_(string)])
47
48        def _write_h5_string(entry, value, key):
49            entry[key] = _h5_string(value)
50
51        def _h5_float(x):
52            if not (isinstance(x, list)):
53                x = [x]
54            return np.array(x, dtype=np.float32)
55
56        def _write_h5_float(entry, value, key):
57            entry.create_dataset(key, data=_h5_float(value))
58
59        def _write_h5_vector(entry, vector, names=['x_position', 'y_position'],
60            units=None, write_fn=_write_h5_string):
61            """
62            Write a vector to an h5 entry
63
64            :param entry: The H5Py entry to write to
65            :param vector: The Vector to write
66            :param names: What to call the x,y and z components of the vector
67                when writing to the H5Py entry
68            :param units: The units of the vector (optional)
69            :param write_fn: A function to convert the value to the required
70                format and write it to the H5Py entry, of the form
71                f(entry, value, name) (optional)
72            """
73            if len(names) < 2:
74                raise ValueError("Length of names must be >= 2.")
75
76            if vector.x is not None:
77                write_fn(entry, vector.x, names[0])
78                if units is not None:
79                    entry[names[0]].attrs['units'] = units
80            if vector.y is not None:
81                write_fn(entry, vector.y, names[1])
82                if units is not None:
83                    entry[names[1]].attrs['units'] = units
84            if len(names) == 3 and vector.z is not None:
85                write_fn(entry, vector.z, names[2])
86                if units is not None:
87                    entry[names[2]].attrs['units'] = units
88
89        valid_data = all([issubclass(d.__class__, (Data1D, Data2D)) for d in dataset])
90        if not valid_data:
91            raise ValueError("All entries of dataset must be Data1D or Data2D objects")
92
93        # Get run name and number from first Data object
94        data_info = dataset[0]
95        run_number = ''
96        run_name = ''
97        if len(data_info.run) > 0:
98            run_number = data_info.run[0]
99            if len(data_info.run_name) > 0:
100                run_name = data_info.run_name[run_number]
101
102        f = h5py.File(filename, 'w')
103        sasentry = f.create_group('sasentry01')
104        sasentry['definition'] = _h5_string('NXcanSAS')
105        sasentry['run'] = _h5_string(run_number)
106        sasentry['run'].attrs['name'] = run_name
107        sasentry['title'] = _h5_string(data_info.title)
108        sasentry.attrs['canSAS_class'] = 'SASentry'
109        sasentry.attrs['version'] = '1.0'
110
111        i = 1
112
113        for data_obj in dataset:
114            data_entry = sasentry.create_group("sasdata{0:0=2d}".format(i))
115            data_entry.attrs['canSAS_class'] = 'SASdata'
116            if isinstance(data_obj, Data1D):
117                self._write_1d_data(data_obj, data_entry)
118            elif isinstance(data_obj, Data2D):
119                self._write_2d_data(data_obj, data_entry)
120            i += 1
121
122        data_info = dataset[0]
123        # Sample metadata
124        sample_entry = sasentry.create_group('sassample')
125        sample_entry.attrs['canSAS_class'] = 'SASsample'
126        sample_entry['ID'] = _h5_string(data_info.sample.name)
127        sample_attrs = ['thickness', 'temperature', 'transmission']
128        for key in sample_attrs:
129            if getattr(data_info.sample, key) is not None:
130                sample_entry.create_dataset(key,
131                    data=_h5_float(getattr(data_info.sample, key)))
132        _write_h5_vector(sample_entry, data_info.sample.position)
133        # NXcanSAS doesn't save information about pitch, only roll
134        # and yaw. The _write_h5_vector method writes vector.y, but we
135        # need to write vector.z for yaw
136        data_info.sample.orientation.y = data_info.sample.orientation.z
137        _write_h5_vector(sample_entry, data_info.sample.orientation,
138            names=['polar_angle', 'azimuthal_angle'])
139        if data_info.sample.details is not None\
140            and data_info.sample.details != []:
141            details = None
142            if len(data_info.sample.details) > 1:
143                details = [np.string_(d) for d in data_info.sample.details]
144                details = np.array(details)
145            elif data_info.sample.details != []:
146                details = _h5_string(data_info.sample.details[0])
147            if details is not None:
148                sample_entry.create_dataset('details', data=details)
149
150        # Instrumment metadata
151        instrument_entry = sasentry.create_group('sasinstrument')
152        instrument_entry.attrs['canSAS_class'] = 'SASinstrument'
153        instrument_entry['name'] = _h5_string(data_info.instrument)
154
155        # Source metadata
156        source_entry = instrument_entry.create_group('sassource')
157        source_entry.attrs['canSAS_class'] = 'SASsource'
158        if data_info.source.radiation is None:
159            source_entry['radiation'] = _h5_string('neutron')
160        else:
161            source_entry['radiation'] = _h5_string(data_info.source.radiation)
162        if data_info.source.beam_shape is not None:
163            source_entry['beam_shape'] = _h5_string(data_info.source.beam_shape)
164        wavelength_keys = { 'wavelength': 'incident_wavelength',
165            'wavelength_min':'wavelength_min',
166            'wavelength_max': 'wavelength_max',
167            'wavelength_spread': 'incident_wavelength_spread' }
168        for sasname, nxname in wavelength_keys.items():
169            value = getattr(data_info.source, sasname)
170            units = getattr(data_info.source, sasname + '_unit')
171            if value is not None:
172                source_entry[nxname] = _h5_float(value)
173                source_entry[nxname].attrs['units'] = units
174        _write_h5_vector(source_entry, data_info.source.beam_size,
175            names=['beam_size_x', 'beam_size_y'],
176            units=data_info.source.beam_size_unit, write_fn=_write_h5_float)
177
178
179        # Collimation metadata
180        if len(data_info.collimation) > 0:
181            i = 1
182            for coll_info in data_info.collimation:
183                collimation_entry = instrument_entry.create_group(
184                    'sascollimation{0:0=2d}'.format(i))
185                collimation_entry.attrs['canSAS_class'] = 'SAScollimation'
186                if coll_info.length is not None:
187                    _write_h5_float(collimation_entry, coll_info.length, 'SDD')
188                    collimation_entry['SDD'].attrs['units'] = coll_info.length_unit
189                if coll_info.name is not None:
190                    collimation_entry['name'] = _h5_string(coll_info.name)
191        else:
192            # Create a blank one - at least 1 set of collimation metadata
193            # required by format
194            collimation_entry = instrument_entry.create_group('sascollimation01')
195
196        # Detector metadata
197        if len(data_info.detector) > 0:
198            i = 1
199            for det_info in data_info.detector:
200                detector_entry = instrument_entry.create_group(
201                    'sasdetector{0:0=2d}'.format(i))
202                detector_entry.attrs['canSAS_class'] = 'SASdetector'
203                if det_info.distance is not None:
204                    _write_h5_float(detector_entry, det_info.distance, 'SDD')
205                    detector_entry['SDD'].attrs['units'] = det_info.distance_unit
206                if det_info.name is not None:
207                    detector_entry['name'] = _h5_string(det_info.name)
208                else:
209                    detector_entry['name'] = _h5_string('')
210                if det_info.slit_length is not None:
211                    _write_h5_float(detector_entry, det_info.slit_length, 'slit_length')
212                    detector_entry['slit_length'].attrs['units'] = det_info.slit_length_unit
213                _write_h5_vector(detector_entry, det_info.offset)
214                # NXcanSAS doesn't save information about pitch, only roll
215                # and yaw. The _write_h5_vector method writes vector.y, but we
216                # need to write vector.z for yaw
217                det_info.orientation.y = det_info.orientation.z
218                _write_h5_vector(detector_entry, det_info.orientation,
219                    names=['polar_angle', 'azimuthal_angle'])
220                _write_h5_vector(detector_entry, det_info.beam_center,
221                    names=['beam_center_x', 'beam_center_y'],
222                    write_fn=_write_h5_float, units=det_info.beam_center_unit)
223                _write_h5_vector(detector_entry, det_info.pixel_size,
224                    names=['x_pixel_size', 'y_pixel_size'],
225                    write_fn=_write_h5_float, units=det_info.pixel_size_unit)
226
227                i += 1
228        else:
229            # Create a blank one - at least 1 detector required by format
230            detector_entry = instrument_entry.create_group('sasdetector01')
231            detector_entry.attrs['canSAS_class'] = 'SASdetector'
232            detector_entry.attrs['name'] = ''
233
234        note_entry = sasentry.create_group('sasnote'.format(i))
235        note_entry.attrs['canSAS_class'] = 'SASnote'
236        notes = None
237        if len(data_info.notes) > 1:
238            notes = [np.string_(n) for n in data_info.notes]
239            notes = np.array(notes)
240        elif data_info.notes != []:
241            notes = _h5_string(data_info.notes[0])
242        if notes is not None:
243            note_entry.create_dataset('SASnote', data=notes)
244
245        f.close()
246
247    def _write_1d_data(self, data_obj, data_entry):
248        """
249        Writes the contents of a Data1D object to a SASdata h5py Group
250
251        :param data_obj: A Data1D object to write to the file
252        :param data_entry: A h5py Group object representing the SASdata
253        """
254        data_entry.attrs['signal'] = 'I'
255        data_entry.attrs['I_axes'] = 'Q'
256        data_entry.attrs['Q_indicies'] = 0
257        q_entry = data_entry.create_dataset('Q', data=data_obj.x)
258        q_entry.attrs['units'] = data_obj.x_unit
259        i_entry = data_entry.create_dataset('I', data=data_obj.y)
260        i_entry.attrs['units'] = data_obj.y_unit
261        if data_obj.dy is not None:
262            i_entry.attrs['uncertainties'] = 'Idev'
263            i_dev_entry = data_entry.create_dataset('Idev', data=data_obj.dy)
264            i_dev_entry.attrs['units'] = data_obj.y_unit
265        if data_obj.dx is not None:
266            q_entry.attrs['resolutions'] = 'dQ'
267            dq_entry = data_entry.create_dataset('dQ', data=data_obj.dx)
268            dq_entry.attrs['units'] = data_obj.x_unit
269        elif data_obj.dxl is not None:
270            q_entry.attrs['resolutions'] = ['dQl','dQw']
271            dql_entry = data_entry.create_dataset('dQl', data=data_obj.dxl)
272            dql_entry.attrs['units'] = data_obj.x_unit
273            dqw_entry = data_entry.create_dataset('dQw', data=data_obj.dxw)
274            dqw_entry.attrs['units'] = data_obj.x_unit
275
276    def _write_2d_data(self, data, data_entry):
277        """
278        Writes the contents of a Data2D object to a SASdata h5py Group
279
280        :param data: A Data2D object to write to the file
281        :param data_entry: A h5py Group object representing the SASdata
282        """
283        data_entry.attrs['signal'] = 'I'
284        data_entry.attrs['I_axes'] = 'Qx,Qy'
285        data_entry.attrs['Q_indicies'] = [0,1]
286
287        (n_rows, n_cols) = (len(data.y_bins), len(data.x_bins))
288
289        if n_rows == 0 and n_cols == 0:
290            # Calculate rows and columns, assuming detector is square
291            # Same logic as used in PlotPanel.py _get_bins
292            n_cols = int(np.floor(np.sqrt(len(data.qy_data))))
293            n_rows = int(np.floor(len(data.qy_data) / n_cols))
294
295            if n_rows * n_cols != len(data.qy_data):
296                raise ValueError("Unable to calculate dimensions of 2D data")
297
298        intensity = np.reshape(data.data, (n_rows, n_cols))
299        qx = np.reshape(data.qx_data, (n_rows, n_cols))
300        qy = np.reshape(data.qy_data, (n_rows, n_cols))
301
302        i_entry = data_entry.create_dataset('I', data=intensity)
303        i_entry.attrs['units'] = data.I_unit
304        qx_entry = data_entry.create_dataset('Qx', data=qx)
305        qx_entry.attrs['units'] = data.Q_unit
306        qy_entry = data_entry.create_dataset('Qy', data=qy)
307        qy_entry.attrs['units'] = data.Q_unit
308        if data.err_data is not None and not all(data.err_data == [None]):
309            d_i = np.reshape(data.err_data, (n_rows, n_cols))
310            i_entry.attrs['uncertainties'] = 'Idev'
311            i_dev_entry = data_entry.create_dataset('Idev', data=d_i)
312            i_dev_entry.attrs['units'] = data.I_unit
313        if data.dqx_data is not None and not all(data.dqx_data == [None]):
314            qx_entry.attrs['resolutions'] = 'dQx'
315            dqx_entry = data_entry.create_dataset('dQx', data=data.dqx_data)
316            dqx_entry.attrs['units'] = data.Q_unit
317        if data.dqy_data is not None and not all(data.dqy_data == [None]):
318            qy_entry.attrs['resolutions'] = 'dQy'
319            dqy_entry = data_entry.create_dataset('dQy', data=data.dqy_data)
320            dqy_entry.attrs['units'] = data.Q_unit
Note: See TracBrowser for help on using the repository browser.