# Copyright 2018-2020 Nick Anthony, Backman Biophotonics Lab, Northwestern University
#
# This file is part of PWSpy.
#
# PWSpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PWSpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PWSpy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
import copy
import json
import logging
import multiprocessing as mp
import numbers
import os
import typing as t_
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
import h5py
import numpy as np
import pandas as pd
import tifffile as tf
from matplotlib import pyplot as plt, widgets
from scipy import interpolate as spi
from scipy.io import savemat
from . import _metadata as pwsdtmd
from . import _other
if t_.TYPE_CHECKING:
from ..utility.reflection import Material
[docs]class ICBase(ABC):
"""A class to handle the data operations common to PWS related `image cubes`. Does not contain any file specific
functionality. uses the generic `index` attribute which can be overridden by derived classes to be wavelength, wavenumber,
time, etc.
Args:
data (np.ndarray): A 3-dimensional array containing the data the dimensions should be [Y, X, Z] where X and Y are the spatial coordinates of the image
and Z corresponds to the `index` dimension, e.g. wavelength, wavenumber, time, etc.
index (tuple(Number)): A tuple containing the values of the index for the data. This could be a tuple of wavelength values, times (in the case of Dyanmics), etc.
dtype (type): the data type that the data should be stored as. The default is numpy.float32.
"""
_index: tuple
data: np.ndarray
def __init__(self, data: np.ndarray, index: tuple, dtype=np.float32):
assert isinstance(data, np.ndarray)
self.data = data.astype(dtype)
self._index = index
if self.data.shape[2] != len(self.index):
raise ValueError(f"The length of the index list doesn't match the index axis of the data array. Got {len(self.index)}, expected {self.data.shape[2]}.")
@property
@abstractmethod
def _hdfTypeName(self) -> str:
"""Each class of this type should have a unique constant name which will be used to identify it when saved as HDF."""
pass
@property
def index(self) -> t_.Tuple[float, ...]:
"""
Returns:
The values of the datacube's index
"""
return self._index
[docs] def plotMean(self) -> t_.Tuple[plt.Figure, plt.Axes]:
"""
Returns:
A figure and attached axes plotting the mean of the data along the index axis.
corresponds to the mean reflectance in most cases.
"""
fig, ax = plt.subplots()
mean = np.mean(self.data, axis=2)
im = ax.imshow(mean)
plt.colorbar(im, ax=ax)
return fig, ax
[docs] def getMeanSpectra(self, mask: t_.Optional[t_.Union[_other.Roi, np.ndarray]] = None) -> t_.Tuple[np.ndarray, np.ndarray]:
"""Calculate the average spectra within a region of the data.
Args:
mask: An optional other.Roi or boolean numpy array used to select pixels from the X and Y dimensions of the data array.
If left as None then the full data array will be used as the region.
Returns:
The average spectra within the region, the standard deviation of the spectra within the region
"""
if isinstance(mask, _other.Roi):
mask = mask.mask
if mask is None: #Make a mask that includes everything
mask = np.ones(self.data.shape[:-1], dtype=np.bool)
mean = self.data[mask].mean(axis=0)
std = self.data[mask].std(axis=0)
return mean, std
[docs] def selectLassoROI(self, displayIndex: t_.Optional[int] = None, clim: t_.Sequence = None) -> _other.Roi:
"""
Allow the user to draw a `freehand` ROI on an image of the acquisition.
Args:
displayIndex: Display a particular z-slice of the array for mask drawing. If `None` then the mean along Z is displayed.
Returns:
An array of vertices of the polygon drawn.
"""
import warnings
warnings.warn("This method has been moved to the `pwspy_gui.utility` module and will be removed in the future.", category=DeprecationWarning)
Verts = [None]
if displayIndex is None:
displayIndex = self.data.shape[2]//2
fig, ax = plt.subplots()
data = self.data[:, :, displayIndex]
ax.imshow(data, clim=[np.percentile(data, 1), np.percentile(data, 99)])
fig.suptitle("Close to accept ROI")
def onSelect(verts):
Verts[0] = verts
l = widgets.LassoSelector(ax, onSelect, lineprops={'color': 'r'})
fig.show()
while plt.fignum_exists(fig.number):
fig.canvas.flush_events()
return _other.Roi.fromVerts(np.array(Verts[0]), self.data.shape[:2])
[docs] def selectRectangleROI(self, displayIndex: t_.Optional[int] = None) -> np.ndarray:
"""
Allow the user to draw a rectangular ROI on an image of the acquisition.
Args:
displayIndex (int): is used to display a particular z-slice for mask drawing. If None then the mean along Z is displayed. Returns an array of vertices of the rectangle.
Returns:
np.ndarray: An array of the 4 XY vertices of the rectangle.
"""
import warnings
warnings.warn("This method has been moved to the `pwspy_gui.utility` module and will be removed in the future.",
category=DeprecationWarning)
verts = [None]
if displayIndex is None:
displayIndex = self.data.shape[2]//2
fig, ax = plt.subplots()
ax.imshow(self.data[:, :, displayIndex])
fig.suptitle("Close to accept ROI")
def rectSelect(mins, maxes):
verts[0] = ((mins.ydata, mins.xdata), (maxes.ydata, mins.xdata), (maxes.ydata, maxes.xdata), (mins.ydata, maxes.xdata))
r = widgets.RectangleSelector(ax, rectSelect)
fig.show()
while plt.fignum_exists(fig.number):
fig.canvas.flush_events()
return np.array(verts[0])
def __getitem__(self, slic):
return self.data[slic]
[docs] def filterDust(self, sigma: float, pixelSize: float):
"""Blurs the data cube in the X and Y dimensions. Often used to remove the effects of dust on a normalization.
Args:
sigma: This specifies the radius of the gaussian filter used for blurring. The units of the value are determined by `pixelSize`
pixelSize: The pixel size in microns. Settings this to 1 will effectively causes sigma to be in units of pixels rather than microns."""
from scipy import ndimage
sigma = sigma / pixelSize # convert from microns to pixels
for i in range(self.data.shape[2]):
self.data[:, :, i] = ndimage.filters.gaussian_filter(self.data[:, :, i], sigma, mode='reflect')
def _indicesMatch(self, other: 'ICBase') -> bool:
"""This check is performed before allowing many arithmetic operations between two data cubes. Makes sure that the Z-axis of the two cubes match."""
return self._index == other._index
[docs] def selIndex(self, start: t_.Optional[float], stop: t_.Optional[float]) -> t_.Tuple[np.ndarray, t_.Sequence]:
"""
Args:
start: The beginning value of the index in the new object. Pass `None` to include everything.
stop: The ending value of the index in the new object. Pass `None` to include everything.
Returns:
A new instance of ICBase with only data from `start` to `stop` in the `index`."""
wv = np.array(self.index)
if start is None:
iStart = None
else:
iStart = np.argmin(np.abs(wv - start))
if stop is None:
iStop = None
else:
iStop = np.argmin(np.abs(wv - stop))
iStop += 1 # include the end point
if iStop >= len(wv): # Include everything
iStop = None
data = self.data[:, :, iStart:iStop]
index = self.index[iStart:iStop]
return data, index
def _add(self, other: t_.Union['self.__class__', numbers.Real, np.ndarray]) -> 'self.__class__': #TODO these don't return the right datatype. They should probably just be gotten rid of
if isinstance(other, self.__class__):
if not self._indicesMatch(other):
raise ValueError(f"{self.__class__} indices are not compatible")
ret = self.data + other.data
elif isinstance(other, (numbers.Real, np.ndarray)):
ret = self.data + other
else:
raise NotImplementedError(f"Addition is not supported between {self.__class__} and {type(other)}")
return ret
def _sub(self, other: t_.Union['self.__class__', numbers.Real, np.ndarray]) -> 'self.__class__':
if isinstance(other, self.__class__):
if not self._indicesMatch(other):
raise ValueError(f"{self.__class__} indices are not compatible")
ret = self.data - other.data
elif isinstance(other, (numbers.Real, np.ndarray)):
ret = self.data - other
else:
raise NotImplementedError(f"Subtraction is not supported between {self.__class__} and {type(other)}")
return ret
def _mul(self, other: t_.Union['self.__class__', numbers.Real, np.ndarray]) -> 'self.__class__':
if isinstance(other, self.__class__):
if not self._indicesMatch(other):
raise ValueError(f"{self.__class__} indices are not compatible")
ret = self.data * other.data
elif isinstance(other, (numbers.Real, np.ndarray)):
ret = self.data * other
else:
raise NotImplementedError(f"Multiplication is not supported between {self.__class__} and {type(other)}")
return ret
def _truediv(self, other: t_.Union['self.__class__', numbers.Real, np.ndarray]) -> 'self.__class__':
if isinstance(other, self.__class__):
if not self._indicesMatch(other):
raise ValueError(f"{self.__class__} indices are not compatible")
ret = self.data / other.data
elif isinstance(other, (numbers.Real, np.ndarray)):
ret = self.data / other
else:
raise NotImplementedError(f"Division is not supported between {self.__class__} and {type(other)}")
return ret
def __add__(self, other):
ret = self._add(other)
new = copy.deepcopy(self)
new.data = ret
return new
def __sub__(self, other):
ret = self._sub(other)
new = copy.deepcopy(self)
new.data = ret
return new
def __mul__(self, other):
ret = self._mul(other)
new = copy.deepcopy(self)
new.data = ret
return new
__rmul__ = __mul__ # multiplication is commutative. let it work both ways.
def __truediv__(self, other):
ret = self._truediv(other)
new = copy.deepcopy(self)
new.data = ret
return new
[docs] def toHdfDataset(self, g: h5py.Group, name: str, fixedPointCompression: bool = True, compression: str = None) -> h5py.Group:
"""
Save the data of this class to a new HDF dataset.
Args:
g (h5py.Group): the parent HDF Group of the new dataset.
name (str): the name of the new HDF dataset in group `g`.
fixedPointCompression (bool): if True then save the data in a special 16bit fixed-point format. Testing has shown that this has a
maximum conversion error of 1.4e-3 percent. Saving is ~10% faster but requires only 50% the hard drive space.
compression: The value of this argument will be passed to h5py.create_dataset for numpy arrays. See h5py documentation for available options.
Returns:
h5py.Group: This is the the same h5py.Group that was passed in a `g`. It should now have a new dataset by the name of 'name'
"""
if fixedPointCompression:
# Scale data to span the full range of an unsigned 16bit integer. save as integer and save the min and max
# needed to scale back to the original data. Testing has shown that this has a maximum conversion error of 1.4e-3 percent.
# Saving is ~10% faster but requires only 50% the hard drive space. Time can be traded for space by using compression
# when creating the dataset
m = self.data.min()
M = self.data.max()
fpData = self.data - m
fpData = fpData / (M - m)
fpData *= (2 ** 16 - 1)
fpData = fpData.astype(np.uint16)
dset = g.create_dataset(name, data=fpData, compression=compression) # , chunks=(64,64,self.data.shape[2]), compression=2)
dset.attrs['index'] = np.array(self.index)
dset.attrs['type'] = np.string_(f"{self._hdfTypeName}_fp")
dset.attrs['min'] = m
dset.attrs['max'] = M
else:
dset = g.create_dataset(name, data=self.data, compression=compression)
dset.attrs['index'] = np.array(self.index)
dset.attrs['type'] = np.string_(self._hdfTypeName)
return g
[docs] @classmethod
def decodeHdf(cls, d: h5py.Dataset) -> t_.Tuple[np.array, t_.Tuple[float, ...]]:
"""
Load a new instance of ICBase from an `h5py.Dataset`
Args:
d: The dataset that the ICBase has been saved to
Returns:
A tuple containing: (data: The 3D array of `data`, index: A tuple containing the `index`)
"""
assert 'type' in d.attrs
assert 'index' in d.attrs
if d.attrs['type'].decode() == cls._hdfTypeName: #standard decoding
return np.array(d), tuple(d.attrs['index'])
elif d.attrs['type'].decode() == f"{cls._hdfTypeName}_fp": #Fixed point decoding
M = d.attrs['max']
m = d.attrs['min']
arr = np.array(d)
arr = arr.astype(np.float32) / (2 ** 16 - 1)
arr *= (M - m)
arr += m
return arr, tuple(d.attrs['index'])
else:
raise TypeError(f"Got {d.attrs['type'].decode()} instead of {cls._hdfTypeName}")
[docs]class ICRawBase(ICBase, ABC):
"""This class represents data cubes which are not derived from other data cubes. They represent raw acquired data that exists as data files on the computer.
For this reason they may need to have hardware specific corrections applied to them such as normalizing out exposure time, linearizing camera counts,
subtracting dark counts, etc. The most important change is the addition of `metadata`.
Args:
data: A 3-dimensional array containing the data. The dimensions should be [Y, X, Z] where X and Y are the spatial coordinates of the image
and Z corresponds to the `index` dimension, e.g. wavelength, wavenumber, time, etc.
index: A tuple containing the values of the index for the data. This could be a tuple of wavelength values, times (in the case of Dynamics), etc.
metadata: The metadata object associated with this data object.
processingStatus: An object that keeps track of which processing steps and corrections have been applied to this object.
dtype (type): the data type that the data should be stored as. The default is numpy.float32.
"""
[docs] @dataclass
class ProcessingStatus:
"""Keeps track of which processing steps have been applied to an `ICRawBase` object.
By default none of these things have been done for raw data"""
cameraCorrected: bool = False
normalizedByExposure: bool = False
extraReflectionSubtracted: bool = False
normalizedByReference: bool = False
def toDict(self) -> dict:
return {'camCorrected': self.cameraCorrected, 'exposureNormed': self.normalizedByExposure, 'erSubtracted': self.extraReflectionSubtracted, 'refNormed': self.normalizedByReference}
@classmethod
def fromDict(cls, d: dict) -> ICBase.ProcessingStatus:
return cls(cameraCorrected=d['camCorrected'], normalizedByExposure=d['exposureNormed'], extraReflectionSubtracted=d['erSubtracted'], normalizedByReference=d['refNormed'])
def __init__(self, data: np.ndarray, index: tuple, metadata: pwsdtmd.MetaDataBase, processingStatus: ProcessingStatus=None, dtype=np.float32):
super().__init__(data, index, dtype)
self.metadata = metadata
if processingStatus:
self.processingStatus = processingStatus
else:
self.processingStatus = ICRawBase.ProcessingStatus(False, False, False, False)
[docs] def normalizeByExposure(self):
"""This is one of the first steps in most analysis pipelines. Data is divided by the camera exposure.
This way two PwsCube that were acquired at different exposure times will still be on equivalent scales."""
if not self.processingStatus.cameraCorrected:
raise Exception(
"This PwsCube has not yet been corrected for camera effects. are you sure you want to normalize by exposure?")
if not self.processingStatus.normalizedByExposure:
self.data = self.data / self.metadata.exposure
else:
raise Exception("The PwsCube has already been normalized by exposure.")
self.processingStatus.normalizedByExposure = True
[docs] def correctCameraEffects(self, correction: _other.CameraCorrection = None, binning: int = None):
"""Subtracts the darkcounts from the data. count is darkcounts per pixel. binning should be specified if
it wasn't saved in the micromanager metadata. Both method arguments should be able to be loaded automatically
from the metadata but for older data files they will need to be supplied manually.
Args:
correction: The cameracorrection object providing information on how to correct the data.
binning: The binning that the raw data was imaged at. 2 = 2x2 binning, 3 = 3x3 binning, etc.
"""
if self.processingStatus.cameraCorrected:
raise Exception("This PwsCube has already had it's camera correction applied!")
if binning is None:
binning = self.metadata.binning
if binning is None: raise ValueError('Binning metadata not found. Binning must be specified in function argument.')
if correction is None:
correction = self.metadata.cameraCorrection
if correction is None: raise ValueError('other.CameraCorrection metadata not found. Binning must be specified in function argument.')
count = correction.darkCounts * binning ** 2 # Account for the fact that binning multiplies the darkcount.
self.data = self.data - count
if correction.linearityPolynomial is None or correction.linearityPolynomial == (1.0,):
pass
else:
self.data = np.polynomial.polynomial.polyval(self.data, (0.0,) + correction.linearityPolynomial) # The [0] item is the y-intercept (already handled by the darkcount)
self.processingStatus.cameraCorrected = True
return
[docs] @abstractmethod
def normalizeByReference(self, reference: 'self.__class__'):
"""Normalize the raw data of this data cube by a reference cube to result in data representing
arbitrarily scaled reflectance.
Args:
reference: A reference acquisition. Usually an image taken from a blank piece of glass.
"""
pass
[docs] def toHdfDataset(self, g: h5py.Group, name: str, fixedPointCompression: bool = True) -> h5py.Group:
"""
Save this object into an HDF dataset.
Args:
g: The `h5py.Group` object to create a new dataset in.
name: The name of the new dataset.
fixedPointCompression: If True then the data will be converted from floating point to 16-bit fixed point.
This results in approximately half the storage requirements at a very slight loss in precision.
Returns:
A reference to the `h5py.Group` passed in as `g`.
"""
g = ICBase.toHdfDataset(self, g, name, fixedPointCompression)
self.metadata.encodeHdfMetadata(g[name])
g[name].attrs['processingStatus'] = np.string_(json.dumps(self.processingStatus.toDict()))
return g
[docs] @classmethod
def decodeHdf(cls, d: h5py.Dataset) -> t_.Tuple[np.array, t_.Tuple[float, ...], dict, ProcessingStatus]:
"""
Load a new instance of ICRawBase from an `h5py.Dataset`
Args:
d: The dataset that the ICBase has been saved to
Returns:
A tuple containing:
data: The 3D array of `data`
index: A tuple containing the `index`
metadata: A dictionary containing metadata.
procStatus: The processing status of the object.
"""
arr, index = super().decodeHdf(d)
mdDict = cls.getMetadataClass().decodeHdfMetadata(d)
if 'processingStatus' in d.attrs:
processingStatus = cls.ProcessingStatus.fromDict(json.loads(d.attrs['processingStatus']))
else: # Some old hdf files won't have this field, that's ok.
processingStatus = None
return arr, index, mdDict, processingStatus
[docs]class DynCube(ICRawBase):
"""A class representing a single acquisition of PWS Dynamics. In which the wavelength is held constant and the 3rd
dimension of the data is time rather than wavelength. This can be analyzed to reveal information about diffusion rate.
Contains methods for loading and saving to multiple formats as well as common operations used in analysis.
Args:
data: A 3-dimensional array containing the data. The dimensions should be [Y, X, Z] where X and Y are the spatial coordinates of the image
and Z corresponds to the `index` dimension, e.g. wavelength, wavenumber, time, etc.
metadata: The metadata object associated with this data object.
processingStatus: An object that keeps track of which processing steps and corrections have been applied to this object.
dtype: the data type that the data should be stored as. The default is numpy.float32.
"""
_hdfTypeName = "DynCube" # This is used for saving/loading from HDF. Important not to change it or old files will stop working.
def __init__(self, data, metadata: pwsdtmd.DynMetaData, processingStatus: ICRawBase.ProcessingStatus = None, dtype=np.float32):
assert isinstance(metadata, pwsdtmd.DynMetaData)
super().__init__(data, metadata.times, metadata, processingStatus=processingStatus, dtype=dtype)
@property
def times(self) -> t_.Tuple[float, ...]:
"""Unlike PWS where we operate along the dimension of wavelength, in dynamics we operate along the dimension of time.
Returns:
A tuple of the time values for each 2d slice along the 3rd axis of the `data` array.
"""
return self.index
[docs] @classmethod
def loadAny(cls, directory: str, metadata: t_.Optional[pwsdtmd.DynMetaData] = None, lock: t_.Optional[mp.Lock] = None) -> DynCube:
"""
Attempt to load a `DynCube` for any format of file in `directory`
Args:
directory: The directory containing the data files.
metadata: The metadata object associated with this acquisition
lock: A `Lock` object used to synchronized IO in multithreading and multiprocessing applications.
Returns:
A new instance of `DynCube`.
"""
try:
return DynCube.fromTiff(directory, metadata=metadata, lock=lock)
except:
try:
return DynCube.fromOldPWS(directory, metadata=metadata, lock=lock)
except:
raise OSError(f"Could not find a valid PWS image cube file at {directory}.")
[docs] @classmethod
def fromOldPWS(cls, directory, metadata: pwsdtmd.DynMetaData = None, lock: mp.Lock = None) -> DynCube:
"""Loads from the file format that was saved by the all-matlab version of the Basis acquisition code.
Data was saved in raw binary to a file called `image_cube`. Some metadata was saved to .mat files called
`info2` and `info3`.
Args:
directory: The directory containing the data files.
metadata: The metadata object associated with this acquisition
lock: A `Lock` object used to synchronized IO in multithreading and multiprocessing applications.
Returns:
A new instance of `DynCube`.
"""
if lock is not None:
lock.acquire()
try:
if metadata is None:
metadata = pwsdtmd.DynMetaData.fromOldPWS(directory)
with open(os.path.join(directory, 'image_cube'), 'rb') as f:
data = np.frombuffer(f.read(), dtype=np.uint16)
data = data.reshape((metadata.dict['imgHeight'], metadata.dict['imgWidth'], len(metadata.times)), order='F')
finally:
if lock is not None:
lock.release()
data = data.copy(order='C')
return cls(data, metadata)
[docs] @classmethod
def fromTiff(cls, directory, metadata: pwsdtmd.DynMetaData = None, lock: mp.Lock = None) -> DynCube:
"""Load a dyanmics acquisition from a tiff file. if the metadata for the acquisition has already been loaded then you can provide
is as the `metadata` argument to avoid loading it again. the `lock` argument is an optional place to provide a multiprocessing.Lock
which can be used when multiple files in parallel to avoid giving the hard drive too many simultaneous requests, this is probably not necessary.
Args:
directory: The directory containing the data files.
metadata: The metadata object associated with this acquisition
lock: A `Lock` object used to synchronized IO in multithreading and multiprocessing applications.
Returns:
A new instance of `DynCube`.
"""
if lock is not None:
lock.acquire()
try:
if metadata is None:
metadata = pwsdtmd.DynMetaData.fromTiff(directory)
if os.path.exists(os.path.join(directory, 'dyn.tif')):
path = os.path.join(directory, 'dyn.tif')
else:
raise OSError("No Tiff file was found at:", directory)
with tf.TiffFile(path) as tif:
data = np.rollaxis(tif.asarray(), 0, 3) # Swap axes to match y,x,lambda convention.
finally:
if lock is not None:
lock.release()
data = data.copy(order='C')
return cls(data, metadata)
[docs] def normalizeByReference(self, reference: t_.Union[DynCube, np.ndarray]):
"""This method can accept either a DynCube (in which case it's average over time will be calculated and used for
normalization) or a 2d numpy Array which should represent the average over time of a reference DynCube. The array
should be 2D and its shape should match the first two dimensions of this DynCube.
Args:
reference: Reference data for normalization. Usually an image of a blank piece of glass.
"""
logger = logging.getLogger(__name__)
if self.processingStatus.normalizedByReference:
raise Exception("This cube has already been normalized by a reference.")
if not self.processingStatus.cameraCorrected:
logger.warning("This cube has not been corrected for camera effects. This is highly reccomended before performing any analysis steps.")
if not self.processingStatus.normalizedByExposure:
logger.warning("This cube has not been normalized by exposure. This is highly reccomended before performing any analysis steps.")
if isinstance(reference, DynCube):
if not reference.processingStatus.cameraCorrected:
logger.warning("The reference cube has not been corrected for camera effects. This is highly reccomended before performing any analysis steps.")
if not reference.processingStatus.normalizedByExposure:
logger.warning("The reference cube has not been normalized by exposure. This is highly reccomended before performing any analysis steps.")
mean = reference.data.mean(axis=2)
elif isinstance(reference, np.ndarray):
assert len(reference.shape) == 2
assert reference.shape[0] == self.data.shape[0]
assert reference.shape[1] == self.data.shape[1]
mean = reference
else:
raise TypeError(f"`reference` must be either DynCube or numpy.ndarray, not {type(reference)}")
self.data = self.data / mean[:, :, None]
self.processingStatus.normalizedByReference = True
[docs] def selIndex(self, start, stop) -> DynCube: # Inherit docstring
data, index = super().selIndex(start, stop)
md = self.metadata
md.dict['times'] = index
return DynCube(data, md)
[docs] def getAutocorrelation(self) -> np.ndarray:
"""
Returns the autocorrelation function of dynamics data along the time axis. The ACF is calculated using
fourier transforms using IFFT(FFT(data)*conj(FFT(data)))/length(data).
Returns:
A 3D array of the autocorrelation function of the original data.
"""
data = self.data - self.data.mean(axis=2)[:, :, None] # By subtracting the mean we get an ACF where the 0-lag value is the variance of the signal.
F = np.fft.rfft(data, axis=2)
ac = np.fft.irfft(F * np.conjugate(F), axis=2) / data.shape[2]
return ac
[docs] def filterDust(self, kernelRadius: float, pixelSize: float = None):
"""
This method blurs the data of the cube along the X and Y dimensions. This is useful if the cube is being
used as a reference to normalize other cube. It helps blur out dust and other unwanted small features.
Args:
kernelRadius: The `sigma` of the gaussian kernel used for blurring. A greater value results in greater
blurring. If `pixelSize` is provided then this is in units of `pixelSize`, otherwise it is in units of
pixels.
pixelSize: The size (usualy in units of microns) of each pixel in the datacube. This can generally be loaded
automatically from the metadata.
"""
if pixelSize is None:
pixelSize = self.metadata.pixelSizeUm
if pixelSize is None:
raise ValueError("DynCube Metadata does not have a `pixelSizeUm` saved. please manually specify pixel size. use pixelSize=1 to make `kernelRadius in units of pixels.")
super().filterDust(kernelRadius, pixelSize)
@classmethod
def fromHdfDataset(cls, d: h5py.Dataset): # Inherit docstring
data, index, mdDict, processingStatus = cls.decodeHdf(d)
md = pwsdtmd.DynMetaData(mdDict, fileFormat=pwsdtmd.DynMetaData.FileFormats.Hdf)
return cls(data, md, processingStatus=processingStatus)
[docs]class PwsCube(ICRawBase):
"""
A class representing a single PWS acquisition. Contains methods for loading and saving to multiple formats as
well as common operations used in analysis.
Args:
data: A 3-dimensional array containing the data. The dimensions should be [Y, X, Z] where X and Y are the spatial coordinates of the image
and Z corresponds to the `index` dimension, e.g. wavelength, wavenumber, time, etc.
metadata: The metadata object associated with this data object.
processingStatus: An object that keeps track of which processing steps and corrections have been applied to this object.
dtype (type): the data type that the data should be stored as. The default is numpy.float32.
"""
_hdfTypeName = "ImCube" # This is used for saving/loading from HDF. Important not to change it or old files will stop working.
def __init__(self, data, metadata: pwsdtmd.PwsMetaData, processingStatus: ICRawBase.ProcessingStatus = None, dtype=np.float32):
assert isinstance(metadata, pwsdtmd.PwsMetaData)
super().__init__(data, metadata.wavelengths, metadata, processingStatus=processingStatus, dtype=dtype)
@property
def wavelengths(self):
"""
A tuple containing the values of the wavelengths for the data.
"""
return self.index
[docs] @classmethod
def loadAny(cls, directory: str, metadata: pwsdtmd.PwsMetaData = None, lock: mp.Lock = None):
"""
Attempt to load a `PwsCube` for any format of file in `directory`
Args:
directory: The directory containing the data files.
metadata: The metadata object associated with this acquisition
lock: A `Lock` object used to synchronized IO in multithreading and multiprocessing applications.
Returns:
A new instance of `PwsCube`.
"""
try:
return PwsCube.fromTiff(directory, metadata=metadata, lock=lock)
except:
try:
return PwsCube.fromOldPWS(directory, metadata=metadata, lock=lock)
except:
try:
return PwsCube.fromNano(directory, metadata=metadata, lock=lock)
except:
raise OSError(f"Could not find a valid PWS image cube file at {directory}.")
[docs] @classmethod
def fromOldPWS(cls, directory: str, metadata: pwsdtmd.PwsMetaData = None, lock: mp.Lock = None):
"""
Loads from the file format that was saved by the all-matlab version of the Basis acquisition code.
Data was saved in raw binary to a file called `image_cube`. Some metadata was saved to .mat files called
`info2` and `info3`.
Args:
directory: The directory containing the data files.
metadata: The metadata object associated with this acquisition
lock: A `Lock` object used to synchronized IO in multithreading and multiprocessing applications.
Returns:
A new instance of `PwsCube`.
"""
if lock is not None:
lock.acquire()
try:
if metadata is None:
metadata = pwsdtmd.PwsMetaData.fromOldPWS(directory)
with open(os.path.join(directory, 'image_cube'), 'rb') as f:
data = np.frombuffer(f.read(), dtype=np.uint16)
data = data.reshape((metadata.dict['imgHeight'], metadata.dict['imgWidth'], len(metadata.wavelengths)), order='F')
finally:
if lock is not None:
lock.release()
data = data.copy(order='C')
return cls(data, metadata)
[docs] @classmethod
def fromTiff(cls, directory, metadata: pwsdtmd.PwsMetaData = None, lock: mp.Lock = None):
"""
Loads from a 3D tiff file named `pws.tif`, or in some older data `MMStack.ome.tif`. Metadata can be stored in
the tags of the tiff file but if there is a pwsmetadata.json file found then this is preferred.
A multiprocessing.Lock object can be passed to this function so that it will acquire a lock during the
hard-drive intensive parts of the function. this is useful in multi-core contexts.
Args:
directory: The directory containing the data files.
metadata: The metadata object associated with this acquisition
lock: A `Lock` object used to synchronized IO in multithreading and multiprocessing applications.
Returns:
A new instance of `PwsCube`.
"""
if lock is not None:
lock.acquire()
try:
if metadata is None:
metadata = pwsdtmd.PwsMetaData.fromTiff(directory)
if os.path.exists(os.path.join(directory, 'MMStack.ome.tif')):
path = os.path.join(directory, 'MMStack.ome.tif')
elif os.path.exists(os.path.join(directory, 'pws.tif')):
path = os.path.join(directory, 'pws.tif')
else:
raise OSError("No Tiff file was found at:", directory)
with tf.TiffFile(path) as tif:
data = np.rollaxis(tif.asarray(), 0, 3) # Swap axes to match y,x,lambda convention.
finally:
if lock is not None:
lock.release()
data = data.copy(order='C')
return cls(data, metadata)
[docs] @classmethod
def fromNano(cls, directory: str, metadata: pwsdtmd.PwsMetaData = None, lock: mp.Lock = None) -> PwsCube:
"""
Loads from the file format used at NC. all data and metadata is contained in a .mat file.
Args:
directory: The directory containing the data files.
metadata: The metadata object associated with this acquisition
lock: A `Lock` object used to synchronized IO in multithreading and multiprocessing applications.
Returns:
A new instance of `PwsCube`.
"""
path = os.path.join(directory, 'imageCube.mat')
if lock is not None:
lock.acquire()
try:
if metadata is None:
metadata = pwsdtmd.PwsMetaData.fromNano(directory)
with h5py.File(path, 'r') as hf:
data = np.array(hf['imageCube'])
data = data.transpose((2, 1, 0)) # Re-order axes to match the shape of ROIs and thumbnails.
data = data.copy(order='C') # Copying the data to 'C' order has shows to provide some benefit for performance in selecting out spectra.
finally:
if lock is not None:
lock.release()
return cls(data, metadata)
def _saveThumbnail(self, directory):
"""Used to save a thumbnail image called `image_bd.tif` this is useful for quickly viewing the cells without
having to load and process all the data."""
im = self.data[:, :, self.data.shape[-1] // 2]
normedIm = im - np.percentile(im, 0.01) # .01 percent saturation
normedIm[normedIm<0] = 0 #Don't allow negative values.
normedIm = normedIm / np.percentile(normedIm, 99.99)
normedIm[normedIm>1] = 1 #Keep eveything below 1
normedIm = (normedIm * 255).astype(np.uint8)
im = tf.TiffWriter(os.path.join(directory, 'image_bd.tif'))
im.save(normedIm)
im.close()
[docs] def toTiff(self, outpath: str, dtype=np.uint16):
"""Save the PwsCube to the standard TIFF file format.
Args:
outpath: The path to save the new TIFF file to.
"""
im = self.data
im = im.astype(dtype)
os.mkdir(outpath)
self._saveThumbnail(outpath)
with tf.TiffWriter(open(os.path.join(outpath, 'pws.tif'), 'wb')) as w:
w.save(np.rollaxis(im, -1, 0), metadata=self.metadata.dict)
self.metadata.metadataToJson(outpath)
[docs] def selIndex(self, start: t_.Optional[float], stop: t_.Optional[float]) -> PwsCube: # Inherit docstring
data, index = super().selIndex(start, stop)
md = copy.deepcopy(self.metadata) # We are creating a copy of the metadata object because modifying the original metadata object can cause weird issues.
assert md.dict is not self.metadata.dict
md.dict['wavelengths'] = index
return PwsCube(data, md)
[docs] @classmethod
def fromHdfDataset(cls, d: h5py.Dataset):
"""Load an PwsCube from an HDF5 dataset."""
data, index, mdDict, processingStatus = cls.decodeHdf(d)
md = pwsdtmd.PwsMetaData(mdDict, fileFormat=pwsdtmd.PwsMetaData.FileFormats.Hdf)
return cls(data, md, processingStatus=processingStatus)
[docs] def filterDust(self, kernelRadius: float, pixelSize: float = None) -> None:
"""This method blurs the data of the PwsCube along the X and Y dimensions. This is useful if the PwsCube is being
used as a reference to normalize other PwsCube. It helps blur out dust adn other unwanted small features.
Args:
kernelRadius: The `sigma` of the gaussian kernel used for blurring. A greater value results in greater
blurring. If `pixelSize` is provided then this is in units of `pixelSize`, otherwise it is in units of
pixels.
pixelSize: The size (usualy in units of microns) of each pixel in the datacube. This can generally be loaded
automatically from the metadata.
"""
if pixelSize is None:
pixelSize = self.metadata.pixelSizeUm
if pixelSize is None:
raise ValueError("PwsCube Metadata does not have a `pixelSizeUm` saved. please manually specify pixel size. use pixelSize=1 to make `kernelRadius in units of pixels.")
super().filterDust(kernelRadius, pixelSize)
[docs] def normalizeByReference(self, reference: PwsCube):
"""Normalize the raw data of this data cube by a reference cube to result in data representing
arbitrarily scaled reflectance.
Args:
reference (PwsCube): A reference acquisition (Usually a blank spot on a dish). The data of this acquisition will be divided by the data of the reference
"""
logger = logging.getLogger(__name__)
if self.processingStatus.normalizedByReference:
raise Exception("This PwsCube has already been normalized by a reference.")
if not self.processingStatus.cameraCorrected:
logger.warning("This PwsCube has not been corrected for camera effects. This is highly reccomended before performing any analysis steps.")
if not self.processingStatus.normalizedByExposure:
logger.warning("This PwsCube has not been normalized by exposure. This is highly reccomended before performing any analysis steps.")
if not reference.processingStatus.cameraCorrected:
logger.warning("The reference PwsCube has not been corrected for camera effects. This is highly reccomended before performing any analysis steps.")
if not reference.processingStatus.normalizedByExposure:
logger.warning("The reference PwsCube has not been normalized by exposure. This is highly reccomended before performing any analysis steps.")
self.data = self.data / reference.data
self.processingStatus.normalizedByReference = True
def __repr__(self):
return f"{self.__class__.__name__}(id={self.metadata.idTag})"
[docs]class KCube(ICBase):
"""A class representing an PwsCube after being transformed from being described in terms of wavelength to
wavenumber (k-space). Much of the analysis operated in terms of k-space.
Args:
data: A 3-dimensional array containing the data. The dimensions should be [Y, X, Z] where X and Y are the spatial coordinates of the image
and Z corresponds to the `index` dimension, e.g. wavelength, wavenumber, time, etc.
wavenumbers: A sequence indicating the wavenumber associated with each 2D slice along the 3rd axis of the `data` array.
metadata: The metadata object associated with this data object.
"""
_hdfTypeName = "KCube" # This is used for saving/loading from HDF. Important not to change it or old files will stop working.
def __init__(self, data: np.ndarray, wavenumbers: t_.Tuple[float], metadata: pwsdtmd.PwsMetaData = None):
self.metadata = metadata #Just saving a reference to the original PwsCube in case we want to reference it.
ICBase.__init__(self, data, wavenumbers, dtype=np.float32)
[docs] @classmethod
def fromPwsCube(cls, cube: PwsCube) -> KCube:
"""
Convert an PwsCube into a KCube. Data is converted from wavelength to wavenumber (1/lambda), interpolation is
then used to linearize the data in terms of wavenumber.
Args:
cube: The `PwsCube` object to generate a `KCube` object from.
Returns:
A new instance of `KCube`
"""
# Convert to wavenumber and reverse the order so we are ascending in order. Units of radian/micron
wavenumbers = (2 * np.pi) / (np.array(cube.wavelengths, dtype=np.float64) * 1e-3)[::-1]
data = cube.data[:, :, ::-1]
# Generate evenly spaced wavenumbers
# dk = (self.wavenumbers[-1] - self.wavenumbers[0])/(len(self.wavenumbers)-1);
evenWavenumbers = np.linspace(wavenumbers[0], wavenumbers[-1], num=len(wavenumbers), dtype=np.float64)
# Interpolate to the evenly spaced wavenumbers
interpFunc = spi.interp1d(wavenumbers, data, kind='linear', axis=2)
data = interpFunc(evenWavenumbers)
return cls(data, tuple(evenWavenumbers.astype(np.float32)), metadata=cube.metadata)
@property
def wavenumbers(self) -> t_.Tuple[float, ...]:
return self.index
[docs] def getOpd(self, useHannWindow: bool, indexOpdStop: int = None, mask: np.ndarray = None) -> t_.Tuple[np.ndarray, np.ndarray]:
"""
Calculate the Fourier transform of each spectra. This can be used to get the distance (in terms of OPD) to
objects that are reflecting light.
Args:
useHannWindow: If True, apply a Hann window to the data before the FFT. This reduces spectral resolution but
improves dynamic range and reduces "frequency leakage".
indexOpdStop: This parameter is a holdover from the original MATLAB implementation. Truncates the 3rd axis
of the OPD array.
mask: A 2D boolean numpy array indicating which pixels should be processed.
Returns:
A tuple containing: `opd`: The 3D array of values, `opdIndex`: The sequence of OPD values associated with each
2D slice along the 3rd axis of the `opd` data.
"""
opd = _FFTHelper.getFFTMagnitude(self.data, useHannWindow, normalization=_FFTHelper.Normalization.POWER)
fftSize = opd.shape[-1] # Due to FFT interpolation the FFT will be longer than the original data.
# Isolate the desired values in the OPD.
opd = opd[:, :, :indexOpdStop]
if mask is not None: # Get the average opd within the ROI of the bool array `mask`
opd = opd[mask].mean(axis=0)
# Generate the opd values for the current OPD.
dk = self.wavenumbers[1] - self.wavenumbers[0] # The interval that our linear array of wavenumbers is spaced by. Units: radians / micron
maxFrequency = 1 / (dk * 2) # Max resolvable frequency is half of the sampling frequency. Units: microns / radian
maxOpd = 2 * np.pi * maxFrequency # Convert to units of microns. This is the maximum OPD value we can get with our sample interval. tighter wavenumber spacing increases OPD range. units of microns
opdVals = np.linspace(0, maxOpd, num=fftSize)
# Above is how the old MATLAB code calculated the frequencies. IMO the code below is simpler and more understandable but we'll stick with the old code.
# opdVals = 2 * np.pi * np.fft.rfftfreq(fftSize, dk) # Units: microns
opdVals = opdVals[:indexOpdStop]
opd = opd.astype(self.data.dtype) # Make sure to upscale precision
opdVals = opdVals.astype(self.data.dtype)
return opd, opdVals
[docs] def getRMSFromOPD(self, lowerOPD: float, upperOPD: float, useHannWindow: bool = False) -> np.ndarray:
"""
Use Parseval's Theorem to calculate our signal RMS from the OPD (magnitude of fourier transform). This allows us to calculate RMS using only contributions
from certain OPD ranges which ideally are correlated with a specific depth into the sample. In practice the large frequency leakage due to our limited
bandwidth of measurement causes this assumption to break down, but it can still be useful if taken with a grain of salt.
Args:
lowerOPD: RMS will be integrated starting at this lower limit of OPD. Note for a reflectance setup like PWS `sampleDepth = OPD / (2 * meanSampleRI)`
upperOPD: RMS will be integrated up to this upper OPD limit.
useHannWindow: If False then use no windowing on the FFT to calculate OPD. If True then use and Hann window.
Returns:
A 2d numpy array of the signal RMS at each XY location in the image.
"""
opd, opdIndex = self.getOpd(useHannWindow=useHannWindow)
startOpdIdx = np.argmin(np.abs(opdIndex - lowerOPD)) # The index associated with lowerOPD
stopOpdIdx = np.argmin(np.abs(opdIndex - upperOPD)) # The index associated with upperOPD
opdSquaredSum = np.sum(opd[:, :, startOpdIdx:stopOpdIdx+1] ** 2, axis=2) # Parseval's theorem tells us that this is equivalent to the sum of the squares of our original signal
opdSquaredSum *= len(self.wavenumbers) / opd.shape[2] # If the original data and opd were of the same length then the above line would be correct. Since the fft may have been upsampled. we need to normalize.
return np.sqrt(opdSquaredSum)
[docs] @staticmethod
def fromOpd(opd: np.ndarray, xVals: np.ndarray, useHannWindow: bool):
"""WARNING: This function is untested. it almost certainly doesn't work. Create a KCube from and opd in the form returned by KCube.getOpd. This is useful if you want to do spectral manipulation and then transform back."""
assert len(xVals.shape) == 1
fftSize = int(2 ** (np.ceil(np.log2((2 * len(xVals)) - 1)))) # %This is the next size of fft that is at least 2x greater than is needed but is a power of two. Results in interpolation, helps amplitude accuracy and fft efficiency.
if useHannWindow: w = np.hanning(len(xVals))
else: w = np.ones((len(xVals)))
sig = np.fft.irfft(opd * w[None, None, :], n=fftSize, axis=2)
#I don't think we need to normalize by the number of elements like we do in getOpd
# by multiplying by Hann window we reduce the total power of signal. To account for that,
sig = np.abs(sig / np.sqrt(np.mean(w ** 2)))
maxWavenumberInterval = 2*np.pi / (xVals[1] - xVals[0])
dWavenumber = maxWavenumberInterval / len(xVals)
waveNumbers = len(xVals) / 2 * np.array(range(fftSize // 2 + 1)) * dWavenumber / (fftSize // 2 + 1)
return sig, waveNumbers
[docs] def getAutoCorrelation(self, isAutocorrMinSub: bool, stopIndex: int) -> t_.Tuple[np.ndarray, np.ndarray]:
"""The autocorrelation of a signal is the covariance of a signal with a
lagged version of itself, normalized so that the covariance at
zero-lag is equal to 1.0 (c[0] = 1.0). The same process without
normalization is the autocovariance.
A fast method for determining the autocovariance of a signal with
itself is to utilize fast-fourier transforms. In this method, the
signal is converted to the frequency domain using fft. The
frequency-domain signal is then convolved with itself. The inverse
fft is performed on this self-convolution, yielding the
autocorrelation.
In this instance, the autocorrelation is determined for a series of
lags, Z. Z is equal to [-P+1:P-1], where P is the quantity of
measurements in each signal (the quantity of wavenumbers). Thus, the
quantity of lags is equal to (2*P)-1. The fft process is fastest
when performed on signals with a length equal to a power of 2. To
take advantage of this property, a Z-point fft is performed on the
signal, where Z is a number greater than (2*P)-1 that is also a power
of 2."""
fftSize = int(2 ** (np.ceil(np.log2((2 * len(
self.wavenumbers)) - 1)))) # This is the next size of fft that is at least 2x greater than is needed but is a power of two. Results in interpolation, helps amplitude accuracy and fft efficiency.
# Determine the fft for each signal. The length of each signal's fft
# will be fftSize.
cubeFft = np.fft.rfft(self.data, n=fftSize, axis=2)
# Determine the ifft of the cubeFft. The resulting ifft of each signal
# will be of length fftSize..
cubeAutocorr = np.fft.irfft(np.abs(cubeFft) ** 2, axis=2) # This is the autocovariance.
# Obtain only the lags desired.
# Then, normalize each autocovariance so the value at zero-lags is 1.
cubeAutocorr = cubeAutocorr[:, :, :len(self.wavenumbers)]
cubeAutocorr /= cubeAutocorr[:, :, 0, np.newaxis]
# In some instances, minimum subtraction is desired. In this case,
# determine the minimum of each signal and subtract that value from
# each value in the signal.
if isAutocorrMinSub:
cubeAutocorr -= cubeAutocorr.min()
# Convert the lags from units of indices to wavenumbers.
lags = np.array(self.wavenumbers) - min(self.wavenumbers)
# Square the lags. This is how it is in the paper. I'm not sure why though.
lagsSquared = lags ** 2
# Before taking the log of the autocorrelation, zero values must be
# modified to prevent outputs of "inf" or "-inf".
cubeAutocorr[cubeAutocorr == 0] = 1e-323
# Obtain the log of the autocorrelation.
cubeAutocorrLog = np.log(cubeAutocorr)
# A first-order polynomial fit is determined between lagsSquared and
# and cubeAutocorrLog. This fit is to be performed only on the first
# linear-portion of the lagsSquared vs. cubeAutocorrLog relationship.
# The index of the last point to be used is indicated by stopIndex.
lagsSquared = lagsSquared[:stopIndex]
cubeAutocorrLog = cubeAutocorrLog[:, :, :stopIndex]
cubeAutocorrLog = np.moveaxis(cubeAutocorrLog, 2, 0)
cubeAutocorrLog = cubeAutocorrLog.reshape(
(cubeAutocorrLog.shape[0], cubeAutocorrLog.shape[1] * cubeAutocorrLog.shape[2]))
# Determine the first-order polynomial fit for each cubeAutocorrLag.
V = np.stack([np.ones(lagsSquared.shape), lagsSquared])
V = V.T
M = np.matmul(V, np.linalg.pinv(V))
cubeLinear = np.matmul(M, cubeAutocorrLog)
cubeSlope = (cubeLinear[1, :] - cubeLinear[0, :]) / (lagsSquared[1] - lagsSquared[0])
cubeSlope = cubeSlope.reshape(self.data.shape[0], self.data.shape[1])
# -- Coefficient of Determination
# Obtain the mean of the observed data
meanObserved = cubeAutocorrLog.mean(axis=0)
# Obtain the regression sum of squares.
ssReg = ((cubeLinear - meanObserved) ** 2).sum(axis=0)
# Obtain the residual sum of squares.
ssErr = ((cubeAutocorrLog - cubeLinear) ** 2).sum(axis=0)
# Obtain the total sume of squares.
ssTot = ssReg + ssErr
# Obtain rSquared.
rSquared = ssReg / ssTot
rSquared = rSquared.reshape(self.data.shape[0], self.data.shape[1])
cubeSlope = cubeSlope.astype(self.data.dtype)#Make sure to to upscale precision
rSquared = rSquared.astype(self.data.dtype)
return cubeSlope, rSquared
[docs] @classmethod
def fromHdfDataset(cls, dataset: h5py.Dataset):
"""
Load the KCube object from an `h5py.Dataset` in an HDF5 file
Args:
dataset: The `h5py.Dataset` that the KCube data is stored in.
Returns:
KCube: A new instance of this class."""
arr, index = cls.decodeHdf(dataset)
return cls(arr, index)
def __add__(self, other):
ret = self._add(other)
return KCube(ret, self.wavenumbers, metadata=self.metadata)
def __sub__(self, other):
ret = self._sub(other)
return KCube(ret, self.wavenumbers, metadata=self.metadata)
def __mul__(self, other):
ret = self._mul(other)
return KCube(ret, self.wavenumbers, metadata=self.metadata)
def __truediv__(self, other):
ret = self._truediv(other)
return KCube(ret, self.wavenumbers, metadata=self.metadata)
[docs]class FluorescenceImage:
"""
Represents a fluorescence image taken by the PWS acquisition software.
Args:
data: A 2D array of image data.
md: The metadata object associated with this image.
"""
def __init__(self, data: np.ndarray, md: pwsdtmd.FluorMetaData):
self.data = data
self.metadata = md
[docs] @classmethod
def fromTiff(cls, directory: str, acquisitionDirectory: t_.Optional[pwsdtmd.Acquisition] = None) -> FluorescenceImage:
"""
Load an image from a TIFF file.
Args:
directory: The path to the folder containing the TIFF file.
acquisitionDirectory: The `Acquisition` object associated with this acquisition.
Returns:
A new instanse of `FluorescenceImage`.
"""
md = pwsdtmd.FluorMetaData.fromTiff(directory, acquisitionDirectory) #This will raise an error if the folder isn't valid
return cls.fromMetadata(md)
[docs] def toTiff(self, directory: str):
"""
Save this object to a TIFF file.
Args:
directory: The path to the folder to save the new file to.
"""
with open(os.path.join(directory, pwsdtmd.FluorMetaData.FILENAME), 'wb') as f:
tf.imsave(f, self.data)
with open(os.path.join(directory, pwsdtmd.FluorMetaData.MDPATH), 'w') as f:
json.dump(self.metadata, f)
class _FFTHelper:
class Normalization(Enum):
POWER = 1
AMPLITUDE = 2
@staticmethod
def getFFTMagnitude(data: np.ndarray, useHannWindow: bool = False, normalization: Normalization = Normalization.POWER):
"""
Apply windowing, calculate FFT and normalize FFT for the last axis of a real-valued numpy array.
Args:
data: A numpy array. The FFT will be calculated along the last axis of the array. The values of this array must be real.
useHannWindow: If True then a Hann window will be applied to the data before the FFT and the proper normalization will be applied.
normalization: When windowing is used the FFT must be normalized using one of two normalizations. "Amplitude" normalization will maintain
the peak height of detected frequencies but will not preserve the total area under the curve which is asssociated with the energy/power of the signal.
"Power" normalization will preserve the total area under the curve (important when calculating RMS from an OPD signal) but the amplitudes of detected
frequencies will generally decrease due to the widened bandwidth associated with windowing.
Returns:
A numpy array with the same number of dimensions as `data`. The last axis of the array will be the magnitude of the FFT of the along the last
axis of the `data` array. Note that the length of the last axis will be longer than the last axis of the `data` array due to FFT interpolation.
"""
dataLength = data.shape[-1]
fftSize = int(2 ** (np.ceil(np.log2((2 * dataLength) - 1)))) # This is the next size of fft that is at least 2x greater than is needed but is a power of two. Results in interpolation, helps amplitude accuracy and fft efficiency.
fftSize *= 2 # We double the fftsize for even more iterpolation. Not sure why, but that's how it was done in the original matlab code.
if useHannWindow: # if hann window checkbox is selected, create hann window
w = np.hanning(dataLength) # Hanning window
else:
w = np.ones((dataLength)) # Create unity window
# Calculate the Fourier Transform of the signal multiplied by Hann window
fft = np.fft.rfft(data * w, n=fftSize, axis=data.ndim-1)
fft = np.abs(fft) # We're only interested in the magnitude.
# Normalize the FFT by the quantity of wavelengths.
fft /= dataLength
# by multiplying by Hann window we reduce the total power and amplitude of the signal. To account for that,
if normalization is _FFTHelper.Normalization.POWER:
fft *= np.sqrt(len(w) / np.sum(w ** 2)) # Correct the signal so the true power (area under curve) is preserved. This will prevent windowing from affecting integration of RMS but the amplitude of each frequency will be reduced https://dsp.stackexchange.com/questions/47598/does-windowing-affect-parsevals-theorem
elif normalization is _FFTHelper.Normalization.AMPLITUDE:
fft *= len(w) / np.sum(w) # Correct amplitude scaling error caused by windowing. Does not preserve energy of signal though. https://www.mathworks.com/matlabcentral/answers/372516-calculate-windowing-correction-factor
else:
raise ValueError(f"{normalization} is not a valid normalization.")
return fft