Source code for dfvfs.file_io.zip_file_io

# -*- coding: utf-8 -*-
"""The ZIP extracted file-like object implementation."""

import io
import os
import zipfile

from dfvfs.file_io import file_io
from dfvfs.resolver import resolver


[docs] class ZipFile(file_io.FileIO): """File input/output (IO) object using zipfile.""" # The size of the uncompressed data buffer. _UNCOMPRESSED_DATA_BUFFER_SIZE = 64 * 1024
[docs] def __init__(self, resolver_context, path_spec): """Initializes a file input/output (IO) object. Args: resolver_context (Context): resolver context. path_spec (PathSpec): a path specification. """ super(ZipFile, self).__init__(resolver_context, path_spec) self._compressed_data = b'' self._current_offset = 0 self._file_system = None self._is_seekable = False self._realign_offset = True self._uncompressed_data = b'' self._uncompressed_data_offset = 0 self._uncompressed_data_size = 0 self._uncompressed_stream_size = None self._zip_ext_file = None self._zip_file = None self._zip_info = None
def _AlignUncompressedDataOffset(self, uncompressed_data_offset): """Aligns the compressed file with the uncompressed data offset. Args: uncompressed_data_offset (int): uncompressed data offset. Raises: IOError: if the ZIP file could not be opened. OSError: if the ZIP file could not be opened. """ if self._zip_ext_file: self._zip_ext_file.close() self._zip_ext_file = None try: # The open can fail if the file path in the local file header # does not use the same path segment separator as the corresponding # entry in the central directory. self._zip_ext_file = self._zip_file.open(self._zip_info, 'r') except zipfile.BadZipfile as exception: raise IOError(f'Unable to open ZIP file with error: {exception!s}') self._uncompressed_data = b'' self._uncompressed_data_size = 0 self._uncompressed_data_offset = 0 while uncompressed_data_offset > 0: self._uncompressed_data = self._zip_ext_file.read( self._UNCOMPRESSED_DATA_BUFFER_SIZE) self._uncompressed_data_size = len(self._uncompressed_data) if uncompressed_data_offset < self._uncompressed_data_size: self._uncompressed_data_offset = uncompressed_data_offset break uncompressed_data_offset -= self._uncompressed_data_size def _Close(self): """Closes the file-like object.""" if self._zip_ext_file: self._zip_ext_file.close() self._zip_ext_file = None self._zip_file = None self._zip_info = None self._file_system = None def _Open(self, mode='rb'): """Opens the file-like object defined by path specification. Args: mode (Optional[str]): file access mode. Raises: AccessError: if the access to open the file was denied. IOError: if the file-like object could not be opened. OSError: if the file-like object could not be opened. PathSpecError: if the path specification is incorrect. """ file_system = resolver.Resolver.OpenFileSystem( self._path_spec, resolver_context=self._resolver_context) file_entry = file_system.GetFileEntryByPathSpec(self._path_spec) if not file_entry: raise IOError('Unable to retrieve file entry.') if not file_entry.IsFile(): raise IOError('Not a regular file.') zip_file = file_system.GetZipFile() zip_info = file_entry.GetZipInfo() try: # The open can fail if the file path in the local file header # does not use the same path segment separator as the corresponding # entry in the central directory. zip_ext_file = zip_file.open(zip_info, 'r') except zipfile.BadZipfile as exception: raise IOError(f'Unable to open ZIP file with error: {exception!s}') self._file_system = file_system self._zip_file = zip_file self._zip_info = zip_info self._zip_ext_file = zip_ext_file self._uncompressed_stream_size = self._zip_info.file_size try: # ZipExtFile in Python 3.6 does not support seek(). self._zip_ext_file.seek(0, os.SEEK_SET) self._is_seekable = True except io.UnsupportedOperation: self._is_seekable = False def _ReadNonSeekableZipExtFile(self, size): """Reads a byte string from a non-seekable file-like object. The function will read a byte string of the specified size or all of the remaining data if no size was specified. Args: size (int): number of bytes to read, where None is all remaining data. Returns: bytes: data read. Raises: IOError: if the read failed. OSError: if the read failed. """ if self._current_offset > self._uncompressed_stream_size: return b'' if (size is None or self._current_offset + size > self._uncompressed_stream_size): size = self._uncompressed_stream_size - self._current_offset if self._realign_offset: self._AlignUncompressedDataOffset(self._current_offset) self._realign_offset = False uncompressed_data = b'' # Read in full blocks of uncompressed data. while self._uncompressed_data_offset + size > self._uncompressed_data_size: uncompressed_data = b''.join([ uncompressed_data, self._uncompressed_data[self._uncompressed_data_offset:]]) remaining_uncompressed_data_size = ( self._uncompressed_data_size - self._uncompressed_data_offset) self._current_offset += remaining_uncompressed_data_size size -= remaining_uncompressed_data_size self._uncompressed_data = self._zip_ext_file.read( self._UNCOMPRESSED_DATA_BUFFER_SIZE) self._uncompressed_data_size = len(self._uncompressed_data) self._uncompressed_data_offset = 0 # Read in partial block of uncompressed data. if (size > 0 and self._uncompressed_data_offset + size <= self._uncompressed_data_size): slice_start_offset = self._uncompressed_data_offset slice_end_offset = slice_start_offset + size uncompressed_data = b''.join([ uncompressed_data, self._uncompressed_data[slice_start_offset:slice_end_offset]]) self._uncompressed_data_offset += size self._current_offset += size return uncompressed_data # Note: that the following functions do not follow the style guide # because they are part of the file-like object interface. # pylint: disable=invalid-name
[docs] def read(self, size=None): """Reads a byte string from the file-like object at the current offset. The function will read a byte string of the specified size or all of the remaining data if no size was specified. Args: size (Optional[int]): number of bytes to read, where None is all remaining data. Returns: bytes: data read. Raises: IOError: if the read failed. OSError: if the read failed. """ if not self._is_open: raise IOError('Not opened.') if self._is_seekable: uncompressed_data = self._zip_ext_file.read(size) self._current_offset += len(uncompressed_data) else: uncompressed_data = self._ReadNonSeekableZipExtFile(size) return uncompressed_data
[docs] def seek(self, offset, whence=os.SEEK_SET): """Seeks to an offset within the file-like object. Args: offset (int): offset to seek to. whence (Optional(int)): value that indicates whether offset is an absolute or relative position within the file. Raises: IOError: if the seek failed. OSError: if the seek failed. """ if not self._is_open: raise IOError('Not opened.') if whence == os.SEEK_CUR: offset += self._current_offset elif whence == os.SEEK_END: offset += self._uncompressed_stream_size elif whence != os.SEEK_SET: raise IOError('Unsupported whence.') if offset < 0: raise IOError('Invalid offset value less than zero.') if self._is_seekable: self._zip_ext_file.seek(offset, os.SEEK_SET) elif offset != self._current_offset: self._realign_offset = True # ZipExtFile tell() is not POSIX compliant hence the current offset # is tracked seperately. self._current_offset = offset
[docs] def get_offset(self): """Retrieves the current offset into the file-like object. Returns: int: current offset into the file-like object. Raises: IOError: if the file-like object has not been opened. OSError: if the file-like object has not been opened. """ if not self._is_open: raise IOError('Not opened.') return self._current_offset
[docs] def get_size(self): """Retrieves the size of the file-like object. Returns: int: size of the file-like object data. Raises: IOError: if the file-like object has not been opened. OSError: if the file-like object has not been opened. """ if not self._is_open: raise IOError('Not opened.') return self._uncompressed_stream_size