# -*- coding: utf-8 -*-
"""The format analyzer."""
import pysigscan
from dfvfs.analyzer import specification
from dfvfs.lib import definitions
from dfvfs.resolver import resolver
[docs]
class Analyzer(object):
"""Format analyzer."""
_SCAN_BUFFER_SIZE = 33 * 1024
_analyzer_helpers = {}
# The archive format category analyzer helpers that do not have
# a format specification.
_archive_remainder_list = None
# The archive format category signature scanner.
_archive_scanner = None
# The archive format category specification store.
_archive_store = None
# The compressed stream format category analyzer helpers that do not have
# a format specification.
_compressed_stream_remainder_list = None
# The compressed stream format category signature scanner.
_compressed_stream_scanner = None
# The compressed stream format category specification store.
_compressed_stream_store = None
# The file system format category analyzer helpers that do not have
# a format specification.
_file_system_remainder_list = None
# The file system format category signature scanner.
_file_system_scanner = None
# The file system format category specification store.
_file_system_store = None
# The storage media image format category analyzer helpers that do not have
# a format specification.
_storage_media_image_remainder_list = None
# The storage media image format category signature scanner.
_storage_media_image_scanner = None
# The storage media image format category specification store.
_storage_media_image_store = None
# The volume system format category analyzer helpers that do not have
# a format specification.
_volume_system_remainder_list = None
# The volume system format category signature scanner.
_volume_system_scanner = None
# The volume system format category specification store.
_volume_system_store = None
@classmethod
def _FlushCache(cls, format_categories):
"""Flushes the cached objects for the specified format categories.
Args:
format_categories (set[str]): format categories.
"""
if definitions.FORMAT_CATEGORY_ARCHIVE in format_categories:
cls._archive_remainder_list = None
cls._archive_scanner = None
cls._archive_store = None
if definitions.FORMAT_CATEGORY_COMPRESSED_STREAM in format_categories:
cls._compressed_stream_remainder_list = None
cls._compressed_stream_scanner = None
cls._compressed_stream_store = None
if definitions.FORMAT_CATEGORY_FILE_SYSTEM in format_categories:
cls._file_system_remainder_list = None
cls._file_system_scanner = None
cls._file_system_store = None
if definitions.FORMAT_CATEGORY_STORAGE_MEDIA_IMAGE in format_categories:
cls._storage_media_image_remainder_list = None
cls._storage_media_image_scanner = None
cls._storage_media_image_store = None
if definitions.FORMAT_CATEGORY_VOLUME_SYSTEM in format_categories:
cls._volume_system_remainder_list = None
cls._volume_system_scanner = None
cls._volume_system_store = None
@classmethod
def _GetSignatureScanner(cls, specification_store):
"""Initializes a signature scanner based on a specification store.
Args:
specification_store (FormatSpecificationStore): specification store.
Returns:
pysigscan.scanner: signature scanner.
"""
signature_scanner = pysigscan.scanner()
signature_scanner.set_scan_buffer_size(cls._SCAN_BUFFER_SIZE)
for format_specification in specification_store.specifications:
for signature in format_specification.signatures:
pattern_offset = signature.offset
if pattern_offset is None:
signature_flags = pysigscan.signature_flags.NO_OFFSET
elif pattern_offset < 0:
pattern_offset *= -1
signature_flags = pysigscan.signature_flags.RELATIVE_FROM_END
else:
signature_flags = pysigscan.signature_flags.RELATIVE_FROM_START
signature_scanner.add_signature(
signature.identifier, pattern_offset, signature.pattern,
signature_flags)
return signature_scanner
@classmethod
def _GetSpecificationStore(cls, format_category):
"""Retrieves the specification store for specified format category.
Args:
format_category (str): format category.
Returns:
tuple[FormatSpecificationStore, list[AnalyzerHelper]]: a format
specification store and remaining analyzer helpers that do not have
a format specification.
"""
specification_store = specification.FormatSpecificationStore()
remainder_list = []
for analyzer_helper in cls._analyzer_helpers.values():
if not analyzer_helper.IsEnabled():
continue
if format_category in analyzer_helper.format_categories:
format_specification = analyzer_helper.GetFormatSpecification()
if format_specification is not None:
specification_store.AddSpecification(format_specification)
else:
remainder_list.append(analyzer_helper)
return specification_store, remainder_list
@classmethod
def _GetTypeIndicators(
cls, signature_scanner, specification_store, remainder_list, path_spec,
resolver_context=None):
"""Determines if a file contains a supported format types.
Args:
signature_scanner (pysigscan.scanner): signature scanner.
specification_store (FormatSpecificationStore): specification store.
remainder_list (list[AnalyzerHelper]): remaining analyzer helpers that
do not have a format specification.
path_spec (PathSpec): path specification.
resolver_context (Optional[Context]): resolver context, where None
represents the built-in context which is not multi process safe.
Returns:
list[str]: supported format type indicators.
"""
type_indicator_list = []
file_object = resolver.Resolver.OpenFileObject(
path_spec, resolver_context=resolver_context)
scan_state = pysigscan.scan_state()
signature_scanner.scan_file_object(scan_state, file_object)
for scan_result in iter(scan_state.scan_results):
format_specification = specification_store.GetSpecificationBySignature(
scan_result.identifier)
if format_specification.identifier not in type_indicator_list:
type_indicator_list.append(format_specification.identifier)
for analyzer_helper in remainder_list:
result = analyzer_helper.AnalyzeFileObject(file_object)
if result is not None:
type_indicator_list.append(result)
return type_indicator_list
[docs]
@classmethod
def DeregisterHelper(cls, analyzer_helper):
"""Deregisters a format analyzer helper.
Args:
analyzer_helper (AnalyzerHelper): analyzer helper.
Raises:
KeyError: if analyzer helper object is not set for the corresponding
type indicator.
"""
if analyzer_helper.type_indicator not in cls._analyzer_helpers:
raise KeyError((
f'Analyzer helper object not set for type indicator: '
f'{analyzer_helper.type_indicator:s}.'))
analyzer_helper = cls._analyzer_helpers[analyzer_helper.type_indicator]
cls._FlushCache(analyzer_helper.format_categories)
del cls._analyzer_helpers[analyzer_helper.type_indicator]
[docs]
@classmethod
def GetArchiveTypeIndicators(cls, path_spec, resolver_context=None):
"""Determines if a file contains a supported archive types.
Args:
path_spec (PathSpec): path specification.
resolver_context (Optional[Context]): resolver context, where None
represents the built-in context which is not multi process safe.
Returns:
list[str]: supported format type indicators.
"""
if (cls._archive_remainder_list is None or
cls._archive_store is None):
specification_store, remainder_list = cls._GetSpecificationStore(
definitions.FORMAT_CATEGORY_ARCHIVE)
cls._archive_remainder_list = remainder_list
cls._archive_store = specification_store
if cls._archive_scanner is None:
cls._archive_scanner = cls._GetSignatureScanner(cls._archive_store)
return cls._GetTypeIndicators(
cls._archive_scanner, cls._archive_store,
cls._archive_remainder_list, path_spec,
resolver_context=resolver_context)
[docs]
@classmethod
def GetCompressedStreamTypeIndicators(cls, path_spec, resolver_context=None):
"""Determines if a file contains a supported compressed stream types.
Args:
path_spec (PathSpec): path specification.
resolver_context (Optional[Context]): resolver context, where None
represents the built-in context which is not multi process safe.
Returns:
list[str]: supported format type indicators.
"""
if (cls._compressed_stream_remainder_list is None or
cls._compressed_stream_store is None):
specification_store, remainder_list = cls._GetSpecificationStore(
definitions.FORMAT_CATEGORY_COMPRESSED_STREAM)
cls._compressed_stream_remainder_list = remainder_list
cls._compressed_stream_store = specification_store
if cls._compressed_stream_scanner is None:
cls._compressed_stream_scanner = cls._GetSignatureScanner(
cls._compressed_stream_store)
return cls._GetTypeIndicators(
cls._compressed_stream_scanner, cls._compressed_stream_store,
cls._compressed_stream_remainder_list, path_spec,
resolver_context=resolver_context)
[docs]
@classmethod
def GetFileSystemTypeIndicators(cls, path_spec, resolver_context=None):
"""Determines if a file contains a supported file system types.
Args:
path_spec (PathSpec): path specification.
resolver_context (Optional[Context]): resolver context, where None
represents the built-in context which is not multi process safe.
Returns:
list[str]: supported format type indicators.
"""
if (cls._file_system_remainder_list is None or
cls._file_system_store is None):
specification_store, remainder_list = cls._GetSpecificationStore(
definitions.FORMAT_CATEGORY_FILE_SYSTEM)
cls._file_system_remainder_list = remainder_list
cls._file_system_store = specification_store
if cls._file_system_scanner is None:
cls._file_system_scanner = cls._GetSignatureScanner(
cls._file_system_store)
return cls._GetTypeIndicators(
cls._file_system_scanner, cls._file_system_store,
cls._file_system_remainder_list, path_spec,
resolver_context=resolver_context)
[docs]
@classmethod
def GetVolumeSystemTypeIndicators(cls, path_spec, resolver_context=None):
"""Determines if a file contains a supported volume system types.
Args:
path_spec (PathSpec): path specification.
resolver_context (Optional[Context]): resolver context, where None
represents the built-in context which is not multi process safe.
Returns:
list[str]: supported format type indicators.
"""
if (cls._volume_system_remainder_list is None or
cls._volume_system_store is None):
specification_store, remainder_list = cls._GetSpecificationStore(
definitions.FORMAT_CATEGORY_VOLUME_SYSTEM)
cls._volume_system_remainder_list = remainder_list
cls._volume_system_store = specification_store
if cls._volume_system_scanner is None:
cls._volume_system_scanner = cls._GetSignatureScanner(
cls._volume_system_store)
type_indicators = cls._GetTypeIndicators(
cls._volume_system_scanner, cls._volume_system_store,
cls._volume_system_remainder_list, path_spec,
resolver_context=resolver_context)
if (len(type_indicators) > 1 and
definitions.TYPE_INDICATOR_TSK_PARTITION in type_indicators):
# The TSK partition analyzer is used as a fallback, remove it if
# an alternative analyzer detected a supported volume system.
type_indicators.remove(definitions.TYPE_INDICATOR_TSK_PARTITION)
return type_indicators
[docs]
@classmethod
def RegisterHelper(cls, analyzer_helper):
"""Registers a format analyzer helper.
Args:
analyzer_helper (AnalyzerHelper): analyzer helper.
Raises:
KeyError: if analyzer helper object is already set for the corresponding
type indicator.
"""
if analyzer_helper.type_indicator in cls._analyzer_helpers:
raise KeyError((
f'Analyzer helper object already set for type indicator: '
f'{analyzer_helper.type_indicator:s}.'))
cls._FlushCache(analyzer_helper.format_categories)
cls._analyzer_helpers[analyzer_helper.type_indicator] = analyzer_helper