import numpy as np
from scipy import ndimage
from scipy.integrate import nquad
from scipy.optimize import leastsq
from skimage import filters
from skimage.feature import blob_log
from hexrd import xrdutil
from hexrd.constants import fwhm_to_sigma
from hexrd.rotations import angleAxisOfRotMat, RotMatEuler
from hexrd.transforms import xfcapi
from hexrd.utils.hkl import hkl_to_str, str_to_hkl
from .calibrator import Calibrator
from .lmfit_param_handling import (
create_grain_params,
DEFAULT_EULER_CONVENTION,
rename_to_avoid_collision,
)
[docs]class LaueCalibrator(Calibrator):
type = 'laue'
def __init__(self, instr, material, grain_params, default_refinements=None,
min_energy=5, max_energy=25, tth_distortion=None,
calibration_picks=None,
euler_convention=DEFAULT_EULER_CONVENTION):
self.instr = instr
self.material = material
self.grain_params = grain_params
self.default_refinements = default_refinements
self.energy_cutoffs = [min_energy, max_energy]
self.tth_distortion = tth_distortion
self.euler_convention = euler_convention
self.data_dict = None
if calibration_picks is not None:
self.calibration_picks = calibration_picks
self.param_names = []
[docs] def create_lmfit_params(self, current_params):
params = create_grain_params(
self.material.name,
self.grain_params_euler,
self.default_refinements,
)
# Ensure there are no name collisions
params, _ = rename_to_avoid_collision(params, current_params)
self.param_names = [x[0] for x in params]
return params
[docs] def update_from_lmfit_params(self, params_dict):
grain_params = []
for i, name in enumerate(self.param_names):
grain_params.append(params_dict[name].value)
self.grain_params_euler = np.asarray(grain_params)
@property
def grain_params_euler(self):
# Grain parameters with orientation set using Euler angle convention
if self.euler_convention is None:
return self.grain_params
grain_params = self.grain_params.copy()
rme = RotMatEuler(np.zeros(3), **self.euler_convention)
rme.rmat = xfcapi.makeRotMatOfExpMap(grain_params[:3])
grain_params[:3] = np.degrees(rme.angles)
return grain_params
@grain_params_euler.setter
def grain_params_euler(self, v):
# Grain parameters with orientation set using Euler angle convention
grain_params = v.copy()
if self.euler_convention is not None:
rme = RotMatEuler(np.zeros(3,), **self.euler_convention)
rme.angles = np.radians(grain_params[:3])
phi, n = angleAxisOfRotMat(rme.rmat)
grain_params[:3] = phi * n.flatten()
self.grain_params = grain_params
@property
def plane_data(self):
return self.material.planeData
@property
def bmatx(self):
return self.plane_data.latVecOps['B']
@property
def energy_cutoffs(self):
return self._energy_cutoffs
@energy_cutoffs.setter
def energy_cutoffs(self, x):
assert len(x) == 2, "input must have 2 elements"
assert x[1] > x[0], "first element must be < than second"
self._energy_cutoffs = x
self.plane_data.wavelength = self.energy_cutoffs[-1]
self.plane_data.exclusions = None
@property
def calibration_picks(self):
# Convert this from our internal data dict format
picks = {}
for det_key in self.instr.detectors:
picks[det_key] = {}
# find valid reflections and recast hkls to int
xys = self.data_dict['pick_xys'][det_key]
hkls = self.data_dict['hkls'][det_key]
for hkl, xy in zip(hkls, xys):
picks[det_key][hkl_to_str(hkl)] = xy
return picks
@calibration_picks.setter
def calibration_picks(self, v):
# Convert this to our internal data dict format
data_dict = {
'pick_xys': {},
'hkls': {},
}
for det_key, det_picks in v.items():
data_dict['hkls'][det_key] = [str_to_hkl(x) for x in det_picks]
data_dict['pick_xys'][det_key] = list(det_picks.values())
self.data_dict = data_dict
[docs] def autopick_points(self, raw_img_dict, tth_tol=5., eta_tol=5.,
npdiv=2, do_smoothing=True, smoothing_sigma=2,
use_blob_detection=True, blob_threshold=0.25,
fit_peaks=True, min_peak_int=1., fit_tth_tol=0.1):
"""
Parameters
----------
raw_img_dict : TYPE
DESCRIPTION.
tth_tol : TYPE, optional
DESCRIPTION. The default is 5..
eta_tol : TYPE, optional
DESCRIPTION. The default is 5..
npdiv : TYPE, optional
DESCRIPTION. The default is 2.
do_smoothing : TYPE, optional
DESCRIPTION. The default is True.
smoothing_sigma : TYPE, optional
DESCRIPTION. The default is 2.
use_blob_detection : TYPE, optional
DESCRIPTION. The default is True.
blob_threshold : TYPE, optional
DESCRIPTION. The default is 0.25.
fit_peaks : TYPE, optional
DESCRIPTION. The default is True.
Returns
-------
None.
"""
labelStructure = ndimage.generate_binary_structure(2, 1)
rmat_s = np.eye(3) # !!! forcing to identity
omega = 0. # !!! same ^^^
rmat_c = xfcapi.makeRotMatOfExpMap(self.grain_params[:3])
tvec_c = self.grain_params[3:6]
# vinv_s = self.grain_params[6:12] # !!!: patches don't take this yet
# run simulation
# ???: could we get this from overlays?
laue_sim = self.instr.simulate_laue_pattern(
self.plane_data,
minEnergy=self.energy_cutoffs[0],
maxEnergy=self.energy_cutoffs[1],
rmat_s=None, grain_params=np.atleast_2d(self.grain_params),
)
# loop over detectors for results
refl_dict = dict.fromkeys(self.instr.detectors)
for det_key, det in self.instr.detectors.items():
det_config = det.config_dict(
chi=self.instr.chi,
tvec=self.instr.tvec,
beam_vector=self.instr.beam_vector
)
xy_det, hkls, angles, dspacing, energy = laue_sim[det_key]
'''
valid_xy = []
valid_hkls = []
valid_angs = []
valid_energy = []
'''
# !!! not necessary to loop over grains since we can only handle 1
# for gid in range(len(xy_det)):
gid = 0
# find valid reflections
valid_refl = ~np.isnan(xy_det[gid][:, 0])
valid_xy = xy_det[gid][valid_refl, :]
valid_hkls = hkls[gid][:, valid_refl]
valid_angs = angles[gid][valid_refl, :]
valid_energy = energy[gid][valid_refl]
# pass
# make patches
refl_patches = xrdutil.make_reflection_patches(
det_config,
valid_angs, det.angularPixelSize(valid_xy),
rmat_c=rmat_c, tvec_c=tvec_c,
tth_tol=tth_tol, eta_tol=eta_tol,
npdiv=npdiv, quiet=True)
reflInfoList = []
img = raw_img_dict[det_key]
native_area = det.pixel_area
num_patches = len(valid_angs)
meas_xy = np.nan*np.ones((num_patches, 2))
meas_angs = np.nan*np.ones((num_patches, 2))
for iRefl, patch in enumerate(refl_patches):
# check for overrun
irow = patch[-1][0]
jcol = patch[-1][1]
if np.any([irow < 0, irow >= det.rows,
jcol < 0, jcol >= det.cols]):
continue
if not np.all(
det.clip_to_panel(
np.vstack([patch[1][0].flatten(),
patch[1][1].flatten()]).T
)[1]
):
continue
# use nearest interpolation
spot_data = img[irow, jcol] * patch[3] * npdiv**2 / native_area
spot_data -= np.amin(spot_data)
patch_size = spot_data.shape
sigmax = 0.25*np.min(spot_data.shape) * fwhm_to_sigma
# optional gaussian smoothing
if do_smoothing:
spot_data = filters.gaussian(spot_data, smoothing_sigma)
if use_blob_detection:
spot_data_scl = 2.*spot_data/np.max(spot_data) - 1.
# Compute radii in the 3rd column.
blobs_log = blob_log(spot_data_scl,
min_sigma=2,
max_sigma=min(sigmax, 20),
num_sigma=10,
threshold=blob_threshold,
overlap=0.1)
numPeaks = len(blobs_log)
else:
labels, numPeaks = ndimage.label(
spot_data > np.percentile(spot_data, 99),
structure=labelStructure
)
slabels = np.arange(1, numPeaks + 1)
tth_edges = patch[0][0][0, :]
eta_edges = patch[0][1][:, 0]
delta_tth = tth_edges[1] - tth_edges[0]
delta_eta = eta_edges[1] - eta_edges[0]
if numPeaks > 0:
peakId = iRefl
if use_blob_detection:
coms = blobs_log[:, :2]
else:
coms = np.array(
ndimage.center_of_mass(
spot_data, labels=labels, index=slabels
)
)
if numPeaks > 1:
#
center = np.r_[spot_data.shape]*0.5
com_diff = coms - np.tile(center, (numPeaks, 1))
closest_peak_idx = np.argmin(
np.sum(com_diff**2, axis=1)
)
#
else:
closest_peak_idx = 0
pass # end multipeak conditional
#
coms = coms[closest_peak_idx]
#
if fit_peaks:
sigm = 0.2*np.min(spot_data.shape)
if use_blob_detection:
sigm = min(blobs_log[closest_peak_idx, 2], sigm)
y0, x0 = coms.flatten()
ampl = float(spot_data[int(y0), int(x0)])
# y0, x0 = 0.5*np.array(spot_data.shape)
# ampl = np.max(spot_data)
a_par = c_par = 0.5/float(sigm**2)
b_par = 0.
bgx = bgy = 0.
bkg = np.min(spot_data)
params = [ampl,
a_par, b_par, c_par,
x0, y0, bgx, bgy, bkg]
#
result = leastsq(gaussian_2d, params, args=(spot_data))
#
fit_par = result[0]
#
coms = np.array([fit_par[5], fit_par[4]])
'''
print("%s, %d, (%.2f, %.2f), (%d, %d)"
% (det_key, iRefl, coms[0], coms[1],
patch_size[0], patch_size[1]))
'''
row_cen = fit_tth_tol * patch_size[0]
col_cen = fit_tth_tol * patch_size[1]
if np.any(
[coms[0] < row_cen,
coms[0] >= patch_size[0] - row_cen,
coms[1] < col_cen,
coms[1] >= patch_size[1] - col_cen]
):
continue
if (fit_par[0] < min_peak_int):
continue
# intensities
spot_intensity, int_err = nquad(
gaussian_2d_int,
[[0., 2.*y0], [0., 2.*x0]],
args=fit_par)
pass
com_angs = np.hstack([
tth_edges[0] + (0.5 + coms[1])*delta_tth,
eta_edges[0] + (0.5 + coms[0])*delta_eta
])
# grab intensities
if not fit_peaks:
if use_blob_detection:
spot_intensity = 10
max_intensity = 10
else:
spot_intensity = np.sum(
spot_data[labels == slabels[closest_peak_idx]]
)
max_intensity = np.max(
spot_data[labels == slabels[closest_peak_idx]]
)
else:
max_intensity = np.max(spot_data)
# need xy coords
# !!! forcing ome = 0. -- could be inconsistent with rmat_s
cmv = np.atleast_2d(np.hstack([com_angs, omega]))
gvec_c = xfcapi.anglesToGVec(
cmv,
chi=self.instr.chi,
rMat_c=rmat_c,
bHat_l=self.instr.beam_vector)
new_xy = xfcapi.gvecToDetectorXY(
gvec_c,
det.rmat, rmat_s, rmat_c,
det.tvec, self.instr.tvec, tvec_c,
beamVec=self.instr.beam_vector)
meas_xy[iRefl, :] = new_xy
if det.distortion is not None:
meas_xy[iRefl, :] = det.distortion.apply_inverse(
meas_xy[iRefl, :]
)
meas_angs[iRefl, :] = com_angs
else:
peakId = -999
#
spot_intensity = np.nan
max_intensity = np.nan
pass
reflInfoList.append([peakId, valid_hkls[:, iRefl],
(spot_intensity, max_intensity),
valid_energy[iRefl],
valid_angs[iRefl, :],
meas_angs[iRefl, :],
meas_xy[iRefl, :]])
pass
reflInfo = np.array(
[tuple(i) for i in reflInfoList],
dtype=reflInfo_dtype)
refl_dict[det_key] = reflInfo
# Convert to our data_dict format
data_dict = {
'pick_xys': {},
'hkls': {},
}
for det, det_picks in refl_dict.items():
data_dict['pick_xys'].setdefault(det, [])
data_dict['hkls'].setdefault(det, [])
for entry in det_picks:
hkl = entry[1].astype(int).tolist()
cart = entry[6]
data_dict['hkls'][det].append(hkl)
data_dict['pick_xys'][det].append(cart)
self.data_dict = data_dict
return data_dict
def _evaluate(self):
data_dict = self.data_dict
# grab reflection data from picks input
pick_hkls_dict = {}
pick_xys_dict = {}
for det_key in self.instr.detectors:
# find valid reflections and recast hkls to int
xys = np.asarray(data_dict['pick_xys'][det_key], dtype=float)
hkls = np.asarray(data_dict['hkls'][det_key], dtype=int)
valid_idx = ~np.isnan(xys[:, 0])
# fill local dicts
pick_hkls_dict[det_key] = np.atleast_2d(hkls[valid_idx, :]).T
pick_xys_dict[det_key] = np.atleast_2d(xys[valid_idx, :])
return pick_hkls_dict, pick_xys_dict
[docs] def residual(self):
# need this for laue obj
pick_hkls_dict, pick_xys_dict = self._evaluate()
# munge energy cutoffs
energy_cutoffs = np.r_[0.5, 1.5] * np.asarray(self.energy_cutoffs)
return sxcal_obj_func(
[self.grain_params], self.instr, pick_xys_dict, pick_hkls_dict,
self.bmatx, energy_cutoffs
)
[docs] def model(self):
# need this for laue obj
pick_hkls_dict, pick_xys_dict = self._evaluate()
return sxcal_obj_func(
[self.grain_params], self.instr, pick_xys_dict, pick_hkls_dict,
self.bmatx, self.energy_cutoffs, sim_only=True
)
# Objective function for Laue fitting
[docs]def sxcal_obj_func(grain_params, instr, meas_xy, hkls_idx,
bmat, energy_cutoffs, sim_only=False):
"""
Objective function for Laue-based fitting.
energy_cutoffs are [minEnergy, maxEnergy] where min/maxEnergy can be lists
"""
# right now just stuck on the end and assumed
# to all be the same length... FIX THIS
calc_xy = {}
calc_ang = {}
for det_key, panel in instr.detectors.items():
# Simulate Laue pattern:
# returns xy_det, hkls_in, angles, dspacing, energy
sim_results = panel.simulate_laue_pattern(
[hkls_idx[det_key], bmat],
minEnergy=energy_cutoffs[0], maxEnergy=energy_cutoffs[1],
grain_params=grain_params,
beam_vec=instr.beam_vector
)
calc_xy_tmp = sim_results[0][0]
idx = ~np.isnan(calc_xy_tmp[:, 0])
calc_xy[det_key] = calc_xy_tmp[idx, :]
if sim_only:
# Grab angles too. We dont use them otherwise.
# FIXME: might need tth correction if there is a distortion.
calc_angs_tmp = sim_results[2][0]
calc_ang[det_key] = calc_angs_tmp[idx, :]
# return values
if sim_only:
return {k: [calc_xy[k], calc_ang[k]] for k in calc_xy}
meas_xy_all = np.vstack(list(meas_xy.values()))
calc_xy_all = np.vstack(list(calc_xy.values()))
diff_vecs_xy = calc_xy_all - meas_xy_all
return diff_vecs_xy.flatten()
[docs]def gaussian_2d(p, data):
shape = data.shape
x, y = np.meshgrid(range(shape[1]), range(shape[0]))
func = p[0]*np.exp(
-(p[1]*(x-p[4])*(x-p[4])
+ p[2]*(x-p[4])*(y-p[5])
+ p[3]*(y-p[5])*(y-p[5]))
) + p[6]*(x-p[4]) + p[7]*(y-p[5]) + p[8]
return func.flatten() - data.flatten()
[docs]def gaussian_2d_int(y, x, *p):
func = p[0]*np.exp(
-(p[1]*(x-p[4])*(x-p[4])
+ p[2]*(x-p[4])*(y-p[5])
+ p[3]*(y-p[5])*(y-p[5]))
)
return func.flatten()
reflInfo_dtype = [
('iRefl', int),
('hkl', (int, 3)),
('intensity', (float, 2)),
('energy', float),
('predAngles', (float, 2)),
('measAngles', (float, 2)),
('measXY', (float, 2)),
]