Source code for pwspy.dataTypes._metadata

# Copyright 2018-2020 Nick Anthony, Backman Biophotonics Lab, Northwestern University
#
# This file is part of PWSpy.
#
# PWSpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PWSpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PWSpy.  If not, see <https://www.gnu.org/licenses/>.

from __future__ import annotations
import json
import logging
import multiprocessing as mp
import os
import pathlib
import subprocess
import sys
import typing as t_
import abc
import warnings
from datetime import datetime
import enum
import h5py
import jsonschema
import numpy as np
import tifffile as tf
from scipy import io as spio
from pwspy.dataTypes import _jsonSchemasPath
from pwspy.dataTypes._other import CameraCorrection, Roi, RoiFile
import pwspy.dataTypes._data as pwsdtd
from pwspy import dateTimeFormat
from pwspy.utility.misc import cached_property
if t_.TYPE_CHECKING:
    from pwspy.analysis import AbstractHDFAnalysisResults


class MetaDataBase(abc.ABC):
    """
    This base class provides that basic functionality to store information about a PWS related acquisition on file.

    Args:
        metadata: A dictionary containing the metadata
        filePath: The path to the location the metadata was loaded from
        acquisitionDirectory: A reference to the `Acquisition` associated with this object.

    """

    @property
    @abc.abstractmethod
    def _jsonSchemaPath(self) -> str:
        """Each sublass should provide a path to the jsonschema file. this is needed in order to resolve jsonschema references"""
        pass

    @property
    @abc.abstractmethod
    def _jsonSchema(self) -> dict:
        """Each subclass should provide a json schema loaded from a file.
        This serves as a schematic that can be checked against when loading metadata to make sure it contains the required information."""
        pass

    def __init__(self, metadata: dict, filePath: t_.Optional[str] = None, acquisitionDirectory: t_.Optional[Acquisition] = None):
        logger = logging.getLogger(__name__)
        self.filePath = filePath
        self.acquisitionDirectory = acquisitionDirectory
        refResolver = jsonschema.RefResolver(pathlib.Path(self._jsonSchemaPath).as_uri(), None)  # This resolver is used to allow derived json schemas to refer to the base schema.
        jsonschema.validate(instance=metadata, schema=self._jsonSchema, resolver=refResolver, cls=_TupleValidator)
        self.dict: dict = metadata
        try:
            datetime.strptime(self.dict['time'], dateTimeFormat)
        except ValueError:
            try:
                logger.info("Detected a non-compliant timestamp. attempting to correct.")
                self.dict['time'] = datetime.strftime(datetime.strptime(self.dict['time'], "%d-%m-%y %H:%M:%S"), dateTimeFormat)
            except ValueError:
                logger.warning("The time stamp could not be parsed. Replacing with 1_1_1970")
                self.dict['time'] = "1-1-1990 01:01:01"
        if self.dict['system'] == "":
            logger.warning("The `system` name in the metadata is blank. Check that the PWS System is saving the proper calibration values.")
        if all([i in self.dict for i in ['darkCounts', 'linearityPoly']]):
            if self.dict['darkCounts'] == 0:
                logger.warning("Detected a darkCounts value of 0 in the pwsdtd.PwsCube Metadata. Check that the PWS System is saving the proper calibration values.")
            self.cameraCorrection = CameraCorrection(darkCounts=self.dict['darkCounts'],
                                                     linearityPolynomial=self.dict['linearityPoly'])
        else:
            self.cameraCorrection = None

    @abc.abstractmethod
    def toDataClass(self, lock: t_.Optional[mp.Lock]) -> pwsdtd.ICBase:
        """Convert the metadata class to a class that loads the data

        Args:
            lock: A `Lock` object used to synchronize IO in multithreaded and multiprocessing applications.
        """
        pass

    @property
    @abc.abstractmethod
    def idTag(self) -> str:
        """A string that uniquely identifies this data."""
        pass

    @property
    def binning(self) -> int:
        """
        The binning setting used by the camera. This is needed in order to properly correct dark counts.
        This is generally extracted from metadata saved by Micromanager
        """
        return self.dict['binning']

    @property
    def pixelSizeUm(self) -> float:
        """
        The pixelSize expressed in microns. This represents the length of each square pixel in object space. Binning
        has already been accounted for here. This is generally extracted from metadata saved my MicroManager
        """
        return self.dict['pixelSizeUm']

    @property
    def exposure(self) -> float:
        """The exposure time of the camera expressed in milliseconds."""
        return self.dict['exposure']

    @property
    def time(self) -> str:
        """The date and time that the acquisition was taken."""
        return self.dict['time']

    @property
    def systemName(self) -> str:
        """The name of the system this was acquired on. The name is set in the `PWS Acquisition Plugin` for Micromanager."""
        return self.dict['system']

    @staticmethod
    def decodeHdfMetadata(d: h5py.Dataset) -> dict:
        """Attempt to extract a dictionary of metadata from an HDF5 dataset.

        Args:
            d: The `h5py.Dataset` to load from.
        Returns:
            A dictionary containing the metadata
        """
        assert 'metadata' in d.attrs
        return json.loads(d.attrs['metadata'])

    def encodeHdfMetadata(self, d: h5py.Dataset) -> h5py.Dataset:
        """Save this metadata object as a json string in an HDF5 dataset.

        Args:
            d: The `h5py.Dataset` to save the metadata to.
        """
        d.attrs['metadata'] = np.string_(json.dumps(self.dict))
        return d


