#!/usr/bin/env python
# -*- coding: latin-1 -*-
"""
Writer class for indexed gzipped files
"""
# Python mzML module - pymzml
# Copyright (C) 2010-2019 M. Kösters, C. Fufezan
# The MIT License (MIT)
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import struct
import time
import zlib
from collections import OrderedDict
[docs]
class GSGW(object):
"""
Generalized Gzip writer class with random access to indexed offsets.
Keyword Arguments:
file (string) : Filename for the resulting file
max_idx (int) : max number of indices which can be saved in
this file
max_idx_len (int) : maximal length of the index in bytes, must
be between 1 and 255
max_offset_len (int) : maximal length of the offset in bytes
output_path (str) : path to the output file
"""
def __init__(
self,
file=None,
max_idx=10000,
max_idx_len=8,
max_offset_len=8,
output_path="./test.dat.igzip",
comp_str=-1,
):
self.Lock = False
self._format_version = 1 # max 255!!!
self.file_name = output_path
self.max_idx_num = max_idx
self.max_idx_len = max_idx_len
self.max_offset_len = max_offset_len
self.generic_header = OrderedDict(
[
("MAGIC_BYTE_1", b"\x1f"),
("MAGIC_BYTE_2", b"\x8b"),
("COMPRESSION", b"\x08"),
("FLAGS", b"\x00"),
("DATE", b"\x00\x00\x00\x00"),
("XFL", b"\x02"),
("OS", b"\x03"),
]
)
self.index = OrderedDict()
self.first_header_set = False
self._file_out = None
self._encoding = "latin-1"
# magic bytes. FU+version
self.index_magic_bytes = b"FU" + struct.pack("<B", self._format_version)
self.crc32 = 0
self.isize = 0
self.comp_str = comp_str
def __del__(self):
"""
Close the file object properly after this object is deleted
"""
self.file_out.close()
def close(self):
"""
Close the internal file object.
"""
self.file_out.close()
@property
def file_out(self):
"""
Output filehandler
"""
if self._file_out is None:
self._file_out = open(self.file_name, "wb")
return self._file_out
@property
def encoding(self):
"""
Returns the encoding used for this file
"""
return self._encoding
@encoding.setter
def encoding(self, encoding):
"""
Set the file encoding for the output file.
"""
assert type(encoding) == str, "encoding must be a string"
self._encoding = encoding
[docs]
def _allocate_index_bytes(self):
"""
Allocate 'self.max_index_num' bytes of length 'self.max_idx_len'
in the header for inserting the index later on.
"""
id_placeholder = self.max_idx_len * b"\x01"
offset_placeholder = self.max_offset_len * b"\x01"
for i in range(self.max_idx_num):
self.file_out.write(id_placeholder)
self.file_out.write(offset_placeholder)
self.file_out.write(b"\x00")
return
[docs]
def _write_data(self, data):
"""
Write data into file-stream.
Arguments:
data (str): uncompressed data
"""
Compressor = zlib.compressobj(
self.comp_str, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0
)
# compress data and flush (includes writing crc32 and isize)
if isinstance(data, bytes) is False:
data = bytes(data, "latin-1")
self.crc32 = zlib.crc32(data)
self.isize = len(data) % 2**32
comp_data = Compressor.compress(data) + Compressor.flush()
self.file_out.write(comp_data)
self.file_out.write(struct.pack("<L", self.crc32))
self.file_out.write(struct.pack("<L", self.isize))
return
[docs]
def add_data(self, data, identifier):
"""
Create a new gzip member with compressed 'data' indexed with 'index'.
Arguments:
data (str) : uncompressed data to write to file
index (str or int) : unique index for the data
"""
if self.Lock is False:
if len(self.index) + 1 > self.max_idx_num:
print(
"""
WARNING: Reached maximum number of indexed data blocks
'({0}), cannot add any more data!
""".format(
self.max_idx_num
)
)
return False
if not self.first_header_set:
self._write_gen_header(Index=True)
self.first_header_set = True
else:
# do we need this?
self._write_gen_header(Index=False)
self.index[identifier] = self.file_out.tell()
self._write_data(data)
return
else:
raise Exception("Cant add any more data if index is already written")
[docs]
def _write_identifier(self, identifier):
"""
Convert and write the identifier into output file.
Arguments:
identifier (str or int): identifier to write into index
"""
id_format = "{0:\xAC>" + str(self.max_idx_len) + "}"
identifier = str(identifier)
identifier = id_format.format(identifier).encode("latin-1")
self.file_out.write(identifier)
return
[docs]
def _write_offset(self, offset):
"""
Convert and write offset to output file.
Arguments:
offset (int): offset which will be formatted and written
into file index
"""
offset_format = "{0:\xAC>" + str(self.max_offset_len) + "}"
offset = str(offset)
offset = offset_format.format(offset).encode("latin-1")
self.file_out.write(offset)
return
[docs]
def write_index(self):
"""
Only called after all the data is written, i.e. all calls to
:func:`~GSGW.add_data` have been done.
Seek back to the beginning of the file and write the index into the
allocated comment bytes (see _write_gen_header(Index=True)).
"""
self.Lock = True
self.file_out.seek(self.index_offset)
for identifier, offset in self.index.items():
self._write_identifier(identifier)
self._write_offset(offset)
def __enter__(self):
"""
Enable the with syntax for this class (entry point).
"""
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Destructor when using this class with 'with .. as'."""
self.file_out.close()
if __name__ == "__main__":
print(__doc__)