Source code for dfvfs.file_io.sqlite_blob_file_io

# -*- coding: utf-8 -*-
"""The SQlite blob file-like object."""

import os

from dfvfs.file_io import file_io
from dfvfs.lib import errors
from dfvfs.lib import sqlite_database
from dfvfs.resolver import resolver


[docs] class SQLiteBlobFile(file_io.FileIO): """File input/output (IO) object using sqlite.""" _OPERATORS = frozenset(['==', '=', 'IS'])
[docs] def __init__(self, resolver_context, path_spec): """Initializes the file-like object. Args: resolver_context (Context): resolver context. path_spec (PathSpec): a path specification. """ super(SQLiteBlobFile, self).__init__(resolver_context, path_spec) self._blob = None self._current_offset = 0 self._database_object = None self._number_of_rows = None self._size = 0 self._table_name = None
def _Close(self): """Closes the file-like object.""" if self._database_object: self._database_object.Close() self._blob = None self._current_offset = 0 self._size = 0 self._table_name = None def _Open(self, mode='rb'): """Opens the file-like object defined by path specification. Args: mode (Optional[str]): file access mode. Raises: AccessError: if the access to open the file was denied. IOError: if the file-like object could not be opened. OSError: if the file-like object could not be opened. PathSpecError: if the path specification is incorrect. """ if not self._path_spec.HasParent(): raise errors.PathSpecError( 'Unsupported path specification without parent.') table_name = getattr(self._path_spec, 'table_name', None) if table_name is None: raise errors.PathSpecError('Path specification missing table name.') column_name = getattr(self._path_spec, 'column_name', None) if column_name is None: raise errors.PathSpecError('Path specification missing column name.') row_condition = getattr(self._path_spec, 'row_condition', None) if row_condition: if not isinstance(row_condition, tuple) or len(row_condition) != 3: raise errors.PathSpecError(( 'Unsupported row_condition not a tuple in the form: ' '(column_name, operator, value).')) row_index = getattr(self._path_spec, 'row_index', None) if row_index is not None and not isinstance(row_index, int): raise errors.PathSpecError('Unsupported row_index not of integer type.') if not row_condition and row_index is None: raise errors.PathSpecError( 'Path specification requires either a row_condition or row_index.') if self._database_object: raise IOError('Database file already set.') file_object = resolver.Resolver.OpenFileObject( self._path_spec.parent, resolver_context=self._resolver_context) database_object = sqlite_database.SQLiteDatabaseFile() database_object.Open(file_object) # Sanity check the table and column names. rows = [] error_string = '' if not database_object.HasTable(table_name): error_string = f'Missing table: {table_name:s}' elif not database_object.HasColumn(table_name, column_name): error_string = ( f'Missing column: {column_name:s} in table: {table_name:s}') elif not row_condition: query = (f'SELECT {column_name:s} FROM {table_name:s} LIMIT 1 ' f'OFFSET {row_index:d}') rows = database_object.Query(query) elif not database_object.HasColumn(table_name, row_condition[0]): condition_column_name = row_condition[0] error_string = ( f'Missing row condition column: {condition_column_name:s} in table: ' f'{table_name:s}') elif row_condition[1] not in self._OPERATORS: condition_operator = row_condition[1] error_string = ( f'Unsupported row condition operator: {condition_operator:s}.') else: condition_column_name = row_condition[0] condition_operator = row_condition[1] query = (f'SELECT {column_name:s} FROM {table_name:s} ' f'WHERE {condition_column_name:s} {condition_operator:s} ?') rows = database_object.Query(query, parameters=(row_condition[2], )) # Make sure the query returns a single row, using cursor.rowcount # is not reliable for this purpose. if not error_string and (len(rows) != 1 or len(rows[0]) != 1): if not row_condition: error_string = ( f'Unable to open blob in table: {table_name:s} and column: ' f'{column_name:s} for row: {row_index:d}.') else: row_condition_string = ' '.join([ f'{value!s}' for value in iter(row_condition)]) error_string = ( f'Unable to open blob in table: {table_name:s} and column: ' f'{column_name:s} where: {row_condition_string:s}.') if error_string: database_object.Close() raise IOError(error_string) self._blob = rows[0][0] self._current_offset = 0 self._database_object = database_object self._size = len(self._blob) self._table_name = table_name # TODO: remove this when there is a move this to a central temp file # manager. https://github.com/log2timeline/dfvfs/issues/92
[docs] def GetNumberOfRows(self): """Retrieves the number of rows of the table. Returns: int: number of rows. Raises: IOError: if the file-like object has not been opened. OSError: if the file-like object has not been opened. """ if not self._database_object: raise IOError('Not opened.') if self._number_of_rows is None: self._number_of_rows = self._database_object.GetNumberOfRows( self._table_name) return self._number_of_rows
# Note: that the following functions do not follow the style guide # because they are part of the file-like object interface. # pylint: disable=invalid-name
[docs] def read(self, size=None): """Reads a byte string from the file-like object at the current offset. The function will read a byte string of the specified size or all of the remaining data if no size was specified. Args: size (Optional[int]): number of bytes to read, where None is all remaining data. Returns: bytes: data read. Raises: IOError: if the read failed. OSError: if the read failed. """ if not self._database_object: raise IOError('Not opened.') if self._current_offset < 0: raise IOError('Invalid offset value out of bounds.') if size == 0 or self._current_offset >= self._size: return b'' if size is None: size = self._size if self._current_offset + size > self._size: size = self._size - self._current_offset start_offset = self._current_offset self._current_offset += size return self._blob[start_offset:self._current_offset]
[docs] def seek(self, offset, whence=os.SEEK_SET): """Seeks to an offset within the file-like object. Args: offset (int): offset to seek to. whence (Optional(int)): value that indicates whether offset is an absolute or relative position within the file. Raises: IOError: if the seek failed. OSError: if the seek failed. """ if not self._database_object: raise IOError('Not opened.') if whence == os.SEEK_CUR: offset += self._current_offset elif whence == os.SEEK_END: offset += self._size elif whence != os.SEEK_SET: raise IOError('Unsupported whence.') if offset < 0: raise IOError('Invalid offset value out of bounds.') self._current_offset = offset
[docs] def get_offset(self): """Retrieves the current offset into the file-like object. Returns: int: current offset into the file-like object. Raises: IOError: if the file-like object has not been opened. OSError: if the file-like object has not been opened. """ if not self._database_object: raise IOError('Not opened.') return self._current_offset
[docs] def get_size(self): """Retrieves the size of the file-like object. Returns: int: size of the file-like object data. Raises: IOError: if the file-like object has not been opened. OSError: if the file-like object has not been opened. """ if not self._database_object: raise IOError('Not opened.') return self._size