Source code for dfvfs.file_io.os_file_io

"""The operating system file-like object implementation."""

import stat
import os

import pysmdev

from dfvfs.file_io import file_io
from dfvfs.lib import definitions
from dfvfs.lib import errors


[docs] class OSFile(file_io.FileIO): """File input/output (IO) object that uses the operating system."""
[docs] def __init__(self, resolver_context, path_spec): """Initializes a file input/output (IO) object. Args: resolver_context (Context): resolver context. path_spec (PathSpec): a path specification. """ super().__init__(resolver_context, path_spec) self._file_object = None self._size = 0
def _Close(self): """Closes the file-like object.""" self._file_object.close() self._file_object = None def _Open(self): """Opens the file-like object defined by path specification. Raises: AccessError: if the access to open the file was denied. MountPointError: if the mount point specified in the path specification does not exist. OSError: if the file-like object could not be opened. PathSpecError: if the path specification is incorrect. """ location = getattr(self._path_spec, "location", None) if self._path_spec.HasParent(): parent_path_spec = self._path_spec.parent if parent_path_spec.type_indicator != definitions.TYPE_INDICATOR_MOUNT: raise errors.PathSpecError( "Unsupported path specification with parent." ) mount_path_spec = self._resolver_context.GetMountPoint( parent_path_spec.identifier ) if not mount_path_spec: raise errors.MountPointError( f"No such mount point: {parent_path_spec.identifier:s}" ) if ( mount_path_spec.type_indicator != definitions.TYPE_INDICATOR_OS or mount_path_spec.parent ): raise errors.MountPointError( "Unsupported mount point path specification." ) mount_path = mount_path_spec.location if mount_path[-1] == os.path.sep: mount_path = mount_path[:-1] # Cannot use os.path.join() here since location is prefixed with a path # segment separator. location = "".join([mount_path, location]) if location is None: raise errors.PathSpecError("Path specification missing location.") # Windows does not support running os.stat on device files so we use # libsmdev to do an initial check. try: is_device = pysmdev.check_device(location) except OSError as exception: # Since os.stat() will not recognize Windows device file names and # will return '[Error 87] The parameter is incorrect' we check here # if pysmdev exception message contains ' access denied ' and raise # AccessError instead. if " access denied " in str(exception): raise errors.AccessError( f"Access denied to file: {location:s} with error: {exception!s}" ) is_device = False if not is_device: try: stat_info = os.stat(location) except OSError as exception: raise OSError(f"Unable to open file with error: {exception!s}") # In case the libsmdev check is not able to detect the device also use # the stat information. if stat.S_ISCHR(stat_info.st_mode) or stat.S_ISBLK(stat_info.st_mode): is_device = True if is_device: smdev_handle = pysmdev.handle() smdev_handle.open(location, mode="r") self._file_object = smdev_handle self._size = smdev_handle.media_size else: # pylint: disable=consider-using-with self._file_object = open( location, mode="rb", ) self._size = stat_info.st_size # Note: that the following functions do not follow the style guide # because they are part of the file-like object interface. # pylint: disable=invalid-name
[docs] def read(self, size=None): """Reads a byte string from the file-like object at the current offset. The function will read a byte string of the specified size or all of the remaining data if no size was specified. Args: size (Optional[int]): number of bytes to read, where None is all remaining data. Returns: bytes: data read. Raises: OSError: if the read failed. """ if not self._is_open: raise OSError("Not opened.") if size is None: size = self._size - self._file_object.tell() return self._file_object.read(size)
[docs] def seek(self, offset, whence=os.SEEK_SET): """Seeks to an offset within the file-like object. Args: offset (int): offset to seek to. whence (Optional(int)): value that indicates whether offset is an absolute or relative position within the file. Raises: OSError: if the seek failed. """ if not self._is_open: raise OSError("Not opened.") # For a yet unknown reason a Python file-like object on Windows allows for # invalid whence values to be passed to the seek function. This check # makes sure the behavior of the function is the same on all platforms. if whence not in (os.SEEK_CUR, os.SEEK_END, os.SEEK_SET): raise OSError("Unsupported whence.") self._file_object.seek(offset, whence)
[docs] def get_offset(self): """Retrieves the current offset into the file-like object. Returns: int: current offset into the file-like object. Raises: OSError: if the file-like object has not been opened. """ if not self._is_open: raise OSError("Not opened.") return self._file_object.tell()
[docs] def get_size(self): """Retrieves the size of the file-like object. Returns: int: size of the file-like object data. Raises: OSError: if the file-like object has not been opened. """ if not self._is_open: raise OSError("Not opened.") return self._size