source: sasview/src/sas/sascalc/data_util/registry.py @ 69400ec

ESS_GUIESS_GUI_DocsESS_GUI_batch_fittingESS_GUI_bumps_abstractionESS_GUI_iss1116ESS_GUI_iss879ESS_GUI_iss959ESS_GUI_openclESS_GUI_orderingESS_GUI_sync_sascalccostrafo411magnetic_scattrelease-4.2.2ticket-1009ticket-1094-headlessticket-1242-2d-resolutionticket-1243ticket-1249ticket885unittest-saveload
Last change on this file since 69400ec was 270c882b, checked in by krzywon, 7 years ago

Beginning implementation of exception handling within data loader.

  • Property mode set to 100644
File size: 6.1 KB
Line 
1# This program is public domain
2"""
3File extension registry.
4
5This provides routines for opening files based on extension,
6and registers the built-in file extensions.
7"""
8
9from sas.sascalc.dataloader.loader_exceptions import NoKnownLoaderException
10
11
12class ExtensionRegistry(object):
13    """
14    Associate a file loader with an extension.
15
16    Note that there may be multiple loaders for the same extension.
17
18    Example: ::
19
20        registry = ExtensionRegistry()
21
22        # Add an association by setting an element
23        registry['.zip'] = unzip
24       
25        # Multiple extensions for one loader
26        registry['.tgz'] = untar
27        registry['.tar.gz'] = untar
28
29        # Generic extensions to use after trying more specific extensions;
30        # these will be checked after the more specific extensions fail.
31        registry['.gz'] = gunzip
32
33        # Multiple loaders for one extension
34        registry['.cx'] = cx1
35        registry['.cx'] = cx2
36        registry['.cx'] = cx3
37
38        # Show registered extensions
39        print registry.extensions()
40       
41        # Can also register a format name for explicit control from caller
42        registry['cx3'] = cx3
43        print registry.formats()
44
45        # Retrieve loaders for a file name
46        registry.lookup('hello.cx') -> [cx3,cx2,cx1]
47
48        # Run loader on a filename
49        registry.load('hello.cx') ->
50            try:
51                return cx3('hello.cx')
52            except:
53                try:
54                    return cx2('hello.cx')
55                except:
56                    return cx1('hello.cx')
57
58        # Load in a specific format ignoring extension
59        registry.load('hello.cx',format='cx3') ->
60            return cx3('hello.cx')
61    """
62    def __init__(self, **kw):
63        self.loaders = {}
64
65    def __setitem__(self, ext, loader):
66        if ext not in self.loaders:
67            self.loaders[ext] = []
68        self.loaders[ext].insert(0,loader)
69
70    def __getitem__(self, ext):
71        return self.loaders[ext]
72
73    def __contains__(self, ext):
74        return ext in self.loaders
75
76    def formats(self):
77        """
78        Return a sorted list of the registered formats.
79        """
80        names = [a for a in self.loaders.keys() if not a.startswith('.')]
81        names.sort()
82        return names
83
84    def extensions(self):
85        """
86        Return a sorted list of registered extensions.
87        """
88        exts = [a for a in self.loaders.keys() if a.startswith('.')]
89        exts.sort()
90        return exts
91
92    def lookup(self, path):
93        """
94        Return the loader associated with the file type of path.
95       
96        Raises ValueError if file type is not known.
97        """       
98        # Find matching extensions
99        extlist = [ext for ext in self.extensions() if path.endswith(ext)]
100        # Sort matching extensions by decreasing order of length
101        extlist.sort(lambda a,b: len(a)<len(b))
102        # Combine loaders for matching extensions into one big list
103        loaders = []
104        for L in [self.loaders[ext] for ext in extlist]:
105            loaders.extend(L)
106        # Remove duplicates if they exist
107        if len(loaders) != len(set(loaders)):
108            result = []
109            for L in loaders:
110                if L not in result: result.append(L)
111            loaders = L
112        # Raise an error if there are no matching extensions
113        if len(loaders) == 0:
114            raise ValueError("Unknown file type for "+path)
115        # All done
116        return loaders
117
118    def load(self, path, format=None):
119        """
120        Call the loader for the file type of path.
121
122        Raises ValueError if no loader is available.
123        Raises KeyError if format is not available.
124        May raise a loader-defined exception if loader fails.       
125        """
126        if format is None:
127            try:
128                loaders = self.lookup(path)
129            except ValueError as e:
130                pass
131        else:
132            try:
133                loaders = self.loaders[format]
134            except KeyError as e:
135                pass
136        for fn in loaders:
137            try:
138                return fn(path)
139            except Exception as e:
140                pass  # give other loaders a chance to succeed
141        # If we get here it is because all loaders failed
142        raise NoKnownLoaderException(e.message)  # raise generic exception
143
144
145# TODO: Move this to the unit test folder
146def test():
147    reg = ExtensionRegistry()
148    class CxError(Exception): pass
149    def cx(file): return 'cx'
150    def new_cx(file): return 'new_cx'
151    def fail_cx(file): raise CxError
152    def cat(file): return 'cat'
153    def gunzip(file): return 'gunzip'
154    reg['.cx'] = cx
155    reg['.cx1'] = cx
156    reg['.cx'] = new_cx
157    reg['.gz'] = gunzip
158    reg['.cx.gz'] = new_cx
159    reg['.cx1.gz'] = fail_cx
160    reg['.cx1'] = fail_cx
161    reg['.cx2'] = fail_cx
162    reg['new_cx'] = new_cx
163
164    # Two loaders associated with .cx
165    assert reg.lookup('hello.cx') == [new_cx,cx]
166    # Make sure the last loader applies first
167    assert reg.load('hello.cx') == 'new_cx'
168    # Make sure the next loader applies if the first fails
169    assert reg.load('hello.cx1') == 'cx'
170    # Make sure the format override works
171    assert reg.load('hello.cx1',format='.cx.gz') == 'new_cx'
172    # Make sure the format override works
173    assert reg.load('hello.cx1',format='new_cx') == 'new_cx'
174    # Make sure the case of all loaders failing is correct
175    try:  reg.load('hello.cx2')
176    except CxError: pass # correct failure
177    else: raise AssertError,"Incorrect error on load failure"
178    # Make sure the case of no loaders fails correctly
179    try: reg.load('hello.missing')
180    except ValueError,msg:
181        assert str(msg)=="Unknown file type for hello.missing",\
182            'Message: <%s>'%(msg)
183    else: raise AssertError,"No error raised for missing extension"
184    assert reg.formats() == ['new_cx']
185    assert reg.extensions() == ['.cx','.cx.gz','.cx1','.cx1.gz','.cx2','.gz']
186    # make sure that it supports multiple '.' in filename
187    assert reg.load('hello.extra.cx1') == 'cx'
188    assert reg.load('hello.gz') == 'gunzip'
189    assert reg.load('hello.cx1.gz') == 'gunzip' # Since .cx1.gz fails
190
191if __name__ == "__main__": test()
Note: See TracBrowser for help on using the repository browser.