Source code for dfvfs.lib.raw_helper

"""Helper functions for RAW storage media image support."""

from dfvfs.lib import errors
from dfvfs.path import factory as path_spec_factory


def _RawGlobPathSpecWithAlphabeticalSchema(
    file_system,
    parent_path_spec,
    segment_format,
    location,
    segment_length,
    upper_case=False,
):
    """Globs for path specifications according to an alphabetical naming schema.

    Args:
      file_system (FileSystem): file system.
      parent_path_spec (PathSpec): parent path specification.
      segment_format (str): naming schema of the segment file location.
      location (str): the base segment file location string.
      segment_length (int): length (number of characters) of the segment
          indicator.
      upper_case (Optional[bool]): True if the segment name is in upper case.

    Returns:
      list[PathSpec]: path specifications that match the glob.
    """
    segment_number = 0
    segment_files = []

    while True:
        segment_index = segment_number
        segment_letters = []
        while len(segment_letters) < segment_length:
            segment_index, remainder = divmod(segment_index, 26)
            if upper_case:
                segment_letters.append(chr(ord("A") + remainder))
            else:
                segment_letters.append(chr(ord("a") + remainder))

        # Reverse the segment letters list to form the extension.
        segment_letters = "".join(segment_letters[::-1])
        segment_location = segment_format.format(location, segment_letters)

        # Note that we don't want to set the keyword arguments when not used
        # because the path specification base class will check for unused
        # keyword arguments and raise.
        kwargs = path_spec_factory.Factory.GetProperties(parent_path_spec)

        kwargs["location"] = segment_location
        if parent_path_spec.parent is not None:
            kwargs["parent"] = parent_path_spec.parent

        segment_path_spec = path_spec_factory.Factory.NewPathSpec(
            parent_path_spec.type_indicator, **kwargs
        )

        if not file_system.FileEntryExistsByPathSpec(segment_path_spec):
            break

        segment_files.append(segment_path_spec)

        segment_number += 1

    return segment_files


def _RawGlobPathSpecWithNumericSchema(
    file_system, parent_path_spec, segment_format, location, segment_number
):
    """Globs for path specifications according to a numeric naming schema.

    Args:
      file_system (FileSystem): file system.
      parent_path_spec (PathSpec): parent path specification.
      segment_format (str): naming schema of the segment file location.
      location (str): the base segment file location string.
      segment_number (int): first segment number.

    Returns:
      list[PathSpec]: path specifications that match the glob.
    """
    segment_files = []

    while True:
        segment_location = segment_format.format(location, segment_number)

        # Note that we don't want to set the keyword arguments when not used
        # because the path specification base class will check for unused
        # keyword arguments and raise.
        kwargs = path_spec_factory.Factory.GetProperties(parent_path_spec)

        kwargs["location"] = segment_location
        if parent_path_spec.parent is not None:
            kwargs["parent"] = parent_path_spec.parent

        segment_path_spec = path_spec_factory.Factory.NewPathSpec(
            parent_path_spec.type_indicator, **kwargs
        )

        if not file_system.FileEntryExistsByPathSpec(segment_path_spec):
            break

        segment_files.append(segment_path_spec)

        segment_number += 1

    return segment_files