class AnalysisManager(abc.ABC):
    """Handles the functionality to save, load, etc. analysis files.

    Args:
        metadata: A dictionary containing the metadata
        filePath: The path to the location the metadata was loaded from
        acquisitionDirectory: A reference to the `Acquisition` associated with this object.

    """
    def __init__(self, filePath: str):
        self.__filePath = filePath

    @staticmethod
    @abc.abstractmethod
    def getAnalysisResultsClass() -> t_.Type[AbstractHDFAnalysisResults]:
        """

        Returns:
            The class that is used to contain analysis results for this acquisition type.
        """
        pass

    def getAnalyses(self) -> t_.List[str]:
        """

        Returns:
            A list of the names of analyses that were found.
        """
        assert self.__filePath is not None
        return self.getAnalysesAtPath(self.__filePath)

    @classmethod
    def getAnalysesAtPath(cls, path: str) -> t_.List[str]:
        """

        Args:
            path: The path to search for analysis files.

        Returns:
            A list of the names of analyses that were found.
        """
        anPath = os.path.join(path, 'analyses')
        if os.path.exists(anPath):
            files = os.listdir(os.path.join(path, 'analyses'))
            return [cls.getAnalysisResultsClass().fileName2Name(f) for f in files]
        else:
            return []

    def saveAnalysis(self, analysis: AbstractHDFAnalysisResults, name: str, overwrite: bool = False):
        """

        Args:
            analysis: An AnalysisResults object to be saved.
            name: The name to save the analysis as
            overwrite: If `True` then any existing file of the same name will be replaced. If `False` an exception will be raised.
        """
        path = os.path.join(self.__filePath, 'analyses')
        if not os.path.exists(path):
            os.mkdir(path)
        analysis.toHDF(path, name, overwrite=overwrite)

    def loadAnalysis(self, name: str) -> AbstractHDFAnalysisResults:
        """

        Args:
            name: The name of the analysis to load.

        Returns:
            A new instance of an AnalysisResults object.
        """
        return self.getAnalysisResultsClass().load(os.path.join(self.__filePath, 'analyses'), name)

    def removeAnalysis(self, name: str):
        """

        Args:
            name: The name of the analysis to be deleted
        """
        os.remove(os.path.join(self.__filePath, 'analyses', self.getAnalysisResultsClass().name2FileName(name)))


