Source code for hexrd.grainmap.nfutil

"""
Refactor of simulate_nf so that an experiment is mocked up.

Also trying to minimize imports
"""

import os
import logging
import h5py

import numpy as np
import numba
import yaml
import argparse
import timeit
import contextlib
import multiprocessing
import tempfile
import shutil
import socket
import copy

# import of hexrd modules
# import hexrd
from hexrd import constants
from hexrd import instrument
from hexrd import material
from hexrd import rotations
from hexrd.transforms import xfcapi
from hexrd import valunits
from hexrd import xrdutil

from skimage.morphology import dilation as ski_dilation

import matplotlib.pyplot as plt

hostname = socket.gethostname()

USE_MPI = False
rank = 0
try:
    from mpi4py import MPI
    comm = MPI.COMM_WORLD
    world_size = comm.Get_size()
    rank = comm.Get_rank()
    USE_MPI = world_size > 1
    logging.info(f'{rank=} {world_size=} {hostname=}')
except ImportError:
    logging.warning(f'mpi4py failed to load on {hostname=}. MPI is disabled.')


# Import of image loading, this should probably be done properly with preprocessed frame-cache binaries

import scipy.ndimage as img
import skimage.filters as filters

try:
    import imageio as imgio
except(ImportError):
    from skimage import io as imgio


