import numpy as np
import h5py
from os import path
[docs]class Spectrum:
"""
==================================================================================
==================================================================================
>> @AUTHOR: Saransh Singh, Lawrence Livermore National Lab, saransh1@llnl.gov
>> @DATE: 05/18/2020 SS 1.0 original
>> @DETAILS: spectrum class holds the a pair of x,y data, in this case, would be
2theta-intensity values
==================================================================================
==================================================================================
"""
def __init__(self, x=None, y=None, name=''):
if x is None:
self._x = np.linspace(10., 100., 500)
else:
self._x = x
if y is None:
self._y = np.log(self._x ** 2) - (self._x * 0.2) ** 2
else:
self._y = y
self.name = name
self.offset = 0
self._scaling = 1
self.smoothing = 0
self.bkg_Spectrum = None
[docs] @staticmethod
def from_file(filename, skip_rows=0):
try:
if filename.endswith('.chi'):
skip_rows = 4
data = np.loadtxt(filename, skiprows=skip_rows)
x = data.T[0]
y = data.T[1]
name = path.basename(filename).split('.')[:-1][0]
return Spectrum(x, y, name)
except ValueError:
print('Wrong data format for spectrum file! - ' + filename)
return -1
[docs] def save(self, filename, header=''):
data = np.dstack((self._x, self._y))
np.savetxt(filename, data[0], header=header)
[docs] def set_background(self, Spectrum):
self.bkg_spectrum = Spectrum
[docs] def reset_background(self):
self.bkg_spectrum = None
[docs] def set_smoothing(self, amount):
self.smoothing = amount
[docs] def rebin(self, bin_size):
"""
Returns a new Spectrum which is a rebinned version of the current one.
"""
x, y = self.data
x_min = np.round(np.min(x) / bin_size) * bin_size
x_max = np.round(np.max(x) / bin_size) * bin_size
new_x = np.arange(x_min, x_max + 0.1 * bin_size, bin_size)
bins = np.hstack((x_min - bin_size * 0.5, new_x + bin_size * 0.5))
new_y = (np.histogram(x, bins, weights=y)
[0] / np.histogram(x, bins)[0])
return Spectrum(new_x, new_y)
[docs] def dump_hdf5(self, file, name):
"""
>> @AUTHOR: Saransh Singh, Lawrence Livermore National Lab, saransh1@llnl.gov
>> @DATE: 01/15/2021 SS 1.0 original
>> @DETAILS: dump the class to a hdf5 file. the file argument could either be a
string or a h5.File instance. If it is a filename, then HDF5 file
is created, a Spectrum group is created and data is written out.
Else data written to Spectrum group in existing file object
>> @PARAMS file file name string or h5py.File object
name name ID of the spectrum e.g. experimental or simulated or background
"""
if(isinstance(file, str)):
fexist = path.isfile(file)
if(fexist):
fid = h5py.File(file, 'r+')
else:
fid = h5py.File(file, 'x')
elif(isinstance(file, h5py.File)):
fid = file
else:
raise RuntimeError(
'Parameters: dump_hdf5 Pass in a filename \
string or h5py.File object')
name_spectrum = 'Spectrum/'+name
if(name_spectrum in fid):
del(fid[name_spectrum])
gid = fid.create_group(name_spectrum)
tth, I = self.data
# make sure these arrays are not zero sized
if(tth.shape[0] > 0):
did = gid.create_dataset("tth", tth.shape, dtype=np.float64)
did.write_direct(tth.astype(np.float64))
if(I.shape[0] > 0):
did = gid.create_dataset("intensity", I.shape, dtype=np.float64)
did.write_direct(I.astype(np.float64))
@property
def data(self):
if self.bkg_Spectrum is not None:
# create background function
x_bkg, y_bkg = self.bkg_Spectrum.data
if not np.array_equal(x_bkg, self._x):
# the background will be interpolated
f_bkg = interp1d(x_bkg, y_bkg, kind='linear')
# find overlapping x and y values:
ind = np.where((self._x <= np.max(x_bkg)) &
(self._x >= np.min(x_bkg)))
x = self._x[ind]
y = self._y[ind]
if len(x) == 0:
""" if there is no overlapping between background
and Spectrum, raise an error """
raise BkgNotInRangeError(self.name)
y = y * self._scaling + self.offset - f_bkg(x)
else:
""" if Spectrum and bkg have the same
x basis we just delete y-y_bkg"""
x, y = self._x, self._y * \
self._scaling + self.offset - y_bkg
else:
x, y = self.original_data
if self.smoothing > 0:
y = gaussian_filter1d(y, self.smoothing)
return x, y
@data.setter
def data(self, data):
(x, y) = data
self._x = x
self._y = y
self.scaling = 1
self.offset = 0
@property
def data_array(self):
x, y = self.data
mask = np.isnan(y)
arr = np.vstack((x[~mask], y[~mask])).T
if isinstance(arr, np.ma.MaskedArray):
return arr.filled()
else:
return arr
@property
def original_data(self):
return self._x, self._y * self._scaling +\
self.offset
@property
def x(self):
return self._x
@x.setter
def x(self, new_value):
self._x = new_value
@property
def y(self):
return self._y
@y.setter
def y(self, new_y):
self._y = new_y
@property
def scaling(self):
return self._scaling
@scaling.setter
def scaling(self, value):
if value < 0:
self._scaling = 0
else:
self._scaling = value
[docs] def limit(self, x_min, x_max):
x, y = self.data
return Spectrum(x[np.where((x_min < x) & (x < x_max))],
y[np.where((x_min < x) & (x < x_max))])
[docs] def extend_to(self, x_value, y_value):
"""
Extends the current Spectrum to a specific x_value by filling it
with the y_value. Does not modify inplace but returns a new filled
Spectrum
:param x_value: Point to which extend the Spectrum should be smaller
than the lowest x-value in the Spectrum or vice versa
:param y_value: number to fill the Spectrum with
:return: extended Spectrum
"""
x_step = np.mean(np.diff(self.x))
x_min = np.min(self.x)
x_max = np.max(self.x)
if x_value < x_min:
x_fill = np.arange(x_min - x_step, x_value -
x_step*0.5, -x_step)[::-1]
y_fill = np.zeros(x_fill.shape)
y_fill.fill(y_value)
new_x = np.concatenate((x_fill, self.x))
new_y = np.concatenate((y_fill, self.y))
elif x_value > x_max:
x_fill = np.arange(x_max + x_step, x_value+x_step*0.5, x_step)
y_fill = np.zeros(x_fill.shape)
y_fill.fill(y_value)
new_x = np.concatenate((self.x, x_fill))
new_y = np.concatenate((self.y, y_fill))
else:
return self
return Spectrum(new_x, new_y)
[docs] def plot(self, show=False, *args, **kwargs):
import matplotlib.pyplot as plt
plt.plot(self.x, self.y, *args, **kwargs)
if show:
plt.show()
[docs] def nan_to_zero(self):
"""
set the nan in spectrum to zero
sometimes integrated spectrum in data can
have some nans, so need to catch those
"""
self._y = np.nan_to_num(self._y, copy=False, nan=0.0)
# Operators:
def __sub__(self, other):
orig_x, orig_y = self.data
other_x, other_y = other.data
if orig_x.shape != other_x.shape:
# todo different shape subtraction of spectra
# seems the fail somehow...
# the background will be interpolated
other_fcn = interp1d(other_x, other_x, kind='linear')
# find overlapping x and y values:
ind = np.where((orig_x <= np.max(other_x)) &
(orig_x >= np.min(other_x)))
x = orig_x[ind]
y = orig_y[ind]
if len(x) == 0:
# if there is no overlapping between
# background and Spectrum, raise an error
raise BkgNotInRangeError(self.name)
return Spectrum(x, y - other_fcn(x))
else:
return Spectrum(orig_x, orig_y - other_y)
def __add__(self, other):
orig_x, orig_y = self.data
other_x, other_y = other.data
if orig_x.shape != other_x.shape:
# the background will be interpolated
other_fcn = interp1d(other_x, other_x, kind='linear')
# find overlapping x and y values:
ind = np.where((orig_x <= np.max(other_x)) &
(orig_x >= np.min(other_x)))
x = orig_x[ind]
y = orig_y[ind]
if len(x) == 0:
# if there is no overlapping between
# background and Spectrum, raise an error
raise BkgNotInRangeError(self.name)
return Spectrum(x, y + other_fcn(x))
else:
return Spectrum(orig_x, orig_y + other_y)
def __rmul__(self, other):
orig_x, orig_y = self.data
return Spectrum(np.copy(orig_x), np.copy(orig_y) * other)
def __eq__(self, other):
if not isinstance(other, Spectrum):
return False
if np.array_equal(self.data, other.data):
return True
return False