""" File handler to support different file extensions. Uses reflectometry's registry utility. """ """ This software was developed by the University of Tennessee as part of the Distributed Data Analysis of Neutron Scattering Experiments (DANSE) project funded by the US National Science Foundation. See the license text in license.txt copyright 2008, University of Tennessee """ from data_util.registry import ExtensionRegistry import os import sys import logging import time from zipfile import ZipFile class Registry(ExtensionRegistry): """ Registry class for file format extensions. Readers and writers are supported. """ def __init__(self): super(Registry, self).__init__() ## Writers self.writers = {} ## List of wildcards self.wildcards = ['All (*.*)|*.*'] ## Creation time, for testing self._created = time.time() # Find internal readers try: cwd = os.path.split(__file__)[0] except: cwd = os.getcwd() logging.error("Registry: could not find the installed reader's directory\n %s" % sys.exc_value) dir = os.path.join(cwd, 'readers') n = self.find_plugins(dir) logging.info("Loader found %i readers" % n) def find_plugins(self, dir): """ Find readers in a given directory @param dir: directory to search into @return: number of readers found """ readers_found = 0 for item in os.listdir(dir): full_path = os.path.join(dir, item) if os.path.isfile(full_path): # Process python files if item.endswith('.py'): toks = os.path.splitext(os.path.basename(item)) try: sys.path.insert(0, os.path.abspath(dir)) module = __import__(toks[0], globals(), locals()) if self._identify_plugin(module): readers_found += 1 except : logging.error("Loader: Error importing %s\n %s" % (item, sys.exc_value)) # Process zip files elif item.endswith('.zip'): try: # Find the modules in the zip file zfile = ZipFile(item) nlist = zfile.namelist() sys.path.insert(0, item) for mfile in nlist: try: # Change OS path to python path fullname = mfile.replace('/', '.') fullname = os.path.splitext(fullname)[0] module = __import__(fullname, globals(), locals(), [""]) if self._identify_plugin(module): readers_found += 1 except: logging.error("Loader: Error importing %s\n %s" % (mfile, sys.exc_value)) except: logging.error("Loader: Error importing %s\n %s" % (item, sys.exc_value)) return readers_found def _identify_plugin(self, module): """ Look into a module to find whether it contains a Reader class. If so, add it to readers and (potentially) to the list of writers. @param module: module object """ reader_found = False if hasattr(module, "Reader"): try: # Find supported extensions loader = module.Reader() for ext in loader.ext: if ext not in self.loaders: self.loaders[ext] = [] self.loaders[ext].insert(0,loader.read) reader_found = True # Keep track of wildcards for wcard in loader.type: if wcard not in self.wildcards: self.wildcards.append(wcard) # Check whether writing is supported if hasattr(loader, 'write'): for ext in loader.ext: if ext not in self.writers: self.writers[ext] = [] self.writers[ext].insert(0,loader.write) except: logging.error("Loader: Error accessing Reader in %s\n %s" % (name, sys.exc_value)) return reader_found def lookup_writers(self, path): """ Return the loader associated with the file type of path. Raises ValueError if file type is not known. """ # Find matching extensions extlist = [ext for ext in self.extensions() if path.endswith(ext)] # Sort matching extensions by decreasing order of length extlist.sort(lambda a,b: len(a)