##################################################################### #This software was developed by the University of Tennessee as part of the #Distributed Data Analysis of Neutron Scattering Experiments (DANSE) #project funded by the US National Science Foundation. #See the license text in license.txt #copyright 2008, University of Tennessee ###################################################################### """ File handler to support different file extensions. Uses reflectometry's registry utility. The default readers are found in the 'readers' sub-module and registered by default at initialization time. To add a new default reader, one must register it in the register_readers method found in readers/__init__.py. A utility method (find_plugins) is available to inspect a directory (for instance, a user plug-in directory) and look for new readers/writers. """ from data_util.registry import ExtensionRegistry import os import sys import logging import time from zipfile import ZipFile # Default readers are defined in the readers sub-module import readers from readers import ascii_reader class Registry(ExtensionRegistry): """ Registry class for file format extensions. Readers and writers are supported. """ def __init__(self): super(Registry, self).__init__() ## Writers self.writers = {} ## List of wildcards self.wildcards = ['All (*.*)|*.*'] ## Creation time, for testing self._created = time.time() # Register default readers readers.read_associations(self) #TODO: remove the following line when ready to switch to the new default readers #readers.register_readers(self._identify_plugin) # Look for plug-in readers self.find_plugins('plugins') def load(self, path, format=None): """ Call the loader for the file type of path. :param path: file path :param format: explicit extension, to force the use of a particular reader Defaults to the ascii (multi-column) reader if no reader was registered for the file's extension. """ try: return super(Registry, self).load(path, format=format) except: # No reader was found. Default to the ascii reader. ascii_loader = ascii_reader.Reader() return ascii_loader.read(path) def find_plugins(self, dir): """ Find readers in a given directory. This method can be used to inspect user plug-in directories to find new readers/writers. :param dir: directory to search into :return: number of readers found """ readers_found = 0 # Check whether the directory exists if not os.path.isdir(dir): logging.warning("DataLoader couldn't load from %s" % dir) return readers_found for item in os.listdir(dir): full_path = os.path.join(dir, item) if os.path.isfile(full_path): # Process python files if item.endswith('.py'): toks = os.path.splitext(os.path.basename(item)) try: sys.path.insert(0, os.path.abspath(dir)) module = __import__(toks[0], globals(), locals()) if self._identify_plugin(module): readers_found += 1 except : logging.error("Loader: Error importing %s\n %s" % (item, sys.exc_value)) # Process zip files elif item.endswith('.zip'): try: # Find the modules in the zip file zfile = ZipFile(item) nlist = zfile.namelist() sys.path.insert(0, item) for mfile in nlist: try: # Change OS path to python path fullname = mfile.replace('/', '.') fullname = os.path.splitext(fullname)[0] module = __import__(fullname, globals(), locals(), [""]) if self._identify_plugin(module): readers_found += 1 except: logging.error("Loader: Error importing %s\n %s" % (mfile, sys.exc_value)) except: logging.error("Loader: Error importing %s\n %s" % (item, sys.exc_value)) return readers_found def associate_file_type(self, ext, module): """ Look into a module to find whether it contains a Reader class. If so, APPEND it to readers and (potentially) to the list of writers for the given extension :param ext: file extension [string] :param module: module object """ reader_found = False if hasattr(module, "Reader"): try: # Find supported extensions loader = module.Reader() if ext not in self.loaders: self.loaders[ext] = [] # Append the new reader to the list self.loaders[ext].append(loader.read) reader_found = True # Keep track of wildcards type_name = module.__name__ if hasattr(loader, 'type_name'): type_name = loader.type_name wcard = "%s files (*%s)|*%s" % (type_name, ext.lower(), ext.lower()) if wcard not in self.wildcards: self.wildcards.append(wcard) # Check whether writing is supported if hasattr(loader, 'write'): if ext not in self.writers: self.writers[ext] = [] # Append the new writer to the list self.writers[ext].append(loader.write) except: logging.error("Loader: Error accessing Reader in %s\n %s" % (module.__name__, sys.exc_value)) return reader_found def associate_file_reader(self, ext, loader): """ Append a reader object to readers :param ext: file extension [string] :param module: reader object """ reader_found = False try: # Find supported extensions if ext not in self.loaders: self.loaders[ext] = [] # Append the new reader to the list self.loaders[ext].append(loader.read) reader_found = True # Keep track of wildcards if hasattr(loader, 'type_name'): type_name = loader.type_name wcard = "%s files (*%s)|*%s" % (type_name, ext.lower(), ext.lower()) if wcard not in self.wildcards: self.wildcards.append(wcard) except: logging.error("Loader: Error accessing Reader in %s\n %s" % (module.__name__, sys.exc_value)) return reader_found def _identify_plugin(self, module): """ Look into a module to find whether it contains a Reader class. If so, add it to readers and (potentially) to the list of writers. :param module: module object """ reader_found = False if hasattr(module, "Reader"): try: # Find supported extensions loader = module.Reader() for ext in loader.ext: if ext not in self.loaders: self.loaders[ext] = [] # When finding a reader at run time, treat this reader as the new # default self.loaders[ext].insert(0,loader.read) reader_found = True # Keep track of wildcards type_name = module.__name__ if hasattr(loader, 'type_name'): type_name = loader.type_name wcard = "%s files (*%s)|*%s" % (type_name, ext.lower(), ext.lower()) if wcard not in self.wildcards: self.wildcards.append(wcard) # Check whether writing is supported if hasattr(loader, 'write'): for ext in loader.ext: if ext not in self.writers: self.writers[ext] = [] self.writers[ext].insert(0,loader.write) except: logging.error("Loader: Error accessing Reader in %s\n %s" % (module.__name__, sys.exc_value)) return reader_found def lookup_writers(self, path): """ :return: the loader associated with the file type of path. :Raises ValueError: if file type is not known. """ # Find matching extensions extlist = [ext for ext in self.extensions() if path.endswith(ext)] # Sort matching extensions by decreasing order of length extlist.sort(lambda a,b: len(a)