Source code for hexrd.fitting.calibration.laue

import copy
from typing import Optional

import numpy as np
from scipy import ndimage
from scipy.integrate import nquad
from scipy.optimize import leastsq
from skimage import filters
from skimage.feature import blob_log

from hexrd import xrdutil
from hexrd.constants import fwhm_to_sigma
from hexrd.instrument import switch_xray_source
from hexrd.rotations import angleAxisOfRotMat, RotMatEuler
from hexrd.transforms import xfcapi
from hexrd.utils.hkl import hkl_to_str, str_to_hkl

from .calibrator import Calibrator
from .lmfit_param_handling import (
    create_grain_params,
    DEFAULT_EULER_CONVENTION,
    rename_to_avoid_collision,
)


[docs]class LaueCalibrator(Calibrator): type = 'laue' def __init__(self, instr, material, grain_params, default_refinements=None, min_energy=5, max_energy=25, tth_distortion=None, calibration_picks=None, euler_convention=DEFAULT_EULER_CONVENTION, xray_source: Optional[str] = None): self.instr = instr self.material = material self.grain_params = grain_params self.default_refinements = default_refinements self.energy_cutoffs = [min_energy, max_energy] self.euler_convention = euler_convention self.xray_source = xray_source self.data_dict = None if calibration_picks is not None: self.calibration_picks = calibration_picks self._tth_distortion = tth_distortion self._update_tth_distortion_panels() self.param_names = [] @property def tth_distortion(self): return self._tth_distortion @tth_distortion.setter def tth_distortion(self, v): self._tth_distortion = v self._update_tth_distortion_panels() def _update_tth_distortion_panels(self): # Make sure the panels in the tth distortion are the same # as those on the instrument, so their beam vectors get modified # accordingly. if self._tth_distortion is None: return self._tth_distortion = copy.deepcopy(self._tth_distortion) for det_key, obj in self._tth_distortion.items(): obj.panel = self.instr.detectors[det_key]
[docs] def create_lmfit_params(self, current_params): params = create_grain_params( self.material.name, self.grain_params_euler, self.default_refinements, ) # Ensure there are no name collisions params, _ = rename_to_avoid_collision(params, current_params) self.param_names = [x[0] for x in params] return params
[docs] def update_from_lmfit_params(self, params_dict): grain_params = [] for i, name in enumerate(self.param_names): grain_params.append(params_dict[name].value) self.grain_params_euler = np.asarray(grain_params)
@property def grain_params_euler(self): # Grain parameters with orientation set using Euler angle convention if self.euler_convention is None: return self.grain_params grain_params = self.grain_params.copy() rme = RotMatEuler(np.zeros(3), **self.euler_convention) rme.rmat = xfcapi.make_rmat_of_expmap(grain_params[:3]) grain_params[:3] = np.degrees(rme.angles) return grain_params @grain_params_euler.setter def grain_params_euler(self, v): # Grain parameters with orientation set using Euler angle convention grain_params = v.copy() if self.euler_convention is not None: rme = RotMatEuler(np.zeros(3,), **self.euler_convention) rme.angles = np.radians(grain_params[:3]) phi, n = angleAxisOfRotMat(rme.rmat) grain_params[:3] = phi * n.flatten() self.grain_params = grain_params @property def plane_data(self): return self.material.planeData @property def bmatx(self): return self.plane_data.latVecOps['B'] @property def energy_cutoffs(self): return self._energy_cutoffs @energy_cutoffs.setter def energy_cutoffs(self, x): assert len(x) == 2, "input must have 2 elements" assert x[1] > x[0], "first element must be < than second" self._energy_cutoffs = x self.plane_data.wavelength = self.energy_cutoffs[-1] self.plane_data.exclusions = None @property def calibration_picks(self): # Convert this from our internal data dict format picks = {} for det_key in self.instr.detectors: picks[det_key] = {} # find valid reflections and recast hkls to int xys = self.data_dict['pick_xys'][det_key] hkls = self.data_dict['hkls'][det_key] for hkl, xy in zip(hkls, xys): picks[det_key][hkl_to_str(hkl)] = xy return picks @calibration_picks.setter def calibration_picks(self, v): # Convert this to our internal data dict format data_dict = { 'pick_xys': {}, 'hkls': {}, } for det_key, det_picks in v.items(): data_dict['hkls'][det_key] = [str_to_hkl(x) for x in det_picks] data_dict['pick_xys'][det_key] = list(det_picks.values()) self.data_dict = data_dict
[docs] def autopick_points(self, raw_img_dict, tth_tol=5., eta_tol=5., npdiv=2, do_smoothing=True, smoothing_sigma=2, use_blob_detection=True, blob_threshold=0.25, fit_peaks=True, min_peak_int=1., fit_tth_tol=0.1): """ Parameters ---------- raw_img_dict : TYPE DESCRIPTION. tth_tol : TYPE, optional DESCRIPTION. The default is 5.. eta_tol : TYPE, optional DESCRIPTION. The default is 5.. npdiv : TYPE, optional DESCRIPTION. The default is 2. do_smoothing : TYPE, optional DESCRIPTION. The default is True. smoothing_sigma : TYPE, optional DESCRIPTION. The default is 2. use_blob_detection : TYPE, optional DESCRIPTION. The default is True. blob_threshold : TYPE, optional DESCRIPTION. The default is 0.25. fit_peaks : TYPE, optional DESCRIPTION. The default is True. Returns ------- None. """ with switch_xray_source(self.instr, self.xray_source): return self._autopick_points( raw_img_dict=raw_img_dict, tth_tol=tth_tol, eta_tol=eta_tol, npdiv=npdiv, do_smoothing=do_smoothing, smoothing_sigma=smoothing_sigma, use_blob_detection=use_blob_detection, blob_threshold=blob_threshold, fit_peaks=fit_peaks, min_peak_int=min_peak_int, fit_tth_tol=fit_tth_tol, )
def _autopick_points(self, raw_img_dict, tth_tol=5., eta_tol=5., npdiv=2, do_smoothing=True, smoothing_sigma=2, use_blob_detection=True, blob_threshold=0.25, fit_peaks=True, min_peak_int=1., fit_tth_tol=0.1): labelStructure = ndimage.generate_binary_structure(2, 1) rmat_s = np.eye(3) # !!! forcing to identity omega = 0. # !!! same ^^^ rmat_c = xfcapi.make_rmat_of_expmap(self.grain_params[:3]) tvec_c = self.grain_params[3:6] # vinv_s = self.grain_params[6:12] # !!!: patches don't take this yet # run simulation # ???: could we get this from overlays? laue_sim = self.instr.simulate_laue_pattern( self.plane_data, minEnergy=self.energy_cutoffs[0], maxEnergy=self.energy_cutoffs[1], rmat_s=None, grain_params=np.atleast_2d(self.grain_params), ) # loop over detectors for results refl_dict = dict.fromkeys(self.instr.detectors) for det_key, det in self.instr.detectors.items(): det_config = det.config_dict( chi=self.instr.chi, tvec=self.instr.tvec, beam_vector=self.instr.beam_vector ) xy_det, hkls, angles, dspacing, energy = laue_sim[det_key] ''' valid_xy = [] valid_hkls = [] valid_angs = [] valid_energy = [] ''' # !!! not necessary to loop over grains since we can only handle 1 # for gid in range(len(xy_det)): gid = 0 # find valid reflections valid_refl = ~np.isnan(xy_det[gid][:, 0]) valid_xy = xy_det[gid][valid_refl, :] valid_hkls = hkls[gid][:, valid_refl] valid_angs = angles[gid][valid_refl, :] valid_energy = energy[gid][valid_refl] # make patches refl_patches = xrdutil.make_reflection_patches( det_config, valid_angs, det.angularPixelSize(valid_xy), rmat_c=rmat_c, tvec_c=tvec_c, tth_tol=tth_tol, eta_tol=eta_tol, npdiv=npdiv, quiet=True) reflInfoList = [] img = raw_img_dict[det_key] native_area = det.pixel_area num_patches = len(valid_angs) meas_xy = np.nan*np.ones((num_patches, 2)) meas_angs = np.nan*np.ones((num_patches, 2)) for iRefl, patch in enumerate(refl_patches): # check for overrun irow = patch[-1][0] jcol = patch[-1][1] if np.any([irow < 0, irow >= det.rows, jcol < 0, jcol >= det.cols]): continue if not np.all( det.clip_to_panel( np.vstack([patch[1][0].flatten(), patch[1][1].flatten()]).T )[1] ): continue # use nearest interpolation spot_data = img[irow, jcol] * patch[3] * npdiv**2 / native_area spot_data -= np.amin(spot_data) patch_size = spot_data.shape sigmax = 0.25*np.min(spot_data.shape) * fwhm_to_sigma # optional gaussian smoothing if do_smoothing: spot_data = filters.gaussian(spot_data, smoothing_sigma) if use_blob_detection: spot_data_scl = 2.*spot_data/np.max(spot_data) - 1. # Compute radii in the 3rd column. blobs_log = blob_log(spot_data_scl, min_sigma=2, max_sigma=min(sigmax, 20), num_sigma=10, threshold=blob_threshold, overlap=0.1) numPeaks = len(blobs_log) else: labels, numPeaks = ndimage.label( spot_data > np.percentile(spot_data, 99), structure=labelStructure ) slabels = np.arange(1, numPeaks + 1) tth_edges = patch[0][0][0, :] eta_edges = patch[0][1][:, 0] delta_tth = tth_edges[1] - tth_edges[0] delta_eta = eta_edges[1] - eta_edges[0] if numPeaks > 0: peakId = iRefl if use_blob_detection: coms = blobs_log[:, :2] else: coms = np.array( ndimage.center_of_mass( spot_data, labels=labels, index=slabels ) ) if numPeaks > 1: # center = np.r_[spot_data.shape]*0.5 com_diff = coms - np.tile(center, (numPeaks, 1)) closest_peak_idx = np.argmin( np.sum(com_diff**2, axis=1) ) # else: closest_peak_idx = 0 # coms = coms[closest_peak_idx] # if fit_peaks: sigm = 0.2*np.min(spot_data.shape) if use_blob_detection: sigm = min(blobs_log[closest_peak_idx, 2], sigm) y0, x0 = coms.flatten() ampl = float(spot_data[int(y0), int(x0)]) # y0, x0 = 0.5*np.array(spot_data.shape) # ampl = np.max(spot_data) a_par = c_par = 0.5/float(sigm**2) b_par = 0. bgx = bgy = 0. bkg = np.min(spot_data) params = [ampl, a_par, b_par, c_par, x0, y0, bgx, bgy, bkg] # result = leastsq(gaussian_2d, params, args=(spot_data)) # fit_par = result[0] # coms = np.array([fit_par[5], fit_par[4]]) ''' print("%s, %d, (%.2f, %.2f), (%d, %d)" % (det_key, iRefl, coms[0], coms[1], patch_size[0], patch_size[1])) ''' row_cen = fit_tth_tol * patch_size[0] col_cen = fit_tth_tol * patch_size[1] if np.any( [coms[0] < row_cen, coms[0] >= patch_size[0] - row_cen, coms[1] < col_cen, coms[1] >= patch_size[1] - col_cen] ): continue if (fit_par[0] < min_peak_int): continue # intensities spot_intensity, int_err = nquad( gaussian_2d_int, [[0., 2.*y0], [0., 2.*x0]], args=fit_par) com_angs = np.hstack([ tth_edges[0] + (0.5 + coms[1])*delta_tth, eta_edges[0] + (0.5 + coms[0])*delta_eta ]) # grab intensities if not fit_peaks: if use_blob_detection: spot_intensity = 10 max_intensity = 10 else: spot_intensity = np.sum( spot_data[labels == slabels[closest_peak_idx]] ) max_intensity = np.max( spot_data[labels == slabels[closest_peak_idx]] ) else: max_intensity = np.max(spot_data) # need xy coords # !!! forcing ome = 0. -- could be inconsistent with rmat_s cmv = np.atleast_2d(np.hstack([com_angs, omega])) gvec_c = xfcapi.angles_to_gvec( cmv, chi=self.instr.chi, rmat_c=rmat_c, beam_vec=self.instr.beam_vector) new_xy = xfcapi.gvec_to_xy( gvec_c, det.rmat, rmat_s, rmat_c, det.tvec, self.instr.tvec, tvec_c, beam_vec=self.instr.beam_vector) meas_xy[iRefl, :] = new_xy if det.distortion is not None: meas_xy[iRefl, :] = det.distortion.apply_inverse( meas_xy[iRefl, :] ) meas_angs[iRefl, :] = com_angs else: peakId = -999 # spot_intensity = np.nan max_intensity = np.nan reflInfoList.append([peakId, valid_hkls[:, iRefl], (spot_intensity, max_intensity), valid_energy[iRefl], valid_angs[iRefl, :], meas_angs[iRefl, :], meas_xy[iRefl, :]]) reflInfo = np.array( [tuple(i) for i in reflInfoList], dtype=reflInfo_dtype) refl_dict[det_key] = reflInfo # Convert to our data_dict format data_dict = { 'pick_xys': {}, 'hkls': {}, } for det, det_picks in refl_dict.items(): data_dict['pick_xys'].setdefault(det, []) data_dict['hkls'].setdefault(det, []) for entry in det_picks: hkl = entry[1].astype(int).tolist() cart = entry[6] data_dict['hkls'][det].append(hkl) data_dict['pick_xys'][det].append(cart) self.data_dict = data_dict return data_dict def _evaluate(self): data_dict = self.data_dict # grab reflection data from picks input pick_hkls_dict = {} pick_xys_dict = {} for det_key in self.instr.detectors: # find valid reflections and recast hkls to int xys = np.asarray(data_dict['pick_xys'][det_key], dtype=float) hkls = np.asarray(data_dict['hkls'][det_key], dtype=int) valid_idx = ~np.isnan(xys[:, 0]) # fill local dicts pick_hkls_dict[det_key] = np.atleast_2d(hkls[valid_idx, :]).T pick_xys_dict[det_key] = np.atleast_2d(xys[valid_idx, :]) return pick_hkls_dict, pick_xys_dict
[docs] def residual(self): with switch_xray_source(self.instr, self.xray_source): return self._residual()
def _residual(self): # need this for laue obj pick_hkls_dict, pick_xys_dict = self._evaluate() # munge energy cutoffs energy_cutoffs = np.r_[0.5, 1.5] * np.asarray(self.energy_cutoffs) return sxcal_obj_func( [self.grain_params], self.instr, pick_xys_dict, pick_hkls_dict, self.bmatx, energy_cutoffs )
[docs] def model(self): with switch_xray_source(self.instr, self.xray_source): return self._model()
def _model(self): # need this for laue obj pick_hkls_dict, pick_xys_dict = self._evaluate() return sxcal_obj_func( [self.grain_params], self.instr, pick_xys_dict, pick_hkls_dict, self.bmatx, self.energy_cutoffs, sim_only=True )
# Objective function for Laue fitting
[docs]def sxcal_obj_func(grain_params, instr, meas_xy, hkls_idx, bmat, energy_cutoffs, sim_only=False): """ Objective function for Laue-based fitting. energy_cutoffs are [minEnergy, maxEnergy] where min/maxEnergy can be lists """ # right now just stuck on the end and assumed # to all be the same length... FIX THIS calc_xy = {} calc_ang = {} for det_key, panel in instr.detectors.items(): # Simulate Laue pattern: # returns xy_det, hkls_in, angles, dspacing, energy sim_results = panel.simulate_laue_pattern( [hkls_idx[det_key], bmat], minEnergy=energy_cutoffs[0], maxEnergy=energy_cutoffs[1], grain_params=grain_params, beam_vec=instr.beam_vector ) calc_xy_tmp = sim_results[0][0] idx = ~np.isnan(calc_xy_tmp[:, 0]) calc_xy[det_key] = calc_xy_tmp[idx, :] if sim_only: # Grab angles too. We dont use them otherwise. # FIXME: might need tth correction if there is a distortion. calc_angs_tmp = sim_results[2][0] calc_ang[det_key] = calc_angs_tmp[idx, :] # return values if sim_only: return {k: [calc_xy[k], calc_ang[k]] for k in calc_xy} meas_xy_all = np.vstack(list(meas_xy.values())) calc_xy_all = np.vstack(list(calc_xy.values())) diff_vecs_xy = calc_xy_all - meas_xy_all return diff_vecs_xy.flatten()
[docs]def gaussian_2d(p, data): shape = data.shape x, y = np.meshgrid(range(shape[1]), range(shape[0])) func = p[0]*np.exp( -(p[1]*(x-p[4])*(x-p[4]) + p[2]*(x-p[4])*(y-p[5]) + p[3]*(y-p[5])*(y-p[5])) ) + p[6]*(x-p[4]) + p[7]*(y-p[5]) + p[8] return func.flatten() - data.flatten()
[docs]def gaussian_2d_int(y, x, *p): func = p[0]*np.exp( -(p[1]*(x-p[4])*(x-p[4]) + p[2]*(x-p[4])*(y-p[5]) + p[3]*(y-p[5])*(y-p[5])) ) return func.flatten()
reflInfo_dtype = [ ('iRefl', int), ('hkl', (int, 3)), ('intensity', (float, 2)), ('energy', float), ('predAngles', (float, 2)), ('measAngles', (float, 2)), ('measXY', (float, 2)), ]