[docs]class DynMetaData(MetaDataBase, AnalysisManager): """A class that represents the metadata of a Dynamics acquisition."""
[docs] class FileFormats(enum.Enum): """An enumerator identifying the types of file formats that this class can be loaded from.""" Tiff = enum.auto() RawBinary = enum.auto() Hdf = enum.auto()
[docs] @staticmethod def getAnalysisResultsClass() -> t_.Type[AbstractHDFAnalysisResults]: from pwspy.analysis.dynamics import DynamicsAnalysisResults return DynamicsAnalysisResults
_jsonSchemaPath = os.path.join(_jsonSchemasPath, 'DynMetaData.json') with open(_jsonSchemaPath) as f: _jsonSchema = json.load(f) def __init__(self, metadata: dict, filePath: t_.Optional[str] = None, fileFormat: t_.Optional[FileFormats] = None, acquisitionDirectory: t_.Optional[Acquisition] = None): self.fileFormat = fileFormat MetaDataBase.__init__(self, metadata, filePath, acquisitionDirectory=acquisitionDirectory) AnalysisManager.__init__(self, filePath)
[docs] def toDataClass(self, lock: mp.Lock = None) -> pwsdtd.DynCube: """ Args: lock (mp.Lock): A multiprocessing `lock` that can prevent help us synchronize demands on the hard drive when loading many files in parallel. Probably not needed. Returns: pwsdtmd.DynCube: The data object associated with this metadata object. """ # from pwspy.dataTypes.data import DynCube return pwsdtd.DynCube.fromMetadata(self, lock)
@property def idTag(self) -> str: """ Returns: str: A unique string identifying this acquisition. """ return f"DynCube_{self.dict['system']}_{self.dict['time']}" @property def wavelength(self) -> int: """The wavelength that this acquisition was acquired at.""" return self.dict['wavelength'] @property def times(self) -> t_.Tuple[float, ...]: """A sequence indicatin the time associated with each 2D slice of the 3rd axis of the `data` array""" return self.dict['times']
[docs] @classmethod def fromOldPWS(cls, directory: str, lock: mp.Lock = None, acquisitionDirectory: t_.Optional[Acquisition] = None) -> DynMetaData: """Loads old dynamics cubes which were saved the same as old pws cubes. a raw binary file with some metadata saved in random .mat files. Does not support automatic detection of binning, pixel size, camera dark counts, system name. Args: directory: The path to the folder containing the data files load the metadata from. Returns: A new instance of `DynMetaData`. """ if lock is not None: lock.acquire() try: #While the info2 file exists for old dynamics acquisitions, it is just garbage char. info3 = list(spio.loadmat(os.path.join(directory, 'info3.mat'))['info3'].squeeze()) wv = list(spio.loadmat(os.path.join(directory, 'WV.mat'))['WV'].squeeze()) wv = [int(i) for i in wv] # We will have issues saving later if these are numpy int types. assert all([i == wv[0] for i in wv]), "The wavelengths of the dynamics cube are not all identical." md = { #RequiredMetadata 'exposure': info3[1], 'time': '{:d}-{:d}-{:d} {:d}:{:d}:{:d}'.format( *[int(i) for i in [info3[8], info3[7], info3[6], info3[9], info3[10], info3[11]]]), 'system': str(info3[0]), 'binning': None, 'pixelSizeUm': None, 'wavelength': wv[0], 'times': [i*info3[1] for i in range(len(wv))], # We don't have any record of the times so we just have to assume it matches exactly with the exposure time, this is in milliseconds. #Extra metadata 'systemId': info3[0], 'imgHeight': int(info3[2]), 'imgWidth': int(info3[3]), 'wavelengths': wv } finally: if lock is not None: lock.release() return cls(md, filePath=directory, fileFormat=DynMetaData.FileFormats.RawBinary, acquisitionDirectory=acquisitionDirectory)
[docs] @classmethod def fromTiff(cls, directory, lock: mp.Lock = None, acquisitionDirectory: t_.Optional[Acquisition] = None) -> DynMetaData: """ Args: directory: The path to the folder containing the data files load the metadata from. Returns: A new instance of `DynMetaData` loaded from file. """ if lock is not None: lock.acquire() try: if os.path.exists(os.path.join(directory, 'dyn.tif')): path = os.path.join(directory, 'dyn.tif') else: raise OSError("No Tiff file was found at:", directory) if os.path.exists(os.path.join(directory, 'dynmetadata.json')): metadata = json.load(open(os.path.join(directory, 'dynmetadata.json'), 'r')) else: with tf.TiffFile(path) as tif: metadata = json.loads(tif.imagej_metadata['Info']) # The micromanager plugin saves metadata as the info property of the imagej imageplus object. finally: if lock is not None: lock.release() metadata['binning'] = metadata['MicroManagerMetadata']['Binning']['scalar'] # Get binning from the micromanager metadata metadata['pixelSizeUm'] = metadata['MicroManagerMetadata']['PixelSizeUm']['scalar'] # Get the pixel size from the micromanager metadata if metadata['pixelSizeUm'] == 0: metadata['pixelSizeUm'] = None return cls(metadata, filePath=directory, fileFormat=cls.FileFormats.Tiff, acquisitionDirectory=acquisitionDirectory)
[docs] def getThumbnail(self) -> np.ndarray: """Return the image used for quick viewing of the acquisition. Has no numeric significance.""" with tf.TiffFile(os.path.join(self.filePath, 'image_bd.tif')) as f: return f.asarray()
[docs]class ERMetaData: """A class representing the extra information related to an ExtraReflectanceCube file. This can be useful as a object to keep track of a ExtraReflectanceCube without having to have the data from the file loaded into memory. Args: inheritedMetadata: The metadata dictionary will often just be inherited information from one of the `PwsCubes` that was used to create this ER Cube. While this data can be useful it should be taken with a grain of salt. E.G. the metadata will contain an `exposure` field. In reality this ER Cube will have been created from pwsdtd.PwsCubes at a variety of exposures. numericalAperture: The numerical aperture that the PwsCubes used to generate this Extra reflection cube were imaged at. filePath: The path to the file that this object is stored in. """ _jsonSchema = {"$schema": "http://json-schema.org/schema#", '$id': 'extraReflectionMetadataSchema', 'title': 'extraReflectionMetadataSchema', 'required': ['system', 'time', 'wavelengths', 'pixelSizeUm', 'binning', 'numericalAperture'], 'type': 'object', 'properties': { 'system': {'type': 'string'}, 'time': {'type': 'string'}, 'wavelengths': {'type': 'array', 'items': {'type': 'number'} }, 'pixelSizeUm': {'type': ['number', 'null']}, 'binning': {'type': ['integer', 'null']}, 'numericalAperture': {'type': ['number']} } } FILESUFFIX = '_eReflectance.h5' DATASETTAG = 'extraReflection' _MDTAG = 'metadata' def __init__(self, inheritedMetadata: dict, numericalAperture: float, filePath: str = None): self.inheritedMetadata = inheritedMetadata self.inheritedMetadata['numericalAperture'] = numericalAperture jsonschema.validate(instance=inheritedMetadata, schema=self._jsonSchema, cls=_TupleValidator) self.filePath = filePath @property def idTag(self) -> str: """A unique tag to identify this acquisition by.""" return f"ExtraReflection_{self.inheritedMetadata['system']}_{self.inheritedMetadata['time']}" @property def numericalAperture(self) -> float: """The numerical aperture that this cube was imaged at.""" return self.inheritedMetadata['numericalAperture'] @property def systemName(self) -> str: """The name of the system that this image was acquired on.""" return self.inheritedMetadata['system']
[docs] @classmethod def validPath(cls, path: str) -> t_.Tuple[bool, t_.Union[str, bytes], t_.Union[str, bytes]]: """ Args: path: The file path to the file to search for valid ExtraReflectance files. Returns: A tuple containing: validPath: True if the path is valid, directory: The directory the file is in, name: The name that the object was saved as. """ if cls.FILESUFFIX in path: directory, name = cls.directory2dirName(path) with h5py.File(os.path.join(directory, f'{name}{cls.FILESUFFIX}'), 'r') as hf: valid = cls._MDTAG in hf[cls.DATASETTAG].attrs return valid, directory, name else: return False, '', ''
[docs] @classmethod def fromHdfFile(cls, directory: str, name: str) -> ERMetaData: """ Args: directory: The directory the file is saved in. name: The name the object was saved as. Returns: A new instance of `ERMetaData` object """ filePath = cls.dirName2Directory(directory, name) with h5py.File(filePath, 'r') as hf: dset = hf[cls.DATASETTAG] return cls.fromHdfDataset(dset, filePath=filePath)
[docs] @classmethod def fromHdfDataset(cls, d: h5py.Dataset, filePath: str = None) -> ERMetaData: """ Args: d: The `h5py.Dataset` to load the object from. Returns: A new instance of `ERMetaData` object """ mdDict = json.loads(d.attrs[cls._MDTAG]) return cls(mdDict, mdDict['numericalAperture'], filePath=filePath)
[docs] def toHdfDataset(self, g: h5py.Group) -> h5py.Group: """ Args: g: The `h5py.Group` to save the new dataset into. """ g[self.DATASETTAG].attrs[self._MDTAG] = np.string_(json.dumps(self.inheritedMetadata)) return g
[docs] @classmethod def directory2dirName(cls, path: str) -> t_.Tuple[t_.Union[bytes, str], t_.Union[bytes, str]]: """ Args: path: The path to the file that stores an `ExtraReflectanceCube` object. Returns: A tuple containing: directory: The directory path, name: The name that the file was saved as. """ directory, fileName = os.path.split(path) if not fileName.endswith(cls.FILESUFFIX): raise ValueError(f"The file name \"{fileName}\" is not recognized as a {cls.__name__} file. Should end with \"{cls.FILESUFFIX}\".") name = fileName.split(cls.FILESUFFIX)[0] return directory, name
[docs] @classmethod def dirName2Directory(cls, directory: str, name: str) -> str: """ This is the inverse of `directory2dirName` """ return os.path.join(directory, f'{name}{cls.FILESUFFIX}')
[docs]class FluorMetaData(MetaDataBase): """ Metadata for a fluorescence image. Args: md: A dictionary containing the metadata """ FILENAME = 'fluor.tif' MDPATH = 'fluorMetadata.json' _jsonSchemaPath = os.path.join(_jsonSchemasPath, 'MetaDataBase.json') with open(_jsonSchemaPath) as f: _jsonSchema = json.load(f) def __init__(self, md: dict, filePath: t_.Optional[str] = None, acquisitionDirectory: t_.Optional[Acquisition] = None): super().__init__(md, filePath, acquisitionDirectory)
[docs] def toDataClass(self, lock: mp.Lock = None) -> pwsdtd.FluorescenceImage: return pwsdtd.FluorescenceImage.fromMetadata(self, lock)
@property def idTag(self): return f"Fluor_{self.dict['system']}_{self.dict['time']}"
[docs] @classmethod def fromTiff(cls, directory: str, acquisitionDirectory: t_.Optional[Acquisition]) -> FluorMetaData: """Load from a TIFF file. Args: directory: The path to the folder to load from. Returns: A new instance of `FluorMetaData` loaded from file. """ if not FluorMetaData.isValidPath(directory): raise ValueError(f"Fluorescence image not found in {directory}.") with open(os.path.join(directory, FluorMetaData.MDPATH), 'r') as f: dic = json.load(f) # Get binning from the micromanager metadata binning = dic['MicroManagerMetadata']['Binning']['scalar'] dic['binning'] = binning # Get the pixel size from the micromanager metadata try: dic['pixelSizeUm'] = dic['MicroManagerMetadata']['PixelSizeUm']['scalar'] except KeyError: dic['pixelSizeUm'] = None if dic['pixelSizeUm'] == 0: dic['pixelSizeUm'] = None return FluorMetaData(dic, directory, acquisitionDirectory)
[docs] @classmethod def isValidPath(cls, directory: str): """ Args: directory: The path to search for valid files. Returns: True if a valid file was found. """ path = os.path.join(directory, cls.FILENAME) path2 = os.path.join(directory, cls.MDPATH) return os.path.exists(path) and os.path.exists(path2)
[docs] def getThumbnail(self) -> np.ndarray: """ Returns: An image for quick viewing of the acquisition. No numerical significance. """ with tf.TiffFile(os.path.join(self.filePath, 'image_bd.tif')) as f: return f.asarray()
[docs]class PwsMetaData(MetaDataBase, AnalysisManager): """A class that represents the metadata of a PWS acquisition. Args: metadata: The dictionary containing the metadata. """
[docs] class FileFormats(enum.Enum): RawBinary = enum.auto() Tiff = enum.auto() Hdf = enum.auto() NanoMat = enum.auto()
[docs] @staticmethod def getAnalysisResultsClass() -> t_.Type[AbstractHDFAnalysisResults]: from pwspy.analysis.pws import PWSAnalysisResults return PWSAnalysisResults
_jsonSchemaPath = os.path.join(_jsonSchemasPath, 'PwsMetaData.json') with open(_jsonSchemaPath) as f: _jsonSchema = json.load(f) def __init__(self, metadata: dict, filePath: t_.Optional[str] = None, fileFormat: PwsMetaData.FileFormats = None, acquisitionDirectory: t_.Optional[Acquisition] = None): MetaDataBase.__init__(self, metadata, filePath, acquisitionDirectory=acquisitionDirectory) AnalysisManager.__init__(self, filePath) self.fileFormat: PwsMetaData.FileFormats = fileFormat self.dict['wavelengths'] = tuple(np.array(self.dict['wavelengths']).astype(float))
[docs] def toDataClass(self, lock: mp.Lock = None) -> pwsdtd.PwsCube: return pwsdtd.PwsCube.fromMetadata(self, lock)
@cached_property def idTag(self) -> str: return f"PwsCube_{self.dict['system']}_{self.dict['time']}" @property def wavelengths(self) -> t_.Tuple[float, ...]: return self.dict['wavelengths']
[docs] @classmethod def loadAny(cls, directory, lock: mp.Lock = None, acquisitionDirectory: t_.Optional[Acquisition] = None) -> PwsMetaData: """ Attempt to load from any file format. Args: directory: The file path to load the metadata from. Returns: A new instance of `PwsMetaData` loaded from file """ try: return PwsMetaData.fromTiff(directory, lock=lock, acquisitionDirectory=acquisitionDirectory) except: try: return PwsMetaData.fromOldPWS(directory, lock=lock, acquisitionDirectory=acquisitionDirectory) except: try: return PwsMetaData.fromNano(directory, lock=lock, acquisitionDirectory=acquisitionDirectory) except: raise OSError(f"Could not find a valid PWS image cube file at {directory}.")
[docs] @classmethod def fromOldPWS(cls, directory, lock: mp.Lock = None, acquisitionDirectory: t_.Optional[Acquisition] = None) -> PwsMetaData: """ Attempt to load from the old .mat file format. Args: directory: The file path to load the metadata from. Returns: A new instance of `PwsMetaData` loaded from file """ if lock is not None: lock.acquire() try: try: md = json.load(open(os.path.join(directory, 'pwsmetadata.txt'))) except: # have to use the old metadata info2 = list(spio.loadmat(os.path.join(directory, 'info2.mat'))['info2'].squeeze()) info3 = list(spio.loadmat(os.path.join(directory, 'info3.mat'))['info3'].squeeze()) logging.getLogger(__name__).info("Json metadata not found. Using backup metadata.") wv = list(spio.loadmat(os.path.join(directory, 'WV.mat'))['WV'].squeeze()) wv = [int(i) for i in wv] # We will have issues saving later if these are numpy int types. md = {'startWv': info2[0], 'stepWv': info2[1], 'stopWv': info2[2], 'exposure': info2[3], 'time': '{:d}-{:d}-{:d} {:d}:{:d}:{:d}'.format( *[int(i) for i in [info3[8], info3[7], info3[6], info3[9], info3[10], info3[11]]]), 'systemId': info3[0], 'system': str(info3[0]), 'imgHeight': int(info3[2]), 'imgWidth': int(info3[3]), 'wavelengths': wv, 'binning': None, 'pixelSizeUm': None} finally: if lock is not None: lock.release() return cls(md, filePath=directory, fileFormat=PwsMetaData.FileFormats.RawBinary, acquisitionDirectory=acquisitionDirectory)
[docs] @classmethod def fromNano(cls, directory: str, lock: mp.Lock = None, acquisitionDirectory: t_.Optional[Acquisition] = None) -> PwsMetaData: """ Attempt to load from NanoCytomic .mat file format Args: directory: The file path to load the metadata from. Returns: A new instance of `PwsMetaData` loaded from file """ if lock is not None: lock.acquire() try: with h5py.File(os.path.join(directory, 'imageCube.mat'), 'r') as hf: cubeParams = hf['cubeParameters'] lam = cubeParams['lambda'] exp = cubeParams['exposure'] #we don't support adaptive exposure. md = {'startWv': lam['start'][0, 0], 'stepWv': lam['step'][0, 0], 'stopWv': lam['stop'][0, 0], 'exposure': exp['base'][0, 0], 'time': datetime.strptime(np.string_(cubeParams['metadata']['date'][()].astype(np.uint8)).decode(), '%Y%m%dT%H%M%S').strftime(dateTimeFormat), 'system': np.string_(cubeParams['metadata']['hardware']['system']['id'][()].astype(np.uint8)).decode(), 'wavelengths': list(lam['sequence'][0]), 'binning': None, 'pixelSizeUm': None} finally: if lock is not None: lock.release() return cls(md, filePath=directory, fileFormat=PwsMetaData.FileFormats.NanoMat, acquisitionDirectory=acquisitionDirectory)
[docs] @classmethod def fromTiff(cls, directory, lock: mp.Lock = None, acquisitionDirectory: t_.Optional[Acquisition] = None) -> PwsMetaData: """ Attempt to load from the standard TIFF file format. Args: directory: The file path to load the metadata from. Returns: A new instance of `PwsMetaData` loaded from file """ if lock is not None: lock.acquire() try: if os.path.exists(os.path.join(directory, 'MMStack.ome.tif')): path = os.path.join(directory, 'MMStack.ome.tif') elif os.path.exists(os.path.join(directory, 'pws.tif')): path = os.path.join(directory, 'pws.tif') else: raise OSError("No Tiff file was found at:", directory) if os.path.exists(os.path.join(directory, 'pwsmetadata.json')): metadata = json.load(open(os.path.join(directory, 'pwsmetadata.json'), 'r')) else: with tf.TiffFile(path) as tif: try: metadata = json.loads(tif.pages[0].description) except: metadata = json.loads(tif.imagej_metadata['Info']) # The micromanager plugin saves metadata as the info property of the imagej imageplus object. metadata['time'] = tif.pages[0].tags['DateTime'].value finally: if lock is not None: lock.release() if 'MicroManagerMetadata' not in metadata: # Data saved by something other than the Micro-Manager acquisition plugin won't have this. I.E. NC data saved by `toTiff` binning = metadata['binning'] pixelSize = metadata['pixelSizeUm'] else: #For a while the micromanager metadata was getting saved weird this fixes it. if 'major_version' in metadata['MicroManagerMetadata']: metadata['MicroManagerMetadata'] = metadata['MicroManagerMetadata']['map'] # Get binning from the micromanager metadata binning = metadata['MicroManagerMetadata']['Binning'] if isinstance(binning, dict): # This is due to a property map change from beta to gamma binning = binning['scalar'] # Get the pixel size from the micromanager metadata try: pixelSize = metadata['MicroManagerMetadata']['PixelSizeUm']['scalar'] except KeyError: pixelSize = None metadata['binning'] = binning metadata['pixelSizeUm'] = pixelSize if metadata['pixelSizeUm'] == 0: metadata['pixelSizeUm'] = None if 'waveLengths' in metadata: # Fix an old naming issue metadata['wavelengths'] = metadata['waveLengths'] del metadata['waveLengths'] return cls(metadata, filePath=directory, fileFormat=PwsMetaData.FileFormats.Tiff, acquisitionDirectory=acquisitionDirectory)
[docs] def metadataToJson(self, directory): """ Save the metadata to a JSON file. Args: directory: The folder path to save the new file to. """ with open(os.path.join(directory, 'pwsmetadata.json'), 'w') as f: json.dump(self.dict, f)
[docs] def getThumbnail(self) -> np.ndarray: """ Returns: An image for quick viewing of the acquisition. No numerical significance. """ if self.fileFormat == PwsMetaData.FileFormats.NanoMat: with h5py.File(os.path.join(self.filePath, 'image_bd.mat'), 'r') as hf: return np.array(hf['image_bd']).T.copy() # For some reason these are saved transposed? else: with tf.TiffFile(os.path.join(self.filePath, 'image_bd.tif')) as f: return f.asarray()
[docs]class Acquisition: """This class handles the file structure of a single acquisition. this can include a PWS acquisition as well as colocalized Dynamics and fluorescence. Args: directory: the file path the root directory of the acquisition """ def __init__(self, directory: t_.Union[str, os.PathLike]): self.filePath = os.path.abspath(directory) # Forcing an absolute path helps by: A: normalizing the path so string comparisons work. B: making sure that if the object is pickled and then unpickled from a different working direcotory the path will still be valid if the data is unmoved. if (self.pws is None) and (self.dynamics is None) and (len(self.fluorescence) == 0): raise OSError(f"Could not find a valid PWS or Dynamics Acquisition at {directory}.") def __repr__(self): if len(self.filePath) > 20: path = ("%.15s" % self.filePath[::-1])[::-1] # This is confusing. We cut everything but the last characters to make long paths readable. path = "..." + path # Add ellipsis else: path = self.filePath return f"{self.__class__.__name__}({path})" @cached_property def pws(self) -> t_.Optional[PwsMetaData]: """PwsMetaData: Returns None if no PWS acquisition was found.""" try: return PwsMetaData.loadAny(os.path.join(self.filePath, 'PWS'), acquisitionDirectory=self) except: try: return PwsMetaData.loadAny(os.path.join(self.filePath), acquisitionDirectory=self) #Many of the old files are saved here in the root directory. except: return None @cached_property def dynamics(self) -> t_.Optional[DynMetaData]: """DynMetaData: Returns None if no dynamics acquisition was found.""" try: return DynMetaData.fromTiff(os.path.join(self.filePath, 'Dynamics'), acquisitionDirectory=self) except: try: return DynMetaData.fromOldPWS(self.filePath, acquisitionDirectory=self) #This is just for old acquisitions where they were saved in their own folder that was indistinguishable from a PWS acquisitison. except: return None @cached_property def fluorescence(self) -> t_.List[FluorMetaData]: """List[FluorMetaData]: Newer acquisitions allow for multiple fluorescence images saved to numbered subfolders""" i = 0 imgs = [] while True: path = os.path.join(self.filePath, f"Fluorescence_{i}") if not os.path.exists(path): break try: imgs.append(FluorMetaData.fromTiff(path, acquisitionDirectory=self)) except ValueError: logging.getLogger(__name__).info(f"Failed to load fluorescence metadata at {path}") i += 1 if len(imgs) == 0: # No files were found. # Old files only had a single fluorescence image with no number on the folder name. path = os.path.join(self.filePath, 'Fluorescence') if os.path.exists(path): return [FluorMetaData.fromTiff(path, acquisitionDirectory=self)] else: return [] else: return imgs @property def idTag(self): if self.pws is not None: return self.pws.idTag else: #We must have one of these two items. return self.dynamics.idTag
[docs] def getRois(self) -> t_.List[t_.Tuple[str, int, RoiFile.FileFormats]]: """Return information about the Rois found in the acquisition's file path. See documentation for Roi.getValidRoisInPath()""" assert self.filePath is not None return RoiFile.getValidRoisInPath(self.filePath)
[docs] def loadRoi(self, name: str, num: int, fformat: RoiFile.FileFormats = None) -> RoiFile: """Load a Roi that has been saved to file in the acquisition's file path.""" assert isinstance(name, str), f"The ROI name must be a string. Got a {type(name)}" assert isinstance(num, int), f"The ROI number must be an integer. Got a {type(num)}" if fformat == RoiFile.FileFormats.MAT: return RoiFile.fromMat(self.filePath, name, num, acquisition=self) elif fformat == RoiFile.FileFormats.HDF3: return RoiFile.fromHDF(self.filePath, name, num, acquisition=self) elif fformat == RoiFile.FileFormats.HDF2: return RoiFile.fromHDF_legacy(self.filePath, name, num, acquisition=self) elif fformat == RoiFile.FileFormats.HDF: return RoiFile.fromHDF_legacy_legacy(self.filePath, name, num, acquisition=self) else: return RoiFile.loadAny(self.filePath, name, num, acquisition=self)
[docs] def saveRoi(self, roiName: str, roiNumber: int, roi: Roi, overwrite: bool = False) -> RoiFile: """ Save a Roi to file in the acquisition's file path. Args: roiName: The name to identify this ROI roiNumber: The number to identify this ROI roi: The ROI object defining the ROI geometry overwrite: If True then any existing ROIFile matching this name and number will be overwritten. Otherwise an OSError is raised. Returns: A reference to the new ROIFile Raises: OSError: If `overwrite` is False and an ROI of the same name and number already exists then an OSError will be raised. """ return RoiFile.toHDF(roi, roiName, roiNumber, self.filePath, overwrite=overwrite, acquisition=self)
def deleteRoi(self, name: str, num: int, fformat: t_.Optional[RoiFile.FileFormats] = None): RoiFile.deleteRoi(self.filePath, name, num, fformat=fformat)
[docs] def editNotes(self): """Create a `notes.txt` file if it doesn't already exists and open it in a text editor.""" filepath = os.path.join(self.filePath, 'notes.txt') filepath = os.path.normpath(filepath) # Sometime there can be an error if we don't clean the file path like this. if not os.path.exists(filepath): with open(filepath, 'w') as f: pass if sys.platform.startswith('darwin'): subprocess.call(('open', filepath)) elif os.name == 'nt': # For Windows os.startfile(filepath) elif os.name == 'posix': # For Linux, Mac, etc. subprocess.call(('xdg-open', filepath))
[docs] def hasNotes(self) -> bool: """Indicates whether or not a `notes.txt` file was found.""" return os.path.exists(os.path.join(self.filePath, 'notes.txt'))
[docs] def getNotes(self) -> str: """Return the contents of `notes.txt` as a string.""" if self.hasNotes(): with open(os.path.join(self.filePath, 'notes.txt'), 'r') as f: return '\n'.join(f.readlines()) else: return ''
[docs] def getThumbnail(self) -> np.ndarray: """Return a thumbnail from any of the available acquisitions. Should be an 8bit normalized image.""" if self.pws is not None: return self.pws.getThumbnail() elif self.dynamics is not None: return self.dynamics.getThumbnail() elif len(self.fluorescence) != 0: return self.fluorescence[0].getThumbnail()
def getNumber(self) -> int: return int(self.filePath.split("Cell")[-1]) def __setstate__(self, state): """When the object is unpickled this will check if the file path is still valid.""" if not os.path.exists(state['filePath']): warnings.warn(f"{self.__class__.__name__} file path cannot be found. {state['filePath']}") self.__dict__ = state # This is the default thing to do to unpicle the object. def __hash__(self) -> int: return hash(self.filePath) # Since the class is intimiately tied to the filesystem this is all that makes it unique def __eq__(self, other: Acquisition) -> bool: if not isinstance(other, Acquisition): return False return self.filePath == other.filePath
_TupleValidator = jsonschema.validators.extend( # All of this is just so that jsonschema will allow a tuple as a 'array' schema member. jsonschema.Draft7Validator, type_checker=jsonschema.Draft7Validator.TYPE_CHECKER.redefine( 'array', lambda checker, instance: jsonschema.Draft7Validator.TYPE_CHECKER.is_type(instance, 'array') or isinstance(instance, tuple) )) if __name__ == '__main__': md = PwsMetaData.fromNano(r'C:\Users\nicke\Desktop\LTL20b_Tracking cells in 50%EtOH,95%EtOH,Water\95% ethanol\Cell1')