[docs] def RawGlobPathSpec(file_system, path_spec): """Globs for path specifications according to the split RAW naming schema. Args: file_system (FileSystem): file system. path_spec (PathSpec): path specification. Returns: list[PathSpec]: path specifications that match the glob. Raises: PathSpecError: if the path specification is invalid. RuntimeError: if the maximum number of supported segment files is reached. """ if not path_spec.HasParent(): raise errors.PathSpecError("Unsupported path specification without parent.") parent_path_spec = path_spec.parent parent_location = getattr(parent_path_spec, "location", None) if not parent_location: raise errors.PathSpecError( "Unsupported parent path specification without location." ) path_segments = file_system.SplitPath(parent_location) last_path_segment = path_segments.pop() filename_prefix, dot, segment_extension = last_path_segment.rpartition(".") if not dot: filename_prefix = segment_extension segment_extension = "" segment_extension_length = len(segment_extension) path_segments.append(filename_prefix) location = file_system.JoinPath(path_segments) if not segment_extension: filename_prefix_length = len(filename_prefix) # Check if there are muliple segment files in the form: PREFIX[a-z]+ # where [a-z]+ starts with a and consist of multiple letters, # e.g. PREFIXaa or PREFIXzz. if filename_prefix[-2:] == "aa": suffix_index = filename_prefix_length - 4 while suffix_index >= 0: if filename_prefix[suffix_index] != "a": suffix_index += 1 break suffix_index -= 1 suffix_length = filename_prefix_length - suffix_index segment_files = _RawGlobPathSpecWithAlphabeticalSchema( file_system, parent_path_spec, "{0:s}{1:s}", location[:-suffix_length], filename_prefix_length - suffix_index, upper_case=False, ) # Check if there are muliple segment files in the form: PREFIX[A-Z]+ # where [A-Z]+ starts with A and consist of multiple letters, # e.g. PREFIXAA or PREFIXZZ. elif filename_prefix[-2:] == "AA": suffix_index = filename_prefix_length - 4 while suffix_index >= 0: if filename_prefix[suffix_index] != "A": suffix_index += 1 break suffix_index -= 1 suffix_length = filename_prefix_length - suffix_index segment_files = _RawGlobPathSpecWithAlphabeticalSchema( file_system, parent_path_spec, "{0:s}{1:s}", location[:-suffix_length], filename_prefix_length - suffix_index, upper_case=True, ) # Check if there are muliple segment files in the form: PREFIX# # where # starts with either 0 or 1 and consist of multiple digits, # e.g. PREFIX1 or PREFIX000. elif filename_prefix[-1].isdigit(): suffix_index = filename_prefix_length - 2 while suffix_index >= 0: if not filename_prefix[suffix_index].isdigit(): suffix_index += 1 break suffix_index -= 1 try: segment_number = int(filename_prefix[suffix_index:], 10) except ValueError: raise errors.PathSpecError( "Unsupported path specification invalid segment file scheme." ) if segment_number not in [0, 1]: raise errors.PathSpecError( "Unsupported path specification invalid segment file scheme." ) suffix_length = filename_prefix_length - suffix_index if suffix_length == 1: segment_format = "{0:s}{1:d}" elif suffix_length == 2: segment_format = "{0:s}{1:02d}" elif suffix_length == 3: segment_format = "{0:s}{1:03d}" elif suffix_length == 4: segment_format = "{0:s}{1:04d}" else: raise errors.PathSpecError( "Unsupported path specification invalid segment file scheme." ) segment_files = _RawGlobPathSpecWithNumericSchema( file_system, parent_path_spec, segment_format, location[:-suffix_length], segment_number, ) else: segment_files = [] # Check if there is single segment file e.g. PREFIX.dd, PREFIX.dmg, # PREFIX.img, PREFIX.raw. elif segment_extension.lower() in ["dd", "dmg", "img", "raw"]: if file_system.FileEntryExistsByPathSpec(parent_path_spec): segment_files = [parent_path_spec] else: segment_files = [] # Check if there are muliple segment files in the form: PREFIX.[a-z]+ # where [a-z]+ starts with a and consist of multiple letters, # e.g. PREFIX.aa or PREFIX.aaa. elif segment_extension == "a" * segment_extension_length: segment_files = _RawGlobPathSpecWithAlphabeticalSchema( file_system, parent_path_spec, "{0:s}.{1:s}", location, segment_extension_length, upper_case=False, ) # Check if there are muliple segment files in the form: PREFIX.[A-Z]+ # where [A-Z]+ starts with A and consist of multiple letters, # e.g. PREFIX.AA or PREFIX.AAA. elif segment_extension == "A" * segment_extension_length: segment_files = _RawGlobPathSpecWithAlphabeticalSchema( file_system, parent_path_spec, "{0:s}.{1:s}", location, segment_extension_length, upper_case=True, ) # Check if there are muliple segment files in the form: PREFIX###.asb # where # starts with 1 and consist of multiple digits e.g. PREFIX001.asb. elif segment_extension == "asb": if location[-3:] == "001": segment_files = _RawGlobPathSpecWithNumericSchema( file_system, parent_path_spec, "{0:s}{1:03d}.asb", location[:-3], 1 ) else: segment_files = [] # Check if there are muliple segment files in the form: PREFIX-f###.vmdk # where # starts with 1 and consist of multiple digits, # e.g. PREFIX-f001.vmdk. elif segment_extension == "vmdk": location, _, segment_number = location.partition("-f") if segment_number == "001": segment_files = _RawGlobPathSpecWithNumericSchema( file_system, parent_path_spec, "{0:s}-f{1:03d}.vmdk", location, 1 ) else: segment_files = [] # Check if there are muliple segment files in the form: PREFIX.# # where # starts with either 0 or 1 and consist of multiple digits, # e.g. PREFIX.1 or PREFIX.000. elif segment_extension.isdigit(): try: segment_number = int(segment_extension, 10) except ValueError: raise errors.PathSpecError( ( f"Unsupported path specification invalid segment file extension: " f"{segment_extension:s}" ) ) if segment_number not in [0, 1]: raise errors.PathSpecError( ( f"Unsupported path specification invalid segment file extension: " f"{segment_extension:s}" ) ) if segment_extension_length == 1: segment_format = "{0:s}.{1:d}" elif segment_extension_length == 2: segment_format = "{0:s}.{1:02d}" elif segment_extension_length == 3: segment_format = "{0:s}.{1:03d}" elif segment_extension_length == 4: segment_format = "{0:s}.{1:04d}" else: raise errors.PathSpecError( ( f"Unsupported path specification invalid segment file extension: " f"{segment_extension:s}" ) ) segment_files = _RawGlobPathSpecWithNumericSchema( file_system, parent_path_spec, segment_format, location, segment_number ) else: segment_files = [] # Check if there are muliple segment files in the form: PREFIX.#of# # e.g. PREFIX.1of5 - PREFIX.5of5. segment_number, _, number_of_segments = segment_extension.partition("of") if segment_number.isdigit() and number_of_segments.isdigit(): try: segment_number = int(segment_number, 10) number_of_segments = int(number_of_segments, 10) except ValueError: raise errors.PathSpecError( ( f"Unsupported path specification invalid segment file " f"extension: {segment_extension:s}" ) ) if segment_number != 1: raise errors.PathSpecError( ( f"Unsupported path specification invalid segment file " f"extension: {segment_extension:s}" ) ) for segment_number in range(1, number_of_segments + 1): segment_location = ( f"{location:s}.{segment_number:d}of{number_of_segments:d}" ) # Note that we don't want to set the keyword arguments when not used # because the path specification base class will check for unused # keyword arguments and raise. kwargs = path_spec_factory.Factory.GetProperties(parent_path_spec) kwargs["location"] = segment_location if parent_path_spec.parent is not None: kwargs["parent"] = parent_path_spec.parent segment_path_spec = path_spec_factory.Factory.NewPathSpec( parent_path_spec.type_indicator, **kwargs ) if not file_system.FileEntryExistsByPathSpec(segment_path_spec): raise errors.PathSpecError( ( f"Missing segment file: {segment_number:d}of" f"{number_of_segments:d} for extension: " f"{segment_extension:s}" ) ) segment_files.append(segment_path_spec) return segment_files