Changes in src/sas/sascalc/data_util/registry.py [3ece5dd:a1b8fee] in sasview
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
src/sas/sascalc/data_util/registry.py
r3ece5dd ra1b8fee 1 # This program is public domain 1 2 """ 2 3 File extension registry. … … 5 6 and registers the built-in file extensions. 6 7 """ 8 from __future__ import print_function 7 9 8 from sas.sascalc.dataloader.loader_exceptions import NoKnownLoaderException 9 10 import os.path 10 11 11 12 class ExtensionRegistry(object): … … 21 22 # Add an association by setting an element 22 23 registry['.zip'] = unzip 23 24 24 25 # Multiple extensions for one loader 25 26 registry['.tgz'] = untar 26 27 registry['.tar.gz'] = untar 27 28 28 # Generic extensions to use after trying more specific extensions; 29 # Generic extensions to use after trying more specific extensions; 29 30 # these will be checked after the more specific extensions fail. 30 31 registry['.gz'] = gunzip … … 37 38 # Show registered extensions 38 39 print registry.extensions() 39 40 40 41 # Can also register a format name for explicit control from caller 41 42 registry['cx3'] = cx3 … … 61 62 def __init__(self, **kw): 62 63 self.loaders = {} 63 64 64 def __setitem__(self, ext, loader): 65 65 if ext not in self.loaders: 66 66 self.loaders[ext] = [] 67 67 self.loaders[ext].insert(0,loader) 68 69 68 def __getitem__(self, ext): 70 69 return self.loaders[ext] 71 72 70 def __contains__(self, ext): 73 71 return ext in self.loaders 74 75 72 def formats(self): 76 73 """ … … 80 77 names.sort() 81 78 return names 82 83 79 def extensions(self): 84 80 """ … … 88 84 exts.sort() 89 85 return exts 90 91 86 def lookup(self, path): 92 87 """ 93 88 Return the loader associated with the file type of path. 94 95 :param path: Data file path 96 :raises ValueError: When no loaders are found for the file. 97 :return: List of available readers for the file extension 98 """ 89 90 Raises ValueError if file type is not known. 91 """ 99 92 # Find matching extensions 100 93 extlist = [ext for ext in self.extensions() if path.endswith(ext)] … … 113 106 # Raise an error if there are no matching extensions 114 107 if len(loaders) == 0: 115 raise ValueError("Unknown file type for "+path) 108 raise ValueError, "Unknown file type for "+path 109 # All done 116 110 return loaders 117 118 111 def load(self, path, format=None): 119 112 """ 120 113 Call the loader for the file type of path. 121 114 122 :raise ValueError:if no loader is available.123 :raise KeyError:if format is not available.124 May raise a loader-defined exception if loader fails. 115 Raises ValueError if no loader is available. 116 Raises KeyError if format is not available. 117 May raise a loader-defined exception if loader fails. 125 118 """ 126 loaders = []127 119 if format is None: 128 try: 129 loaders = self.lookup(path) 130 except ValueError as e: 131 pass 120 loaders = self.lookup(path) 132 121 else: 133 try: 134 loaders = self.loaders[format] 135 except KeyError as e: 136 pass 137 last_exc = None 122 loaders = self.loaders[format] 138 123 for fn in loaders: 139 124 try: 140 125 return fn(path) 141 except Exception as e: 142 last_exc = e 143 pass # give other loaders a chance to succeed 126 except: 127 pass # give other loaders a chance to succeed 144 128 # If we get here it is because all loaders failed 145 if last_exc is not None and len(loaders) != 0: 146 # If file has associated loader(s) and they;ve failed 147 raise last_exc 148 raise NoKnownLoaderException(e.message) # raise generic exception 129 raise # reraises last exception 130 131 def test(): 132 reg = ExtensionRegistry() 133 class CxError(Exception): pass 134 def cx(file): return 'cx' 135 def new_cx(file): return 'new_cx' 136 def fail_cx(file): raise CxError 137 def cat(file): return 'cat' 138 def gunzip(file): return 'gunzip' 139 reg['.cx'] = cx 140 reg['.cx1'] = cx 141 reg['.cx'] = new_cx 142 reg['.gz'] = gunzip 143 reg['.cx.gz'] = new_cx 144 reg['.cx1.gz'] = fail_cx 145 reg['.cx1'] = fail_cx 146 reg['.cx2'] = fail_cx 147 reg['new_cx'] = new_cx 148 149 # Two loaders associated with .cx 150 assert reg.lookup('hello.cx') == [new_cx,cx] 151 # Make sure the last loader applies first 152 assert reg.load('hello.cx') == 'new_cx' 153 # Make sure the next loader applies if the first fails 154 assert reg.load('hello.cx1') == 'cx' 155 # Make sure the format override works 156 assert reg.load('hello.cx1',format='.cx.gz') == 'new_cx' 157 # Make sure the format override works 158 assert reg.load('hello.cx1',format='new_cx') == 'new_cx' 159 # Make sure the case of all loaders failing is correct 160 try: reg.load('hello.cx2') 161 except CxError: pass # correct failure 162 else: raise AssertError,"Incorrect error on load failure" 163 # Make sure the case of no loaders fails correctly 164 try: reg.load('hello.missing') 165 except ValueError,msg: 166 assert str(msg)=="Unknown file type for hello.missing",'Message: <%s>'%(msg) 167 else: raise AssertError,"No error raised for missing extension" 168 assert reg.formats() == ['new_cx'] 169 assert reg.extensions() == ['.cx','.cx.gz','.cx1','.cx1.gz','.cx2','.gz'] 170 # make sure that it supports multiple '.' in filename 171 assert reg.load('hello.extra.cx1') == 'cx' 172 assert reg.load('hello.gz') == 'gunzip' 173 assert reg.load('hello.cx1.gz') == 'gunzip' # Since .cx1.gz fails 174 175 if __name__ == "__main__": test()
Note: See TracChangeset
for help on using the changeset viewer.