Source code for pymzml.msdata

#!/usr/bin/env python3
# -*- coding: latin-1 -*-
"""
The MsData class offers a base class for mass spectrometry data.
It provides common functionality for both Spectrum and Chromatogram classes.
"""

# Python mzML module - pymzml
# Copyright (C) 2010-2019 M. Kösters, C. Fufezan
#     The MIT License (MIT)

#     Permission is hereby granted, free of charge, to any person obtaining a copy
#     of this software and associated documentation files (the "Software"), to deal
#     in the Software without restriction, including without limitation the rights
#     to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
#     copies of the Software, and to permit persons to whom the Software is
#     furnished to do so, subject to the following conditions:

#     The above copyright notice and this permission notice shall be included in all
#     copies or substantial portions of the Software.

#     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#     AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
#     OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
#     SOFTWARE.

import math
import re
import sys
import warnings
import xml.etree.ElementTree as ElementTree
import zlib
from base64 import b64decode as b64dec
from collections import defaultdict as ddict
from struct import unpack

import numpy as np

from .obo import OboTranslator


[docs] class MsData(object): """ General base class for mass spectrometry data handling. Provides common functionality for both Spectrum and Chromatogram classes. """ def _read_accessions(self): """Set all required variables for this spectrum.""" self.accessions = {} for element in self.element.iter(): accession = element.get("accession") name = element.get("name") if accession is not None: self.accessions[name] = accession if "profile spectrum" in self.accessions.keys(): self._profile = True
[docs] def get_element_by_name(self, name): """ Get element from the original tree by it's unit name. Arguments: name (str): unit name of the mzml element. Keyword Arguments: obo_version (str, optional): obo version number. """ iterator = self.element.iter() return_ele = None for ele in iterator: if ele.get("name", default=None) == name: return_ele = ele break return return_ele
[docs] def get_element_by_path(self, hooks): """ Find elements in spectrum by its path. Arguments: hooks (list): list of parent elements for the target element. Returns: elements (list): list of XML objects found in the path Example: To access cvParam in scanWindow tag: >>> spec.get_element_by_path(['scanList', 'scan', 'scanWindowList', ... 'scanWindow', 'cvParam']) """ return_ele = None if len(hooks) > 0: path_array = ["."] for hook in hooks: path_array.append("{ns}{hook}".format(ns=self.ns, hook=hook)) path = "/".join(path_array) return_ele = self.element.findall(path) return return_ele
def _register(self, decoded_tuple): d_type, array = decoded_tuple if d_type == "mz": self._mz = array elif d_type == "i": self._i = array elif d_type == "time": self._time = array else: raise Exception("Unknown data Type ({0})".format(d_type)) def _get_encoding_parameters(self, array_type): """ Find the correct parameter for decoding and return them as tuple. Arguments: array_type (str): data type of the array, e.g. m/z, time or intensity Returns: data (str) : encoded data comp (str) : compression method d_type (str) : data type d_array_length (str) : length of the data array """ numpress_encoding = False b_data_string = "./{ns}binaryDataArrayList/{ns}binaryDataArray/{ns}cvParam[@name='{name}']/..".format( ns=self.ns, name=array_type ) float_type_string = "./{ns}cvParam[@accession='{Acc}']" b_data_array = self.element.find(b_data_string) if not b_data_array: # non-standard data array b_data_string = "./{ns}binaryDataArrayList/{ns}binaryDataArray/{ns}cvParam[@value='{value}']/..".format( ns=self.ns, value=array_type ) b_data_array = self.element.find(b_data_string) comp = [] if b_data_array: for cvParam in b_data_array.iterfind("./{ns}cvParam".format(ns=self.ns)): if "compression" in cvParam.get("name"): if "numpress" in cvParam.get("name").lower(): numpress_encoding = True comp.append(cvParam.get("name")) d_array_length = self.element.get("defaultArrayLength") if not numpress_encoding: try: # 32-bit float d_type = b_data_array.find( float_type_string.format( ns=self.ns, Acc=self.obo_translator["32-bit float"]["id"], ) ).get("name") except: try: # 64-bit Float d_type = b_data_array.find( float_type_string.format( ns=self.ns, Acc=self.obo_translator["64-bit float"]["id"], ) ).get("name") except: try: # 32-bit integer d_type = b_data_array.find( float_type_string.format( ns=self.ns, Acc=self.obo_translator["32-bit integer"]["id"], ) ).get("name") except: try: # 64-bit integer d_type = b_data_array.find( float_type_string.format( ns=self.ns, Acc=self.obo_translator["64-bit integer"]["id"], ) ).get("name") except: # null-terminated ASCII string d_type = b_data_array.find( float_type_string.format( ns=self.ns, Acc=self.obo_translator[ "null-terminated ASCII string" ]["id"], ) ).get("name") else: # compression is numpress, dont need data type here d_type = None data = b_data_array.find("./{ns}binary".format(ns=self.ns)) if data is not None: data = data.text else: data = None d_array_length = 0 d_type = "64-bit float" if data is not None: data = data.encode("utf-8") else: data = "" return (data, d_array_length, d_type, comp) @property def measured_precision(self): """ Set the measured and internal precision. Returns: value (float): measured Precision (e.g. 5e-6) """ return self._measured_precision @measured_precision.setter def measured_precision(self, value): self._measured_precision = value self.internal_precision = int(round(50000.0 / (value * 1e6))) return def _decode_to_numpy(self, data, d_array_length, data_type, comp): """ Decode the b64 encoded and packed strings from data as numpy arrays. Returns: data (np.ndarray): Returns the unpacked data as a tuple. Returns an empty list if there is no raw data or raises an exception if data could not be decoded. d_array_length just for compatibility """ out_data = b64dec(data) if len(out_data) != 0: if "zlib" in comp or "zlib compression" in comp: out_data = zlib.decompress(out_data) if ( "ms-np-linear" in comp or "ms-np-pic" in comp or "ms-np-slof" in comp or "MS-Numpress linear prediction compression" in comp or "MS-Numpress short logged float compression" in comp ): out_data = self._decodeNumpress_to_array(out_data, comp) if data_type == "32-bit float": # one character code may be sufficient too (f) f_type = np.float32 out_data = np.frombuffer(out_data, f_type) elif data_type == "64-bit float": # one character code may be sufficient too (d) f_type = np.float64 out_data = np.frombuffer(out_data, f_type) elif data_type == "32-bit integer": # one character code may be sufficient too (i) i_type = np.int32 out_data = np.frombuffer(out_data, i_type) elif data_type == "64-bit integer": # one character code may be sufficient too (l) i_type = np.int64 out_data = np.frombuffer(out_data, i_type) # TODO elif data_type == "null-terminated ASCII string": else: raise ValueError(f"Unsupported data type: {data_type}") else: out_data = np.array([]) return out_data def _decode_to_tuple(self, data, d_array_length, float_type, comp): """ Decode b64 encoded and packed strings. Returns: data (tuple): Returns the unpacked data as a tuple. Returns an empty list if there is no raw data or raises an exception if data could not be decoded. """ dec_data = b64dec(data) if len(dec_data) != 0: if "zlib" in comp or "zlib compression" in comp: dec_data = zlib.decompress(dec_data) if set(["ms-np-linear", "ms-np-pic", "ms-np-slof"]) & set(comp): self._decodeNumpress(data, comp) # else: # print( # 'New data compression ({0}) detected, cant decompress'.format( # comp # ) # ) # sys.exit(1) if float_type == "32-bit float": f_type = "f" elif float_type == "64-bit float": f_type = "d" fmt = "{endian}{array_length}{float_type}".format( endian="<", array_length=d_array_length, float_type=f_type ) ret_data = unpack(fmt, dec_data) else: ret_data = [] return ret_data def _decodeNumpress_to_array(self, data, compression): """ Decode golomb-rice encoded data (aka numpress encoded data). Arguments: data (str) : Encoded data string compression (str) : Decompression algorithm to be used (valid are 'ms-np-linear', 'ms-np-pic', 'ms-np-slof') Returns: array (list): Returns the unpacked data as an array of floats. """ result = [] comp_ms_tags = [self.calling_instance.OT[comp]["id"] for comp in compression] data = np.frombuffer(data, dtype=np.uint8) if "MS:1002312" in comp_ms_tags: from .decoder import MSDecoder result = MSDecoder.decode_linear(data) elif "MS:1002313" in comp_ms_tags: from .decoder import MSDecoder result = MSDecoder.decode_pic(data) elif "MS:1002314" in comp_ms_tags: from .decoder import MSDecoder result = MSDecoder.decode_slof(data) return result def _median(self, data): """ Compute median. Arguments: data (list): list of numeric values Returns: median (float): median of the input data """ return np.median(data)
[docs] def to_string(self, encoding="latin-1", method="xml"): """ Return string representation of the xml element the spectrum was initialized with. Keyword Arguments: encoding (str) : text encoding of the returned string.\n Default is latin-1. method (str) : text format of the returned string.\n Default is xml, alternatives are html and text. Returns: element (str) : xml string representation of the spectrum. """ return ElementTree.tostring(self.element, encoding=encoding, method=method)