#!/usr/bin/env python
# -*- coding: latin-1 -*-
"""
Reader class for indexed gzipped files
"""
# Python mzML module - pymzml
# Copyright (C) 2010-2019 M. Kösters, C. Fufezan
# The MIT License (MIT)
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import struct
import zlib
from collections import OrderedDict
[docs]
class GSGR(object):
"""
Generalized Gzip reader class which enables random access in files
written with the :class:`~pymzml.utils.GSGW.GSGW` class.
Keyword Arguments:
file (str): path to file to read
"""
def __init__(self, file=None):
self.file_in = open(file, "rb")
self.filename = file
self.magic_bytes = b"\x1f\x8b"
self.indexed = True
if not self._check_magic_bytes():
raise Exception("not a gzip file (wrong magic bytes)")
self.random_access = False # initial state, until index is read
self._read_basic_header()
if self.flg & 0 != 0: # FTEXT flag
self.ascii_file = True
if self.flg & 2 != 0: # FHCRC flag
crc16 = self.file_in.read(2)
if self.flg & 4 != 0: # FEXTRA flag
# TODO: maybe never tested
xlen = struct.unpack("<H", self.file_in.read(2))[0]
self.file_in.seek(xlen)
if self.flg & 8 != 0: # FNAME flag
self.fname = self._read_until_zero()
if self.flg & 16 == 0: # FCOMMENT flag NOT SET
self.indexed = False
else:
self._read_index()
def __del__(self):
try:
self.close()
except:
raise Exception(" cant close file")
[docs]
def seek(self, offset):
"""
Seek to byte offset in input file.
Arguments:
offset (int): byte offset to seek to in FileIn
Returns:
None
"""
self.file_in.seek(offset)
return
[docs]
def read_block(self, index):
"""
Read and return the data block with the unique index `index`
Arguments:
index(int or str): identifier associated with a specific block
Returns:
data (str): indexed text block as string
"""
start = self.index[index]
try:
end = self.index[int(index) + 1]
except:
end = self.file_in.seek(0, 2)
self.file_in.seek(start)
readSize = end - start
comp_data = self.file_in.read(readSize)
data = zlib.decompress(comp_data, -zlib.MAX_WBITS)
return data
[docs]
def _check_magic_bytes(self):
"""
Check if file is a gzip file.
"""
# self.file_in.seek(0) # make sure file pointer is at start
mb = self.file_in.read(2)
return mb == self.magic_bytes
def _read_until_zero(self):
"""
Read input until \x00 is reached
"""
buf = b""
c = self.file_in.read(1)
while c != b"\x00":
buf += c
c = self.file_in.read(1)
return buf
[docs]
def _read_index(self):
"""
Read and save offset dict from indexed gzip file
"""
self.index = OrderedDict()
self.file_in.seek(10) # make sure file pointer is at right position
mb = self.file_in.read(3)
if mb != b"FU\x01": # All hail MK!
print("No index in comment field found. No random access possible")
self.indexed = False
lengths = struct.unpack("<BB", self.file_in.read(2))
self.idx_len = lengths[0]
self.offset_len = lengths[1]
ID_block = b""
while b"\x00" not in ID_block:
ID_block = self.file_in.read(self.idx_len)
OffsetBlock = self.file_in.read(self.offset_len)
try:
try:
Identifier = int(ID_block.decode("latin-1").strip("¬"))
except:
Identifier = ID_block.decode("latin-1").strip("¬")
Offset = int(OffsetBlock.decode("latin-1").strip("¬"))
self.index[Identifier] = Offset
except:
break
self.file_in.seek(0)
[docs]
def read(self, size=-1):
"""
Read the content of the in File in binary mode
Keyword Arguments:
size (int, optional): number of bytes to read, -1 for everything
Returns:
data (bytes): parsed bytes from input file
"""
return self.file_in.read(size)
def __enter__(self):
"""
Enable the with syntax for this class (entry point)
"""
return self.file_in
def __exit__(self, exc_type, exc_value, traceback):
"""
destructor when using this class with 'with .. as '
"""
self.file_in.close()
def close(self):
"""
Close the internal Filehandler
"""
self.file_in.close()
if __name__ == "__main__":
print(__doc__)