# standard imports
# ---------
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
import copy
from functools import partial
from os import path
import time
import warnings
# 3rd party imports
# -----------------
import h5py
import lmfit
import numpy as np
from scipy.interpolate import CubicSpline
from scipy.special import roots_legendre
# hexrd imports
# -------------
from hexrd import constants
from hexrd.imageutil import snip1d_quad
from hexrd.material import Material
from hexrd.transforms.xfcapi import angles_to_gvec
from hexrd.valunits import valWUnit
from hexrd.wppf.peakfunctions import (
calc_rwp,
computespectrum_pvfcj,
computespectrum_pvtch,
computespectrum_pvpink,
calc_Iobs_pvfcj,
calc_Iobs_pvtch,
calc_Iobs_pvpink,
)
from hexrd.wppf import wppfsupport
from hexrd.wppf.spectrum import Spectrum
from hexrd.wppf.phase import (
Phases_LeBail,
Phases_Rietveld,
Material_LeBail,
Material_Rietveld,
)
[docs]class AbstractWPPF(ABC):
"""Methods used by both LeBail and Rietveld"""
# Abstract methods which must be defined for each WPPF type
[docs] @abstractmethod
def generate_default_parameters(self) -> lmfit.Parameters:
pass
@abstractmethod
def _add_phase_params(self, params: lmfit.Parameters):
pass
@abstractmethod
def _set_phase_params_vals_to_class(self, params: lmfit.Parameters):
pass
@abstractmethod
def _get_phase(self, name: str, wavelength_type: str):
pass
# Shared methods which each WPPF type uses
def __str__(self):
cls_name = self.__class__.__name__
resstr = (
f"<{cls_name} Fit class>\n"
"resParameters of the model are as follows:\n"
)
resstr += self.params.__str__()
return resstr
[docs] def dump_hdf5(self, file):
"""
>> @AUTHOR: Saransh Singh, Lawrence Livermore National Lab,
saransh1@llnl.gov
>> @DATE: 01/19/2021 SS 1.0 original
>> @DETAILS: write out the hdf5 file with all the spectrum, parameters
and phases pecified by filename or h5py.File object
"""
if isinstance(file, str):
fexist = path.isfile(file)
if fexist:
fid = h5py.File(file, "r+")
else:
fid = h5py.File(file, "x")
elif isinstance(file, h5py.File):
fid = file
else:
raise RuntimeError(
"Parameters: dump_hdf5 Pass in a filename \
string or h5py.File object"
)
self.phases.dump_hdf5(fid)
self.params.dump_hdf5(fid)
self.spectrum_expt.dump_hdf5(fid, "experimental")
self.spectrum_sim.dump_hdf5(fid, "simulated")
self.background.dump_hdf5(fid, "background")
[docs] def params_vary_off(self):
"""
no params are varied
"""
for p in self.params:
self.params[p].vary = False
[docs] def params_vary_on(self):
"""
all params are varied
"""
for p in self.params:
self.params[p].vary = True
@property
def num_vary(self) -> int:
return sum(x.vary for x in self.params.values())
@property
def bkgmethod(self):
return self._bkgmethod
@bkgmethod.setter
def bkgmethod(self, v):
self._bkgmethod = v
if "chebyshev" in v and hasattr(self, 'bkg_coef'):
degree = v["chebyshev"]
# In case the degree has changed, slice off any extra at the end,
# and in case it is less, pad with zeros.
if len(self.bkg_coef) > degree + 1:
self.bkg_coef = self.bkg_coef[:degree + 1]
elif len(self.bkg_coef) < degree + 1:
pad_width = (0, degree + 1 - len(self.bkg_coef))
self.bkg_coef = np.pad(self.bkg_coef, pad_width)
[docs] def chebyshevfit(self):
"""
03/08/2021 SS spectrum_expt is a list now. accounting
for that change
"""
self._background = []
self.bkg_coef = self.cheb_init_coef
for i, s in enumerate(self._spectrum_expt):
tth = self._tth_list[i]
self._background.append(Spectrum(x=tth, y=self.init_bkg(tth)))
# cubic spline fit of background using custom points chosen from plot
[docs] def splinefit(self, points):
"""
03/08/2021 SS adding tth as input. this is the
list of points for which background is estimated
"""
self._background = []
x = points[:, 0]
y = points[:, 1]
cs = CubicSpline(x, y)
for i, s in enumerate(self._spectrum_expt):
tth = self._tth_list[i]
bkg = cs(tth)
self._background.append(Spectrum(x=tth, y=bkg))
@property
def cheb_coef(self):
if "chebyshev" in self.bkgmethod:
return self.bkg_coef
else:
return None
@property
def cheb_polynomial(self):
return np.polynomial.Chebyshev(
self.cheb_coef,
domain=[self.tth_list[0], self.tth_list[-1]]
)
@property
def cheb_init_coef(self):
if "chebyshev" in self.bkgmethod:
return self.init_bkg.coef
else:
return None
[docs] def reset_background_params(self):
# Reset background parameters to their initial values
if "chebyshev" not in self.bkgmethod:
return
params = self.params
for i in range(len(self.bkg_coef)):
name = f'bkg_{i}'
if name in params:
params[name].value = self.bkg_coef[i]
def _update_bkg(self, params):
"""
update the background coefficients for
the chebyshev polynomials
"""
if "chebyshev" in self.bkgmethod:
coef = self.bkg_coef.copy()
for p in params:
if "bkg" in p:
indx = int(p.split("_")[1])
coef[indx] = params[p].value
self.bkg_coef = coef
else:
return
def _update_shkl(self, params):
"""
if certain shkls are refined, then update
them using the params arg. else use values from
the parameter class
"""
shkl_dict = {}
for p in self.phases:
for k in self.phases.wavelength:
phase = self._get_phase(p, k)
shkl_name = phase.valid_shkl
eq_const = phase.eq_constraints
mname = phase.name
key = [f"{mname}_{s}" for s in shkl_name]
for s, k in zip(shkl_name, key):
if k in params:
shkl_dict[s] = params[k].value
else:
shkl_dict[s] = self.params[k].value
phase.shkl = wppfsupport._fill_shkl(shkl_dict, eq_const)
[docs] def update_parameters(self):
for p in self.res.params:
par = self.res.params[p]
self.params[p].value = par.value
[docs] def calctth(self):
self.tth = {}
self.hkls = {}
self.dsp = {}
self.limit = {}
self.sf_hkl_factors = {}
self.sf_lfactor = {}
for p in self.phases:
self.tth[p] = {}
self.hkls[p] = {}
self.dsp[p] = {}
self.limit[p] = {}
self.sf_hkl_factors[p] = {}
self.sf_lfactor[p] = {}
for k, l in self.phases.wavelength.items():
phase = self._get_phase(p, k)
t = phase.getTTh(l[0].getVal('nm'))
sf_f, lfact_sf = phase.get_sf_hkl_factors()
allowed = phase.wavelength_allowed_hkls
t = t[allowed]
hkl = phase.hkls[allowed, :]
dsp = phase.dsp[allowed]
tth_min = min(self.tth_min)
tth_max = max(self.tth_max)
limit = np.logical_and(t >= tth_min, t <= tth_max)
self.limit[p][k] = limit
self.tth[p][k] = t[limit]
self.hkls[p][k] = hkl[limit, :]
self.dsp[p][k] = dsp[limit]
if sf_f is not None and lfact_sf is not None:
sf_f = sf_f[allowed]
lfact_sf = lfact_sf[allowed]
self.sf_hkl_factors[p][k] = sf_f[limit]
self.sf_lfactor[p][k] = lfact_sf[limit]
[docs] def calcRwp(self, params):
"""
>> @AUTHOR: Saransh Singh, Lawrence Livermore National Lab,
saransh1@llnl.gov
>> @DATE: 05/19/2020 SS 1.0 original
>> @DETAILS: this routine computes the weighted error between
calculated and experimental spectra. goodness of fit is
also calculated. the weights are the inverse squareroot of
the experimental intensities
"""
"""
the err variable is the difference between
simulated and experimental spectra
"""
self._set_params_vals_to_class(params, init=False, skip_phases=False)
self._update_shkl(params)
self._update_bkg(params)
errvec = self.computespectrum()
return errvec
@property
def spectrum_sim(self):
tth, inten = self._spectrum_sim.data
inten[self.global_mask] = np.nan
inten += self.background.y
return Spectrum(x=tth, y=inten)
@property
def spectrum_expt(self):
vector_list = [s.y for s in self._spectrum_expt]
spec_masked = join_regions(
vector_list, self.global_index, self.global_shape
)
return Spectrum(x=self._tth_list_global, y=spec_masked)
@spectrum_expt.setter
def spectrum_expt(self, expt_spectrum):
"""
>> @AUTHOR: Saransh Singh, Lawrence Livermore National Lab,
saransh1@llnl.gov
>> @DATE: 05/19/2020 SS 1.0 original
09/11/2020 SS 1.1 multiple data types accepted as input
09/14/2020 SS 1.2 background method chebyshev now has user
specified polynomial degree
03/03/2021 SS 1.3 moved the initialization to the property
definition of self.spectrum_expt
05/03/2021 SS 2.0 moved weight calculation and background
initialization to the property definition
03/05/2021 SS 2.1 adding support for masked array
np.ma.MaskedArray
03/08/2021 SS 3.0 spectrum_expt is now a list to deal with
the masked arrays
>> @DETAILS: load the experimental spectum of 2theta-intensity
"""
if expt_spectrum is not None:
if isinstance(expt_spectrum, Spectrum):
"""
directly passing the spectrum class
"""
self._spectrum_expt = [expt_spectrum]
# self._spectrum_expt.nan_to_zero()
self.global_index = [(0, expt_spectrum.shape[0])]
self.global_mask = np.zeros(
[
expt_spectrum.shape[0],
],
dtype=bool,
)
self._tth_list = [s._x for s in self._spectrum_expt]
self._tth_list_global = expt_spectrum._x
self.offset = False
elif isinstance(expt_spectrum, np.ndarray):
"""
initialize class using a nx2 array
"""
if np.ma.is_masked(expt_spectrum):
"""
@date 03/05/2021 SS 1.0 original
this is an instance of masked array where there are
nans in the spectrum. this will have to be handled with
a lot of care. steps are as follows:
1. if array is masked array, then check if any values are
masked or not.
2. if they are then the spectrum_expt is a list of
individual islands of the spectrum, each with its own
background
3. Every place where spectrum_expt is used, we will do a
type test to figure out the logic of the operations
"""
expt_spec_list, gidx = separate_regions(expt_spectrum)
self.global_index = gidx
self.global_shape = expt_spectrum.shape[0]
self.global_mask = expt_spectrum.mask[:, 1]
self._spectrum_expt = []
for s in expt_spec_list:
self._spectrum_expt.append(
Spectrum(
x=s[:, 0],
y=s[:, 1],
name="expt_spectrum"
)
)
else:
max_ang = expt_spectrum[-1, 0]
if max_ang < np.pi:
warnings.warn(
"angles are small and appear to \
be in radians. please check"
)
self._spectrum_expt = [
Spectrum(
x=expt_spectrum[:, 0],
y=expt_spectrum[:, 1],
name="expt_spectrum",
)
]
self.global_index = [
(0, self._spectrum_expt[0].x.shape[0])
]
self.global_shape = expt_spectrum.shape[0]
self.global_mask = np.zeros(
[
expt_spectrum.shape[0],
],
dtype=bool,
)
self._tth_list = [s._x for s in self._spectrum_expt]
self._tth_list_global = expt_spectrum[:, 0]
self.offset = False
elif isinstance(expt_spectrum, str):
"""
load from a text file
undefined behavior if text file has nans
"""
if path.exists(expt_spectrum):
self._spectrum_expt = [
Spectrum.from_file(expt_spectrum, skip_rows=0)
]
# self._spectrum_expt.nan_to_zero()
self.global_index = [
(0, self._spectrum_expt[0].x.shape[0])
]
self.global_shape = self._spectrum_expt[0].x.shape[0]
self.global_mask = np.zeros(
[
self.global_shape,
],
dtype=bool,
)
else:
raise FileNotFoundError(
"input spectrum file doesn't exist."
)
self._tth_list = [self._spectrum_expt[0]._x]
self._tth_list_global = self._spectrum_expt[0]._x
self.offset = False
"""
03/08/2021 SS tth_min and max are now lists
"""
self.tth_max = []
self.tth_min = []
self.ntth = []
for s in self._spectrum_expt:
self.tth_max.append(s.x.max())
self.tth_min.append(s.x.min())
self.ntth.append(s.x.shape[0])
"""
03/02/2021 SS added tth_step for some computations
related to snip background estimation
@TODO this will not work for masked spectrum
03/08/2021 tth_step is a list now
"""
self.tth_step = []
for tmi, tma, nth in zip(self.tth_min, self.tth_max, self.ntth):
if nth > 1:
self.tth_step.append((tma - tmi) / nth)
else:
self.tth_step.append(0.0)
"""
@date 03/03/2021 SS
there are cases when the intensity in the spectrum is
negative. our approach will be to offset the spectrum to make all
the values positive for the computation and then finally offset it
when the computation has finished.
03/08/2021 all quantities are lists now
"""
for s in self._spectrum_expt:
self.offset = []
self.offset_val = []
if np.any(s.y < 0.0):
self.offset.append(True)
self.offset_val.append(s.y.min())
s.y = s.y - s.y.min()
"""
@date 09/24/2020 SS
catching the cases when intensity is zero.
for these points the weights will become
infinite. therefore, these points will be
masked out and assigned a weight of zero.
In addition, if any points have negative
intensity, they will also be assigned a zero
weight
03/08/2021 SS everything is a list now
"""
self._weights = []
for s in self._spectrum_expt:
mask = s.y <= 0.0
ww = np.zeros(s.y.shape)
"""also initialize statistical weights
for the error calculation"""
ww[~mask] = 1.0 / s.y[~mask]
self._weights.append(ww)
self.initialize_bkg()
else:
raise RuntimeError("expt_spectrum setter: spectrum is None")
@property
def background(self):
if "chebyshev" in self.bkgmethod:
vector_list = [self.cheb_polynomial(t) for t in self._tth_list]
else:
vector_list = [s.y for s in self._background]
bkg_masked = join_regions(
vector_list, self.global_index, self.global_shape
)
return Spectrum(x=self.tth_list, y=bkg_masked)
@property
def weights(self):
weights_masked = join_regions(
self._weights, self.global_index, self.global_shape
)
return Spectrum(x=self.tth_list, y=weights_masked)
@property
def params(self):
return self._params
@params.setter
def params(self, param_info):
"""
>> @AUTHOR: Saransh Singh, Lawrence Livermore National Lab,
saransh1@llnl.gov
>> @DATE: 05/19/2020 SS 1.0 original
09/11/2020 SS 1.1 modified to accept multiple input types
03/05/2021 SS 2.0 moved everything to the property setter
>> @DETAILS: initialize parameter list from file. if no file given,
then initialize to some default values
(lattice constants are for CeO2)
"""
xn, wn = roots_legendre(16)
self.xn = xn[8:]
self.wn = wn[8:]
if param_info is not None:
if isinstance(param_info, lmfit.Parameters):
"""
directly passing the parameter class
"""
self._params = param_info
params = param_info
else:
params = lmfit.Parameters()
if isinstance(param_info, dict):
"""
initialize class using dictionary read from the yaml file
"""
for k, v in param_info.items():
params.add(
k,
value=float(v[0]),
min=float(v[1]),
max=float(v[2]),
vary=bool(v[3]),
)
elif isinstance(param_info, str):
"""
load from a yaml file
"""
if path.exists(param_info):
params.load(param_info)
else:
raise FileNotFoundError(
"input spectrum file doesn't exist."
)
"""
this part initializes the lattice parameters in the
"""
self._add_phase_params(params)
self._params = params
else:
"""
first three are cagliotti parameters
next two are the lorentz paramters
final is the zero instrumental peak position error
mixing factor calculated by Thomax, Cox, Hastings formula
"""
params = self.generate_default_parameters()
self._params = params
self._set_params_vals_to_class(params, init=True, skip_phases=True)
def _set_params_vals_to_class(self, params, init=False, skip_phases=False):
"""
@date: 03/12/2021 SS 1.0 original
@details: set the values from parameters to the WPPF class
"""
for p in params:
if init or hasattr(self, p):
setattr(self, p, params[p].value)
if not skip_phases:
self._set_phase_params_vals_to_class(params)
if self.amorphous_model is not None:
self._set_amorphous_params_vals_to_class(params)
def _set_amorphous_params_vals_to_class(self, params: lmfit.Parameters):
if self.amorphous_model is None:
# Can't do anything
return
scale = {}
shift = {}
center = {}
fwhm = {}
for key in self.amorphous_model.scale:
nn = f'{key}_amorphous_scale'
if nn in params:
scale[key] = params[nn].value
else:
scale[key] = self.amorphous_model.scale[key]
if self.amorphous_model.model_type == "experimental":
nn = f'{key}_amorphous_shift'
if nn in params:
shift[key] = params[nn].value
else:
shift[key] = self.amorphous_model.shift[key]
if self.amorphous_model.model_type in ("split_gaussian",
"split_pv"):
nn = f'{key}_amorphous_center'
if nn in params:
center[key] = params[nn].value
else:
center[key] = self.amorphous_model.center[key]
if self.amorphous_model.model_type == "split_gaussian":
nnl = f'{key}_amorphous_fwhm_l'
if nnl in params:
fwhm_l = params[nnl].value
else:
fwhm_l = self.amorphous_model.fwhm[key][0]
nnr = f'{key}_amorphous_fwhm_r'
if nnr in params:
fwhm_r = params[nnr].value
else:
fwhm_r = self.amorphous_model.fwhm[key][1]
fwhm[key] = np.array([fwhm_l,
fwhm_r
])
elif self.amorphous_model.model_type == "split_pv":
nnl = f'{key}_amorphous_fwhm_g_l'
if nnl in params:
fwhm_g_l = params[nnl].value
else:
fwhm_g_l = self.amorphous_model.fwhm[key][0]
nnl = f'{key}_amorphous_fwhm_l_l'
if nnl in params:
fwhm_l_l = params[nnl].value
else:
fwhm_l_l = self.amorphous_model.fwhm[key][1]
nnr = f'{key}_amorphous_fwhm_g_r'
if nnr in params:
fwhm_g_r = params[nnr].value
else:
fwhm_g_r = self.amorphous_model.fwhm[key][2]
nnr = f'{key}_amorphous_fwhm_l_r'
if nnr in params:
fwhm_l_r = params[nnr].value
else:
fwhm_l_r = self.amorphous_model.fwhm[key][3]
fwhm[key] = np.array([fwhm_g_l,
fwhm_l_l,
fwhm_g_r,
fwhm_l_r
])
self.amorphous_model.scale = scale
if self.amorphous_model.model_type == "experimental":
self.amorphous_model.shift = shift
elif self.amorphous_model.model_type in ["split_gaussian",
"split_pv"]:
self.amorphous_model.center = center
self.amorphous_model.fwhm = fwhm
[docs] def initialize_bkg(self):
"""
the cubic spline seems to be the ideal route in terms
of determining the background intensity. this involves
selecting a small (~5) number of points from the spectrum,
usually called the anchor points. a cubic spline interpolation
is performed on this subset to estimate the overall background.
scipy provides some useful routines for this
the other option implemented is the chebyshev polynomials. this
basically automates the background determination and removes the
user from the loop which is required for the spline type background.
"""
if self.bkgmethod is None:
self._background = []
for tth in self.tth_list:
self._background.append(Spectrum(x=tth, y=np.zeros(tth.shape)))
elif "spline" in self.bkgmethod.keys():
points = self.bkgmethod["spline"]
if not points:
raise RuntimeError(
"Background points must be set to use spline"
)
self.splinefit(np.squeeze(np.asarray(points)))
elif "chebyshev" in self.bkgmethod.keys():
self.chebyshevfit()
elif "file" in self.bkgmethod.keys():
if len(self._spectrum_expt) > 1:
raise RuntimeError(
"initialize_bkg: \
file input not allowed for \
masked spectra."
)
else:
bkg = Spectrum.from_file(self.bkgmethod["file"])
x = bkg.x
y = bkg.y
cs = CubicSpline(x, y)
yy = cs(self.tth_list)
self._background = [Spectrum(x=self.tth_list[0], y=yy)]
elif "array" in self.bkgmethod.keys():
if len(self._spectrum_expt) > 1:
raise RuntimeError(
"initialize_bkg: \
file input not allowed for \
masked spectra."
)
else:
x = self.bkgmethod["array"][:, 0]
y = self.bkgmethod["array"][:, 1]
cs = CubicSpline(x, y)
yy = cs(self._tth_list)
self._background = [Spectrum(x=self.tth_list, y=yy)]
elif "snip1d" in self.bkgmethod.keys():
self._background = []
for i, s in enumerate(self._spectrum_expt):
if not self.tth_step:
ww = 3
else:
if self.tth_step[i] > 0.0:
ww = np.rint(
self.bkgmethod["snip1d"][0] / self.tth_step[i]
).astype(np.int32)
else:
ww = 3
numiter = self.bkgmethod["snip1d"][1]
yy = np.squeeze(
snip1d_quad(np.atleast_2d(s.y), w=ww, numiter=numiter)
)
self._background.append(Spectrum(x=self._tth_list[i], y=yy))
@property
def init_bkg(self):
degree = self.bkgmethod["chebyshev"]
x = np.empty([0, ])
y = np.empty([0, ])
wts = np.empty([0, ])
for i, s in enumerate(self._spectrum_expt):
tth = self._tth_list[i]
wt = self._weights[i]
x = np.append(x, tth)
y = np.append(y, s._y)
wts = np.append(wts, wt)
p = np.polynomial.Chebyshev.fit(x, y, degree, w=wts)
return p
@property
def peakshape(self):
return self._peakshape
@peakshape.setter
def peakshape(self, val):
"""
@TODO make sure the parameter list
is updated when the peakshape changes
"""
if isinstance(val, str):
if val == "pvfcj":
self._peakshape = 0
elif val == "pvtch":
self._peakshape = 1
elif val == "pvpink":
self._peakshape = 2
else:
msg = (
"invalid peak shape string. "
"must be: \n"
"1. pvfcj: pseudo voight (Finger, Cox, Jephcoat)\n"
"2. pvtch: pseudo voight (Thompson, Cox, Hastings)\n"
"3. pvpink: Pink beam (Von Dreele)"
)
raise ValueError(msg)
elif isinstance(val, int):
if val >= 0 and val <= 2:
self._peakshape = val
else:
msg = (
"invalid peak shape int. "
"must be: \n"
"1. 0: pseudo voight (Finger, Cox, Jephcoat)\n"
"2. 1: pseudo voight (Thompson, Cox, Hastings)\n"
"3. 2: Pink beam (Von Dreele)"
)
raise ValueError(msg)
"""
update parameters
"""
if hasattr(self, "params"):
params = self.generate_default_parameters()
for p in params:
if p in self.params:
params[p] = self.params[p]
self._params = params
self._set_params_vals_to_class(params, init=True, skip_phases=True)
self.computespectrum()
@property
def computespectrum_fcn(self):
if self.peakshape == 0:
return computespectrum_pvfcj
elif self.peakshape == 1:
return computespectrum_pvtch
elif self.peakshape == 2:
return computespectrum_pvpink
@property
def calc_Iobs_fcn(self):
if self.peakshape == 0:
return calc_Iobs_pvfcj
elif self.peakshape == 1:
return calc_Iobs_pvtch
elif self.peakshape == 2:
return calc_Iobs_pvpink
@property
def tth_list(self):
if isinstance(self.spectrum_expt._x, np.ma.MaskedArray):
return self.spectrum_expt._x.filled()
else:
return self.spectrum_expt._x
@property
def total_area(self):
tth, intensity = self.spectrum_sim.data
_, background = self.background.data
total_intensity = intensity-background
'''put some guard rails around the total intensity
to protect against nans in the values
'''
mask = np.isnan(total_intensity)
sum_area = np.trapz(total_intensity[~mask], tth[~mask])
return sum_area
@property
def DOC(self):
if self.amorphous_model is None:
return 1.
amorphous_area = self.amorphous_model.integrated_area
return 1. - amorphous_area / self.total_area
def _nm(x):
return valWUnit("lp", "length", x, "nm")
[docs]class LeBail(AbstractWPPF):
"""
===========================================================================
>> @AUTHOR: Saransh Singh, Lawrence Livermore National Lab,
saransh1@llnl.gov
>> @DATE: 05/19/2020 SS 1.0 original
09/11/2020 SS 1.1 expt_spectrum, params and phases now have
multiple input option for easy integration with hexrdgui
09/14/2020 SS 1.2 bkgmethod is now a dictionary. if method is
'chebyshev', the the value specifies the degree of the polynomial
to use for background estimation
01/22/2021 SS 1.3 added intensity_init option to initialize
intensity with structure factors if the user so chooses
01/22/2021 SS 1.4 added option to specify background via a
filename or numpy array
03/12/2021 SS 1.5 added _generate_default_parameter function
>> @DETAILS: this is the main LeBail class and contains all the refinable
parameters for the analysis. Since the LeBail method has no
structural information during refinement, the refinable
parameters for this model will be:
1. a, b, c, alpha, beta, gamma : unit cell parameters
2. U, V, W : cagliotti paramaters
3. 2theta_0 : Instrumental zero shift error
4. eta1, eta2, eta3 : weight factor for gaussian vs lorentzian
@NOTE: All angles are always going to be in degrees
>> @PARAMETERS:
expt_spectrum: name of file or numpy array or Spectrum
class of experimental intensity
params: yaml file or dictionary or Parameter class
phases: yaml file or dictionary or Phases_Lebail class
wavelength: dictionary of wavelengths
bkgmethod: method to estimate background. either spline or chebyshev
fit or filename or numpy array
(last two options added 01/22/2021 SS)
Intensity_init: if set to none, then some power of 10 is used.
User has option to pass in dictionary of structure
factors. must ensure that the size of structure factor
matches the possible reflections (added 01/22/2021 SS)
============================================================================
"""
def __init__(
self,
expt_spectrum=None,
params=None,
phases=None,
wavelength={
"kalpha1": [_nm(0.15406), 1.0],
"kalpha2": [_nm(0.154443), 1.0]
},
bkgmethod={"spline": None},
intensity_init=None,
peakshape="pvfcj",
amorphous_model=None,
reset_background_params=True,
):
self.peakshape = peakshape
self.bkgmethod = bkgmethod
self.intensity_init = intensity_init
# self.initialize_expt_spectrum(expt_spectrum)
self.spectrum_expt = expt_spectrum
if wavelength is not None:
self.wavelength = wavelength
self._tstart = time.time()
self.phases = phases
self.amorphous_model = amorphous_model
self.params = params
self.initialize_Icalc()
self.computespectrum()
self._tstop = time.time()
self.tinit = self._tstop - self._tstart
self.niter = 0
self.Rwplist = np.empty([0])
self.gofFlist = np.empty([0])
if reset_background_params:
# Reset background parameters to their initial values
self.reset_background_params()
else:
self._update_bkg(self.params)
[docs] def generate_default_parameters(self) -> lmfit.Parameters:
return wppfsupport._generate_default_parameters_LeBail(
self.phases,
self.peakshape,
self.bkgmethod,
amorphous_model=self.amorphous_model,
)
def _add_phase_params(self, params: lmfit.Parameters):
for p in self.phases:
wppfsupport._add_lp_to_params(params, self.phases[p])
def _set_phase_params_vals_to_class(self, params: lmfit.Parameters):
updated_lp = False
for p in self.phases:
mat = self.phases[p]
"""
PART 1: update stacking fault and twin beta parameters
"""
if mat.sgnum == 225:
sf_alpha_name = f"{p}_sf_alpha"
twin_beta_name = f"{p}_twin_beta"
if params[sf_alpha_name].vary:
mat.sf_alpha = params[sf_alpha_name].value
if params[twin_beta_name].vary:
mat.twin_beta = params[twin_beta_name].value
"""
PART 2: update the lattice parameters
"""
lp = []
lpvary = False
pre = f"{p}_"
name = [f"{pre}{x}" for x in wppfsupport._lpname]
for nn in name:
if nn in params:
if params[nn].vary:
lpvary = True
lp.append(params[nn].value)
if lpvary:
lp = self.phases[p].Required_lp(lp)
mat.lparms = np.array(lp)
mat._calcrmt()
updated_lp = True
if updated_lp:
self.calctth()
def _get_phase(self, name: str, wavelength_type: str):
# LeBail just ignores the wavelength type for phases
return self.phases[name]
[docs] def initialize_Icalc(self):
"""
@DATE 01/22/2021 SS modified the function so Icalc can be initialized
with a dictionary of structure factors
"""
self.Icalc = {}
if self.intensity_init is None:
if self.spectrum_expt._y.max() > 0:
n10 = np.floor(np.log10(self.spectrum_expt._y.max())) - 1
else:
n10 = 0
for p in self.phases:
self.Icalc[p] = {}
for k in self.phases.wavelength.keys():
self.Icalc[p][k] = \
(10 ** n10) * np.ones(self.tth[p][k].shape)
elif isinstance(self.intensity_init, dict):
"""
first check if intensities for all phases are present in the
passed dictionary
"""
for p in self.phases:
if p not in self.intensity_init:
raise RuntimeError(
"LeBail: Intensity was initialized\
using custom values. However, initial values for one \
or more phases seem to be missing from the dictionary."
)
self.Icalc[p] = {}
"""
now check that the size of the initial intensities provided is
consistent with the number of reflections
(size of initial intensity > size of hkl is allowed.
the unused values are ignored.)
for this we need to step through the different wavelengths in
the spectrum and check each of them
"""
for k in self.phases.wavelength:
if k not in self.intensity_init[p]:
raise RuntimeError(
"LeBail: Intensity was initialized\
using custom values. However, initial values for one \
or more wavelengths in spectrum seem to be missing \
from the dictionary."
)
if (self.tth[p][k].shape[0]
<= self.intensity_init[p][k].shape[0]):
self.Icalc[p][k] = self.intensity_init[p][k][
0:self.tth[p][k].shape[0]
]
else:
raise RuntimeError(
"LeBail: Intensity_init must be either\
None or a dictionary"
)
[docs] def computespectrum(self):
"""
>> @AUTHOR: Saransh Singh, Lawrence Livermore National Lab,
saransh1@llnl.gov
>> @DATE: 06/08/2020 SS 1.0 original
>> @DETAILS: compute the simulated spectrum
"""
x = self.tth_list
y = np.zeros(x.shape)
tth_list = np.ascontiguousarray(self.tth_list)
for iph, p in enumerate(self.phases):
for k, l in self.phases.wavelength.items():
name = self.phases[p].name
lam = l[0].getVal("nm")
Ic = self.Icalc[p][k]
shft_c = np.cos(0.5 * np.radians(self.tth[p][k])) * self.shft
trns_c = np.sin(np.radians(self.tth[p][k])) * self.trns
if self.phases[p].sf_alpha is None:
sf_shift = 0.0
Xs = np.zeros(Ic.shape)
else:
alpha = getattr(self, f"{p}_sf_alpha")
beta = getattr(self, f"{p}_twin_beta")
sf_shift = alpha*np.tan(np.radians(0.5*self.tth[p][k])) *\
self.sf_hkl_factors[p][k]
Xs = np.degrees(0.9 * (1.5 * alpha + beta) * (
self.sf_lfactor[p][k] * lam / self.phases[p].lparms[0])
)
tth = self.tth[p][k] + self.zero_error + \
shft_c + trns_c + sf_shift
dsp = self.dsp[p][k]
hkls = self.hkls[p][k]
# n = np.min((tth.shape[0], Ic.shape[0]))
shkl = self.phases[p].shkl
eta_fwhm = getattr(self, f"{name}_eta_fwhm")
X = getattr(self, f"{name}_X")
Y = getattr(self, f"{name}_Y")
P = getattr(self, f"{name}_P")
XY = np.array([X, Y])
if self.peakshape == 0:
args = (
np.array([self.U, self.V, self.W]),
P,
XY,
Xs,
shkl,
eta_fwhm,
self.HL,
self.SL,
tth,
dsp,
hkls,
tth_list,
Ic,
self.xn,
self.wn,
)
elif self.peakshape == 1:
args = (
np.array([self.U, self.V, self.W]),
P,
XY,
Xs,
shkl,
eta_fwhm,
tth,
dsp,
hkls,
tth_list,
Ic,
)
elif self.peakshape == 2:
args = (
np.array([self.alpha0, self.alpha1]),
np.array([self.beta0, self.beta1]),
np.array([self.U, self.V, self.W]),
P,
XY,
Xs,
shkl,
eta_fwhm,
tth,
dsp,
hkls,
tth_list,
Ic,
)
y += self.computespectrum_fcn(*args)
if self.amorphous_model is not None:
y += self.amorphous_model.amorphous_lineout
self._spectrum_sim = Spectrum(x=x, y=y)
errvec, self.Rwp, self.gofF = calc_rwp(
self.spectrum_sim.data_array,
self.spectrum_expt.data_array,
self.weights.data_array,
self.num_vary,
)
return errvec
[docs] def CalcIobs(self):
"""
>> @AUTHOR: Saransh Singh, Lawrence Livermore National Lab,
saransh1@llnl.gov
>> @DATE: 06/08/2020 SS 1.0 original
>> @DETAILS: this is one of the main functions to partition the expt
intensities to overlapping peaks in the calculated pattern
"""
self.Iobs = {}
spec_expt = self.spectrum_expt.data_array
spec_sim = self.spectrum_sim.data_array
tth_list = np.ascontiguousarray(self.tth_list)
for iph, p in enumerate(self.phases):
self.Iobs[p] = {}
for k, l in self.phases.wavelength.items():
name = self.phases[p].name
Ic = self.Icalc[p][k]
lam = l[0].getVal("nm")
shft_c = np.cos(0.5 * np.radians(self.tth[p][k])) * self.shft
trns_c = np.sin(np.radians(self.tth[p][k])) * self.trns
sf_shift = 0.0
Xs = np.zeros(Ic.shape)
if self.phases[p].sf_alpha is not None:
alpha = getattr(self, f"{p}_sf_alpha")
beta = getattr(self, f"{p}_twin_beta")
sf_shift = alpha*np.tan(np.radians(0.5*self.tth[p][k])) *\
self.sf_hkl_factors[p][k]
Xs = np.degrees(0.9 * (1.5 * alpha + beta) * (
self.sf_lfactor[p][k] * lam / self.phases[p].lparms[0]
))
tth = self.tth[p][k] + self.zero_error + \
shft_c + trns_c + sf_shift
dsp = self.dsp[p][k]
hkls = self.hkls[p][k]
# n = np.min((tth.shape[0], Ic.shape[0])) # !!! not used
shkl = self.phases[p].shkl
eta_fwhm = getattr(self, f"{name}_eta_fwhm")
X = getattr(self, f"{name}_X")
Y = getattr(self, f"{name}_Y")
P = getattr(self, f"{name}_P")
XY = np.array([X, Y])
if self.peakshape == 0:
args = (
np.array([self.U, self.V, self.W]),
P,
XY,
Xs,
shkl,
eta_fwhm,
self.HL,
self.SL,
self.xn,
self.wn,
tth,
dsp,
hkls,
tth_list,
Ic,
spec_expt,
spec_sim,
)
elif self.peakshape == 1:
args = (
np.array([self.U, self.V, self.W]),
P,
XY,
Xs,
shkl,
eta_fwhm,
tth,
dsp,
hkls,
tth_list,
Ic,
spec_expt,
spec_sim,
)
elif self.peakshape == 2:
args = (
np.array([self.alpha0, self.alpha1]),
np.array([self.beta0, self.beta1]),
np.array([self.U, self.V, self.W]),
P,
XY,
Xs,
shkl,
eta_fwhm,
tth,
dsp,
hkls,
tth_list,
Ic,
spec_expt,
spec_sim,
)
self.Iobs[p][k] = self.calc_Iobs_fcn(*args)
[docs] def RefineCycle(self, print_to_screen=True):
"""
>> @AUTHOR: Saransh Singh, Lawrence Livermore National Lab,
saransh1@llnl.gov
>> @DATE: 06/08/2020 SS 1.0 original
01/28/2021 SS 1.1 added optional print_to_screen argument
>> @DETAILS: this is one refinement cycle for the least squares,
typically few 10s to 100s of cycles may be required for
convergence
"""
self.CalcIobs()
self.Icalc = self.Iobs
self.res = self.Refine()
if self.res is not None:
self.update_parameters()
self.niter += 1
self.Rwplist = np.append(self.Rwplist, self.Rwp)
self.gofFlist = np.append(self.gofFlist, self.gofF)
if print_to_screen:
msg = (f"Finished iteration. Rwp: "
f"{self.Rwp*100.0:.2f} % and chi^2: {self.gofF:.2f}")
print(msg)
[docs] def Refine(self):
"""
>> @AUTHOR: Saransh Singh, Lawrence Livermore National Lab,
saransh1@llnl.gov
>> @DATE: 05/19/2020 SS 1.0 original
>> @DETAILS: this routine performs the least squares refinement for all
variables which are allowed to be varied.
"""
if self.num_vary > 0:
fdict = {
"ftol": 1e-6,
"xtol": 1e-6,
"gtol": 1e-6,
"verbose": 0,
"max_nfev": 1000,
"method": "trf",
"jac": "2-point",
}
fitter = lmfit.Minimizer(self.calcRwp, self.params)
res = fitter.least_squares(**fdict)
return res
else:
msg = "nothing to refine. updating intensities"
print(msg)
self.computespectrum()
return getattr(self, 'res', None)
@property
def phases(self):
return self._phases
@phases.setter
def phases(self, phase_info):
"""
>> @AUTHOR: Saransh Singh, Lawrence Livermore National Lab,
saransh1@llnl.gov
>> @DATE: 06/08/2020 SS 1.0 original
09/11/2020 SS 1.1 multiple different ways to initialize
phases
09/14/2020 SS 1.2 added phase initialization from
material.Material class
03/05/2021 SS 2.0 moved everything to property setter
>> @DETAILS: load the phases for the LeBail fits
"""
if phase_info is None:
self._on_phases_changed()
return
if isinstance(phase_info, Phases_LeBail):
"""
directly passing the phase class
"""
self._phases = phase_info
self._on_phases_changed()
return
p = Phases_LeBail(wavelength=getattr(self, "wavelength", None))
if isinstance(phase_info, dict):
"""
initialize class using a dictionary with key as
material file and values as the name of each phase
"""
for material_file, material_names in phase_info.items():
if not isinstance(material_names, list):
material_names = [material_names]
p.add_many(material_file, material_names)
elif isinstance(phase_info, str):
"""
load from a yaml file
"""
p.load(phase_info)
else:
mat_list = []
if isinstance(phase_info, Material):
mat_list = [phase_info]
elif isinstance(phase_info, list):
mat_list = phase_info
for mat in mat_list:
p[mat.name] = Material_LeBail(
fhdf=None, xtal=None,
dmin=None, material_obj=mat
)
p.reset_phase_fractions()
self._phases = p
self._on_phases_changed()
def _on_phases_changed(self):
self.calctth()
for p in self.phases:
(
self.phases[p].valid_shkl,
self.phases[p].eq_constraints,
self.phases[p].rqd_index,
self.phases[p].trig_ptype,
) = wppfsupport._required_shkl_names(self.phases[p])
[docs]class Rietveld(AbstractWPPF):
"""
===========================================================================
>> @AUTHOR: Saransh Singh, Lawrence Livermore National Lab,
saransh1@llnl.gov
>> @DATE: 01/08/2020 SS 1.0 original
07/13/2020 SS 2.0 complete rewrite to include new
parameter/material/pattern class
02/01/2021 SS 2.1 peak shapes from Thompson,Cox and Hastings
formula using X anf Y parameters for the lorentzian peak
widths
>> @DETAILS: this is the main rietveld class and contains all the refinable
parameters for the analysis. the member classes are as follows
(in order of initialization):
1. Spectrum contains the experimental spectrum
2. Background contains the background extracted from
spectrum
3. Refine contains all the machinery for refinement
============================================================================
"""
def __init__(
self,
expt_spectrum=None,
params=None,
phases=None,
wavelength={"kalpha1": [_nm(0.15406), 1.0],
"kalpha2": [_nm(0.154443), 0.52]},
bkgmethod={"spline": None},
peakshape="pvfcj",
shape_factor=1.0,
particle_size=1.0,
phi=0.0,
amorphous_model=None,
reset_background_params=True,
texture_model=None,
eta_min=-180,
eta_max=180,
eta_step=5.,
):
self.bkgmethod = bkgmethod
self.shape_factor = shape_factor
self.particle_size = particle_size
self.phi = phi
self.peakshape = peakshape
self.spectrum_expt = expt_spectrum
self.amorphous_model = amorphous_model
# extent of 2D data in the azimuth
# important for texture analysis
self.eta_min = eta_min
self.eta_max = eta_max
self.eta_step = eta_step
self._tstart = time.time()
if wavelength is not None:
self.wavelength = wavelength
for k, v in self.wavelength.items():
v[0] = valWUnit("lp", "length", v[0].getVal("nm"), "nm")
self.phases = phases
self.texture_model = texture_model
self.params = params
self.PolarizationFactor()
self.computespectrum()
self._tstop = time.time()
self.tinit = self._tstop - self._tstart
self.niter = 0
self.Rwplist = np.empty([0])
self.gofFlist = np.empty([0])
if reset_background_params:
# Reset background parameters to their initial values
self.reset_background_params()
else:
self._update_bkg(self.params)
[docs] def generate_default_parameters(self) -> lmfit.Parameters:
return wppfsupport._generate_default_parameters_Rietveld(
self.phases,
self.peakshape,
self.bkgmethod,
init_val=self.cheb_init_coef,
amorphous_model=self.amorphous_model,
texture_model=self.texture_model,
)
def _add_phase_params(self, params: lmfit.Parameters):
for p in self.phases:
for lpi in self.phases[p]:
wppfsupport._add_atominfo_to_params(
params, self.phases[p][lpi]
)
def _set_phase_params_vals_to_class(self, params: lmfit.Parameters):
# These indicate that *any* materials updated their lattice
# parameters or atom info.
updated_lp = False
updated_atominfo = False
pf = np.zeros([self.phases.num_phases, ])
for ii, p in enumerate(self.phases):
name = f"{p}_phase_fraction"
pf[ii] = params[name].value
for lpi in self.phases[p]:
mat = self.phases[p][lpi]
# This indicate *this* material updated its lattice parameter
# or atom info.
lpvary = False
atominfo_vary = False
"""
PART 1: update stacking fault and twin beta parameters
"""
if mat.sgnum == 225:
sf_alpha_name = f"{p}_sf_alpha"
twin_beta_name = f"{p}_twin_beta"
if params[sf_alpha_name].vary:
mat.sf_alpha = params[sf_alpha_name].value
if params[twin_beta_name].vary:
mat.twin_beta = params[twin_beta_name].value
"""
PART 2: update the lattice parameters
"""
lp = []
pre = f"{p}_"
name = [f"{pre}{x}" for x in wppfsupport._lpname]
for nn in name:
if nn in params:
if params[nn].vary:
lpvary = True
lp.append(params[nn].value)
if lpvary:
lp = mat.Required_lp(lp)
mat.lparms = np.array(lp)
mat._calcrmt()
"""
PART 3: update the atom info
"""
atom_type = mat.atom_type
atom_label = wppfsupport._getnumber(atom_type)
for i in range(atom_type.shape[0]):
Z = atom_type[i]
elem = constants.ptableinverse[Z]
nx = f"{p}_{elem}{atom_label[i]}_x"
ny = f"{p}_{elem}{atom_label[i]}_y"
nz = f"{p}_{elem}{atom_label[i]}_z"
oc = f"{p}_{elem}{atom_label[i]}_occ"
if mat.aniU:
Un = []
for j in range(6):
Un.append(
(
f"{p}_{elem}"
f"{atom_label[i]}"
f"_{wppfsupport._nameU[j]}"
)
)
else:
dw = f"{p}_{elem}{atom_label[i]}_dw"
atominfo_vary = any(
params[name].vary for name in (nx, ny, nz, oc)
)
x = params[nx].value
y = params[ny].value
z = params[nz].value
oc = params[oc].value
if mat.aniU:
U = []
for j in range(6):
param = params[Un[j]]
if param.vary:
atominfo_vary = True
U.append(param.value)
mat.U[i, :] = np.array(U)
else:
if params[dw].vary:
atominfo_vary = True
mat.U[i] = params[dw].value
mat.atom_pos[i, :] = np.array([x, y, z, oc])
if mat.aniU:
mat.calcBetaij()
if lpvary:
updated_lp = True
if atominfo_vary:
updated_atominfo = True
if updated_lp:
self.calctth()
if updated_lp or updated_atominfo:
self.calcsf()
self.phases.phase_fraction = pf / np.sum(pf)
def _get_phase(self, name: str, wavelength_type: str):
# Rietveld uses the wavelength type
return self.phases[name][wavelength_type]
[docs] def calcsf(self):
self.sf = {}
self.sf_raw = {}
self.extinction = {}
self.absorption = {}
for p in self.phases:
self.sf[p] = {}
self.sf_raw[p] = {}
self.extinction[p] = {}
self.absorption[p] = {}
for k, l in self.phases.wavelength.items():
w = l[0].getVal("nm")
w_int = l[1]
tth = self.tth[p][k]
# allowed = self.phases[p][k].wavelength_allowed_hkls
# limit = self.limit[p][k]
self.sf[p][k], self.sf_raw[p][k] = \
self.phases[p][k].CalcXRSF(w, w_int)
self.extinction[p][k] = self.phases[p][k].calc_extinction(
10.0 * w,
tth,
self.sf_raw[p][k],
self.shape_factor,
self.particle_size,
)
self.absorption[p][k] = self.phases[p][k].calc_absorption(
tth, self.phi, 10.0 * w
)
[docs] def PolarizationFactor(self):
# tth = self.tth
self.LP = {}
Ph = self.Ph
for p in self.phases:
self.LP[p] = {}
for k, l in self.phases.wavelength.items():
t = np.radians(self.tth[p][k])
self.LP[p][k] = (
(1 + Ph * np.cos(t) ** 2)
/ np.cos(0.5 * t)
/ np.sin(0.5 * t) ** 2
# / (2.0 * (1 + Ph))
)
[docs] def compute_intensities(self):
'''this function computes the intensities of the
xray diffraction excluding any texture. this function
will replace part of the code in the computespectrum
function
'''
Ic = {}
for iph, p in enumerate(self.phases):
Ic[p] = {}
for k, l in self.phases.wavelength.items():
tth = self.tth[p][k]
pf = self.phases[p][k].pf / self.phases[p][k].vol ** 2
sf = self.sf[p][k]
lp = self.LP[p][k]
extinction = self.extinction[p][k]
absorption = self.absorption[p][k]
n = np.min((tth.shape[0], sf.shape[0], lp.shape[0]))
tth = tth[:n]
sf = sf[:n]
lp = lp[:n]
extinction = extinction[:n]
absorption = absorption[:n]
Ic[p][k] = self.scale * pf * sf * lp # *extinction*absorption
return Ic
[docs] def compute_tth_after_shifts(self,
p,
k):
'''another helper function to be used by both
Rietveld.computspectrum and Rietveld.computespectrum_2d
Parameters
----------
p: str
name of the phase
k: str
wavelength key
'''
name = self.phases[p][k].name
lam = self.phases.wavelength[k][0].getVal("nm")
shft_c = np.cos(0.5 * np.radians(self.tth[p][k])) * self.shft
trns_c = np.sin(np.radians(self.tth[p][k])) * self.trns
if self.phases[p][k].sf_alpha is None:
sf_shift = 0.0
Xs = np.zeros(self.tth[p][k].shape)
else:
alpha = getattr(self, f"{p}_sf_alpha")
beta = getattr(self, f"{p}_twin_beta")
sf_shift = alpha*np.tan(np.radians(0.5*self.tth[p][k])) *\
self.sf_hkl_factors[p][k]
Xs = np.degrees(0.9*(1.5*alpha+beta)*(
self.sf_lfactor[p][k] *
lam/self.phases[p][k].lparms[0]))
tth = self.tth[p][k] + self.zero_error + \
shft_c + trns_c + sf_shift
return tth, Xs
[docs] def computespectrum_phase(self,
p,
k,
Ic,
texture_factor=None):
'''this is a helper function so which is use by both the
Rietveld.computspectrum and Rietveld.computespectrum_2d
function to avoid code repetition.
Parameters
----------
p: str
name of the phase
k: str
wavelength key
texture_factor: numpy.ndarray
azimuthally averaged texture factor
'''
tth_list = np.ascontiguousarray(self.tth_list)
name = self.phases[p][k].name
tth, Xs = self.compute_tth_after_shifts(p, k)
Icmod = Ic.copy()
if not texture_factor is None:
n = np.min((tth.shape[0], Ic.shape[0], texture_factor.shape[0]))
tth = tth[:n]
Icmod = Icmod[:n] * texture_factor[:n] # *extinction*absorption
dsp = self.dsp[p][k]
hkls = self.hkls[p][k]
shkl = self.phases[p][k].shkl
eta_fwhm = getattr(self, f"{name}_eta_fwhm")
X = getattr(self, f"{name}_X")
Y = getattr(self, f"{name}_Y")
P = getattr(self, f"{name}_P")
XY = np.array([X, Y])
if self.peakshape == 0:
args = (
np.array([self.U, self.V, self.W]),
P,
XY,
Xs,
shkl,
eta_fwhm,
self.HL,
self.SL,
tth,
dsp,
hkls,
tth_list,
Icmod,
self.xn,
self.wn,
)
elif self.peakshape == 1:
args = (
np.array([self.U, self.V, self.W]),
P,
XY,
Xs,
shkl,
eta_fwhm,
tth,
dsp,
hkls,
tth_list,
Icmod,
)
elif self.peakshape == 2:
args = (
np.array([self.alpha0, self.alpha1]),
np.array([self.beta0, self.beta1]),
np.array([self.U, self.V, self.W]),
P,
XY,
Xs,
shkl,
eta_fwhm,
tth,
dsp,
hkls,
tth_list,
Icmod,
)
return self.computespectrum_fcn(*args)
[docs] def computespectrum(self):
"""
>> @AUTHOR: Saransh Singh, Lawrence Livermore National Lab
>> @EMAIL: saransh1@llnl.gov
>> @DATE: 06/08/2020 SS 1.0 original
>> @DETAILS: compute the simulated spectrum
"""
x = self.tth_list
y = np.zeros(x.shape)
Icomputed = self.compute_intensities()
for iph, p in enumerate(self.phases):
for k, l in self.phases.wavelength.items():
if self.texture_model[p] is None:
texture_factor = None
else:
texture_factor = self.texture_model[p].calc_texture_factor(
self.params,
eta_min=self.eta_min,
eta_max=self.eta_max)
y += self.computespectrum_phase(p,
k,
Icomputed[p][k],
texture_factor=texture_factor)
if self.amorphous_model is not None:
y += self.amorphous_model.amorphous_lineout
self._spectrum_sim = Spectrum(x=x, y=y)
errvec, self.Rwp, self.gofF = calc_rwp(
self.spectrum_sim.data_array,
self.spectrum_expt.data_array,
self.weights.data_array,
self.num_vary,
)
return errvec
[docs] def computespectrum_2D(self):
'''this function computes the 2D pattern for the
Rietevld model. if there is no texture, the pattern
is uniform in the azimuthal direction. if there is
a texture model present, then the azimuthal intensities
are modulated based on the texture model.
Parameters
-----------
eta_min: float
minimum azimuthal angle
eta_max: float
maximum azimuthal angle
eta_step: float
step size in azimuth. this determines the
dimensions in the azimuthal direction
Returns
--------
simulated_2d: np.ndarray
simulated 2D diffraction pattern
'''
x = self.tth_list
y = np.zeros(x.shape)
azimuth = np.arange(self.eta_min,
self.eta_max,
self.eta_step)
nazimuth = azimuth.shape[0]
Icomputed = self.compute_intensities()
if self.texture_model is None:
for iph, p in enumerate(self.phases):
for k, l in self.phases.wavelength.items():
y += self.computespectrum_phase(p,
k,
Icomputed[p][k],
texture_factor=None)
y[self.global_mask] = np.nan
y += self.background.y
if self.amorphous_model is not None:
y += self.amorphous_model.amorphous_lineout
self.simulated_2d = np.tile(y, (nazimuth-1, 1))
return
else:
'''get pole figure intensities around the azimuth
'''
self.simulated_2d = np.empty([nazimuth-1, x.shape[0]])
azimuth_texture_factor = {}
for iph, p in enumerate(self.phases):
if p in self.texture_model:
self.texture_model[p].calc_pf_rings(
self.params,
eta_min=self.eta_min,
eta_max=self.eta_max,
eta_step=self.eta_step,
calc_type='spectrum_2d')
azimuth_texture_factor[p] = self.texture_model[p].intensities_rings_2d
else:
azimuth_texture_factor[p] = None
for irow in range(1, nazimuth):
y = np.zeros(x.shape)
for iph, p in enumerate(self.phases):
for k, l in self.phases.wavelength.items():
hkls = self.hkls[p][k]
texture_factor = None
if not azimuth_texture_factor[p] is None:
texture_factor = np.empty([hkls.shape[0], ])
for ii,h in enumerate(hkls):
hkey = tuple(h)
texture_factor[ii] = azimuth_texture_factor[p][hkey][irow]
y += self.computespectrum_phase(p,
k,
Icomputed[p][k],
texture_factor=texture_factor)
y[self.global_mask] = np.nan
y += self.background.y
if self.amorphous_model is not None:
y += self.amorphous_model.amorphous_lineout
self.simulated_2d[irow-1, :] = y
[docs] def Refine(self):
"""
>> @AUTHOR: Saransh Singh, Lawrence Livermore National Lab,
saransh1@llnl.gov
>> @DATE: 05/19/2020 SS 1.0 original
>> @DETAILS: this routine performs the least squares refinement for all
variables that are allowed to be varied.
"""
if self.num_vary > 0:
fdict = {
"ftol": 1e-6,
"xtol": 1e-6,
"gtol": 1e-6,
"verbose": 0,
"max_nfev": 1000,
"method": "trf",
"jac": "2-point",
}
fitter = lmfit.Minimizer(self.calcRwp, self.params)
self.res = fitter.least_squares(**fdict)
self.update_parameters()
self.niter += 1
self.Rwplist = np.append(self.Rwplist, self.Rwp)
self.gofFlist = np.append(self.gofFlist, self.gofF)
msg = (f"Finished iteration. Rwp: "
f"{self.Rwp*100.0:.2f} % and chi^2: {self.gofF:.2f}")
print(msg)
else:
print("Nothing to refine...")
[docs] def RefineTexture(self):
for name, model in self.texture_model.items():
if model is None:
continue
print(f'Refining texture parameters for "{name}"')
results = model.calculate_harmonic_coefficients(self.params)
# Set the results to the final one
self.res = results
self.computespectrum()
self.niter += 1
self.Rwplist = np.append(self.Rwplist, self.Rwp)
self.gofFlist = np.append(self.gofFlist, self.gofF)
msg = (f"Finished iteration. Rwp: "
f"{self.Rwp*100.0:.2f} % and chi^2: {self.gofF:.2f}")
print(msg)
[docs] def texture_parameters_vary(self,
vary=False):
'''helper function to turn texture related
parameters on or off
'''
for phase_name in self.phases:
prefix = f'{phase_name}_c_'
for p in self.params:
if p.startswith(prefix):
self.params[p].vary = vary
@property
def any_texture_params_varied(self):
for phase_name in self.phases:
prefix = f'{phase_name}_c_'
for param in self.params.values():
if param.name.startswith(prefix) and param.vary:
return True
return False
@property
def texture_models_have_pfdata(self):
for model in self.texture_model.values():
if not model.pfdata:
return False
return True
@property
def phases(self):
return self._phases
@phases.setter
def phases(self, phase_info):
"""
>> @AUTHOR: Saransh Singh, Lawrence Livermore National Lab,
saransh1@llnl.gov
>> @DATE: 06/08/2020 SS 1.0 original
02/01/2021 SS 2.0 modified to follow same input style as
LeBail class with inputs of Material class, dict, list
or filename valid
>> @DETAILS: load the phases for the LeBail fits
"""
if phase_info is None:
self._on_phases_changed()
return
if isinstance(phase_info, Phases_Rietveld):
"""
directly passing the phase class
"""
self._phases = phase_info
self._on_phases_changed()
return
p = Phases_Rietveld(wavelength=getattr(self, "wavelength", None))
if isinstance(phase_info, dict):
"""
initialize class using a dictionary with key as
material file and values as the name of each phase
"""
for material_file, material_names in phase_info.items():
if not isinstance(material_names, list):
material_names = [material_names]
p.add_many(material_file, material_names)
elif isinstance(phase_info, str):
"""
load from a yaml file
"""
p.load(phase_info)
else:
mat_list = []
if isinstance(phase_info, Material):
mat_list = [phase_info]
elif isinstance(phase_info, list):
mat_list = phase_info
for mat in mat_list:
p[mat.name] = {}
for k, v in self.wavelength.items():
E = (
1.0e6
* constants.cPlanck
* constants.cLight
/ constants.cCharge
/ v[0].getVal('nm')
)
mat.beamEnergy = valWUnit(
"kev", "ENERGY", E, "keV"
)
p[mat.name][k] = Material_Rietveld(
fhdf=None, xtal=None,
dmin=None, material_obj=mat
)
p.reset_phase_fractions()
self._phases = p
self._on_phases_changed()
def _on_phases_changed(self):
self.calctth()
self.calcsf()
for p in self.phases:
for k in self.phases[p]:
(
self._phases[p][k].valid_shkl,
self._phases[p][k].eq_constraints,
self._phases[p][k].rqd_index,
self._phases[p][k].trig_ptype,
) = wppfsupport._required_shkl_names(self._phases[p][k])
@property
def texture_model(self):
return self._texture_model
@texture_model.setter
def texture_model(self, valdict):
'''only dictionary key value pairs are acceptable.
key should match name of the phase. if a certain
phase is not present, texture model for that phase
is set to None
'''
if valdict is None:
valdict = {}
if not isinstance(valdict, dict):
msg = (
'only dictionary input allowed '
'where key are name of phase and '
'value is the harmonic_model instance'
)
raise ValueError(msg)
self._texture_model = valdict
for phase_name in self.phases:
if phase_name not in valdict:
self._texture_model[phase_name] = None
@property
def texture_index(self):
res = {}
for p in self.phases:
if self.texture_model[p] is not None:
res[p] = self.texture_model[p].J(self.params)
else:
res[p] = 1.
return res
@property
def eta_min(self):
return self._eta_min
@eta_min.setter
def eta_min(self, val):
self._eta_min = np.radians(val)
@property
def eta_max(self):
return self._eta_max
@eta_max.setter
def eta_max(self, val):
self._eta_max = np.radians(val)
@property
def eta_step(self):
return self._eta_step
@eta_step.setter
def eta_step(self, val):
self._eta_step = np.radians(val)
[docs] def compute_texture_data(self,
pv_binned: np.ndarray,
bvec: np.ndarray | None = None,
evec: np.ndarray | None = None,
azimuthal_interval: float = 5):
"""Compute texture data to use in texture refinement
Using the current parameters on the Rietveld object, fit the peaks
to LeBail models in order to determine their intensities, and then
compute a dictionary of texture contributions. These texture
contributions are automatically set on the `pfdata` of the texture
models.
A simulated spectrum is returned.
"""
# Use only the first lambda key, since it doesn't matter for LeBail
lambda_key = next(iter(self.wavelength))
mats = [v[lambda_key] for v in self.phases.phase_dict.values()]
# Create LeBail params to use for intensity-finding. When possible,
# we'll use the same params currently on the Rietveld object.
bkg_method = {'chebyshev': 1}
params = wppfsupport._generate_default_parameters_LeBail(
mats, 1, bkg_method)
for p in params:
params[p].value = self.params[p].value
# Ensure these are marked as `Vary`
params['U'].vary = True
params['V'].vary = True
params['W'].vary = True
# Allow lattice constants to vary as well
for mat in mats:
lp_names = [f"{mat.name}_{x}" for x in wppfsupport._lpname]
for name in lp_names:
if name in params:
params[name].vary = True
# Compute the current intensities from Rietveld
ints_computed = self.compute_intensities()
# Extract the intensities from the data
mask = np.isnan(pv_binned)
results = extract_intensities(**{
'polar_view': np.ma.masked_array(pv_binned, mask=mask),
'tth_array': self.tth_list,
'params': params,
'phases': mats,
'wavelength': self.wavelength,
'bkgmethod': bkg_method,
'intensity_init': ints_computed,
'termination_condition': {
"rwp_perct_change": 0.05,
"max_iter": 10,
},
'peakshape': "pvtch",
})
# we have to divide by the computed instensites
# to get the texture contribution
for mat_key, model in self.texture_model.items():
if model is None:
continue
pfdata = {}
for ii in range(pv_binned.shape[0]):
eta = self.eta_min + (ii + 1) * np.radians(azimuthal_interval)
t = results[2][ii][mat_key][lambda_key]
hkl = results[1][ii][mat_key][lambda_key]
ints = results[0][ii][mat_key][lambda_key]
ints_comp = ints_computed[mat_key][lambda_key]
nn = np.min((t.shape[0], ints_comp.shape[0]))
for jj in range(nn):
angs = np.atleast_2d([np.radians(t[jj]), eta, 0])
v = angles_to_gvec(angs, beam_vec=bvec, eta_vec=evec)
data = np.hstack((
v,
np.atleast_2d(ints[jj] / ints_comp[jj]),
))
hkey = tuple(hkl[jj, :])
if hkey in pfdata:
# Stack with previous data
data = np.vstack((pfdata[hkey], data))
pfdata[hkey] = data
# Now set the texture data on the texture model
model.pfdata = pfdata
return results[4]
[docs]def separate_regions(masked_spec_array):
"""
utility function for separating array into separate
islands as dictated by mask. this function was taken from
stackoverflow
https://stackoverflow.com/questions/43385877/
efficient-numpy-subarrays-extraction-from-a-mask
"""
array = masked_spec_array.data
mask = ~masked_spec_array.mask[:, 1]
m0 = np.concatenate(([False], mask, [False]))
idx = np.flatnonzero(m0[1:] != m0[:-1])
gidx = [(idx[i], idx[i + 1]) for i in range(0, len(idx), 2)]
return [array[idx[i]: idx[i + 1], :] for i in range(0, len(idx), 2)], gidx
[docs]def join_regions(vector_list, global_index, global_shape):
"""
@author Saransh Singh Lawrence Livermore National Lab
@date 03/08/2021 SS 1.0 original
@details utility function for joining different pieces of masked array
into one masked array
"""
out_vector = np.empty(
[
global_shape,
]
)
out_vector[:] = np.nan
for s, ids in zip(vector_list, global_index):
out_vector[ids[0]: ids[1]] = s
# out_array = np.ma.masked_array(out_array, mask = np.isnan(out_array))
return out_vector
peakshape_dict = {
"pvfcj": "pseudo-voight (finger, cox, jephcoat)",
"pvtch": "pseudo-voight (thompson, cox, hastings)",
"pvpink": "pseudo-voight (von dreele)",
}