"""Write imageseries to various formats"""
import abc
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
import os
import threading
import warnings
import numpy as np
import h5py
import hdf5plugin
import yaml
from hexrd.matrixutil import extract_ijv
from hexrd.utils.hdf5 import unwrap_dict_to_h5
MAX_NZ_FRACTION = 0.1 # 10% sparsity trigger for frame-cache write
# =============================================================================
# METHODS
# =============================================================================
[docs]def write(ims, fname, fmt, **kwargs):
"""write imageseries to file with options
Parameters
----------
ims: Imageseries
the imageseries to write
fname: str
the name of the HDF5 file to write
fmt: str
format name of the imageseries
kwargs: dict
options specific to format
"""
wcls = _Registry.getwriter(fmt)
w = wcls(ims, fname, **kwargs)
w.write()
# Registry
class _RegisterWriter(abc.ABCMeta):
def __init__(cls, name, bases, attrs):
abc.ABCMeta.__init__(cls, name, bases, attrs)
_Registry.register(cls)
class _Registry(object):
"""Registry for imageseries writers"""
writer_registry = dict()
@classmethod
def register(cls, wcls):
"""Register writer class"""
if wcls.__name__ != 'Writer':
cls.writer_registry[wcls.fmt] = wcls
@classmethod
def getwriter(cls, name):
"""return instance associated with name"""
return cls.writer_registry[name]
[docs]class Writer(object, metaclass=_RegisterWriter):
"""Base class for writers
Parameters
----------
ims: Imageseries
the imageseries to write
fname: str
the name of the HDF5 file to write
kwargs: dict
options specific to format
"""
fmt = None
def __init__(self, ims, fname, **kwargs):
self._ims = ims
self._shape = ims.shape
self._dtype = ims.dtype
self._nframes = len(ims)
self._meta = ims.metadata
self._fname = fname
self._opts = kwargs
if isinstance(fname, h5py.File):
filename = fname.filename
else:
filename = fname
# split filename into components
tmp = os.path.split(filename)
self._fname_dir = tmp[0]
tmp = os.path.splitext(tmp[1])
self._fname_base = tmp[0]
self._fname_suff = tmp[1]
@property
def fname(self):
return self._fname
@property
def fname_dir(self):
return self._fname_dir
@property
def opts(self):
return self._opts
[docs]class WriteH5(Writer):
"""Write imageseries in HDF5 file
Parameters
----------
ims: Imageseries
the imageseries to write
fname: str
the name of the HDF5 file to write
path: str, required
the path in HDF5 file
gzip: int 0-9
0 turns off compression, default=1
chunk_rows: int
number of rows per chunk; default is all
shuffle: bool
shuffle HDF5 data
"""
fmt = 'hdf5'
dflt_gzip = 1
dflt_chrows = 0
dflt_shuffle = True
def __init__(self, ims, fname, **kwargs):
Writer.__init__(self, ims, fname, **kwargs)
self._path = self._opts['path']
#
# ======================================== API
#
[docs] def write(self):
"""Write imageseries to HDF5 file"""
if isinstance(self._fname, h5py.File):
f = self._fname
else:
f = h5py.File(self._fname, "w")
g = f.create_group(self._path)
s0, s1 = self._shape
ds = g.create_dataset(
'images', (self._nframes, s0, s1), self._dtype, **self.h5opts
)
for i in range(self._nframes):
ds[i, :, :] = self._ims[i]
# add metadata
for k, v in list(self._meta.items()):
if np.issubdtype(v.dtype, 'U'):
# HDF5 can't handle unicode strings.
# Turn it into a regular string.
v = v.astype('S')
g.attrs[k] = v
@property
def h5opts(self):
d = {}
# shuffle
shuffle = self._opts.pop('shuffle', self.dflt_shuffle)
d['shuffle'] = shuffle
# compression
compress = self._opts.pop('gzip', self.dflt_gzip)
if compress > 9:
raise ValueError('gzip compression cannot exceed 9: %s' % compress)
if compress > 0:
d['compression'] = 'gzip'
d['compression_opts'] = compress
# chunk size
s0, s1 = self._shape
chrows = self._opts.pop('chunk_rows', self.dflt_chrows)
if chrows < 1 or chrows > s0:
chrows = s0
d['chunks'] = (1, chrows, s1)
return d
[docs]class WriteFrameCache(Writer):
"""write frame cache imageseries
The default write option is to save image data and metadata all
in a single npz file. The original option was to have a YAML file
as the primary file and a specified cache file for the images; this
method is deprecated.
Parameters
----------
ims: Imageseries instance
the imageseries to write
fname: str or Path
name of file to write;
threshold: float
threshold value for image, at or below which values are zeroed
cache_file: str or Path, optional
name of the npz file to save the image data, if not given in the
`fname` argument; for YAML format (deprecated), this is required
style: str, type of file to use for saving. options are:
- 'npz' for saving in a numpy compressed file
- 'fch5' for saving in the HDF5-based frame-cache format
max_workers: int, optional
The max number of worker threads for multithreading. Defaults to
the number of CPUs.
"""
fmt = 'frame-cache'
def __init__(self, ims, fname, style='npz', **kwargs):
Writer.__init__(self, ims, fname, **kwargs)
self._thresh = self._opts['threshold']
self._cache, self.cachename = self._set_cache()
ncpus = multiprocessing.cpu_count()
self.max_workers = kwargs.get('max_workers', ncpus)
supported_formats = ['npz', 'fch5']
if style not in supported_formats:
raise TypeError(
f"Unknown file style for writing framecache: {style}. "
f"Supported formats are {supported_formats}"
)
self.style = style
self.hdf5_compression = hdf5plugin.Blosc(cname="zstd", clevel=5)
def _set_cache(self):
cf = self.opts.get('cache_file')
if cf is None:
cachename = cache = self.fname
else:
if os.path.isabs(cf):
cache = cf
else:
cdir = os.path.dirname(self.fname)
cache = os.path.join(cdir, cf)
cachename = cf
return cache, cachename
@property
def cache(self):
return self._cache
def _process_meta(self, save_omegas=False):
d = {}
for k, v in list(self._meta.items()):
if isinstance(v, np.ndarray) and save_omegas:
# Save as a numpy array file
# if file does not exist (careful about directory)
# create new file
cdir = os.path.dirname(self._cache)
b = self._fname_base
fname = os.path.join(cdir, "%s-%s.npy" % (b, k))
if not os.path.exists(fname):
np.save(fname, v)
# add trigger in yml file
d[k] = "! load-numpy-array %s" % fname
else:
d[k] = v
return d
def _write_yml(self):
datad = {
'file': self._cachename,
'dtype': str(self._ims.dtype),
'nframes': len(self._ims),
'shape': list(self._ims.shape),
}
info = {'data': datad, 'meta': self._process_meta(save_omegas=True)}
with open(self._fname, "w") as f:
yaml.safe_dump(info, f)
def _write_frames(self):
if self.style == 'npz':
self._write_frames_npz()
elif self.style == 'fch5':
self._write_frames_fch5()
def _check_sparsity(self, frame_id, count, buff_size):
# check the sparsity
#
# FIXME: formalize this a little better
# ???: maybe set a hard limit of total nonzeros for the imageseries
# ???: could pass as a kwarg on open
fullness = count / float(buff_size)
if fullness > MAX_NZ_FRACTION:
sparseness = 100.0 * (1 - fullness)
msg = "frame %d is %4.2f%% sparse (cutoff is 95%%)" % (
frame_id,
sparseness,
)
warnings.warn(msg)
def _write_frames_npz(self):
"""also save shape array as originally done (before yaml)"""
buff_size = self._ims.shape[0] * self._ims.shape[1]
arrd = {}
num_workers = min(self.max_workers, len(self._ims))
row_buffers = np.empty((num_workers, buff_size), dtype=np.uint16)
col_buffers = np.empty((num_workers, buff_size), dtype=np.uint16)
val_buffers = np.empty((num_workers, buff_size), dtype=self._ims.dtype)
buffer_ids = {}
assign_buffer_lock = threading.Lock()
def assign_buffer_id():
with assign_buffer_lock:
buffer_ids[threading.get_ident()] = len(buffer_ids)
def extract_data(i):
buffer_id = buffer_ids[threading.get_ident()]
rows = row_buffers[buffer_id]
cols = col_buffers[buffer_id]
vals = val_buffers[buffer_id]
# wrapper to find (sparse) pixels above threshold
count = extract_ijv(self._ims[i], self._thresh, rows, cols, vals)
self._check_sparsity(i, count, buff_size)
arrd[f'{i}_row'] = rows[:count].copy()
arrd[f'{i}_col'] = cols[:count].copy()
arrd[f'{i}_data'] = vals[:count].copy()
kwargs = {
'max_workers': num_workers,
'initializer': assign_buffer_id,
}
with ThreadPoolExecutor(**kwargs) as executor:
# Evaluate the results via `list()`, so that if an exception is
# raised in a thread, it will be re-raised and visible to the user.
list(executor.map(extract_data, range(len(self._ims))))
arrd['shape'] = self._ims.shape
arrd['nframes'] = len(self._ims)
arrd['dtype'] = str(self._ims.dtype).encode()
arrd.update(self._process_meta())
np.savez_compressed(self.cache, **arrd)
def _write_frames_fch5(self):
"""Write framecache into an hdf5 file. The file will use three
datasets for the framecache:
- 'data': (m,1) array holding the datavalues of all frames. `m` is
evaluated upon runtime
- 'indices': (m,2) array holding the row& col information for the
values in data. 'data' together within 'indices' represent tha data
using the CSR format for sparse matrices.
- 'frame_ids`: (2*nframes) holds the range that the i-th frame
occupies in the above arrays. i.e. the information of the i-th frame
can be accessed using:
data_i = data[frame_ids[2*i]:frame_ids[2*i+1]] and
indices_i = indices[frame_ids[2*i]:frame_ids[2*i+1]]
"""
max_frame_size = self._ims.shape[0] * self._ims.shape[1]
nframes = len(self._ims)
shape = self._ims.shape
data_dtype = self._ims.dtype
frame_indices = np.empty((2 * nframes,), dtype=np.uint64)
data_dataset = None
indices_dataset = None
file_position = 0
total_size = 0
common_lock = threading.Lock()
thread_local = threading.local()
# creating an array in memory will fail if data is too big or threshold
# too low, so we write to the file while iterating the frames
with h5py.File(self.cache, "w") as h5f:
h5f.attrs['HEXRD_FRAMECACHE_VERSION'] = 1
h5f["shape"] = shape
h5f["nframes"] = nframes
h5f["dtype"] = str(np.dtype(self._ims.dtype)).encode("utf-8")
metadata = h5f.create_group("metadata")
unwrap_dict_to_h5(metadata, self._meta.copy())
def initialize_buffers():
thread_local.data = np.empty(
(max_frame_size, 1), dtype=self._ims.dtype
)
thread_local.indices = np.empty(
(max_frame_size, 2), dtype=np.uint16
)
def single_array_write_thread(i):
nonlocal file_position, total_size
im = self._ims[i]
row_slice = thread_local.indices[:, 0]
col_slice = thread_local.indices[:, 1]
data_slice = thread_local.data[:, 0]
count = extract_ijv(
im, self._thresh, row_slice, col_slice, data_slice
)
self._check_sparsity(i, count, max_frame_size)
# get the range this thread is doing to write into the file
start_file = 0
end_file = 0
with common_lock:
start_file = file_position
file_position += count
end_file = file_position
total_size += end_file - start_file
# write within the appropriate ranges
data_dataset[start_file:end_file, :] = thread_local.data[
:count, :
]
indices_dataset[start_file:end_file, :] = thread_local.indices[
:count, :
]
frame_indices[2 * i] = start_file
frame_indices[2 * i + 1] = end_file
kwargs = {
"max_workers": self.max_workers,
"initializer": initialize_buffers,
}
data_dataset = h5f.create_dataset(
"data",
shape=(nframes * max_frame_size, 1),
dtype=data_dtype,
compression=self.hdf5_compression,
)
indices_dataset = h5f.create_dataset(
"indices",
shape=(nframes * max_frame_size, 2),
dtype=np.uint16,
compression=self.hdf5_compression,
)
with ThreadPoolExecutor(**kwargs) as executor:
# Evaluate the results via `list()`, so that if an exception is
# raised in a thread, it will be re-raised and visible to
# the user.
list(executor.map(single_array_write_thread, range(nframes)))
# update the sizes of the dataset to match the amount of data
# that have been actually written
data_dataset.resize(total_size, axis=0)
indices_dataset.resize(total_size, axis=0)
h5f.create_dataset(
"frame_ids",
data=frame_indices,
compression=self.hdf5_compression,
)
[docs] def write(self, output_yaml=False):
"""writes frame cache for imageseries
presumes sparse forms are small enough to contain all frames
"""
self._write_frames()
if output_yaml:
warnings.warn(
"YAML output for frame-cache is deprecated", DeprecationWarning
)
self._write_yml()