Source code for dfvfs.lib.cpio

"""Copy in and out (CPIO) archive file."""

import os

from dtfabric.runtime import fabric as dtfabric_fabric

from dfvfs.lib import data_format
from dfvfs.lib import errors


[docs] class CPIOArchiveFileEntry: """CPIO archive file entry. Attributes: data_offset (int): data start offset. data_size (int): data size. group_identifier (int): group identifier (gid). inode_number (int): inode number. mode (int): mode. modification_time (int): modification POSIX timestamp. number_of_links (int): number of hard links. path (str): path. size (int): archive file entry record size. user_identifier (int): user identifier (uid). """
[docs] def __init__(self): """Initializes a CPIO archive file entry.""" super().__init__() self.data_offset = None self.data_size = None self.group_identifier = None self.inode_number = None self.mode = None self.modification_time = None self.number_of_links = None self.path = None self.size = None self.user_identifier = None
[docs] class CPIOArchiveFile(data_format.DataFormat): """CPIO archive file. Attributes: file_format (str): CPIO file format. """ _DATA_TYPE_FABRIC_DEFINITION_FILE = os.path.join( os.path.dirname(__file__), "cpio.yaml" ) with open(_DATA_TYPE_FABRIC_DEFINITION_FILE, "rb") as file_object: _DATA_TYPE_FABRIC_DEFINITION = file_object.read() _DATA_TYPE_FABRIC = dtfabric_fabric.DataTypeFabric( yaml_definition=_DATA_TYPE_FABRIC_DEFINITION ) _CPIO_BINARY_BIG_ENDIAN_FILE_ENTRY = _DATA_TYPE_FABRIC.CreateDataTypeMap( "cpio_binary_big_endian_file_entry" ) _CPIO_BINARY_LITTLE_ENDIAN_FILE_ENTRY = _DATA_TYPE_FABRIC.CreateDataTypeMap( "cpio_binary_little_endian_file_entry" ) _CPIO_PORTABLE_ASCII_FILE_ENTRY = _DATA_TYPE_FABRIC.CreateDataTypeMap( "cpio_portable_ascii_file_entry" ) _CPIO_NEW_ASCII_FILE_ENTRY = _DATA_TYPE_FABRIC.CreateDataTypeMap( "cpio_new_ascii_file_entry" ) _CPIO_SIGNATURE_BINARY_BIG_ENDIAN = b"\x71\xc7" _CPIO_SIGNATURE_BINARY_LITTLE_ENDIAN = b"\xc7\x71" _CPIO_SIGNATURE_PORTABLE_ASCII = b"070707" _CPIO_SIGNATURE_NEW_ASCII = b"070701" _CPIO_SIGNATURE_NEW_ASCII_WITH_CHECKSUM = b"070702" _CPIO_ATTRIBUTE_NAMES_ODC = ( "device_number", "inode_number", "mode", "user_identifier", "group_identifier", "number_of_links", "special_device_number", "modification_time", "path_size", "file_size", ) _CPIO_ATTRIBUTE_NAMES_CRC = ( "inode_number", "mode", "user_identifier", "group_identifier", "number_of_links", "modification_time", "path_size", "file_size", "device_major_number", "device_minor_number", "special_device_major_number", "special_device_minor_number", "checksum", )
[docs] def __init__(self, encoding="utf-8"): """Initializes a CPIO archive file. Args: encoding (str): encoding of paths within the archive file. """ super().__init__() self._encoding = encoding self._file_entries = None self._file_object = None self._file_size = 0 self.file_format = None
@property def encoding(self): """str: encoding of paths within the archive file.""" return self._encoding def _ReadFileEntry(self, file_object, file_offset): """Reads a file entry. Args: file_object (FileIO): file-like object. file_offset (int): offset of the data relative from the start of the file-like object. Returns: CPIOArchiveFileEntry: a file entry. Raises: FileFormatError: if the file entry cannot be read. """ data_type_map = None if self.file_format == "bin-big-endian": data_type_map = self._CPIO_BINARY_BIG_ENDIAN_FILE_ENTRY elif self.file_format == "bin-little-endian": data_type_map = self._CPIO_BINARY_LITTLE_ENDIAN_FILE_ENTRY elif self.file_format == "odc": data_type_map = self._CPIO_PORTABLE_ASCII_FILE_ENTRY elif self.file_format in ("crc", "newc"): data_type_map = self._CPIO_NEW_ASCII_FILE_ENTRY file_entry, file_entry_data_size = self._ReadStructureFromFileObject( file_object, file_offset, data_type_map ) file_offset += file_entry_data_size if self.file_format in ("bin-big-endian", "bin-little-endian"): file_entry.modification_time = ( file_entry.modification_time.upper << 16 ) | file_entry.modification_time.lower file_entry.file_size = ( file_entry.file_size.upper << 16 ) | file_entry.file_size.lower if self.file_format == "odc": for attribute_name in self._CPIO_ATTRIBUTE_NAMES_ODC: value = getattr(file_entry, attribute_name, None) try: value = int(value, 8) except ValueError: raise errors.FileFormatError( ( f"Unable to convert attribute: {attribute_name:s} into " f"an integer" ) ) value = setattr(file_entry, attribute_name, value) elif self.file_format in ("crc", "newc"): for attribute_name in self._CPIO_ATTRIBUTE_NAMES_CRC: value = getattr(file_entry, attribute_name, None) try: value = int(value, 16) except ValueError: raise errors.FileFormatError( ( f"Unable to convert attribute: {attribute_name:s} into " f"an integer" ) ) value = setattr(file_entry, attribute_name, value) path_data = file_object.read(file_entry.path_size) file_offset += file_entry.path_size path = path_data.decode(self._encoding) path, _, _ = path.partition("\x00") if self.file_format in ("bin-big-endian", "bin-little-endian"): padding_size = file_offset % 2 if padding_size > 0: padding_size = 2 - padding_size elif self.file_format == "odc": padding_size = 0 elif self.file_format in ("crc", "newc"): padding_size = file_offset % 4 if padding_size > 0: padding_size = 4 - padding_size file_offset += padding_size archive_file_entry = CPIOArchiveFileEntry() archive_file_entry.data_offset = file_offset archive_file_entry.data_size = file_entry.file_size archive_file_entry.group_identifier = file_entry.group_identifier archive_file_entry.inode_number = file_entry.inode_number archive_file_entry.modification_time = file_entry.modification_time archive_file_entry.number_of_links = file_entry.number_of_links archive_file_entry.path = path archive_file_entry.mode = file_entry.mode archive_file_entry.size = ( file_entry_data_size + file_entry.path_size + padding_size + file_entry.file_size ) archive_file_entry.user_identifier = file_entry.user_identifier file_offset += file_entry.file_size if self.file_format in ("bin-big-endian", "bin-little-endian"): padding_size = file_offset % 2 if padding_size > 0: padding_size = 2 - padding_size elif self.file_format == "odc": padding_size = 0 elif self.file_format in ("crc", "newc"): padding_size = file_offset % 4 if padding_size > 0: padding_size = 4 - padding_size if padding_size > 0: archive_file_entry.size += padding_size return archive_file_entry def _ReadFileEntries(self, file_object): """Reads the file entries from the cpio archive. Args: file_object (FileIO): file-like object. """ self._file_entries = {} file_offset = 0 while file_offset < self._file_size or self._file_size == 0: file_entry = self._ReadFileEntry(file_object, file_offset) file_offset += file_entry.size if file_entry.path == "TRAILER!!!": break if file_entry.path in self._file_entries: # TODO: alert on file entries with duplicate paths? continue self._file_entries[file_entry.path] = file_entry
[docs] def Close(self): """Closes the CPIO archive file.""" self._file_entries = None self._file_object = None self._file_size = None
[docs] def FileEntryExistsByPath(self, path): """Determines if file entry for a specific path exists. Returns: bool: True if the file entry exists. """ if self._file_entries is None: return False return path in self._file_entries
[docs] def GetFileEntries(self, path_prefix=""): """Retrieves the file entries. Args: path_prefix (str): path prefix. Yields: CPIOArchiveFileEntry: a CPIO archive file entry. """ if self._file_entries: for path, file_entry in self._file_entries.items(): if path.startswith(path_prefix): yield file_entry
[docs] def GetFileEntryByPath(self, path): """Retrieves a file entry for a specific path. Returns: CPIOArchiveFileEntry: a CPIO archive file entry or None if not available. """ if not self._file_entries: return None return self._file_entries.get(path)
[docs] def Open(self, file_object): """Opens the CPIO archive file. Args: file_object (FileIO): a file-like object. Raises: OSError: if the file format signature is not supported. """ file_object.seek(0, os.SEEK_SET) signature_data = file_object.read(6) self.file_format = None if len(signature_data) > 2: if signature_data[:2] == self._CPIO_SIGNATURE_BINARY_BIG_ENDIAN: self.file_format = "bin-big-endian" elif signature_data[:2] == self._CPIO_SIGNATURE_BINARY_LITTLE_ENDIAN: self.file_format = "bin-little-endian" elif signature_data == self._CPIO_SIGNATURE_PORTABLE_ASCII: self.file_format = "odc" elif signature_data == self._CPIO_SIGNATURE_NEW_ASCII: self.file_format = "newc" elif signature_data == self._CPIO_SIGNATURE_NEW_ASCII_WITH_CHECKSUM: self.file_format = "crc" if self.file_format is None: raise OSError("Unsupported CPIO format.") self._file_object = file_object self._file_size = file_object.get_size() self._ReadFileEntries(self._file_object)
[docs] def ReadDataAtOffset(self, file_offset, size): """Reads a byte string from the file-like object at a specific offset. Args: file_offset (int): file offset. size (int): number of bytes to read. Returns: bytes: data read. Raises: OSError: if the read failed. """ self._file_object.seek(file_offset, os.SEEK_SET) return self._file_object.read(size)