Source code for hexrd.config.root

import os
from pathlib import Path
import logging
import multiprocessing as mp

from hexrd.constants import shared_ims_key
from hexrd import imageseries

from .config import Config
from .instrument import Instrument
from .findorientations import FindOrientationsConfig
from .fitgrains import FitGrainsConfig
from .material import MaterialConfig

logger = logging.getLogger('hexrd.config')


[docs]class RootConfig(Config): @property def working_dir(self): """Working directory, either specified in file or current directory If the directory is not specified in the config file, then it will default to the current working directory. If it is specified, the directory must exist, or it will throw an IOError. """ wdir = Path(self.get('working_dir', default=Path.cwd())) if not wdir.exists(): raise IOError(f'"working_dir": {str(wdir)} does not exist') return wdir @working_dir.setter def working_dir(self, val): val = Path(val) if not val.is_dir(): raise IOError('"working_dir": "%s" does not exist' % str(val)) self.set('working_dir', val) @property def analysis_name(self): """Name of the analysis This will be used to set up the output directory. The name can contain slash ("/") characters, which will generate a subdirectory structure in the `analysis_dir`. """ return str(self.get('analysis_name', default='analysis')) @analysis_name.setter def analysis_name(self, val): self.set('analysis_name', val) @property def analysis_dir(self): """Analysis directory, where output files go The name is derived from `working_dir` and `analysis_name`. This property returns a Path object. The directory and any intermediate directories can be created with the `mkdir()` method, e.g. >>> analysis_dir.mkdir(parents=True, exist_ok=True) """ adir = Path(self.working_dir) / self.analysis_name return adir @property def analysis_id(self): return '_'.join( [self.analysis_name.strip().replace(' ', '-'), self.material.active.strip().replace(' ', '-')] ) @property def new_file_placement(self): """Use new file placements for find-orientations and fit-grains The new file placement rules put several files in the `analysis_dir` instead of the `working_dir`. """ return self.get('new_file_placement', default=False) @property def find_orientations(self): return FindOrientationsConfig(self) @property def fit_grains(self): if not hasattr(self, "_fitgrain_config"): self._fitgrain_config = FitGrainsConfig(self) return self._fitgrain_config @property def instrument(self): if not hasattr(self, '_instr_config'): instr_file = self.get('instrument', None) if instr_file is not None: instr_file = self.check_filename(instr_file, self.working_dir) self._instr_config = Instrument(self, instr_file) return self._instr_config @instrument.setter def instrument(self, instr_config): self._instr_config = instr_config @property def material(self): if not hasattr(self, '_material_config'): self._material_config = MaterialConfig(self) if self.instrument.configuration is not None: # !!! must make matl beam energy consistent with the instrument beam_energy = self.instrument.hedm.beam_energy self._material_config.beam_energy = beam_energy return self._material_config @material.setter def material(self, material_config): self._material_config = material_config @property def multiprocessing(self): # determine number of processes to run in parallel multiproc = self.get('multiprocessing', default=-1) ncpus = mp.cpu_count() if multiproc == 'all': res = ncpus elif multiproc == 'half': temp = ncpus // 2 res = temp if temp else 1 elif isinstance(multiproc, int): if multiproc >= 0: if multiproc > ncpus: logger.warning( 'Resuested %s processes, %d available', multiproc, ncpus ) res = ncpus else: res = multiproc if multiproc else 1 else: temp = ncpus + multiproc if temp < 1: logger.warning( 'Cannot use less than 1 process, requested %d of %d', temp, ncpus ) res = 1 else: res = temp else: temp = ncpus - 1 logger.warning( "Invalid value %s for multiprocessing", multiproc ) res = temp return res @multiprocessing.setter def multiprocessing(self, val): isint = isinstance(val, int) if val in ('half', 'all', -1): self.set('multiprocessing', val) elif (isint and val >= 0 and val <= mp.cpu_count()): self.set('multiprocessing', int(val)) else: raise RuntimeError( '"multiprocessing": must be 1:%d, got %s' % (mp.cpu_count(), val) ) @property def image_series(self): """Return the imageseries dictionary.""" if not hasattr(self, '_image_dict'): self._image_dict = dict() fmt = self.get('image_series:format') imsdata = self.get('image_series:data') for ispec in imsdata: fname = self.check_filename(ispec['file'], self.working_dir) args = ispec['args'] ims = imageseries.open(fname, fmt, **args) oms = imageseries.omega.OmegaImageSeries(ims) try: panel = ispec['panel'] if isinstance(panel, (tuple, list)): panel = '_'.join(panel) elif panel is None: panel = shared_ims_key except(KeyError): try: panel = oms.metadata['panel'] except(KeyError): panel = shared_ims_key self._image_dict[panel] = oms return self._image_dict @image_series.setter def image_series(self, ims_dict): self._image_dict = ims_dict