Source code for dfvfs.helpers.text_file

# -*- coding: utf-8 -*-
"""A text file interface for file-like objects."""

import os

# Since this class implements the readlines file-like object interface
# the names of the interface functions are in lower case as an exception
# to the normal naming convention.


[docs] class TextFile(object): """Text file interface for file-like objects.""" # The maximum allowed size of the read buffer. _MAXIMUM_READ_BUFFER_SIZE = 16 * 1024 * 1024
[docs] def __init__( self, file_object, encoding='utf-8', encoding_errors='strict', end_of_line='\n'): """Initializes the text file. Args: file_object (FileIO): a file-like object to read from. encoding (Optional[str]): text encoding. encoding_errors (Optional[str]): text encoding errors handler. end_of_line (Optional[str]): end of line indicator. """ super(TextFile, self).__init__() self._file_object = file_object self._file_object_size = file_object.get_size() self._encoding = encoding self._encoding_errors = encoding_errors self._end_of_line = end_of_line.encode(self._encoding) self._end_of_line_length = len(self._end_of_line) self._lines = [] self._lines_buffer = b'' self._lines_buffer_offset = 0 self._current_offset = 0
[docs] def __enter__(self): """Enters a with statement.""" return self
[docs] def __exit__(self, unused_type, unused_value, unused_traceback): """Exits a with statement.""" # TODO: do we want to close the file_object here e.g. i.c.w. a flag value # to have TextFile manage the file_object? return
[docs] def __iter__(self): """Returns a line of text. Yields: str: line of text. """ line = self.readline() while line: yield line line = self.readline()
# Note: that the following functions do not follow the style guide # because they are part of the readline file-like object interface. # pylint: disable=invalid-name
[docs] def readline(self, size=None): """Reads a single line of text. The functions reads one entire line from the file-like object. A trailing end-of-line indicator (newline by default) is kept in the string (but may be absent when a file ends with an incomplete line). An empty string is returned only when end-of-file is encountered immediately. Args: size (Optional[int]): maximum byte size to read. If present and non-negative, it is a maximum byte count (including the trailing end-of-line) and an incomplete line may be returned. Returns: str: line of text. Raises: UnicodeDecodeError: if a line cannot be decoded and encoding errors is set to strict. ValueError: if the size is smaller than zero or exceeds the maximum (as defined by _MAXIMUM_READ_BUFFER_SIZE). """ if size is not None and size < 0: raise ValueError('Invalid size value smaller than zero.') if size is not None and size > self._MAXIMUM_READ_BUFFER_SIZE: raise ValueError('Invalid size value exceeds maximum.') if not self._lines: if self._lines_buffer_offset >= self._file_object_size: return '' read_size = size if not read_size: read_size = self._MAXIMUM_READ_BUFFER_SIZE if self._lines_buffer_offset + read_size > self._file_object_size: read_size = self._file_object_size - self._lines_buffer_offset self._file_object.seek(self._lines_buffer_offset, os.SEEK_SET) read_buffer = self._file_object.read(read_size) self._lines_buffer_offset += len(read_buffer) self._lines = read_buffer.split(self._end_of_line) if self._lines_buffer: self._lines[0] = b''.join([self._lines_buffer, self._lines[0]]) self._lines_buffer = b'' # Move a partial line from the lines list to the lines buffer. if read_buffer[self._end_of_line_length:] != self._end_of_line: self._lines_buffer = self._lines.pop() for index, line in enumerate(self._lines): self._lines[index] = b''.join([line, self._end_of_line]) if (self._lines_buffer and self._lines_buffer_offset >= self._file_object_size): self._lines.append(self._lines_buffer) self._lines_buffer = b'' if not self._lines: line = self._lines_buffer self._lines_buffer = b'' elif not size or size >= len(self._lines[0]): line = self._lines.pop(0) else: line = self._lines[0] self._lines[0] = line[size:] line = line[:size] last_offset = self._current_offset self._current_offset += len(line) decoded_line = line.decode(self._encoding, self._encoding_errors) # Remove a byte-order mark at the start of the file. if last_offset == 0 and decoded_line[0] == '\ufeff': decoded_line = decoded_line[1:] return decoded_line
[docs] def readlines(self, sizehint=None): """Reads lines of text. The function reads until EOF using readline() and return a list containing the lines read. Args: sizehint (Optional[int]): maximum byte size to read. If present, instead of reading up to EOF, whole lines totalling sizehint bytes are read. Returns: list[str]: lines of text. """ if sizehint is None or sizehint <= 0: sizehint = None lines = [] lines_byte_size = 0 line = self.readline() while line: lines.append(line) if sizehint is not None: lines_byte_size += len(line) if lines_byte_size >= sizehint: break line = self.readline() return lines
# get_offset() is preferred above tell() by the libbfio layer used in libyal.
[docs] def get_offset(self): """Retrieves the current offset into the file-like object. Returns: int: current offset into the file-like object. """ return self._current_offset
# Pythonesque alias for get_offset().
[docs] def tell(self): """Retrieves the current offset into the file-like object. Returns: int: current offset into the file-like object. """ return self._current_offset