"""The encoded stream file-like object implementation."""
import os
from dfvfs.encoding import manager as encoding_manager
from dfvfs.file_io import file_io
from dfvfs.lib import errors
from dfvfs.resolver import resolver
[docs]
class EncodedStream(file_io.FileIO):
"""File input/output (IO) object of a encoded stream."""
# The size of the encoded data buffer.
_ENCODED_DATA_BUFFER_SIZE = 8 * 1024 * 1024
[docs]
def __init__(self, resolver_context, path_spec):
"""Initializes a file input/output (IO) object.
Args:
resolver_context (Context): resolver context.
path_spec (PathSpec): a path specification.
"""
super().__init__(resolver_context, path_spec)
self._current_offset = 0
self._decoded_data = b""
self._decoded_data_offset = 0
self._decoded_data_size = 0
self._decoded_stream_size = None
self._decoder = None
self._encoded_data = b""
self._encoding_method = None
self._file_object = None
self._realign_offset = True
def _Close(self):
"""Closes the file-like object.
If the file-like object was passed in the init function the encoded stream file-
like object does not control the file-like object and should not actually close
it.
"""
self._decoder = None
self._decoded_data = b""
self._encoded_data = b""
self._file_object = None
def _GetDecoder(self):
"""Retrieves the decoder.
Returns:
Decoder: decoder.
"""
return encoding_manager.EncodingManager.GetDecoder(self._encoding_method)
def _GetDecodedStreamSize(self):
"""Retrieves the decoded stream size.
Returns:
int: decoded stream size.
"""
self._file_object.seek(0, os.SEEK_SET)
self._decoder = self._GetDecoder()
self._decoded_data = b""
encoded_data_offset = 0
encoded_data_size = self._file_object.get_size()
decoded_stream_size = 0
while encoded_data_offset < encoded_data_size:
read_count = self._ReadEncodedData(self._ENCODED_DATA_BUFFER_SIZE)
if read_count == 0:
break
encoded_data_offset += read_count
decoded_stream_size += self._decoded_data_size
return decoded_stream_size
def _Open(self):
"""Opens the file-like object.
Raises:
AccessError: if the access to open the file was denied.
OSError: if the file-like object could not be opened.
PathSpecError: if the path specification is incorrect.
"""
if not self._path_spec.HasParent():
raise errors.PathSpecError("Unsupported path specification without parent.")
self._encoding_method = getattr(self._path_spec, "encoding_method", None)
if self._encoding_method is None:
raise errors.PathSpecError("Path specification missing encoding method.")
self._file_object = resolver.Resolver.OpenFileObject(
self._path_spec.parent, resolver_context=self._resolver_context
)
def _AlignDecodedDataOffset(self, decoded_data_offset):
"""Aligns the encoded file with the decoded data offset.
Args:
decoded_data_offset (int): decoded data offset.
"""
self._file_object.seek(0, os.SEEK_SET)
self._decoder = self._GetDecoder()
self._decoded_data = b""
encoded_data_offset = 0
encoded_data_size = self._file_object.get_size()
while encoded_data_offset < encoded_data_size:
read_count = self._ReadEncodedData(self._ENCODED_DATA_BUFFER_SIZE)
if read_count == 0:
break
encoded_data_offset += read_count
if decoded_data_offset < self._decoded_data_size:
self._decoded_data_offset = decoded_data_offset
break
decoded_data_offset -= self._decoded_data_size
def _ReadEncodedData(self, read_size):
"""Reads encoded data from the file-like object.
Args:
read_size (int): number of bytes of encoded data to read.
Returns:
int: number of bytes of encoded data read.
"""
encoded_data = self._file_object.read(read_size)
read_count = len(encoded_data)
self._encoded_data = b"".join([self._encoded_data, encoded_data])
self._decoded_data, self._encoded_data = self._decoder.Decode(
self._encoded_data
)
self._decoded_data_size = len(self._decoded_data)
return read_count
[docs]
def SetDecodedStreamSize(self, decoded_stream_size):
"""Sets the decoded stream size.
This function is used to set the decoded stream size if it can be
determined separately.
Args:
decoded_stream_size (int): size of the decoded stream in bytes.
Raises:
OSError: if the file-like object is already open.
ValueError: if the decoded stream size is invalid.
"""
if self._is_open:
raise OSError("Already open.")
if decoded_stream_size < 0:
raise ValueError(
f"Invalid decoded stream size: {decoded_stream_size:d} value out "
f"of bounds."
)
self._decoded_stream_size = decoded_stream_size
# Note: that the following functions do not follow the style guide
# because they are part of the file-like object interface.
# pylint: disable=invalid-name
[docs]
def read(self, size=None):
"""Reads a byte string from the file-like object at the current offset.
The function will read a byte string of the specified size or
all of the remaining data if no size was specified.
Args:
size (Optional[int]): number of bytes to read, where None is all
remaining data.
Returns:
bytes: data read.
Raises:
OSError: if the read failed.
"""
if not self._is_open:
raise OSError("Not opened.")
if self._current_offset < 0:
raise OSError(
f"Invalid current offset: {self._current_offset:d} value less than "
f"zero."
)
if self._decoded_stream_size is None:
self._decoded_stream_size = self._GetDecodedStreamSize()
if self._decoded_stream_size < 0:
raise OSError("Invalid decoded stream size.")
if self._current_offset >= self._decoded_stream_size:
return b""
if self._realign_offset:
self._AlignDecodedDataOffset(self._current_offset)
self._realign_offset = False
if size is None:
size = self._decoded_stream_size
if self._current_offset + size > self._decoded_stream_size:
size = self._decoded_stream_size - self._current_offset
decoded_data = b""
if size == 0:
return decoded_data
while size > self._decoded_data_size:
decoded_data = b"".join(
[decoded_data, self._decoded_data[self._decoded_data_offset :]]
)
remaining_decoded_data_size = (
self._decoded_data_size - self._decoded_data_offset
)
self._current_offset += remaining_decoded_data_size
size -= remaining_decoded_data_size
if self._current_offset >= self._decoded_stream_size:
break
read_count = self._ReadEncodedData(self._ENCODED_DATA_BUFFER_SIZE)
self._decoded_data_offset = 0
if read_count == 0:
break
if size > 0:
slice_start_offset = self._decoded_data_offset
slice_end_offset = slice_start_offset + size
decoded_data = b"".join(
[decoded_data, self._decoded_data[slice_start_offset:slice_end_offset]]
)
self._decoded_data_offset += size
self._current_offset += size
return decoded_data
[docs]
def seek(self, offset, whence=os.SEEK_SET):
"""Seeks to an offset within the file-like object.
Args:
offset (int): offset to seek to.
whence (Optional(int)): value that indicates whether offset is an absolute
or relative position within the file.
Raises:
OSError: if the seek failed.
"""
if not self._is_open:
raise OSError("Not opened.")
if self._current_offset < 0:
raise OSError(
f"Invalid current offset: {self._current_offset:d} value less than "
f"zero."
)
if whence == os.SEEK_CUR:
offset += self._current_offset
elif whence == os.SEEK_END:
if self._decoded_stream_size is None:
self._decoded_stream_size = self._GetDecodedStreamSize()
if self._decoded_stream_size is None:
raise OSError("Invalid decoded stream size.")
offset += self._decoded_stream_size
elif whence != os.SEEK_SET:
raise OSError("Unsupported whence.")
if offset < 0:
raise OSError("Invalid offset value less than zero.")
if offset != self._current_offset:
self._current_offset = offset
self._realign_offset = True
[docs]
def get_offset(self):
"""Retrieves the current offset into the file-like object.
Returns:
int: current offset in the decoded stream.
Raises:
OSError: if the file-like object has not been opened.
"""
if not self._is_open:
raise OSError("Not opened.")
return self._current_offset
[docs]
def get_size(self):
"""Retrieves the size of the file-like object.
Returns:
int: size of the decoded stream.
Raises:
OSError: if the file-like object has not been opened.
"""
if not self._is_open:
raise OSError("Not opened.")
if self._decoded_stream_size is None:
self._decoded_stream_size = self._GetDecodedStreamSize()
return self._decoded_stream_size