[docs]def load_instrument(yml): with open(yml, 'r') as f: icfg = yaml.load(f, Loader=yaml.FullLoader) return instrument.HEDMInstrument(instrument_config=icfg)
# %% beam = constants.beam_vec Z_l = constants.lab_z vInv_ref = constants.identity_6x1 # ============================================================================== # %% SOME SCAFFOLDING # ==============================================================================
[docs]class ProcessController: """This is a 'controller' that provides the necessary hooks to track the results of the process as well as to provide clues of the progress of the process""" def __init__(self, result_handler=None, progress_observer=None, ncpus=1, chunk_size=100): self.rh = result_handler self.po = progress_observer self.ncpus = ncpus self.chunk_size = chunk_size self.limits = {} self.timing = [] # progress handling -------------------------------------------------------
[docs] def start(self, name, count): self.po.start(name, count) t = timeit.default_timer() self.timing.append((name, count, t))
[docs] def finish(self, name): t = timeit.default_timer() self.po.finish() entry = self.timing.pop() assert name == entry[0] total = t - entry[2] logging.info("%s took %8.3fs (%8.6fs per item).", entry[0], total, total/entry[1])
[docs] def update(self, value): self.po.update(value)
# result handler ----------------------------------------------------------
[docs] def handle_result(self, key, value): logging.debug("handle_result (%(key)s)", locals()) self.rh.handle_result(key, value)
# value limitting ---------------------------------------------------------
[docs] def set_limit(self, key, limit_function): if key in self.limits: logging.warn("Overwritting limit funtion for '%(key)s'", locals()) self.limits[key] = limit_function
[docs] def limit(self, key, value): try: value = self.limits[key](value) except KeyError: pass except Exception: logging.warn("Could not apply limit to '%(key)s'", locals()) return value
# configuration ----------------------------------------------------------
[docs] def get_process_count(self): return self.ncpus
[docs] def get_chunk_size(self): return self.chunk_size
[docs]def null_progress_observer(): class NullProgressObserver: def start(self, name, count): pass def update(self, value): pass def finish(self): pass return NullProgressObserver()
[docs]def progressbar_progress_observer(): class ProgressBarProgressObserver: def start(self, name, count): from progressbar import ProgressBar, Percentage, Bar self.pbar = ProgressBar(widgets=[name, Percentage(), Bar()], maxval=count) self.pbar.start() def update(self, value): self.pbar.update(value) def finish(self): self.pbar.finish() return ProgressBarProgressObserver()
[docs]def forgetful_result_handler(): class ForgetfulResultHandler: def handle_result(self, key, value): pass # do nothing return ForgetfulResultHandler()
[docs]def saving_result_handler(filename): """returns a result handler that saves the resulting arrays into a file with name filename""" class SavingResultHandler: def __init__(self, file_name): self.filename = file_name self.arrays = {} def handle_result(self, key, value): self.arrays[key] = value def __del__(self): logging.debug("Writing arrays in %(filename)s", self.__dict__) try: np.savez_compressed(open(self.filename, "wb"), **self.arrays) except IOError: logging.error("Failed to write %(filename)s", self.__dict__) return SavingResultHandler(filename)
[docs]def checking_result_handler(filename): """returns a return handler that checks the results against a reference file. The Check will consider a FAIL either a result not present in the reference file (saved as a numpy savez or savez_compressed) or a result that differs. It will consider a PARTIAL PASS if the reference file has a shorter result, but the existing results match. A FULL PASS will happen when all existing results match """ class CheckingResultHandler: def __init__(self, reference_file): """Checks the result against those save in 'reference_file'""" logging.info("Loading reference results from '%s'", reference_file) self.reference_results = np.load(open(reference_file, 'rb')) def handle_result(self, key, value): if key in ['experiment', 'image_stack']: return # ignore these try: reference = self.reference_results[key] except KeyError as e: logging.warning("%(key)s: %(e)s", locals()) reference = None if reference is None: msg = "'{0}': No reference result." logging.warn(msg.format(key)) try: if key == "confidence": reference = reference.T value = value.T check_len = min(len(reference), len(value)) test_passed = np.allclose(value[:check_len], reference[:check_len]) if not test_passed: msg = "'{0}': FAIL" logging.warn(msg.format(key)) lvl = logging.WARN elif len(value) > check_len: msg = "'{0}': PARTIAL PASS" lvl = logging.WARN else: msg = "'{0}': FULL PASS" lvl = logging.INFO logging.log(lvl, msg.format(key)) except Exception as e: msg = "%(key)s: Failure trying to check the results.\n%(e)s" logging.error(msg, locals()) return CheckingResultHandler(filename)
# ============================================================================= # %% OPTIMIZED BITS # ============================================================================= # Some basic 3d algebra ======================================================= @numba.njit(nogil=True, cache=True) def _v3_dot(a, b): return a[0]*b[0] + a[1]*b[1] + a[2]*b[2] @numba.njit(nogil=True, cache=True) def _m33_v3_multiply(m, v, dst): v0 = v[0] v1 = v[1] v2 = v[2] dst[0] = m[0, 0]*v0 + m[0, 1]*v1 + m[0, 2]*v2 dst[1] = m[1, 0]*v0 + m[1, 1]*v1 + m[1, 2]*v2 dst[2] = m[2, 0]*v0 + m[2, 1]*v1 + m[2, 2]*v2 return dst @numba.njit(nogil=True, cache=True) def _v3_normalized(src, dst): v0 = src[0] v1 = src[1] v2 = src[2] sqr_norm = v0*v0 + v1*v1 + v2*v2 inv_norm = 1.0 if sqr_norm == 0.0 else 1./np.sqrt(sqr_norm) dst[0] = v0 * inv_norm dst[1] = v1 * inv_norm dst[2] = v2 * inv_norm return dst @numba.njit(nogil=True, cache=True) def _make_binary_rot_mat(src, dst): v0 = src[0] v1 = src[1] v2 = src[2] dst[0, 0] = 2.0*v0*v0 - 1.0 dst[0, 1] = 2.0*v0*v1 dst[0, 2] = 2.0*v0*v2 dst[1, 0] = 2.0*v1*v0 dst[1, 1] = 2.0*v1*v1 - 1.0 dst[1, 2] = 2.0*v1*v2 dst[2, 0] = 2.0*v2*v0 dst[2, 1] = 2.0*v2*v1 dst[2, 2] = 2.0*v2*v2 - 1.0 return dst # code transcribed in numba from transforms module ============================ # This is equivalent to the transform module anglesToGVec, but written in # numba. This should end in a module to share with other scripts @numba.njit(nogil=True, cache=True) def _anglesToGVec(angs, rMat_ss, rMat_c): """From a set of angles return them in crystal space""" result = np.empty_like(angs) for i in range(len(angs)): cx = np.cos(0.5*angs[i, 0]) sx = np.sin(0.5*angs[i, 0]) cy = np.cos(angs[i, 1]) sy = np.sin(angs[i, 1]) g0 = cx*cy g1 = cx*sy g2 = sx # with g being [cx*xy, cx*sy, sx] # result = dot(rMat_c, dot(rMat_ss[i], g)) t0_0 = \ rMat_ss[i, 0, 0]*g0 + rMat_ss[i, 1, 0]*g1 + rMat_ss[i, 2, 0]*g2 t0_1 = \ rMat_ss[i, 0, 1]*g0 + rMat_ss[i, 1, 1]*g1 + rMat_ss[i, 2, 1]*g2 t0_2 = \ rMat_ss[i, 0, 2]*g0 + rMat_ss[i, 1, 2]*g1 + rMat_ss[i, 2, 2]*g2 result[i, 0] = \ rMat_c[0, 0]*t0_0 + rMat_c[1, 0]*t0_1 + rMat_c[2, 0]*t0_2 result[i, 1] = \ rMat_c[0, 1]*t0_0 + rMat_c[1, 1]*t0_1 + rMat_c[2, 1]*t0_2 result[i, 2] = \ rMat_c[0, 2]*t0_0 + rMat_c[1, 2]*t0_1 + rMat_c[2, 2]*t0_2 return result # This is equivalent to the transform's module gvec_to_xy, # but written in numba. # As of now, it is not a good replacement as efficient allocation of the # temporary arrays is not competitive with the stack allocation using in # the C version of the code (WiP) # tC varies per coord # gvec_cs, rSm varies per grain # # gvec_cs @numba.njit(nogil=True, cache=True) def _gvec_to_detector_array(vG_sn, rD, rSn, rC, tD, tS, tC): """ beamVec is the beam vector: (0, 0, -1) in this case """ ztol = xrdutil.epsf p3_l = np.empty((3,)) tmp_vec = np.empty((3,)) vG_l = np.empty((3,)) tD_l = np.empty((3,)) norm_vG_s = np.empty((3,)) norm_beam = np.empty((3,)) tZ_l = np.empty((3,)) brMat = np.empty((3, 3)) result = np.empty((len(rSn), 2)) _v3_normalized(beam, norm_beam) _m33_v3_multiply(rD, Z_l, tZ_l) for i in range(len(rSn)): _m33_v3_multiply(rSn[i], tC, p3_l) p3_l += tS p3_minus_p1_l = tD - p3_l num = _v3_dot(tZ_l, p3_minus_p1_l) _v3_normalized(vG_sn[i], norm_vG_s) _m33_v3_multiply(rC, norm_vG_s, tmp_vec) _m33_v3_multiply(rSn[i], tmp_vec, vG_l) bDot = -_v3_dot(norm_beam, vG_l) if bDot < ztol or bDot > 1.0 - ztol: result[i, 0] = np.nan result[i, 1] = np.nan continue _make_binary_rot_mat(vG_l, brMat) _m33_v3_multiply(brMat, norm_beam, tD_l) denom = _v3_dot(tZ_l, tD_l) if denom < ztol: result[i, 0] = np.nan result[i, 1] = np.nan continue u = num/denom tmp_res = u*tD_l - p3_minus_p1_l result[i, 0] = _v3_dot(tmp_res, rD[:, 0]) result[i, 1] = _v3_dot(tmp_res, rD[:, 1]) return result @numba.njit(nogil=True, cache=True) def _quant_and_clip_confidence(coords, angles, image, base, inv_deltas, clip_vals, bsp): """quantize and clip the parametric coordinates in coords + angles coords - (..., 2) array: input 2d parametric coordinates angles - (...) array: additional dimension for coordinates base - (3,) array: base value for quantization (for each dimension) inv_deltas - (3,) array: inverse of the quantum size (for each dimension) clip_vals - (2,) array: clip size (only applied to coords dimensions) bsp - (2,) array: beam stop vertical position and width in mm clipping is performed on ranges [0, clip_vals[0]] for x and [0, clip_vals[1]] for y returns an array with the quantized coordinates, with coordinates falling outside the clip zone filtered out. """ count = len(coords) in_sensor = 0 matches = 0 for i in range(count): xf = coords[i, 0] yf = coords[i, 1] # does not count intensity which is covered by the beamstop dcp 5.13.21 if np.abs(yf-bsp[0])<(bsp[1]/2.): continue xf = np.floor((xf - base[0]) * inv_deltas[0]) if not xf >= 0.0: continue if not xf < clip_vals[0]: continue yf = np.floor((yf - base[1]) * inv_deltas[1]) if not yf >= 0.0: continue if not yf < clip_vals[1]: continue zf = np.floor((angles[i] - base[2]) * inv_deltas[2]) in_sensor += 1 x, y, z = int(xf), int(yf), int(zf) # x_byte = x // 8 # x_off = 7 - (x % 8) # if image[z, y, x_byte] & (1 << x_off): # matches += 1 if image[z, y, x]: matches += 1 return 0 if in_sensor == 0 else float(matches)/float(in_sensor) # ============================================================================== # %% DIFFRACTION SIMULATION # ==============================================================================
[docs]def get_simulate_diffractions(grain_params, experiment, cache_file='gold_cubes.npy', controller=None): """getter functions that handles the caching of the simulation""" try: image_stack = np.load(cache_file, mmap_mode='r', allow_pickle=False) except Exception: image_stack = simulate_diffractions(grain_params, experiment, controller=controller) np.save(cache_file, image_stack) controller.handle_result('image_stack', image_stack) return image_stack
[docs]def simulate_diffractions(grain_params, experiment, controller): """actual forward simulation of the diffraction""" # use a packed array for the image_stack array_dims = (experiment.nframes, experiment.ncols, ((experiment.nrows - 1)//8) + 1) image_stack = np.zeros(array_dims, dtype=np.uint8) count = len(grain_params) subprocess = 'simulate diffractions' _project = xrdutil._project_on_detector_plane rD = experiment.rMat_d chi = experiment.chi tD = experiment.tVec_d tS = experiment.tVec_s distortion = experiment.distortion eta_range = [(-np.pi, np.pi), ] ome_range = experiment.ome_range ome_period = (-np.pi, np.pi) full_hkls = xrdutil._fetch_hkls_from_planedata(experiment.plane_data) bMat = experiment.plane_data.latVecOps['B'] wlen = experiment.plane_data.wavelength controller.start(subprocess, count) for i in range(count): rC = xfcapi.make_rmat_of_expmap(grain_params[i][0:3]) tC = np.ascontiguousarray(grain_params[i][3:6]) vInv_s = np.ascontiguousarray(grain_params[i][6:12]) ang_list = np.vstack( xfcapi.oscill_angles_of_hkls( full_hkls[:, 1:], chi, rC, bMat, wlen, v_inv=vInv_s ) ) # hkls not needed here all_angs, _ = xrdutil._filter_hkls_eta_ome( full_hkls, ang_list, eta_range, ome_range ) all_angs[:, 2] = rotations.mapAngle(all_angs[:, 2], ome_period) proj_pts = _project(all_angs, rD, rC, chi, tD, tC, tS, distortion) det_xy = proj_pts[0] _write_pixels(det_xy, all_angs[:, 2], image_stack, experiment.base, experiment.inv_deltas, experiment.clip_vals) controller.update(i + 1) controller.finish(subprocess) return image_stack
# ============================================================================== # %% IMAGE DILATION # ==============================================================================
[docs]def get_dilated_image_stack(image_stack, experiment, controller, cache_file='gold_cubes_dilated.npy'): try: dilated_image_stack = np.load(cache_file, mmap_mode='r', allow_pickle=False) except Exception: dilated_image_stack = dilate_image_stack(image_stack, experiment, controller) np.save(cache_file, dilated_image_stack) return dilated_image_stack
[docs]def dilate_image_stack(image_stack, experiment, controller): # first, perform image dilation =========================================== # perform image dilation (using scikit_image dilation) subprocess = 'dilate image_stack' dilation_shape = np.ones((2*experiment.row_dilation + 1, 2*experiment.col_dilation + 1), dtype=np.uint8) image_stack_dilated = np.empty_like(image_stack) dilated = np.empty( (image_stack.shape[-2], image_stack.shape[-1] << 3), dtype=bool ) n_images = len(image_stack) controller.start(subprocess, n_images) for i_image in range(n_images): to_dilate = np.unpackbits(image_stack[i_image], axis=-1) ski_dilation(to_dilate, dilation_shape, out=dilated) image_stack_dilated[i_image] = np.packbits(dilated, axis=-1) controller.update(i_image + 1) controller.finish(subprocess) return image_stack_dilated
# This part is critical for the performance of simulate diffractions. It # basically "renders" the "pixels". It takes the coordinates, quantizes to an # image coordinate and writes to the appropriate image in the stack. Note # that it also performs clipping based on inv_deltas and clip_vals. # # Note: This could be easily modified so that instead of using an array of # booleans, an array of uint8 could be used so the image is stored # with a bit per pixel. @numba.njit(nogil=True, cache=True) def _write_pixels(coords, angles, image, base, inv_deltas, clip_vals): count = len(coords) for i in range(count): x = int(np.floor((coords[i, 0] - base[0]) * inv_deltas[0])) if x < 0 or x >= clip_vals[0]: continue y = int(np.floor((coords[i, 1] - base[1]) * inv_deltas[1])) if y < 0 or y >= clip_vals[1]: continue z = int(np.floor((angles[i] - base[2]) * inv_deltas[2])) x_byte = x // 8 x_off = 7 - (x % 8) image[z, y, x_byte] |= (1 << x_off)
[docs]def get_offset_size(n_coords): offset = 0 size = n_coords if USE_MPI: coords_per_rank = n_coords // world_size offset = rank * coords_per_rank size = coords_per_rank if rank == world_size - 1: size = n_coords - offset return (offset, size)
[docs]def gather_confidence(controller, confidence, n_grains, n_coords): if rank == 0: global_confidence = np.empty(n_grains * n_coords, dtype=np.float64) else: global_confidence = None # Calculate the send buffer sizes coords_per_rank = n_coords // world_size send_counts = np.full(world_size, coords_per_rank * n_grains) send_counts[-1] = (n_coords - (coords_per_rank * (world_size-1))) * n_grains if rank == 0: # Time how long it takes to perform the MPI gather controller.start('gather_confidence', 1) # Transpose so the data will be more easily re-shaped into its final shape # Must be flattened as well so the underlying data is modified... comm.Gatherv(confidence.T.flatten(), (global_confidence, send_counts), root=0) if rank == 0: controller.finish('gather_confidence') confidence = global_confidence.reshape(n_coords, n_grains).T controller.handle_result("confidence", confidence)
# ============================================================================== # %% ORIENTATION TESTING # ==============================================================================
[docs]def test_orientations(image_stack, experiment, test_crds, controller, multiprocessing_start_method='fork'): """grand loop precomputing the grown image stack image-stack -- is the dilated image stack to be tested against. experiment -- A bunch of experiment related parameters. test_crds -- Coordinates to test orientations on, units mm. controller -- An external object implementing the hooks to notify progress as well as figuring out what to do with results. """ # extract some information needed ========================================= # number of grains, number of coords (maybe limited by call), projection # function to use, chunk size to use if multiprocessing and the number # of cpus. n_grains = experiment.n_grains chunk_size = controller.get_chunk_size() ncpus = controller.get_process_count() # generate angles ========================================================= # all_angles will be a list containing arrays for the different angles to # use, one entry per grain. # # Note that the angle generation is driven by the exp_maps # in the experiment all_angles = evaluate_diffraction_angles(experiment, controller) # generate coords ========================================================= n_coords = controller.limit('coords', len(test_crds)) # precompute per-grain stuff ============================================== # gVec_cs and rmat_ss can be precomputed, do so. subprocess = 'precompute gVec_cs' controller.start(subprocess, len(all_angles)) precomp = [] for i, angs in enumerate(all_angles): rmat_ss = xfcapi.make_sample_rmat(experiment.chi, angs[:, 2]) gvec_cs = _anglesToGVec(angs, rmat_ss, experiment.rMat_c[i]) precomp.append((gvec_cs, rmat_ss)) controller.finish(subprocess) # Divide coords by ranks (offset, size) = get_offset_size(n_coords) # grand loop ============================================================== # The near field simulation 'grand loop'. Where the bulk of computing is # performed. We are looking for a confidence matrix that has a n_grains chunks = range(offset, offset+size, chunk_size) subprocess = 'grand_loop' controller.start(subprocess, n_coords) finished = 0 ncpus = min(ncpus, len(chunks)) logging.info(f'For {rank=}, {offset=}, {size=}, {chunks=}, {len(chunks)=}, {ncpus=}') logging.info('Checking confidence for %d coords, %d grains.', n_coords, n_grains) confidence = np.empty((n_grains, size)) if ncpus > 1: global _multiprocessing_start_method _multiprocessing_start_method=multiprocessing_start_method logging.info('Running multiprocess %d processes (%s)', ncpus, _multiprocessing_start_method) with grand_loop_pool(ncpus=ncpus, state=(chunk_size, image_stack, all_angles, precomp, test_crds, experiment)) as pool: for rslice, rvalues in pool.imap_unordered(multiproc_inner_loop, chunks): count = rvalues.shape[1] # We need to adjust this slice for the offset rslice = slice(rslice.start - offset, rslice.stop - offset) confidence[:, rslice] = rvalues finished += count controller.update(finished) else: logging.info('Running in a single process') for chunk_start in chunks: chunk_stop = min(n_coords, chunk_start+chunk_size) rslice, rvalues = _grand_loop_inner( image_stack, all_angles, precomp, test_crds, experiment, start=chunk_start, stop=chunk_stop ) count = rvalues.shape[1] # We need to adjust this slice for the offset rslice = slice(rslice.start - offset, rslice.stop - offset) confidence[:, rslice] = rvalues finished += count controller.update(finished) controller.finish(subprocess) # Now gather result to rank 0 if USE_MPI: gather_confidence(controller, confidence, n_grains, n_coords) else: controller.handle_result("confidence", confidence) return confidence
[docs]def evaluate_diffraction_angles(experiment, controller=None): """Uses simulateGVecs to generate the angles used per each grain. returns a list containg one array per grain. experiment -- a bag of experiment values, including the grains specs and other required parameters. """ # extract required data from experiment exp_maps = experiment.exp_maps plane_data = experiment.plane_data detector_params = experiment.detector_params pixel_size = experiment.pixel_size ome_range = experiment.ome_range ome_period = experiment.ome_period panel_dims_expanded = [(-10, -10), (10, 10)] subprocess = 'evaluate diffraction angles' pbar = controller.start(subprocess, len(exp_maps)) all_angles = [] ref_gparams = np.array([0., 0., 0., 1., 1., 1., 0., 0., 0.]) for i, exp_map in enumerate(exp_maps): gparams = np.hstack([exp_map, ref_gparams]) sim_results = xrdutil.simulateGVecs(plane_data, detector_params, gparams, panel_dims=panel_dims_expanded, pixel_pitch=pixel_size, ome_range=ome_range, ome_period=ome_period, distortion=None) all_angles.append(sim_results[2]) controller.update(i + 1) controller.finish(subprocess) return all_angles
def _grand_loop_inner(image_stack, angles, precomp, coords, experiment, start=0, stop=None): """Actual simulation code for a chunk of data. It will be used both, in single processor and multiprocessor cases. Chunking is performed on the coords. image_stack -- the image stack from the sensors angles -- the angles (grains) to test coords -- all the coords to test precomp -- (gvec_cs, rmat_ss) precomputed for each grain experiment -- bag with experiment parameters start -- chunk start offset stop -- chunk end offset """ t = timeit.default_timer() n_coords = len(coords) n_angles = len(angles) # experiment geometric layout parameters rD = experiment.rMat_d rCn = experiment.rMat_c tD = experiment.tVec_d tS = experiment.tVec_s # experiment panel related configuration base = experiment.base inv_deltas = experiment.inv_deltas clip_vals = experiment.clip_vals distortion = experiment.distortion bsp = experiment.bsp #beam stop vertical center and width _to_detector = xfcapi.gvec_to_xy # _to_detector = _gvec_to_detector_array stop = min(stop, n_coords) if stop is not None else n_coords # FIXME: distortion hanlding is broken! distortion_fn = None if distortion is not None and len(distortion > 0): distortion_fn, distortion_args = distortion acc_detector = 0.0 acc_distortion = 0.0 acc_quant_clip = 0.0 confidence = np.zeros((n_angles, stop-start)) grains = 0 crds = 0 if distortion_fn is None: for igrn in range(n_angles): angs = angles[igrn] rC = rCn[igrn] gvec_cs, rMat_ss = precomp[igrn] grains += 1 for icrd in range(start, stop): t0 = timeit.default_timer() det_xy = _to_detector( gvec_cs, rD, rMat_ss, rC, tD, tS, coords[icrd] ) t1 = timeit.default_timer() c = _quant_and_clip_confidence(det_xy, angs[:, 2], image_stack, base, inv_deltas, clip_vals, bsp) t2 = timeit.default_timer() acc_detector += t1 - t0 acc_quant_clip += t2 - t1 crds += 1 confidence[igrn, icrd - start] = c else: for igrn in range(n_angles): angs = angles[igrn] rC = rCn[igrn] gvec_cs, rMat_ss = precomp[igrn] grains += 1 for icrd in range(start, stop): t0 = timeit.default_timer() tmp_xys = _to_detector( gvec_cs, rD, rMat_ss, rC, tD, tS, coords[icrd] ) #changed to tmp_xys from det_xy, dcp 2021_05_30 t1 = timeit.default_timer() det_xy = distortion_fn(tmp_xys, distortion_args, invert=True) t2 = timeit.default_timer() c = _quant_and_clip_confidence(det_xy, angs[:, 2], image_stack, base, inv_deltas, clip_vals,bsp) t3 = timeit.default_timer() acc_detector += t1 - t0 acc_distortion += t2 - t1 acc_quant_clip += t3 - t2 crds += 1 confidence[igrn, icrd - start] = c t = timeit.default_timer() - t return slice(start, stop), confidence
[docs]def generate_test_grid(low, top, samples): """generates a test grid of coordinates""" cvec_s = np.linspace(low, top, samples) Xs, Ys, Zs = np.meshgrid(cvec_s, cvec_s, cvec_s) return np.vstack([Xs.flatten(), Ys.flatten(), Zs.flatten()]).T
# Multiprocessing bits ======================================================== # # The parallellized part of test_orientations uses some big arrays as part of # the state that needs to be communicated to the spawn processes. # # On fork platforms, take advantage of process memory inheritance. # # On non fork platforms, rely on joblib dumping the state to disk and loading # back in the target processes, pickling only the minimal information to load # state back. Pickling the big arrays directly was causing memory errors and # would be less efficient in memory (as joblib memmaps by default the big # arrays, meaning they may be shared between processes).
[docs]def multiproc_inner_loop(chunk): """function to use in multiprocessing that computes the simulation over the task's alloted chunk of data""" chunk_size = _mp_state[0] n_coords = len(_mp_state[4]) (offset, size) = get_offset_size(n_coords) chunk_stop = min(offset+size, chunk+chunk_size) return _grand_loop_inner(*_mp_state[1:], start=chunk, stop=chunk_stop)
[docs]def worker_init(id_state, id_exp): """process initialization function. This function is only used when the child processes are spawned (instead of forked). When using the fork model of multiprocessing the data is just inherited in process memory.""" import joblib global _mp_state state = joblib.load(id_state) experiment = joblib.load(id_exp) _mp_state = state + (experiment,)
[docs]@contextlib.contextmanager def grand_loop_pool(ncpus, state): """function that handles the initialization of multiprocessing. It handles properly the use of spawned vs forked multiprocessing. The multiprocessing can be either 'fork' or 'spawn', with 'spawn' being required in non-fork platforms (like Windows) and 'fork' being preferred on fork platforms due to its efficiency. """ # state = ( chunk_size, # image_stack, # angles, # precomp, # coords, # experiment ) global _multiprocessing_start_method try: multiprocessing.set_start_method(_multiprocessing_start_method) except: print('Multiprocessing context already set') if _multiprocessing_start_method == 'fork': # Use FORK multiprocessing. # All read-only data can be inherited in the process. So we "pass" it # as a global that the child process will be able to see. At the end of # theprocessing the global is removed. global _mp_state _mp_state = state pool = multiprocessing.Pool(ncpus) yield pool del (_mp_state) else: # Use SPAWN multiprocessing. # As we can not inherit process data, all the required data is # serialized into a temporary directory using joblib. The # multiprocessing pool will have the "worker_init" as initialization # function that takes the key for the serialized data, which will be # used to load the parameter memory into the spawn process (also using # joblib). In theory, joblib uses memmap for arrays if they are not # compressed, so no compression is used for the bigger arrays. import joblib tmp_dir = tempfile.mkdtemp(suffix='-nf-grand-loop') try: # dumb dumping doesn't seem to work very well.. do something ad-hoc logging.info('Using "%s" as temporary directory.', tmp_dir) id_exp = joblib.dump(state[-1], os.path.join(tmp_dir, 'grand-loop-experiment.gz'), compress=True) id_state = joblib.dump(state[:-1], os.path.join(tmp_dir, 'grand-loop-data')) pool = multiprocessing.Pool(ncpus, worker_init, (id_state[0], id_exp[0])) yield pool finally: logging.info('Deleting "%s".', tmp_dir) shutil.rmtree(tmp_dir)
# %% Test Grid Generation
[docs]def gen_nf_test_grid(cross_sectional_dim, v_bnds, voxel_spacing): Zs_list=np.arange(-cross_sectional_dim/2.+voxel_spacing/2.,cross_sectional_dim/2.,voxel_spacing) Xs_list=np.arange(-cross_sectional_dim/2.+voxel_spacing/2.,cross_sectional_dim/2.,voxel_spacing) if v_bnds[0]==v_bnds[1]: Xs,Ys,Zs=np.meshgrid(Xs_list,v_bnds[0],Zs_list) else: Xs,Ys,Zs=np.meshgrid(Xs_list,np.arange(v_bnds[0]+voxel_spacing/2.,v_bnds[1],voxel_spacing),Zs_list) #note numpy shaping of arrays is goofy, returns(length(y),length(x),length(z)) test_crds = np.vstack([Xs.flatten(), Ys.flatten(), Zs.flatten()]).T n_crds = len(test_crds) return test_crds, n_crds, Xs, Ys, Zs
[docs]def gen_nf_test_grid_tomo(x_dim_pnts, z_dim_pnts, v_bnds, voxel_spacing): if v_bnds[0]==v_bnds[1]: Xs,Ys,Zs=np.meshgrid(np.arange(x_dim_pnts),v_bnds[0],np.arange(z_dim_pnts)) else: Xs,Ys,Zs=np.meshgrid(np.arange(x_dim_pnts),np.arange(v_bnds[0]+voxel_spacing/2.,v_bnds[1],voxel_spacing),np.arange(z_dim_pnts)) #note numpy shaping of arrays is goofy, returns(length(y),length(x),length(z)) Zs=(Zs-(z_dim_pnts/2))*voxel_spacing Xs=(Xs-(x_dim_pnts/2))*voxel_spacing test_crds = np.vstack([Xs.flatten(), Ys.flatten(), Zs.flatten()]).T n_crds = len(test_crds) return test_crds, n_crds, Xs, Ys, Zs
# %%
[docs]def gen_nf_dark(data_folder,img_nums,num_for_dark,nrows,ncols,dark_type='median',stem='nf_',num_digits=5,ext='.tif'): dark_stack=np.zeros([num_for_dark,nrows,ncols]) print('Loading data for dark generation...') for ii in np.arange(num_for_dark): print('Image #: ' + str(ii)) dark_stack[ii,:,:]=imgio.imread(data_folder+'%s'%(stem)+str(img_nums[ii]).zfill(num_digits)+ext) #image_stack[ii,:,:]=np.flipud(tmp_img>threshold) if dark_type=='median': print('making median...') dark=np.median(dark_stack,axis=0) elif dark_type=='min': print('making min...') dark=np.min(dark_stack,axis=0) return dark
# %%
[docs]def gen_nf_cleaned_image_stack(data_folder,img_nums,dark,nrows,ncols, \ process_type='gaussian',process_args=[4.5,5], \ threshold=1.5,ome_dilation_iter=1,stem='nf_', \ num_digits=5,ext='.tif'): image_stack=np.zeros([img_nums.shape[0],nrows,ncols],dtype=bool) print('Loading and Cleaning Images...') if process_type=='gaussian': sigma=process_args[0] size=process_args[1].astype(int) #needs to be int for ii in np.arange(img_nums.shape[0]): print('Image #: ' + str(ii)) tmp_img=imgio.imread(data_folder+'%s'%(stem)+str(img_nums[ii]).zfill(num_digits)+ext)-dark #image procesing tmp_img = filters.gaussian(tmp_img, sigma=sigma) tmp_img = img.morphology.grey_closing(tmp_img,size=(size,size)) binary_img = img.morphology.binary_fill_holes(tmp_img>threshold) image_stack[ii,:,:]=binary_img else: num_erosions=process_args[0] num_dilations=process_args[1] for ii in np.arange(img_nums.shape[0]): print('Image #: ' + str(ii)) tmp_img=imgio.imread(data_folder+'%s'%(stem)+str(img_nums[ii]).zfill(num_digits)+ext)-dark #image procesing image_stack[ii,:,:]=img.morphology.binary_erosion(tmp_img>threshold,iterations=num_erosions) image_stack[ii,:,:]=img.morphology.binary_dilation(image_stack[ii,:,:],iterations=num_dilations) #%A final dilation that includes omega print('Final Dilation Including Omega....') image_stack=img.morphology.binary_dilation(image_stack,iterations=ome_dilation_iter) return image_stack
# %%
[docs]def gen_trial_exp_data(grain_out_file,det_file,mat_file, mat_name, max_tth, comp_thresh, chi2_thresh, misorientation_bnd, \ misorientation_spacing,ome_range_deg, nframes, beam_stop_parms): print('Loading Grain Data...') #gen_grain_data ff_data=np.loadtxt(grain_out_file) #ff_data=np.atleast_2d(ff_data[2,:]) exp_maps=ff_data[:,3:6] t_vec_ds=ff_data[:,6:9] # completeness=ff_data[:,1] chi2=ff_data[:,2] n_grains=exp_maps.shape[0] rMat_c = rotations.rotMatOfExpMap(exp_maps.T) cut=np.where(np.logical_and(completeness>comp_thresh,chi2<chi2_thresh))[0] exp_maps=exp_maps[cut,:] t_vec_ds=t_vec_ds[cut,:] chi2=chi2[cut] # Add Misorientation mis_amt=misorientation_bnd*np.pi/180. spacing=misorientation_spacing*np.pi/180. mis_steps = int(misorientation_bnd/misorientation_spacing) ori_pts = np.arange(-mis_amt, (mis_amt+(spacing*0.999)),spacing) num_ori_grid_pts=ori_pts.shape[0]**3 num_oris=exp_maps.shape[0] XsO, YsO, ZsO = np.meshgrid(ori_pts, ori_pts, ori_pts) grid0 = np.vstack([XsO.flatten(), YsO.flatten(), ZsO.flatten()]).T exp_maps_expanded=np.zeros([num_ori_grid_pts*num_oris,3]) t_vec_ds_expanded=np.zeros([num_ori_grid_pts*num_oris,3]) for ii in np.arange(num_oris): pts_to_use=np.arange(num_ori_grid_pts)+ii*num_ori_grid_pts exp_maps_expanded[pts_to_use,:]=grid0+np.r_[exp_maps[ii,:] ] t_vec_ds_expanded[pts_to_use,:]=np.r_[t_vec_ds[ii,:] ] exp_maps=exp_maps_expanded t_vec_ds=t_vec_ds_expanded n_grains=exp_maps.shape[0] rMat_c = rotations.rotMatOfExpMap(exp_maps.T) print('Loading Instrument Data...') ome_period_deg=(ome_range_deg[0][0], (ome_range_deg[0][0]+360.)) #degrees ome_step_deg=(ome_range_deg[0][1]-ome_range_deg[0][0])/nframes #degrees ome_period = (ome_period_deg[0]*np.pi/180.,ome_period_deg[1]*np.pi/180.) ome_range = [(ome_range_deg[0][0]*np.pi/180.,ome_range_deg[0][1]*np.pi/180.)] ome_step = ome_step_deg*np.pi/180. ome_edges = np.arange(nframes+1)*ome_step+ome_range[0][0]#fixed 2/26/17 instr=load_instrument(det_file) panel = next(iter(instr.detectors.values())) # !!! there is only 1 # tranform paramters # Sample chi = instr.chi tVec_s = instr.tvec # Detector rMat_d = panel.rmat tilt_angles_xyzp = np.asarray(rotations.angles_from_rmat_xyz(rMat_d)) tVec_d = panel.tvec # pixels row_ps = panel.pixel_size_row col_ps = panel.pixel_size_col pixel_size = (row_ps, col_ps) nrows = panel.rows ncols = panel.cols # panel dimensions panel_dims = [tuple(panel.corner_ll), tuple(panel.corner_ur)] x_col_edges = panel.col_edge_vec y_row_edges = panel.row_edge_vec rx, ry = np.meshgrid(x_col_edges, y_row_edges) max_pixel_tth = instrument.max_tth(instr) detector_params = np.hstack([tilt_angles_xyzp, tVec_d, chi, tVec_s]) distortion = panel.distortion # !!! must be None for now # a different parametrization for the sensor # (makes for faster quantization) base = np.array([x_col_edges[0], y_row_edges[0], ome_edges[0]]) deltas = np.array([x_col_edges[1] - x_col_edges[0], y_row_edges[1] - y_row_edges[0], ome_edges[1] - ome_edges[0]]) inv_deltas = 1.0/deltas clip_vals = np.array([ncols, nrows]) # # dilation # max_diameter = np.sqrt(3)*0.005 # row_dilation = int(np.ceil(0.5 * max_diameter/row_ps)) # col_dilation = int(np.ceil(0.5 * max_diameter/col_ps)) print('Loading Materials Data...') # crystallography data beam_energy = valunits.valWUnit("beam_energy", "energy", instr.beam_energy, "keV") beam_wavelength = constants.keVToAngstrom(beam_energy.getVal('keV')) dmin = valunits.valWUnit("dmin", "length", 0.5*beam_wavelength/np.sin(0.5*max_pixel_tth), "angstrom") # material loading mats = material.load_materials_hdf5(mat_file, dmin=dmin,kev=beam_energy) pd = mats[mat_name].planeData if max_tth is not None: pd.tThMax = np.amax(np.radians(max_tth)) else: pd.tThMax = np.amax(max_pixel_tth) print('Final Assembly...') experiment = argparse.Namespace() # grains related information experiment.n_grains = n_grains # this can be derived from other values... experiment.rMat_c = rMat_c # n_grains rotation matrices (one per grain) experiment.exp_maps = exp_maps # n_grains exp_maps (one per grain) experiment.plane_data = pd experiment.detector_params = detector_params experiment.pixel_size = pixel_size experiment.ome_range = ome_range experiment.ome_period = ome_period experiment.x_col_edges = x_col_edges experiment.y_row_edges = y_row_edges experiment.ome_edges = ome_edges experiment.ncols = ncols experiment.nrows = nrows experiment.nframes = nframes # used only in simulate... experiment.rMat_d = rMat_d experiment.tVec_d = tVec_d experiment.chi = chi # note this is used to compute S... why is it needed? experiment.tVec_s = tVec_s experiment.rMat_c = rMat_c # ns.row_dilation = 0 #done beforehand row_dilation, disabled # experiemnt.col_dilation = 0 #col_dilation experiment.distortion = distortion experiment.panel_dims = panel_dims # used only in simulate... experiment.base = base experiment.inv_deltas = inv_deltas experiment.clip_vals = clip_vals experiment.bsp = beam_stop_parms if mis_steps ==0: nf_to_ff_id_map = cut else: nf_to_ff_id_map=np.tile(cut,3**3*mis_steps) return experiment, nf_to_ff_id_map
[docs]def process_raw_confidence(raw_confidence,vol_shape=None,id_remap=None,min_thresh=0.0): print('Compiling Confidence Map...') if vol_shape == None: confidence_map=np.max(raw_confidence,axis=0) grain_map=np.argmax(raw_confidence,axis=0) else: confidence_map=np.max(raw_confidence,axis=0).reshape(vol_shape) grain_map=np.argmax(raw_confidence,axis=0).reshape(vol_shape) #fix grain indexing not_indexed=np.where(confidence_map<=min_thresh) grain_map[not_indexed] =-1 if id_remap is not None: max_grain_no=np.max(grain_map) grain_map_copy=copy.copy(grain_map) print('Remapping grain ids to ff...') for ii in np.arange(max_grain_no): this_grain=np.where(grain_map==ii) grain_map_copy[this_grain]=id_remap[ii] grain_map=grain_map_copy return grain_map.astype(int), confidence_map
# %%
[docs]def save_raw_confidence(save_dir,save_stem,raw_confidence,id_remap=None): print('Saving raw confidence, might take a while...') if id_remap is not None: np.savez(save_dir+save_stem+'_raw_confidence.npz',raw_confidence=raw_confidence,id_remap=id_remap) else: np.savez(save_dir+save_stem+'_raw_confidence.npz',raw_confidence=raw_confidence)
# %%
[docs]def save_nf_data(save_dir,save_stem,grain_map,confidence_map,Xs,Ys,Zs,ori_list,id_remap=None): print('Saving grain map data...') if id_remap is not None: np.savez(save_dir+save_stem+'_grain_map_data.npz',grain_map=grain_map,confidence_map=confidence_map,Xs=Xs,Ys=Ys,Zs=Zs,ori_list=ori_list,id_remap=id_remap) else: np.savez(save_dir+save_stem+'_grain_map_data.npz',grain_map=grain_map,confidence_map=confidence_map,Xs=Xs,Ys=Ys,Zs=Zs,ori_list=ori_list)
# %%
[docs]def scan_detector_parm(image_stack, experiment,test_crds,controller,parm_to_opt,parm_range,slice_shape,ang='deg'): # 0-distance # 1-x center # 2-y center # 3-xtilt # 4-ytilt # 5-ztilt parm_vector=np.arange(parm_range[0],parm_range[1]+1e-6,(parm_range[1]-parm_range[0])/parm_range[2]) if parm_to_opt>2 and ang=='deg': parm_vector=parm_vector*np.pi/180. multiprocessing_start_method = 'fork' if hasattr(os, 'fork') else 'spawn' # current detector parameters, note the value for the actively optimized # parameters will be ignored distance=experiment.detector_params[5]#mm x_cen=experiment.detector_params[3]#mm y_cen=experiment.detector_params[4]#mm xtilt=experiment.detector_params[0] ytilt=experiment.detector_params[1] ztilt=experiment.detector_params[2] ome_range=copy.copy(experiment.ome_range) ome_period=copy.copy(experiment.ome_period) ome_edges=copy.copy(experiment.ome_edges) num_parm_pts=len(parm_vector) trial_data=np.zeros([num_parm_pts,slice_shape[0],slice_shape[1]]) tmp_td=copy.copy(experiment.tVec_d) for jj in np.arange(num_parm_pts): print('cycle %d of %d'%(jj+1,num_parm_pts)) # overwrite translation vector components if parm_to_opt==0: tmp_td[2]=parm_vector[jj] if parm_to_opt==1: tmp_td[0]=parm_vector[jj] if parm_to_opt==2: tmp_td[1]=parm_vector[jj] if parm_to_opt == 3: rMat_d_tmp = xfcapi.make_detector_rmat( [parm_vector[jj], ytilt, ztilt] ) elif parm_to_opt == 4: rMat_d_tmp = xfcapi.make_detector_rmat( [xtilt, parm_vector[jj], ztilt] ) elif parm_to_opt == 5: rMat_d_tmp = xfcapi.make_detector_rmat( [xtilt, ytilt, parm_vector[jj]] ) else: rMat_d_tmp = xfcapi.make_detector_rmat([xtilt, ytilt, ztilt]) experiment.rMat_d = rMat_d_tmp experiment.tVec_d = tmp_td if parm_to_opt==6: experiment.ome_range = [ ( ome_range[0][0] - parm_vector[jj], ome_range[0][1] - parm_vector[jj], ) ] experiment.ome_period = ( ome_period[0] - parm_vector[jj], ome_period[1] - parm_vector[jj], ) experiment.ome_edges = np.array(ome_edges - parm_vector[jj]) experiment.base[2] = experiment.ome_edges[0] # print(experiment.ome_range) # print(experiment.ome_period) # print(experiment.ome_edges) # print(experiment.base) conf=test_orientations(image_stack, experiment,test_crds,controller, \ multiprocessing_start_method) trial_data[jj]=np.max(conf,axis=0).reshape(slice_shape) return trial_data, parm_vector
# %%
[docs]def plot_ori_map(grain_map, confidence_map, exp_maps, layer_no,mat,id_remap=None): grains_plot=np.squeeze(grain_map[layer_no,:,:]) conf_plot=np.squeeze(confidence_map[layer_no,:,:]) n_grains=len(exp_maps) rgb_image = np.zeros( [grains_plot.shape[0], grains_plot.shape[1], 4], dtype='float32') rgb_image[:, :, 3] = 1. for ii in np.arange(n_grains): if id_remap is not None: this_grain = np.where(np.squeeze(grains_plot) == id_remap[ii]) else: this_grain = np.where(np.squeeze(grains_plot) == ii) if np.sum(this_grain[0]) > 0: ori = exp_maps[ii, :] rmats = rotations.rotMatOfExpMap(ori) rgb = mat.unitcell.color_orientations( rmats, ref_dir=np.array([0., 1., 0.])) #color mapping rgb_image[this_grain[0], this_grain[1], 0] = rgb[0][0] rgb_image[this_grain[0], this_grain[1], 1] = rgb[0][1] rgb_image[this_grain[0], this_grain[1], 2] = rgb[0][2] fig1 = plt.figure() plt.imshow(rgb_image, interpolation='none') plt.title('Layer %d Grain Map' % layer_no) #plt.show() plt.hold(True) #fig2 = plt.figure() plt.imshow(conf_plot, vmin=0.0, vmax=1., interpolation='none', cmap=plt.cm.gray, alpha=0.5) plt.title('Layer %d Confidence Map' % layer_no) plt.show()
# ============================================================================== # %% SCRIPT ENTRY AND PARAMETER HANDLING # ============================================================================== # def main(args, controller): # grain_params, experiment = mockup_experiment() # controller.handle_result('experiment', experiment) # controller.handle_result('grain_params', grain_params) # image_stack = get_simulate_diffractions(grain_params, experiment, # controller=controller) # image_stack = get_dilated_image_stack(image_stack, experiment, # controller) # test_orientations(image_stack, experiment, # controller=controller) # def parse_args(): # try: # default_ncpus = multiprocessing.cpu_count() # except NotImplementedError: # default_ncpus = 1 # parser = argparse.ArgumentParser() # parser.add_argument("--inst-profile", action='append', default=[], # help="instrumented profile") # parser.add_argument("--generate", # help="generate file with intermediate results") # parser.add_argument("--check", # help="check against an file with intermediate results") # parser.add_argument("--limit", type=int, # help="limit the size of the run") # parser.add_argument("--ncpus", type=int, default=default_ncpus, # help="number of processes to use") # parser.add_argument("--chunk-size", type=int, default=100, # help="chunk size for use in multiprocessing/reporting") # parser.add_argument("--force-spawn-multiprocessing", action='store_true', # help="force using spawn as the multiprocessing method") # args = parser.parse_args() # ''' # keys = [ # 'inst_profile', # 'generate', # 'check', # 'limit', # 'ncpus', # 'chunk_size'] # print( # '\n'.join([': '.join([key, str(getattr(args, key))]) for key in keys]) # ) # ''' # return args
[docs]def build_controller(check=None,generate=None,ncpus=2,chunk_size=10,limit=None): # builds the controller to use based on the args # result handle try: import progressbar progress_handler = progressbar_progress_observer() except ImportError: progress_handler = null_progress_observer() if check is not None: if generate is not None: logging.warn( "generating and checking can not happen at the same time, " + "going with checking") result_handler = checking_result_handler(check) elif generate is not None: result_handler = saving_result_handler(generate) else: result_handler = forgetful_result_handler() # if args.ncpus > 1 and os.name == 'nt': # logging.warn("Multiprocessing on Windows is disabled for now") # args.ncpus = 1 controller = ProcessController(result_handler, progress_handler, ncpus=ncpus, chunk_size=chunk_size) if limit is not None: controller.set_limit('coords', lambda x: min(x, limit)) return controller
[docs]def output_grain_map(data_location,data_stems,output_stem,vol_spacing,top_down=True,save_type=['npz']): num_scans=len(data_stems) confidence_maps=[None]*num_scans grain_maps=[None]*num_scans Xss=[None]*num_scans Yss=[None]*num_scans Zss=[None]*num_scans if len(vol_spacing)==1: vol_shifts=np.arange(0,vol_spacing[0]*num_scans+1e-12,vol_spacing[0]) else: vol_shifts=vol_spacing for ii in np.arange(num_scans): print('Loading Volume %d ....'%(ii)) conf_data=np.load(os.path.join(data_location,data_stems[ii]+'_grain_map_data.npz')) confidence_maps[ii]=conf_data['confidence_map'] grain_maps[ii]=conf_data['grain_map'] Xss[ii]=conf_data['Xs'] Yss[ii]=conf_data['Ys'] Zss[ii]=conf_data['Zs'] #assumes all volumes to be the same size num_layers=grain_maps[0].shape[0] total_layers=num_layers*num_scans num_rows=grain_maps[0].shape[1] num_cols=grain_maps[0].shape[2] grain_map_stitched=np.zeros((total_layers,num_rows,num_cols)) confidence_stitched=np.zeros((total_layers,num_rows,num_cols)) Xs_stitched=np.zeros((total_layers,num_rows,num_cols)) Ys_stitched=np.zeros((total_layers,num_rows,num_cols)) Zs_stitched=np.zeros((total_layers,num_rows,num_cols)) for ii in np.arange(num_scans): if top_down==True: grain_map_stitched[((ii)*num_layers):((ii)*num_layers+num_layers),:,:]=grain_maps[num_scans-1-ii] confidence_stitched[((ii)*num_layers):((ii)*num_layers+num_layers),:,:]=confidence_maps[num_scans-1-ii] Xs_stitched[((ii)*num_layers):((ii)*num_layers+num_layers),:,:]=\ Xss[num_scans-1-ii] Zs_stitched[((ii)*num_layers):((ii)*num_layers+num_layers),:,:]=\ Zss[num_scans-1-ii] Ys_stitched[((ii)*num_layers):((ii)*num_layers+num_layers),:,:]=Yss[num_scans-1-ii]+vol_shifts[ii] else: grain_map_stitched[((ii)*num_layers):((ii)*num_layers+num_layers),:,:]=grain_maps[ii] confidence_stitched[((ii)*num_layers):((ii)*num_layers+num_layers),:,:]=confidence_maps[ii] Xs_stitched[((ii)*num_layers):((ii)*num_layers+num_layers),:,:]=Xss[ii] Zs_stitched[((ii)*num_layers):((ii)*num_layers+num_layers),:,:]=Zss[ii] Ys_stitched[((ii)*num_layers):((ii)*num_layers+num_layers),:,:]=Yss[ii]+vol_shifts[ii] for ii in np.arange(len(save_type)): if save_type[ii] == 'hdf5': print('Writing HDF5 data...') hf = h5py.File(output_stem + '_assembled.h5', 'w') hf.create_dataset('grain_map', data=grain_map_stitched) hf.create_dataset('confidence', data=confidence_stitched) hf.create_dataset('Xs', data=Xs_stitched) hf.create_dataset('Ys', data=Ys_stitched) hf.create_dataset('Zs', data=Zs_stitched) elif save_type[ii]=='npz': print('Writing NPZ data...') np.savez(output_stem + '_assembled.npz',\ grain_map=grain_map_stitched,confidence=confidence_stitched, Xs=Xs_stitched,Ys=Ys_stitched,Zs=Zs_stitched) elif save_type[ii]=='vtk': print('Writing VTK data...') # VTK Dump Xslist=Xs_stitched[:,:,:].ravel() Yslist=Ys_stitched[:,:,:].ravel() Zslist=Zs_stitched[:,:,:].ravel() grainlist=grain_map_stitched[:,:,:].ravel() conflist=confidence_stitched[:,:,:].ravel() num_pts=Xslist.shape[0] num_cells=(total_layers-1)*(num_rows-1)*(num_cols-1) f = open(os.path.join(output_stem +'_assembled.vtk'), 'w') f.write('# vtk DataFile Version 3.0\n') f.write('grainmap Data\n') f.write('ASCII\n') f.write('DATASET UNSTRUCTURED_GRID\n') f.write('POINTS %d double\n' % (num_pts)) for i in np.arange(num_pts): f.write('%e %e %e \n' %(Xslist[i],Yslist[i],Zslist[i])) scale2=num_cols*num_rows scale1=num_cols f.write('CELLS %d %d\n' % (num_cells, 9*num_cells)) for k in np.arange(Xs_stitched.shape[0]-1): for j in np.arange(Xs_stitched.shape[1]-1): for i in np.arange(Xs_stitched.shape[2]-1): base=scale2*k+scale1*j+i p1=base p2=base+1 p3=base+1+scale1 p4=base+scale1 p5=base+scale2 p6=base+scale2+1 p7=base+scale2+scale1+1 p8=base+scale2+scale1 f.write('8 %d %d %d %d %d %d %d %d \n' \ %(p1,p2,p3,p4,p5,p6,p7,p8)) f.write('CELL_TYPES %d \n' % (num_cells)) for i in np.arange(num_cells): f.write('12 \n') f.write('POINT_DATA %d \n' % (num_pts)) f.write('SCALARS grain_id int \n') f.write('LOOKUP_TABLE default \n') for i in np.arange(num_pts): f.write('%d \n' %(grainlist[i])) f.write('FIELD FieldData 1 \n' ) f.write('confidence 1 %d float \n' % (num_pts)) for i in np.arange(num_pts): f.write('%e \n' %(conflist[i])) f.close() else: print('Not a valid save option, npz, vtk, or hdf5 allowed.') return grain_map_stitched, confidence_stitched, Xs_stitched, Ys_stitched, \ Zs_stitched
# # assume that if os has fork, it will be used by multiprocessing. # # note that on python > 3.4 we could use multiprocessing get_start_method and # # set_start_method for a cleaner implementation of this functionality. # _multiprocessing_start_method = 'fork' if hasattr(os, 'fork') else 'spawn' # if __name__ == '__main__': # LOG_LEVEL = logging.INFO # FORMAT="%(relativeCreated)12d [%(process)6d/%(thread)6d] %(levelname)8s: %(message)s" # logging.basicConfig(level=LOG_LEVEL, format=FORMAT) # # Setting the root log level via logging.basicConfig() doesn't always work. # # The next line ensures that it will get set. # logging.getLogger().setLevel(LOG_LEVEL) # args = parse_args() # if len(args.inst_profile) > 0: # from hexrd.utils import profiler # logging.debug("Instrumenting functions") # profiler.instrument_all(args.inst_profile) # if args.force_spawn_multiprocessing: # _multiprocessing_start_method = 'spawn' # controller = build_controller(args) # main(args, controller) # del controller # if len(args.inst_profile) > 0: # logging.debug("Dumping profiler results") # profiler.dump_results(args.inst_profile)