Source code for pwspy.analysis.dynamics

# Copyright 2018-2020 Nick Anthony, Backman Biophotonics Lab, Northwestern University
#
# This file is part of PWSpy.
#
# PWSpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PWSpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PWSpy.  If not, see <https://www.gnu.org/licenses/>.

"""
Classes used in the analysis of Dynamics data.

Classes
---------

.. autosummary::
    :toctree: generated/

    DynamicsAnalysisSettings
    DynamicsAnalysisResults
    DynamicsAnalysis

Inheritance
-------------
.. inheritance-diagram:: DynamicsAnalysisSettings DynamicsAnalysisResults DynamicsAnalysis
    :parts: 1

"""

from __future__ import annotations
import dataclasses
import logging
from datetime import datetime

import numpy as np
import pandas as pd
from numpy import ma
import multiprocessing as mp
import typing as t_
from . import AbstractAnalysis, warnings, AbstractAnalysisSettings, AbstractHDFAnalysisResults
from pwspy import dateTimeFormat
import pwspy.dataTypes as pwsdt
from pwspy.utility.reflection import reflectanceHelper, Material

__all__ = ['DynamicsAnalysis', 'DynamicsAnalysisSettings', 'DynamicsAnalysisResults']


[docs]class DynamicsAnalysis(AbstractAnalysis): """This class performs the analysis of RMS_t_squared and D (diffusion). It is based on a set of MATLAB scripts written by Scott Gladstein. The original scripts can be found in the `_oldMatlab` subpackage. References: "Multimodal interferometric imaging of nanoscale structure and macromolecular motion uncovers UV induced cellular paroxysm" Args: settings: The settings use for the analysis extraReflectance: the metadata object referring to a calibration file for extra reflectance. You can optionally proide the ExtraReflectanceCube rather than just the metadata object referring to it. ref: A reference acquisition to use for normalization. """ n_medium = 1.37 # The average index of refraction for chromatin? def __init__(self, settings: DynamicsAnalysisSettings, extraReflectance: t_.Optional[t_.Union[pwsdt.ERMetaData, pwsdt.ExtraReflectanceCube]], ref: pwsdt.DynCube): super().__init__() logger = logging.getLogger(__name__) if not ref.processingStatus.cameraCorrected: ref.correctCameraEffects(settings.cameraCorrection) if not ref.processingStatus.normalizedByExposure: ref.normalizeByExposure() if ref.metadata.pixelSizeUm is not None: # Only works if pixel size was saved in the metadata. ref.filterDust(.75) # Apply a blur to filter out dust particles. This is in microns. I'm not sure if this is the optimal value. if settings.referenceMaterial is None: theoryR = pd.Series(np.ones((len(ref.times),)), index=ref.times) # Having this as all ones effectively ignores it. logger.warning("DynamicsAnalysis ignoring reference material correction") else: theoryR = reflectanceHelper.getReflectance(settings.referenceMaterial, Material.Glass, wavelengths=ref.metadata.wavelength, NA=settings.numericalAperture) if isinstance(extraReflectance, pwsdt.ERMetaData): # In the case the extraReflectance is an ExtraReflectanceCube or `None` no action needs to take place. extraReflectance = pwsdt.ExtraReflectanceCube.fromMetadata(extraReflectance) if extraReflectance is None: Iextra = None logger.warning("DynamicsAnalysis ignoring extra reflection") else: if extraReflectance.metadata.numericalAperture != settings.numericalAperture: logger.warning("The numerical aperture of your analysis does not match the NA of the Extra Reflectance Calibration. Calibration File NA: {extraReflectance.metadata.numericalAperture}. PWSAnalysis NA: {settings.numericalAperture}.") idx = np.asarray(np.array(extraReflectance.wavelengths) == ref.metadata.wavelength).nonzero()[0][0] # The index of extra reflectance that matches the wavelength of our dynamics cube I0 = ref.data.mean(axis=2) / (float(theoryR) + extraReflectance.data[:, :, idx]) # I0 is the intensity of the illumination source, reconstructed in units of `counts`. this is an inversion of our assumption that reference = I0*(referenceReflectance + extraReflectance) Iextra = I0 * extraReflectance.data[:, :, idx] # Convert from reflectance to predicted counts/ms. ref.subtractExtraReflection(Iextra) # remove the extra reflection from our data# if not settings.relativeUnits: ref = ref / theoryR[None, None, :] # now when we normalize by our reference we will get a result in units of physical reflectance rather than arbitrary units. self.refMean = ref.data.mean(axis=2) ref.normalizeByReference(self.refMean) # We normalize so that the average is 1. This is for scaling purposes with the AC. Seems like the AC should be scale independent though, not sure. self.refAc = ref.getAutocorrelation()[:, :, :settings.diffusionRegressionLength+1].mean(axis=(0, 1)) # We find the average autocorrlation of the background to cut down on noise, presumably this is uniform accross the field of view any way, right? self.refTag = ref.metadata.idTag self.erTag = extraReflectance.metadata.idTag if extraReflectance is not None else None self.settings = settings self.extraReflection = Iextra
[docs] def run(self, cube: pwsdt.DynCube) -> t_.Tuple[DynamicsAnalysisResults, t_.List[warnings.AnalysisWarning]]: # Inherit docstring warns = [] if not cube.processingStatus.cameraCorrected: cube.correctCameraEffects(self.settings.cameraCorrection) if not cube.processingStatus.normalizedByExposure: cube.normalizeByExposure() if self.extraReflection is not None: cube.subtractExtraReflection(self.extraReflection) cube.normalizeByReference(self.refMean) cubeAc = cube.getAutocorrelation() cubeAc = cubeAc[:, :, :self.settings.diffusionRegressionLength+1] # We are only going to use the first few time points of the ACF, we can get rid of the rest. rms_t_squared = cubeAc[:, :, 0] - self.refAc[0] # The rms^2 noise of the reference averaged over the whole image. rms_t_squared[rms_t_squared < 0] = 0 # Sometimes the above noise subtraction can cause some of our values to be barely below 0, that's going to be a problem. # If we didn't care about noise subtraction we could get rms_t as just `cube.data.std(axis=2)` # Determine the mean-reflectance for each pixel in the cell. reflectance = cube.data.mean(axis=2) # Diffusion cubeAc = ma.array(cubeAc) # Convert to the numpy.MaskedArray type to help us mark some data as invalid. cubeAc[cubeAc.data[:, :, 0] < np.sqrt(2)*self.refAc[0]] = ma.masked # Remove pixels with low SNR. Default threshold removes values where 1st point of acf is less than sqrt(2) of background acf ac = ma.array(cubeAc - self.refAc) # Background subtracted autocorrelation function. ac = ac / ac[:, :, 0][:, :, None] # Normalize by the zero-lag value ac[np.any(ac <= 0, axis=2)] = ma.masked # Before taking the log of the autocorrelation any negative or zero values will cause problems. Remove the pixel entirely dt = (cube.times[-1] - cube.times[0]) / (len(cube.times) - 1) / 1e3 # Convert to seconds k = (self.n_medium * 2 * np.pi) / (cube.metadata.wavelength / 1e3) # expressing wavelength in microns to match up with old matlab code. val = np.log(ac) / (4 * k ** 2) # See the `theory` section of the paper for an explanation of the 4k^2. The slope of log(ac) should be equivalent to 1/t_c in the paper. d_slope = -self._maskedLinearRegression(val, dt) # Get the slope of the autocorrelation. This is related to the diffusion in the cell. The minus is here to make the number positive, the slope is really negative. results = DynamicsAnalysisResults.create(meanReflectance=reflectance, rms_t_squared=rms_t_squared, reflectance=cube, diffusion=d_slope, settings=self.settings, imCubeIdTag=cube.metadata.idTag, referenceIdTag=self.refTag, extraReflectionIdTag=self.erTag) return results, warns
@staticmethod def _maskedLinearRegression(arr: ma.MaskedArray, dt: float) -> np.ndarray: """ Takes a 3d ACF array as input and returns a 2d array indicating the slope along the 3rd dimension of the input array. The dimensions of the output array match the first two dimensions of the input array. The input array can have invalid pixels masked out, this function will exclude them from the calculation. Args: arr: The masked 3D array of the autocorrelation function of each spectra. dt: The time interval between each element of the autocorrelation function. Returns: The 2D array containing the slope of each ACF at each pixel. """ origShape = arr.shape y = np.reshape(arr, (origShape[0]*origShape[1], origShape[2])) #Convert to a 2d array [pixels, time]. This is required by the polyfit function. t = np.array([i*dt for i in range(origShape[2])]) # Generate a 1d array representing the time axis. # Y = np.reshape(y[~y.mask], (np.sum((~y.mask).sum()//y.shape[1], y.shape[1]))) # Remove all the masked pixels since the polyfit function doesn't know what to do with them. Y = np.reshape(y[~y.mask], ((~y.mask).sum()//y.shape[1], y.shape[1])) # Remove all the masked pixels since the polyfit function doesn't know what to do with them. coeff = np.polyfit(t, Y.T, deg=1) # Linear fit slope = coeff[0, :] # Get the slope and ignore the y intercept Slope = ma.zeros(y.shape[0]) # Create an empty masked array that includes all pixels again Slope.mask = y.mask[:, 0] # Copy the original mask indicating which pixels are invalid. Slope[~Slope.mask] = slope # Fill our calculated slopes into the yunmasked pixels. Slope = np.reshape(Slope, (origShape[0], origShape[1])) # Reshape back to a 2d image return Slope
[docs] def copySharedDataToSharedMemory(self): # Inherit docstring refdata = mp.RawArray('f', self.refAc.size) refdata = np.frombuffer(refdata, dtype=np.float32).reshape(self.refAc.shape) np.copyto(refdata, self.refAc) self.refAc = refdata refmdata = mp.RawArray('f', self.refMean.size) refmdata = np.frombuffer(refmdata, dtype=np.float32).reshape(self.refMean.shape) np.copyto(refmdata, self.refMean) self.refMean = refmdata if self.extraReflection is not None: iedata = mp.RawArray('f', self.extraReflection.size) iedata = np.frombuffer(iedata, dtype=np.float32).reshape(self.extraReflection.shape) np.copyto(iedata, self.extraReflection) self.extraReflection = iedata
[docs]class DynamicsAnalysisResults(AbstractHDFAnalysisResults): # Inherit docstring.
[docs] @staticmethod def fields(): # Inherit docstring. return ('meanReflectance', 'reflectance', 'rms_t_squared', 'diffusion', 'time', 'settings', 'imCubeIdTag', 'referenceIdTag', 'extraReflectionIdTag')
[docs] @staticmethod def name2FileName(name: str) -> str: # Inherit docstring. return f'dynAnalysisResults_{name}.h5'
[docs] @staticmethod def fileName2Name(fileName: str) -> str: # Inherit docstring. return fileName.split('dynAnalysisResults_')[1][:-3]
[docs] @classmethod def create(cls, settings: DynamicsAnalysisSettings, meanReflectance: np.ndarray, rms_t_squared: np.ndarray, reflectance: pwsdt.DynCube, diffusion: np.ndarray, imCubeIdTag: str, referenceIdTag: str, extraReflectionIdTag: t_.Optional[str]): # Inherit docstring. d = {'time': datetime.now().strftime(dateTimeFormat), 'meanReflectance': meanReflectance, 'reflectance': reflectance, 'diffusion': diffusion, 'rms_t_squared': rms_t_squared, 'imCubeIdTag': imCubeIdTag, 'referenceIdTag': referenceIdTag, 'extraReflectionIdTag': extraReflectionIdTag, 'settings': settings} return cls(None, d)
@AbstractHDFAnalysisResults.FieldDecorator def meanReflectance(self) -> np.ndarray: """A 2D array giving the reflectance of the image averaged over the full spectra.""" dset = self.file['meanReflectance'] return np.array(dset) @AbstractHDFAnalysisResults.FieldDecorator def rms_t_squared(self) -> np.ndarray: """A 2D array giving the spectral variance at each position in the image.""" dset = self.file['rms_t_squared'] return np.array(dset) @AbstractHDFAnalysisResults.FieldDecorator def settings(self) -> DynamicsAnalysisSettings: """The settings used to generate these results.""" return DynamicsAnalysisSettings.fromJsonString(bytes(np.array(self.file['settings'])).decode()) @AbstractHDFAnalysisResults.FieldDecorator def reflectance(self) -> pwsdt.DynCube: """A dynamics cube containing the 3D reflectance array after all corrections and analysis.""" dset = self.file['reflectance'] return pwsdt.DynCube.fromHdfDataset(dset) @AbstractHDFAnalysisResults.FieldDecorator def diffusion(self) -> np.ndarray: """A 2D array indicating the diffusion at each position in the image.""" dset = self.file['diffusion'] return np.array(dset) @AbstractHDFAnalysisResults.FieldDecorator def imCubeIdTag(self) -> str: """The idtag of the dynamics cube that was analyzed.""" return bytes(np.array(self.file['imCubeIdTag'])).decode() @AbstractHDFAnalysisResults.FieldDecorator def referenceIdTag(self) -> str: """The idtag of the dynamics cube that was used as a reference for normalization.""" return bytes(np.array(self.file['referenceIdTag'])).decode() @AbstractHDFAnalysisResults.FieldDecorator def time(self) -> str: """The time that the analysis was performed.""" return self.file['time'] @AbstractHDFAnalysisResults.FieldDecorator def extraReflectionIdTag(self) -> str: """The idtag of the extra reflection correction that was used.""" return bytes(np.array(self.file['extraReflectionIdTag'])).decode()
[docs]@dataclasses.dataclass class DynamicsAnalysisSettings(AbstractAnalysisSettings): """These settings determine the behavior of the `DynamicsAnalysis` class. Args: extraReflectanceId: The unique `IDTag` of the extraReflectance calibration that was used on this analysis. referenceMaterial: The material that was imaged in the reference image of this analysis. Found as an in pwspy.moduleConst.Material. The theoretically predicted reflectance of the reference image is used in the extraReflectance correction. numericalAperture: The illumination NA of the system. This is used for two purposes. First, we want to make sure that the NA of our data matches the NA of our extra reflectance correction cube. Second, the theoretically predicted reflectance of our reference is based not only on what our refereMaterial is but also the NA since reflectance is angle dependent. relativeUnits: If `True` then all calculation are performed such that the reflectance is 1 if it matches the reference. If `False` then we use the theoretical reflectance of the reference (based on NA and reference material) to normalize our results to the actual physical reflectance of the sample (about 0.4% for water) cameraCorrection: An object describing the dark counts and non-linearity of the camera used. If the data supplied to the DynamicsAnalysis class has already been corrected then this setting will not be used. Setting this to `None` will result in the camera correcting being automatically determined based on the image files' metadata. diffusionRegressionLength: The original matlab scripts for analysis of dynamics data determined the slope of the log(ACF) by looking only at the first two indices, (log(ACF)[1]-log(ACF)[0])/dt. This results in very noisy results. However as you at higher index value of the log(ACF) the noise becomes much worse. A middle ground is to perform linear regression on the first 4 indices to determine the slope. You can adjust that number here. """ extraReflectanceId: t_.Optional[str] referenceMaterial: Material numericalAperture: float relativeUnits: bool cameraCorrection: t_.Optional[pwsdt.CameraCorrection] diffusionRegressionLength: int = 3 FileSuffix = "dynAnalysis" # This is used for setting the filename when saving and loading to json def __post_init__(self): assert self.diffusionRegressionLength > 0 assert self.diffusionRegressionLength < 20 # Even 20 is probably way too long, unless a system is created with extremely low noise.
[docs] def asDict(self) -> dict: d = dataclasses.asdict(self) if self.referenceMaterial is None: d['referenceMaterial'] = None else: d['referenceMaterial'] = self.referenceMaterial.name # Convert from enum to string return d
@classmethod def _fromDict(cls, d: dict) -> DynamicsAnalysisSettings: if d['referenceMaterial'] is not None: d['referenceMaterial'] = Material[d['referenceMaterial']] # Convert from string to enum for newKey in ['cameraCorrection']: if newKey not in d.keys(): d[newKey] = None # For a while these settings were missing from the code. Allow us to still load old files. return cls(**d)