From d36f194a34e3073da696e8902efc421dfa0fbedf Mon Sep 17 00:00:00 2001 From: remittor Date: Sun, 22 Oct 2023 00:31:13 +0300 Subject: [PATCH] [xmir_base] Add module "ubi-reader" Sources: https://github.com/onekey-sec/ubi_reader --- xmir_base/ubireader/__init__.py | 0 xmir_base/ubireader/debug.py | 53 +++ xmir_base/ubireader/scripts/__init__.py | 1 + .../scripts/ubireader_display_blocks.py | 195 ++++++++ .../scripts/ubireader_display_info.py | 192 ++++++++ .../scripts/ubireader_extract_files.py | 205 +++++++++ .../scripts/ubireader_extract_images.py | 174 +++++++ .../ubireader/scripts/ubireader_list_files.py | 180 ++++++++ .../ubireader/scripts/ubireader_utils_info.py | 350 ++++++++++++++ xmir_base/ubireader/settings.py | 34 ++ xmir_base/ubireader/ubi/__init__.py | 226 +++++++++ xmir_base/ubireader/ubi/block/__init__.py | 237 ++++++++++ xmir_base/ubireader/ubi/block/layout.py | 64 +++ xmir_base/ubireader/ubi/block/sort.py | 130 ++++++ xmir_base/ubireader/ubi/defines.py | 119 +++++ xmir_base/ubireader/ubi/display.py | 161 +++++++ xmir_base/ubireader/ubi/headers.py | 114 +++++ xmir_base/ubireader/ubi/image.py | 60 +++ xmir_base/ubireader/ubi/volume.py | 122 +++++ xmir_base/ubireader/ubi_io.py | 251 ++++++++++ xmir_base/ubireader/ubifs/__init__.py | 149 ++++++ xmir_base/ubireader/ubifs/defines.py | 432 ++++++++++++++++++ xmir_base/ubireader/ubifs/display.py | 165 +++++++ xmir_base/ubireader/ubifs/list.py | 177 +++++++ xmir_base/ubireader/ubifs/misc.py | 77 ++++ xmir_base/ubireader/ubifs/nodes.py | 262 +++++++++++ xmir_base/ubireader/ubifs/output.py | 206 +++++++++ xmir_base/ubireader/ubifs/walk.py | 160 +++++++ xmir_base/ubireader/utils.py | 183 ++++++++ 29 files changed, 4679 insertions(+) create mode 100644 xmir_base/ubireader/__init__.py create mode 100644 xmir_base/ubireader/debug.py create mode 100644 xmir_base/ubireader/scripts/__init__.py create mode 100644 xmir_base/ubireader/scripts/ubireader_display_blocks.py create mode 100644 xmir_base/ubireader/scripts/ubireader_display_info.py create mode 100644 xmir_base/ubireader/scripts/ubireader_extract_files.py create mode 100644 xmir_base/ubireader/scripts/ubireader_extract_images.py create mode 100644 xmir_base/ubireader/scripts/ubireader_list_files.py create mode 100644 xmir_base/ubireader/scripts/ubireader_utils_info.py create mode 100644 xmir_base/ubireader/settings.py create mode 100644 xmir_base/ubireader/ubi/__init__.py create mode 100644 xmir_base/ubireader/ubi/block/__init__.py create mode 100644 xmir_base/ubireader/ubi/block/layout.py create mode 100644 xmir_base/ubireader/ubi/block/sort.py create mode 100644 xmir_base/ubireader/ubi/defines.py create mode 100644 xmir_base/ubireader/ubi/display.py create mode 100644 xmir_base/ubireader/ubi/headers.py create mode 100644 xmir_base/ubireader/ubi/image.py create mode 100644 xmir_base/ubireader/ubi/volume.py create mode 100644 xmir_base/ubireader/ubi_io.py create mode 100644 xmir_base/ubireader/ubifs/__init__.py create mode 100644 xmir_base/ubireader/ubifs/defines.py create mode 100644 xmir_base/ubireader/ubifs/display.py create mode 100644 xmir_base/ubireader/ubifs/list.py create mode 100644 xmir_base/ubireader/ubifs/misc.py create mode 100644 xmir_base/ubireader/ubifs/nodes.py create mode 100644 xmir_base/ubireader/ubifs/output.py create mode 100644 xmir_base/ubireader/ubifs/walk.py create mode 100644 xmir_base/ubireader/utils.py diff --git a/xmir_base/ubireader/__init__.py b/xmir_base/ubireader/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/xmir_base/ubireader/debug.py b/xmir_base/ubireader/debug.py new file mode 100644 index 0000000..7201fe9 --- /dev/null +++ b/xmir_base/ubireader/debug.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +############################################################# +# ubi_reader/ubifs +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +import sys +import traceback +from ubireader import settings + +def log(obj, message): + if settings.logging_on or settings.logging_on_verbose: + print('{} {}'.format(obj.__name__, message)) + +def verbose_log(obj, message): + if settings.logging_on_verbose: + log(obj, message) + +def verbose_display(displayable_obj): + if settings.logging_on_verbose: + print(displayable_obj.display('\t')) + +def error(obj, level, message): + if settings.error_action == 'exit': + print('{} {}: {}'.format(obj.__name__, level, message)) + if settings.fatal_traceback: + traceback.print_exc() + sys.exit(1) + + else: + if level.lower() == 'warn': + print('{} {}: {}'.format(obj.__name__, level, message)) + elif level.lower() == 'fatal': + print('{} {}: {}'.format(obj.__name__, level, message)) + if settings.fatal_traceback: + traceback.print_exc() + sys.exit(1) + else: + print('{} {}: {}'.format(obj.__name__, level, message)) + diff --git a/xmir_base/ubireader/scripts/__init__.py b/xmir_base/ubireader/scripts/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/xmir_base/ubireader/scripts/__init__.py @@ -0,0 +1 @@ + diff --git a/xmir_base/ubireader/scripts/ubireader_display_blocks.py b/xmir_base/ubireader/scripts/ubireader_display_blocks.py new file mode 100644 index 0000000..8273219 --- /dev/null +++ b/xmir_base/ubireader/scripts/ubireader_display_blocks.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python +############################################################# +# ubi_reader/scripts/ubireader_display_blocks +# (c) 2019 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +############################################################# +# Search by block parameters and display information about +# matching blocks. +############################################################# + +import os +import sys +import argparse +from ubireader.ubi import ubi_base +from ubireader.ubi_io import ubi_file +from ubireader import settings +from ubireader.ubi.defines import UBI_EC_HDR_MAGIC +from ubireader.ubifs.defines import UBIFS_NODE_MAGIC +from ubireader.utils import guess_filetype, guess_start_offset, guess_leb_size, guess_peb_size + + +def main(): + description = 'Search for specified blocks and display information.' + usage = """ + ubireader_display_blocks "{'block.attr': value,...}" path/to/image + Search for blocks by given parameters and display information about them. + This is block only, no volume or image information is created, which can + be used to debug file and image extraction. + Example: + "{'peb_num':[0, 1] + range(100, 102), 'ec_hdr.ec': 1, 'is_valid': True}" + This matches block.peb_num 0, 1, 100, 101, and 102 + with a block.ec_hdr.ec (erase count) of 1, that are valid PEB blocks. + For a full list of parameters check ubireader.ubi.block.description. + """ + parser = argparse.ArgumentParser(usage=usage, description=description) + + parser.add_argument('-l', '--log', action='store_true', dest='log', + help='Print extraction information to screen.') + + parser.add_argument('-v', '--verbose-log', action='store_true', dest='verbose', + help='Prints nearly everything about anything to screen.') + + parser.add_argument('-p', '--peb-size', type=int, dest='block_size', + help='Specify PEB size. (UBI Only)') + + parser.add_argument('-e', '--leb-size', type=int, dest='block_size', + help='Specify LEB size. (UBIFS Only)') + + parser.add_argument('-s', '--start-offset', type=int, dest='start_offset', + help='Specify offset of UBI/UBIFS data in file. (default: 0)') + + parser.add_argument('-n', '--end-offset', type=int, dest='end_offset', + help='Specify end offset of UBI/UBIFS data in file.') + + parser.add_argument('-g', '--guess-offset', type=int, dest='guess_offset', + help='Specify offset to start guessing where UBI data is in file. (default: 0)') + + parser.add_argument('-w', '--warn-only-block-read-errors', action='store_true', dest='warn_only_block_read_errors', + help='Attempts to continue extracting files even with bad block reads. Some data will be missing or corrupted! (default: False)') + + parser.add_argument('-i', '--ignore-block-header-errors', action='store_true', dest='ignore_block_header_errors', + help='Forces unused and error containing blocks to be included and also displayed with log/verbose. (default: False)') + + parser.add_argument('-f', '--u-boot-fix', action='store_true', dest='uboot_fix', + help='Assume blocks with image_seq 0 are because of older U-boot implementations and include them. (default: False)') + + parser.add_argument('block_search_params', + help=""" + Double quoted Dict of ubi.block.description attributes, which is run through eval(). + Ex. "{\'peb_num\':[0, 1], \'ec_hdr.ec\': 1, \'is_valid\': True}" + """) + + parser.add_argument('filepath', help='File with blocks of interest.') + + if len(sys.argv) == 1: + parser.print_help() + + args = parser.parse_args() + + settings.logging_on = args.log + + settings.logging_on_verbose = args.verbose + + settings.warn_only_block_read_errors = args.warn_only_block_read_errors + + settings.ignore_block_header_errors = args.ignore_block_header_errors + + settings.uboot_fix = args.uboot_fix + + if args.filepath: + path = args.filepath + if not os.path.exists(path): + parser.error("File path doesn't exist.") + else: + parser.error('File path must be provided.') + sys.exit(1) + + if args.start_offset: + start_offset = args.start_offset + elif args.guess_offset: + start_offset = guess_start_offset(path, args.guess_offset) + else: + start_offset = guess_start_offset(path) + + if args.end_offset: + end_offset = args.end_offset + else: + end_offset = None + + filetype = guess_filetype(path, start_offset) + if not filetype: + parser.error('Could not determine file type.') + + if args.block_size: + block_size = args.block_size + else: + if filetype == UBI_EC_HDR_MAGIC: + block_size = guess_peb_size(path) + elif filetype == UBIFS_NODE_MAGIC: + block_size = guess_leb_size(path) + + if not block_size: + parser.error('Block size could not be determined.') + + if args.block_search_params: + try: + search_params = eval(args.block_search_params) + + if not isinstance(search_params, dict): + parser.error('Search Param Error: Params must be a Dict of block PEB object items:value pairs.') + + except NameError as e: + parser.error('Search Param Error: Dict key block attrs must be single quoted.') + + except Exception as e: + parser.error('Search Param Error: %s' % e) + + else: + parser.error('No search parameters given, -b arg is required.') + + + ufile_obj = ubi_file(path, block_size, start_offset, end_offset) + ubi_obj = ubi_base(ufile_obj) + blocks = [] + + for block in ubi_obj.blocks: + match = True + + for key in search_params: + b = ubi_obj.blocks[block] + + for attr in key.split('.'): + if hasattr(b, attr): + b = getattr(b, attr) + + if isinstance(search_params[key], list): + if isinstance(b, list): + for value in b: + if value in search_params[key]: + break + else: + match = False + elif b not in search_params[key]: + match = False + + elif b != search_params[key]: + match = False + break + + if match: + blocks.append(ubi_obj.blocks[block]) + + ufile_obj.close() + + print('\nBlock matches: %s' % len(blocks)) + + for block in blocks: + print(block.display()) + +if __name__=='__main__': + main() diff --git a/xmir_base/ubireader/scripts/ubireader_display_info.py b/xmir_base/ubireader/scripts/ubireader_display_info.py new file mode 100644 index 0000000..b5291aa --- /dev/null +++ b/xmir_base/ubireader/scripts/ubireader_display_info.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python + +############################################################# +# ubi_reader +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +import os +import sys +import time +import argparse + +from ubireader import settings +from ubireader.ubi import ubi +from ubireader.ubi.defines import UBI_EC_HDR_MAGIC +from ubireader.ubifs import ubifs +from ubireader.ubifs.defines import UBIFS_NODE_MAGIC +from ubireader.utils import guess_filetype, guess_start_offset, guess_leb_size, guess_peb_size +from ubireader.ubi_io import ubi_file, leb_virtual_file + +def main(): + start = time.time() + description = 'Show information about UBI or UBIFS image.' + usage = 'ubireader_display_info [options] filepath' + parser = argparse.ArgumentParser(usage=usage, description=description) + + parser.add_argument('-l', '--log', action='store_true', dest='log', + help='Print extraction information to screen.') + + parser.add_argument('-v', '--verbose-log', action='store_true', dest='verbose', + help='Prints nearly everything about anything to screen.') + + parser.add_argument('-u', '--ubifs-info', action='store_true', dest='ubifs_info', + help='Get UBIFS information from inside a UBI image. (default: false)') + + parser.add_argument('-p', '--peb-size', type=int, dest='block_size', + help='Specify PEB size. (UBI Only)') + + parser.add_argument('-e', '--leb-size', type=int, dest='block_size', + help='Specify LEB size. (UBIFS Only)') + + parser.add_argument('-s', '--start-offset', type=int, dest='start_offset', + help='Specify offset of UBI/UBIFS data in file. (default: 0)') + + parser.add_argument('-n', '--end-offset', type=int, dest='end_offset', + help='Specify end offset of UBI/UBIFS data in file.') + + parser.add_argument('-g', '--guess-offset', type=int, dest='guess_offset', + help='Specify offset to start guessing where UBI data is in file. (default: 0)') + + parser.add_argument('-w', '--warn-only-block-read-errors', action='store_true', dest='warn_only_block_read_errors', + help='Attempts to continue extracting files even with bad block reads. Some data will be missing or corrupted! (default: False)') + + parser.add_argument('-i', '--ignore-block-header-errors', action='store_true', dest='ignore_block_header_errors', + help='Forces unused and error containing blocks to be included and also displayed with log/verbose. (default: False)') + + parser.add_argument('-f', '--u-boot-fix', action='store_true', dest='uboot_fix', + help='Assume blocks with image_seq 0 are because of older U-boot implementations and include them. (default: False)') + + parser.add_argument('filepath', help='File to extract contents of.') + + if len(sys.argv) == 1: + parser.print_help() + sys.exit(1) + + args = parser.parse_args() + + settings.logging_on = args.log + + settings.logging_on_verbose = args.verbose + + settings.warn_only_block_read_errors = args.warn_only_block_read_errors + + settings.ignore_block_header_errors = args.ignore_block_header_errors + + settings.uboot_fix = args.uboot_fix + + if args.filepath: + path = args.filepath + if not os.path.exists(path): + parser.error("File path doesn't exist.") + + if args.start_offset: + start_offset = args.start_offset + elif args.guess_offset: + start_offset = guess_start_offset(path, args.guess_offset) + else: + start_offset = guess_start_offset(path) + + if args.end_offset: + end_offset = args.end_offset + else: + end_offset = None + + filetype = guess_filetype(path, start_offset) + if not filetype: + parser.error('Could not determine file type.') + + ubifs_info = args.ubifs_info + + if args.block_size: + block_size = args.block_size + else: + if filetype == UBI_EC_HDR_MAGIC: + block_size = guess_peb_size(path) + elif filetype == UBIFS_NODE_MAGIC: + block_size = guess_leb_size(path) + + if not block_size: + parser.error('Block size could not be determined.') + + + # Create file object. + ufile_obj = ubi_file(path, block_size, start_offset, end_offset) + + if filetype == UBI_EC_HDR_MAGIC: + # Create UBI object + ubi_obj = ubi(ufile_obj) + + # Display UBI info if not UBIFS request. + if not ubifs_info: + print(ubi_obj.display()) + + # Loop through found images in file. + for image in ubi_obj.images: + # Display image information if not UBIFS request. + if not ubifs_info: + print('%s' % image.display('\t')) + + # Loop through volumes in each image. + for volume in image.volumes: + # Show UBI or UBIFS info. + if not ubifs_info: + + # Display volume information. + print(image.volumes[volume].display('\t\t')) + + else: + # Get blocks associated with this volume. + vol_blocks = image.volumes[volume].get_blocks(ubi_obj.blocks) + + # Skip volume if empty. + if not len(vol_blocks): + continue + + # Create LEB backed virtual file with volume blocks. + # Necessary to prevent having to load entire UBI image + # into memory. + lebv_file = leb_virtual_file(ubi_obj, vol_blocks) + + # Create UBIFS object and print info. + ubifs_obj = ubifs(lebv_file) + print(ubifs_obj.display()) + print(ubifs_obj.superblock_node.display('\t')) + print(ubifs_obj.master_node.display('\t')) + try: + print(ubifs_obj.master_node2.display('\t')) + except: + print('Master Node Error only one valid node.') + + elif filetype == UBIFS_NODE_MAGIC: + # Create UBIFS object + ubifs_obj = ubifs(ufile_obj) + print(ubifs_obj.display()) + print(ubifs_obj.superblock_node.display('\t')) + print(ubifs_obj.master_node.display('\t')) + try: + print(ubifs_obj.master_node2.display('\t')) + except: + print('Master Node Error only one valid node.') + + else: + print('Something went wrong to get here.') + + ufile_obj.close() + + +if __name__=='__main__': + main() diff --git a/xmir_base/ubireader/scripts/ubireader_extract_files.py b/xmir_base/ubireader/scripts/ubireader_extract_files.py new file mode 100644 index 0000000..a18a214 --- /dev/null +++ b/xmir_base/ubireader/scripts/ubireader_extract_files.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python + +############################################################# +# ubi_reader +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +import os +import sys +import time +import argparse + +from ubireader import settings +from ubireader.ubi import ubi +from ubireader.ubi.defines import UBI_EC_HDR_MAGIC +from ubireader.ubifs import ubifs +from ubireader.ubifs.output import extract_files +from ubireader.ubifs.defines import UBIFS_NODE_MAGIC +from ubireader.ubi_io import ubi_file, leb_virtual_file +from ubireader.debug import error, log +from ubireader.utils import guess_filetype, guess_start_offset, guess_leb_size, guess_peb_size + +def create_output_dir(outpath): + if os.path.exists(outpath): + if os.listdir(outpath): + error(create_output_dir, 'Fatal', 'Output directory is not empty. %s' % outpath) + else: + try: + os.makedirs(outpath) + log(create_output_dir, 'Created output path: %s' % outpath) + except Exception as e: + error(create_output_dir, 'Fatal', '%s' % e) + +def main(): + start = time.time() + description = 'Extract contents of a UBI or UBIFS image.' + usage = 'ubireader_extract_files [options] filepath' + parser = argparse.ArgumentParser(usage=usage, description=description) + + parser.add_argument('-k', '--keep-permissions', action='store_true', dest='permissions', + help='Maintain file permissions, requires running as root. (default: False)') + + parser.add_argument('-l', '--log', action='store_true', dest='log', + help='Print extraction information to screen.') + + parser.add_argument('-v', '--verbose-log', action='store_true', dest='verbose', + help='Prints nearly everything about anything to screen.') + + parser.add_argument('-p', '--peb-size', type=int, dest='block_size', + help='Specify PEB size. (UBI Only)') + + parser.add_argument('-e', '--leb-size', type=int, dest='block_size', + help='Specify LEB size. (UBIFS Only)') + + parser.add_argument('-s', '--start-offset', type=int, dest='start_offset', + help='Specify offset of UBI/UBIFS data in file. (default: 0)') + + parser.add_argument('-n', '--end-offset', type=int, dest='end_offset', + help='Specify end offset of UBI/UBIFS data in file.') + + parser.add_argument('-g', '--guess-offset', type=int, dest='guess_offset', + help='Specify offset to start guessing where UBI data is in file. (default: 0)') + + parser.add_argument('-w', '--warn-only-block-read-errors', action='store_true', dest='warn_only_block_read_errors', + help='Attempts to continue extracting files even with bad block reads. Some data will be missing or corrupted! (default: False)') + + parser.add_argument('-i', '--ignore-block-header-errors', action='store_true', dest='ignore_block_header_errors', + help='Forces unused and error containing blocks to be included and also displayed with log/verbose. (default: False)') + + parser.add_argument('-f', '--u-boot-fix', action='store_true', dest='uboot_fix', + help='Assume blocks with image_seq 0 are because of older U-boot implementations and include them. (default: False)') + + parser.add_argument('-o', '--output-dir', dest='outpath', + help='Specify output directory path.') + + parser.add_argument('filepath', help='File to extract contents of.') + + if len(sys.argv) == 1: + parser.print_help() + sys.exit(1) + + args = parser.parse_args() + + settings.logging_on = args.log + + settings.logging_on_verbose = args.verbose + + settings.warn_only_block_read_errors = args.warn_only_block_read_errors + + settings.ignore_block_header_errors = args.ignore_block_header_errors + + settings.uboot_fix = args.uboot_fix + + if args.filepath: + path = args.filepath + if not os.path.exists(path): + parser.error("File path doesn't exist.") + + if args.start_offset: + start_offset = args.start_offset + elif args.guess_offset: + start_offset = guess_start_offset(path, args.guess_offset) + else: + start_offset = guess_start_offset(path) + + if args.end_offset: + end_offset = args.end_offset + else: + end_offset = None + + filetype = guess_filetype(path, start_offset) + if not filetype: + parser.error('Could not determine file type.') + + if args.outpath: + outpath = args.outpath + else: + outpath = settings.output_dir + + if args.block_size: + block_size = args.block_size + else: + if filetype == UBI_EC_HDR_MAGIC: + block_size = guess_peb_size(path) + elif filetype == UBIFS_NODE_MAGIC: + block_size = guess_leb_size(path) + + if not block_size: + parser.error('Block size could not be determined.') + + perms = args.permissions + + # Create file object. + ufile_obj = ubi_file(path, block_size, start_offset, end_offset) + + if filetype == UBI_EC_HDR_MAGIC: + # Create UBI object + ubi_obj = ubi(ufile_obj) + + # Loop through found images in file. + for image in ubi_obj.images: + + # Create path for specific image + # In case multiple images in data + img_outpath = os.path.join(outpath, '%s' % image.image_seq) + + # Loop through volumes in each image. + for volume in image.volumes: + + # Get blocks associated with this volume. + vol_blocks = image.volumes[volume].get_blocks(ubi_obj.blocks) + + # Create volume data output path. + vol_outpath = os.path.join(img_outpath, volume) + + # Create volume output path directory. + create_output_dir(vol_outpath) + + # Skip volume if empty. + if not len(vol_blocks): + continue + + # Create LEB backed virtual file with volume blocks. + # Necessary to prevent having to load entire UBI image + # into memory. + lebv_file = leb_virtual_file(ubi_obj, vol_blocks) + + # Extract files from UBI image. + ubifs_obj = ubifs(lebv_file) + print('Extracting files to: %s' % vol_outpath) + extract_files(ubifs_obj, vol_outpath, perms) + + + elif filetype == UBIFS_NODE_MAGIC: + # Create UBIFS object + ubifs_obj = ubifs(ufile_obj) + + # Create directory for files. + create_output_dir(outpath) + + # Extract files from UBIFS image. + print('Extracting files to: %s' % outpath) + extract_files(ubifs_obj, outpath, perms) + + else: + print('Something went wrong to get here.') + + ufile_obj.close() + + +if __name__=='__main__': + main() diff --git a/xmir_base/ubireader/scripts/ubireader_extract_images.py b/xmir_base/ubireader/scripts/ubireader_extract_images.py new file mode 100644 index 0000000..3d8cf2e --- /dev/null +++ b/xmir_base/ubireader/scripts/ubireader_extract_images.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python + +############################################################# +# ubi_reader +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +import os +import sys +import time +import argparse + +from ubireader import settings +from ubireader.ubi import ubi +from ubireader.ubi.defines import UBI_EC_HDR_MAGIC +from ubireader.ubi_io import ubi_file +from ubireader.debug import error, log +from ubireader.utils import guess_filetype, guess_start_offset, guess_peb_size + +def create_output_dir(outpath): + if not os.path.exists(outpath): + try: + os.makedirs(outpath) + log(create_output_dir, 'Created output path: %s' % outpath) + except Exception as e: + error(create_output_dir, 'Fatal', '%s' % e) + +def main(): + start = time.time() + description = 'Extract UBI or UBIFS images from file containing UBI data in it.' + usage = 'ubireader_extract_images [options] filepath' + parser = argparse.ArgumentParser(usage=usage, description=description) + + parser.add_argument('-l', '--log', action='store_true', dest='log', + help='Print extraction information to screen.') + + parser.add_argument('-v', '--verbose-log', action='store_true', dest='verbose', + help='Prints nearly everything about anything to screen.') + + parser.add_argument('-p', '--peb-size', type=int, dest='block_size', + help='Specify PEB size.') + + parser.add_argument('-u', '--image-type', dest='image_type', + help='Specify image type to extract UBI or UBIFS. (default: UBIFS)') + + parser.add_argument('-s', '--start-offset', type=int, dest='start_offset', + help='Specify offset of UBI data in file. (default: 0)') + + parser.add_argument('-n', '--end-offset', type=int, dest='end_offset', + help='Specify end offset of UBI data in file.') + + parser.add_argument('-g', '--guess-offset', type=int, dest='guess_offset', + help='Specify offset to start guessing where UBI data is in file. (default: 0)') + + parser.add_argument('-w', '--warn-only-block-read-errors', action='store_true', dest='warn_only_block_read_errors', + help='Attempts to continue extracting files even with bad block reads. Some data will be missing or corrupted! (default: False)') + + parser.add_argument('-i', '--ignore-block-header-errors', action='store_true', dest='ignore_block_header_errors', + help='Forces unused and error containing blocks to be included and also displayed with log/verbose. (default: False)') + + parser.add_argument('-f', '--u-boot-fix', action='store_true', dest='uboot_fix', + help='Assume blocks with image_seq 0 are because of older U-boot implementations and include them. (default: False)') + + parser.add_argument('-o', '--output-dir', dest='outpath', + help='Specify output directory path.') + + parser.add_argument('filepath', help='File to extract contents of.') + + if len(sys.argv) == 1: + parser.print_help() + sys.exit(1) + + args = parser.parse_args() + + settings.logging_on = args.log + + settings.logging_on_verbose = args.verbose + + settings.warn_only_block_read_errors = args.warn_only_block_read_errors + + settings.ignore_block_header_errors = args.ignore_block_header_errors + + settings.uboot_fix = args.uboot_fix + + if args.filepath: + path = args.filepath + if not os.path.exists(path): + parser.error("File path doesn't exist.") + + if args.start_offset: + start_offset = args.start_offset + elif args.guess_offset: + start_offset = guess_start_offset(path, args.guess_offset) + else: + start_offset = guess_start_offset(path) + + if args.end_offset: + end_offset = args.end_offset + else: + end_offset = None + + filetype = guess_filetype(path, start_offset) + if filetype != UBI_EC_HDR_MAGIC: + parser.error('File does not look like UBI data.') + + img_name = os.path.basename(path) + if args.outpath: + outpath = os.path.abspath(os.path.join(args.outpath, img_name)) + else: + outpath = os.path.join(settings.output_dir, img_name) + + if args.block_size: + block_size = args.block_size + else: + block_size = guess_peb_size(path) + + if not block_size: + parser.error('Block size could not be determined.') + + if args.image_type: + image_type = args.image_type.upper() + else: + image_type = 'UBIFS' + + # Create file object. + ufile_obj = ubi_file(path, block_size, start_offset, end_offset) + + # Create UBI object + ubi_obj = ubi(ufile_obj) + + # Loop through found images in file. + for image in ubi_obj.images: + if image_type == 'UBI': + # Create output path and open file. + img_outpath = os.path.join(outpath, 'img-%s.ubi' % image.image_seq) + create_output_dir(outpath) + f = open(img_outpath, 'wb') + + # Loop through UBI image blocks + for block in image.get_blocks(ubi_obj.blocks): + if ubi_obj.blocks[block].is_valid: + # Write block (PEB) to file + f.write(ubi_obj.file.read_block(ubi_obj.blocks[block])) + + elif image_type == 'UBIFS': + # Loop through image volumes + for volume in image.volumes: + # Create output path and open file. + vol_outpath = os.path.join(outpath, 'img-%s_vol-%s.ubifs' % (image.image_seq, volume)) + create_output_dir(outpath) + f = open(vol_outpath, 'wb') + + # Loop through and write volume block data (LEB) to file. + for block in image.volumes[volume].reader(ubi_obj): + f.write(block) + + ufile_obj.close() + + +if __name__=='__main__': + main() diff --git a/xmir_base/ubireader/scripts/ubireader_list_files.py b/xmir_base/ubireader/scripts/ubireader_list_files.py new file mode 100644 index 0000000..1ac00be --- /dev/null +++ b/xmir_base/ubireader/scripts/ubireader_list_files.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python + +############################################################# +# ubi_reader +# (C) Collin Mulliner based on Jason Pruitt's ubireader_extract_images +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +import os +import sys +import time +import argparse + +from ubireader import settings +from ubireader.ubi import ubi +from ubireader.ubi.defines import UBI_EC_HDR_MAGIC +from ubireader.ubifs import ubifs +from ubireader.ubifs.list import list_files, copy_file +from ubireader.ubifs.defines import UBIFS_NODE_MAGIC +from ubireader.ubi_io import ubi_file, leb_virtual_file +from ubireader.debug import error, log +from ubireader.utils import guess_filetype, guess_start_offset, guess_leb_size, guess_peb_size + +def main(): + start = time.time() + description = 'List and Extract files of a UBI or UBIFS image.' + usage = 'ubireader_list_files [options] filepath' + parser = argparse.ArgumentParser(usage=usage, description=description) + + parser.add_argument('-l', '--log', action='store_true', dest='log', + help='Print extraction information to screen.') + + parser.add_argument('-v', '--verbose-log', action='store_true', dest='verbose', + help='Prints nearly everything about anything to screen.') + + parser.add_argument('-p', '--peb-size', type=int, dest='block_size', + help='Specify PEB size. (UBI Only)') + + parser.add_argument('-e', '--leb-size', type=int, dest='block_size', + help='Specify LEB size. (UBIFS Only)') + + parser.add_argument('-s', '--start-offset', type=int, dest='start_offset', + help='Specify offset of UBI/UBIFS data in file. (default: 0)') + + parser.add_argument('-n', '--end-offset', type=int, dest='end_offset', + help='Specify end offset of UBI/UBIFS data in file.') + + parser.add_argument('-g', '--guess-offset', type=int, dest='guess_offset', + help='Specify offset to start guessing where UBI data is in file. (default: 0)') + + parser.add_argument('-w', '--warn-only-block-read-errors', action='store_true', dest='warn_only_block_read_errors', + help='Attempts to continue extracting files even with bad block reads. Some data will be missing or corrupted! (default: False)') + + parser.add_argument('-i', '--ignore-block-header-errors', action='store_true', dest='ignore_block_header_errors', + help='Forces unused and error containing blocks to be included and also displayed with log/verbose. (default: False)') + + parser.add_argument('-f', '--u-boot-fix', action='store_true', dest='uboot_fix', + help='Assume blocks with image_seq 0 are because of older U-boot implementations and include them. (default: False)') + + parser.add_argument('-P', '--path', dest='listpath', + help='Path to list.') + + parser.add_argument('-C', '--copy', dest='copyfile', + help='File to Copy.') + + parser.add_argument('-D', '--copy-dest', dest='copyfiledest', + help='Copy Destination.') + + parser.add_argument('filepath', help='UBI/UBIFS image file.') + + if len(sys.argv) == 1: + parser.print_help() + sys.exit(1) + + args = parser.parse_args() + + settings.logging_on = args.log + + settings.logging_on_verbose = args.verbose + + settings.warn_only_block_read_errors = args.warn_only_block_read_errors + + settings.ignore_block_header_errors = args.ignore_block_header_errors + + settings.uboot_fix = args.uboot_fix + + if args.filepath: + path = args.filepath + if not os.path.exists(path): + parser.error("File path doesn't exist.") + + if args.start_offset: + start_offset = args.start_offset + elif args.guess_offset: + start_offset = guess_start_offset(path, args.guess_offset) + else: + start_offset = guess_start_offset(path) + + if args.end_offset: + end_offset = args.end_offset + else: + end_offset = None + + filetype = guess_filetype(path, start_offset) + if not filetype: + parser.error('Could not determine file type.') + + if args.block_size: + block_size = args.block_size + else: + if filetype == UBI_EC_HDR_MAGIC: + block_size = guess_peb_size(path) + elif filetype == UBIFS_NODE_MAGIC: + block_size = guess_leb_size(path) + + if not block_size: + parser.error('Block size could not be determined.') + + # Create file object. + ufile_obj = ubi_file(path, block_size, start_offset, end_offset) + + if filetype == UBI_EC_HDR_MAGIC: + # Create UBI object + ubi_obj = ubi(ufile_obj) + + # Loop through found images in file. + for image in ubi_obj.images: + + # Loop through volumes in each image. + for volume in image.volumes: + + # Get blocks associated with this volume. + vol_blocks = image.volumes[volume].get_blocks(ubi_obj.blocks) + + # Skip volume if empty. + if not len(vol_blocks): + continue + + # Create LEB backed virtual file with volume blocks. + # Necessary to prevent having to load entire UBI image + # into memory. + lebv_file = leb_virtual_file(ubi_obj, vol_blocks) + + # Create UBIFS object. + ubifs_obj = ubifs(lebv_file) + + if args.listpath: + list_files(ubifs_obj, args.listpath) + if args.copyfile and args.copyfiledest: + copy_file(ubifs_obj, args.copyfile, args.copyfiledest) + + elif filetype == UBIFS_NODE_MAGIC: + # Create UBIFS object + ubifs_obj = ubifs(ufile_obj) + + if args.listpath: + list_files(ubifs_obj, args.listpath) + if args.copyfile and args.copyfiledest: + copy_file(ubifs_obj, args.copyfile, args.copyfiledest) + + else: + print('Something went wrong to get here.') + + ufile_obj.close() + + +if __name__=='__main__': + main() diff --git a/xmir_base/ubireader/scripts/ubireader_utils_info.py b/xmir_base/ubireader/scripts/ubireader_utils_info.py new file mode 100644 index 0000000..b8fafe5 --- /dev/null +++ b/xmir_base/ubireader/scripts/ubireader_utils_info.py @@ -0,0 +1,350 @@ +#!/usr/bin/env python + +############################################################# +# ubi_reader +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +import os +import sys +import time +import argparse +if (sys.version_info > (3, 0)): + import configparser +else: + import ConfigParser as configparser +from ubireader import settings +from ubireader.ubi import ubi +from ubireader.ubi.defines import UBI_EC_HDR_MAGIC, PRINT_VOL_TYPE_LIST, UBI_VTBL_AUTORESIZE_FLG +from ubireader.ubifs import ubifs +from ubireader.ubifs.defines import PRINT_UBIFS_KEY_HASH, PRINT_UBIFS_COMPR +from ubireader.ubi_io import ubi_file, leb_virtual_file +from ubireader.debug import error, log +from ubireader.utils import guess_filetype, guess_start_offset, guess_peb_size + +def create_output_dir(outpath): + if os.path.exists(outpath): + if os.listdir(outpath): + error(create_output_dir, 'Fatal', 'Output directory is not empty. %s' % outpath) + else: + try: + os.makedirs(outpath) + log(create_output_dir, 'Created output path: %s' % outpath) + except Exception as e: + error(create_output_dir, 'Fatal', '%s' % e) + + +def get_ubi_params(ubi_obj): + """Get ubi_obj utils params + + Arguments: + Obj:ubi -- UBI object. + + Returns: + Dict -- Dict keyed to volume with Dict of args and flags. + """ + ubi_flags = {'min_io_size':'-m', + 'max_bud_bytes':'-j', + 'leb_size':'-e', + 'default_compr':'-x', + 'sub_page_size':'-s', + 'fanout':'-f', + 'key_hash':'-k', + 'orph_lebs':'-p', + 'log_lebs':'-l', + 'max_leb_cnt': '-c', + 'peb_size':'-p', + 'sub_page_size':'-s', + 'vid_hdr_offset':'-O', + 'version':'-x', + 'image_seq':'-Q', + 'alignment':'-a', + 'vol_id':'-n', + 'name':'-N'} + + ubi_params = {} + ubi_args = {} + ini_params = {} + + for image in ubi_obj.images: + img_seq = image.image_seq + ubi_params[img_seq] = {} + ubi_args[img_seq] = {} + ini_params[img_seq] = {} + + for volume in image.volumes: + ubi_args[img_seq][volume] = {} + ini_params[img_seq][volume] = {} + + # Get ubinize.ini settings + ini_params[img_seq][volume]['vol_type'] = PRINT_VOL_TYPE_LIST[image.volumes[volume].vol_rec.vol_type] + + if image.volumes[volume].vol_rec.flags == UBI_VTBL_AUTORESIZE_FLG: + ini_params[img_seq][volume]['vol_flags'] = 'autoresize' + else: + ini_params[img_seq][volume]['vol_flags'] = image.volumes[volume].vol_rec.flags + + ini_params[img_seq][volume]['vol_id'] = image.volumes[volume].vol_id + ini_params[img_seq][volume]['vol_name'] = image.volumes[volume].name.rstrip(b'\x00').decode('utf-8') + ini_params[img_seq][volume]['vol_alignment'] = image.volumes[volume].vol_rec.alignment + + ini_params[img_seq][volume]['vol_size'] = image.volumes[volume].vol_rec.reserved_pebs * ubi_obj.leb_size + + # Create file object backed by UBI blocks. + lebv_file = leb_virtual_file(ubi_obj, image.volumes[volume].get_blocks(ubi_obj.blocks)) + # Create UBIFS object + ubifs_obj = ubifs(lebv_file) + + for key, value in ubifs_obj.superblock_node: + if key == 'key_hash': + value = PRINT_UBIFS_KEY_HASH[value] + elif key == 'default_compr': + value = PRINT_UBIFS_COMPR[value] + + if key in ubi_flags: + ubi_args[img_seq][volume][key] = value + + for key, value in image.volumes[volume].vol_rec: + if key == 'name': + value = value.rstrip(b'\x00').decode('utf-8') + + if key in ubi_flags: + ubi_args[img_seq][volume][key] = value + + ubi_args[img_seq][volume]['version'] = image.version + ubi_args[img_seq][volume]['vid_hdr_offset'] = image.vid_hdr_offset + ubi_args[img_seq][volume]['sub_page_size'] = ubi_args[img_seq][volume]['vid_hdr_offset'] + ubi_args[img_seq][volume]['sub_page_size'] = ubi_args[img_seq][volume]['vid_hdr_offset'] + ubi_args[img_seq][volume]['image_seq'] = image.image_seq + ubi_args[img_seq][volume]['peb_size'] = ubi_obj.peb_size + ubi_args[img_seq][volume]['vol_id'] = image.volumes[volume].vol_id + + ubi_params[img_seq][volume] = {'flags':ubi_flags, 'args':ubi_args[img_seq][volume], 'ini':ini_params[img_seq][volume]} + + return ubi_params + + +def print_ubi_params(ubi_obj): + ubi_params = get_ubi_params(ubi_obj) + for img_params in ubi_params: + for volume in ubi_params[img_params]: + ubi_flags = ubi_params[img_params][volume]['flags'] + ubi_args = ubi_params[img_params][volume]['args'] + ini_params = ubi_params[img_params][volume]['ini'] + sorted_keys = sorted(ubi_params[img_params][volume]['args']) + + print('\nVolume %s' % volume) + for key in sorted_keys: + if len(key)< 8: + name = '%s\t' % key + else: + name = key + print('\t%s\t%s %s' % (name, ubi_flags[key], ubi_args[key])) + + print('\n\t#ubinize.ini#') + print('\t[%s]' % ini_params['vol_name']) + for key in ini_params: + if key != 'name': + print('\t%s=%s' % (key, ini_params[key])) + + +def make_files(ubi, outpath): + ubi_params = get_ubi_params(ubi) + + for img_params in ubi_params: + config = configparser.ConfigParser() + img_outpath = os.path.join(outpath, 'img-%s' % img_params) + + if not os.path.exists(img_outpath): + os.mkdir(img_outpath) + + ini_path = os.path.join(img_outpath, 'img-%s.ini' % img_params) + ubi_file = os.path.join('img-%s.ubi' % img_params) + script_path = os.path.join(img_outpath, 'create_ubi_img-%s.sh' % img_params) + ubifs_files =[] + buf = '#!/bin/sh\n' + print('Writing to: %s' % script_path) + + with open(script_path, 'w') as fscr: + with open(ini_path, 'w') as fini: + print('Writing to: %s' % ini_path) + vol_idx = 0 + + for volume in ubi_params[img_params]: + ubifs_files.append(os.path.join('img-%s_%s.ubifs' % (img_params, vol_idx))) + ini_params = ubi_params[img_params][volume]['ini'] + ini_file = 'img-%s.ini' % img_params + config.add_section(volume) + config.set(volume, 'mode', 'ubi') + config.set(volume, 'image', ubifs_files[vol_idx]) + + for i in ini_params: + config.set(volume, i, str(ini_params[i])) + + ubi_flags = ubi_params[img_params][volume]['flags'] + ubi_args = ubi_params[img_params][volume]['args'] + mkfs_flags = ['min_io_size', + 'leb_size', + 'max_leb_cnt', + 'default_compr', + 'fanout', + 'key_hash', + 'orph_lebs', + 'log_lebs'] + + argstr = '' + for flag in mkfs_flags: + argstr += ' %s %s' % (ubi_flags[flag], ubi_args[flag]) + + #leb = '%s %s' % (ubi_flags['leb_size'], ubi_args['leb_size']) + peb = '%s %s' % (ubi_flags['peb_size'], ubi_args['peb_size']) + min_io = '%s %s' % (ubi_flags['min_io_size'], ubi_args['min_io_size']) + #leb_cnt = '%s %s' % (ubi_flags['max_leb_cnt'], ubi_args['max_leb_cnt']) + vid_hdr = '%s %s' % (ubi_flags['vid_hdr_offset'], ubi_args['vid_hdr_offset']) + sub_page = '%s %s' % (ubi_flags['sub_page_size'], ubi_args['sub_page_size']) + + buf += '/usr/sbin/mkfs.ubifs%s -r $%s %s\n' % (argstr, (vol_idx+1), ubifs_files[vol_idx]) + + vol_idx += 1 + + config.write(fini) + + ubinize_flags = ['peb_size', + 'min_io_size', + 'vid_hdr_offset', + 'sub_page_size', + 'version', + 'image_seq'] + + argstr = '' + for flag in ubinize_flags: + argstr += ' %s %s' % (ubi_flags[flag], ubi_args[flag]) + + buf += '/usr/sbin/ubinize%s -o %s %s\n' % (argstr, ubi_file, ini_file) + fscr.write(buf) + os.chmod(script_path, 0o755) + +def main(): + start = time.time() + description = 'Determine settings for recreating UBI image.' + usage = 'ubireader_utils_info [options] filepath' + parser = argparse.ArgumentParser(usage=usage, description=description) + + parser.add_argument('-r', '--show-only', action='store_true', dest='show_only', + help='Print parameters to screen only. (default: false)') + + parser.add_argument('-l', '--log', action='store_true', dest='log', + help='Print extraction information to screen.') + + parser.add_argument('-v', '--verbose-log', action='store_true', dest='verbose', + help='Prints nearly everything about anything to screen.') + + parser.add_argument('-p', '--peb-size', type=int, dest='block_size', + help='Specify PEB size.') + + parser.add_argument('-s', '--start-offset', type=int, dest='start_offset', + help='Specify offset of UBI data in file. (default: 0)') + + parser.add_argument('-n', '--end-offset', type=int, dest='end_offset', + help='Specify end offset of UBI data in file.') + + parser.add_argument('-g', '--guess-offset', type=int, dest='guess_offset', + help='Specify offset to start guessing where UBI data is in file. (default: 0)') + + parser.add_argument('-w', '--warn-only-block-read-errors', action='store_true', dest='warn_only_block_read_errors', + help='Attempts to continue extracting files even with bad block reads. Some data will be missing or corrupted! (default: False)') + + parser.add_argument('-i', '--ignore-block-header-errors', action='store_true', dest='ignore_block_header_errors', + help='Forces unused and error containing blocks to be included and also displayed with log/verbose. (default: False)') + + parser.add_argument('-f', '--u-boot-fix', action='store_true', dest='uboot_fix', + help='Assume blocks with image_seq 0 are because of older U-boot implementations and include them. (default: False)') + + parser.add_argument('-o', '--output-dir', dest='outpath', + help='Specify output directory path.') + + parser.add_argument('filepath', help='File to extract contents of.') + + if len(sys.argv) == 1: + parser.print_help() + sys.exit(1) + + args = parser.parse_args() + + settings.logging_on = args.log + + settings.logging_on_verbose = args.verbose + + settings.warn_only_block_read_errors = args.warn_only_block_read_errors + + settings.ignore_block_header_errors = args.ignore_block_header_errors + + settings.uboot_fix = args.uboot_fix + + if args.filepath: + path = args.filepath + if not os.path.exists(path): + parser.error("File path doesn't exist.") + + if args.start_offset: + start_offset = args.start_offset + elif args.guess_offset: + start_offset = guess_start_offset(path, args.guess_offset) + else: + start_offset = guess_start_offset(path) + + if args.end_offset: + end_offset = args.end_offset + else: + end_offset = None + + filetype = guess_filetype(path, start_offset) + if filetype != UBI_EC_HDR_MAGIC: + parser.error('File does not look like UBI data.') + + img_name = os.path.basename(path) + if args.outpath: + outpath = os.path.abspath(os.path.join(args.outpath, img_name)) + else: + outpath = os.path.join(settings.output_dir, img_name) + + if args.block_size: + block_size = args.block_size + else: + block_size = guess_peb_size(path) + + if not block_size: + parser.error('Block size could not be determined.') + + # Create file object. + ufile_obj = ubi_file(path, block_size, start_offset, end_offset) + + # Create UBI object + ubi_obj = ubi(ufile_obj) + + # Print info. + print_ubi_params(ubi_obj) + + if not args.show_only: + create_output_dir(outpath) + # Create build scripts. + make_files(ubi_obj, outpath) + + ufile_obj.close() + + +if __name__=='__main__': + main() diff --git a/xmir_base/ubireader/settings.py b/xmir_base/ubireader/settings.py new file mode 100644 index 0000000..3ad9726 --- /dev/null +++ b/xmir_base/ubireader/settings.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +############################################################# +# ubi_reader +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +output_dir = 'ubifs-root' + +error_action = True # if 'exit' on any error exit program. +fatal_traceback = False # Print traceback on fatal errors. + +ignore_block_header_errors = False # Ignore block errors. +warn_only_block_read_errors = False # Warning instead of Fatal error. + +logging_on = False # Print debug info on. +logging_on_verbose = False # Print verbose debug info on. + +use_dummy_socket_file = False # Create regular file place holder for sockets. +use_dummy_devices = False # Create regular file place holder for devices. + +uboot_fix = False # Older u-boot sets image_seq to 0 on blocks it's written to. diff --git a/xmir_base/ubireader/ubi/__init__.py b/xmir_base/ubireader/ubi/__init__.py new file mode 100644 index 0000000..cf2cb88 --- /dev/null +++ b/xmir_base/ubireader/ubi/__init__.py @@ -0,0 +1,226 @@ +#!/usr/bin/env python +############################################################# +# ubi_reader/ubi +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +from ubireader.debug import error +from ubireader.ubi.block import sort, extract_blocks +from ubireader.ubi import display +from ubireader.ubi.image import description as image +from ubireader.ubi.block import layout, rm_old_blocks + +class ubi_base(object): + """UBI Base object + + Arguments: + Obj:image -- UBI image object + + Attributes: + ubi_file:file -- ubi_file object + Int:block_count -- Number of blocks found. + Int:first_peb_num -- Number of the first UBI PEB in file. + Int:leb_size -- Size of Logical Erase Blocks. + Int:peb_size -- Size of Physical Erase Blocks. + Int:min_io -- Size of min I/O from vid_hdr_offset. + Dict:blocks -- Dict keyed by PEB number of all blocks. + """ + + def __init__(self, ubi_file): + self.__name__ = 'UBI' + self._file = ubi_file + self._first_peb_num = 0 + self._blocks = extract_blocks(self) + self._block_count = len(self.blocks) + + if self._block_count <= 0: + error(self, 'Fatal', 'No blocks found.') + + arbitrary_block = next(iter(self.blocks.values())) + self._min_io_size = arbitrary_block.ec_hdr.vid_hdr_offset + self._leb_size = self.file.block_size - arbitrary_block.ec_hdr.data_offset + + + def _get_file(self): + """UBI File object + + Returns: + Obj -- UBI File object. + """ + return self._file + file = property(_get_file) + + + def _get_block_count(self): + """Total amount of UBI blocks in file. + + Returns: + Int -- Number of blocks + """ + return self._block_count + block_count = property(_get_block_count) + + + def _set_first_peb_num(self, i): + self._first_peb_num = i + def _get_first_peb_num(self): + """First Physical Erase Block with UBI data + + Returns: + Int -- Number of the first PEB. + """ + return self._first_peb_num + first_peb_num = property(_get_first_peb_num, _set_first_peb_num) + + + def _get_leb_size(self): + """LEB size of UBI blocks in file. + + Returns: + Int -- LEB Size. + """ + return self._leb_size + leb_size = property(_get_leb_size) + + + def _get_peb_size(self): + """PEB size of UBI blocks in file. + + Returns: + Int -- PEB Size. + """ + return self.file.block_size + peb_size = property(_get_peb_size) + + + def _get_min_io_size(self): + """Min I/O Size + + Returns: + Int -- Min I/O Size. + """ + return self._min_io_size + min_io_size = property(_get_min_io_size) + + + def _get_blocks(self): + """Main Dict of UBI Blocks + + Passed around for lists of indexes to be made or to be returned + filtered through a list. So there isn't multiple copies of blocks, + as there can be thousands. + """ + return self._blocks + blocks = property(_get_blocks) + + +class ubi(ubi_base): + """UBI object + + Arguments: + Obj:ubi_file -- UBI file object. + + Attributes: + Inherits:ubi_base -- ubi_base attributes. + List:images -- List of UBI image objects. + List:data_blocks_list -- List of all data blocks in file. + List:layout_blocks_list -- List of all layout blocks in file. + List:int_vol_blocks_list -- List of internal volumes minus layout. + List:unknown_blocks_list -- List of blocks with unknown types. * + """ + + def __init__(self, ubi_file): + super(ubi, self).__init__(ubi_file) + + layout_list, data_list, int_vol_list, unknown_list = sort.by_type(self.blocks) + + self._layout_blocks_list = layout_list + self._data_blocks_list = data_list + self._int_vol_blocks_list = int_vol_list + self._unknown_blocks_list = unknown_list + + newest_layout_list = rm_old_blocks(self.blocks, self.layout_blocks_list) + + if len(newest_layout_list) < 2: + error(self, 'Fatal', 'Less than 2 layout blocks found.') + + layout_pairs = layout.group_pairs(self.blocks, newest_layout_list) + + layout_infos = layout.associate_blocks(self.blocks, layout_pairs) + + self._images = [] + for i in range(0, len(layout_infos)): + self._images.append(image(self.blocks, layout_infos[i])) + + + def _get_images(self): + """Get UBI images. + + Returns: + List -- Of volume objects groupled by image. + """ + return self._images + images = property(_get_images) + + + def _get_data_blocks_list(self): + """Get all UBI blocks found in file that are data blocks. + + Returns: + List -- List of block objects. + """ + return self._data_blocks_list + data_blocks_list = property(_get_data_blocks_list) + + + def _get_layout_blocks_list(self): + """Get all UBI blocks found in file that are layout volume blocks. + + Returns: + List -- List of block objects. + """ + return self._layout_blocks_list + layout_blocks_list = property(_get_layout_blocks_list) + + + def _get_int_vol_blocks_list(self): + """Get all UBI blocks found in file that are internal volume blocks. + + Returns: + List -- List of block objects. + + This does not include layout blocks. + """ + return self._int_vol_blocks_list + int_vol_blocks_list = property(_get_int_vol_blocks_list) + + + def _get_unknown_blocks_list(self): + """Get all UBI blocks found in file of unknown type.. + + Returns: + List -- List of block objects. + """ + return self._unknown_blocks_list + unknown_blocks_list = property(_get_unknown_blocks_list) + + def display(self, tab=''): + """Print information about this object. + + Argument: + Str:tab -- '\t' for spacing this object. + """ + return display.ubi(self, tab) diff --git a/xmir_base/ubireader/ubi/block/__init__.py b/xmir_base/ubireader/ubi/block/__init__.py new file mode 100644 index 0000000..ce80f96 --- /dev/null +++ b/xmir_base/ubireader/ubi/block/__init__.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python +############################################################# +# ubi_reader/ubi +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +from zlib import crc32 +from ubireader import settings +from ubireader.debug import error, log, verbose_display, verbose_log +from ubireader.ubi import display +from ubireader.ubi.defines import UBI_EC_HDR_SZ, UBI_VID_HDR_SZ, UBI_INTERNAL_VOL_START, UBI_EC_HDR_MAGIC, UBI_CRC32_INIT +from ubireader.ubi.headers import ec_hdr, vid_hdr, vtbl_recs + + +class description(object): + """UBI Block description Object + + UBI Specifications: + http://www.linux-mtd.infradead.org/ -- Home page + /drivers/mtd/ubi/ubi-media.h -- Header structs + and defines + + Attributes: + Obj:ec_hdr -- Error Count Header + Obj:vid_hdr -- Volume ID Header + List:vtbl_recs -- (Optional) List of Volume Table Records. + Bool:is_vtbl -- If contains volume records table. + Bool:is_internal_vol -- If Vol ID is > UBI_INTERNAL_VOL_START + Bool:is_valid -- If ec_hdr & vid_hdr are error free. + Int:peb_num -- Physical Erase Block number. + Int:leb_num -- Logical Erase Block number. + Int:file_offset -- Address location in file of this block. + Int:size -- Size of total block data or PEB size. + Int:data_crc -- crc32 of block data. + Will print out all information when invoked as a string. + """ + + def __init__(self, block_buf): + + self.file_offset = -1 + self.peb_num = -1 + self.leb_num = -1 + self.size = -1 + + self.vid_hdr = None + self.is_internal_vol = False + self.vtbl_recs = [] + + # TODO better understanding of block types/errors + self.ec_hdr = ec_hdr(block_buf[0:UBI_EC_HDR_SZ]) + + if not self.ec_hdr.errors or settings.ignore_block_header_errors: + self.vid_hdr = vid_hdr(block_buf[self.ec_hdr.vid_hdr_offset:self.ec_hdr.vid_hdr_offset+UBI_VID_HDR_SZ]) + + if not self.vid_hdr.errors or settings.ignore_block_header_errors: + self.is_internal_vol = self.vid_hdr.vol_id >= UBI_INTERNAL_VOL_START + + if self.vid_hdr.vol_id >= UBI_INTERNAL_VOL_START: + self.vtbl_recs = vtbl_recs(block_buf[self.ec_hdr.data_offset:]) + + self.leb_num = self.vid_hdr.lnum + + self.is_vtbl = bool(self.vtbl_recs) or False + self.is_valid = not self.ec_hdr.errors and not self.vid_hdr.errors or settings.ignore_block_header_errors + + + def __repr__(self): + return 'Block: PEB# %s: LEB# %s' % (self.peb_num, self.leb_num) + + + def display(self, tab=''): + return display.block(self, tab) + + + +def get_blocks_in_list(blocks, idx_list): + """Retrieve block objects in list of indexes + + Arguments: + List:blocks -- List of block objects + List:idx_list -- List of block indexes + + Returns: + Dict:blocks -- List of block objects generated + from provided list of indexes in + order of idx_list. + """ + + return {i:blocks[i] for i in idx_list} + + + +def extract_blocks(ubi): + """Get a list of UBI block objects from file + + Arguments:. + Obj:ubi -- UBI object. + + Returns: + Dict -- Of block objects keyed by PEB number. + """ + + blocks = {} + ubi.file.seek(ubi.file.start_offset) + peb_count = 0 + cur_offset = 0 + bad_blocks = [] + + # range instead of xrange, as xrange breaks > 4GB end_offset. + for i in range(ubi.file.start_offset, ubi.file.end_offset, ubi.file.block_size): + buf = ubi.file.read(ubi.file.block_size) + + if buf.startswith(UBI_EC_HDR_MAGIC): + blk = description(buf) + blk.file_offset = i + blk.peb_num = ubi.first_peb_num + peb_count + blk.size = ubi.file.block_size + blk.data_crc = (~crc32(buf[blk.ec_hdr.data_offset:blk.ec_hdr.data_offset+blk.vid_hdr.data_size]) & UBI_CRC32_INIT) + blocks[blk.peb_num] = blk + peb_count += 1 + log(extract_blocks, blk) + verbose_log(extract_blocks, 'file addr: %s' % (ubi.file.last_read_addr())) + ec_hdr_errors = '' + vid_hdr_errors = '' + + if blk.ec_hdr.errors: + ec_hdr_errors = ','.join(blk.ec_hdr.errors) + + if blk.vid_hdr and blk.vid_hdr.errors: + vid_hdr_errors = ','.join(blk.vid_hdr.errors) + + if ec_hdr_errors or vid_hdr_errors: + if blk.peb_num not in bad_blocks: + bad_blocks.append(blk.peb_num) + log(extract_blocks, 'PEB: %s has possible issue EC_HDR [%s], VID_HDR [%s]' % (blk.peb_num, ec_hdr_errors, vid_hdr_errors)) + + verbose_display(blk) + + else: + cur_offset += ubi.file.block_size + ubi.first_peb_num = cur_offset//ubi.file.block_size + ubi.file.start_offset = cur_offset + + return blocks + + +def rm_old_blocks(blocks, block_list): + del_blocks = [] + + for i in block_list: + if i in del_blocks: + continue + + if blocks[i].is_valid is not True: + del_blocks.append(i) + continue + + for k in block_list: + if i == k: + continue + + if k in del_blocks: + continue + + if blocks[k].is_valid is not True: + del_blocks.append(k) + continue + + if blocks[i].leb_num != blocks[k].leb_num: + continue + + if blocks[i].ec_hdr.image_seq != blocks[k].ec_hdr.image_seq: + continue + + second_newer = blocks[k].vid_hdr.sqnum > blocks[i].vid_hdr.sqnum + del_block = None + use_block = None + + if second_newer: + if blocks[k].vid_hdr.copy_flag == 0: + del_block = i + use_block = k + + else: + if blocks[i].vid_hdr.copy_flag == 0: + del_block = k + use_block = i + + if del_block is not None: + del_blocks.append(del_block) + log(rm_old_blocks, 'Old block removed (copy_flag): PEB %s, LEB %s, Using PEB%s' % (blocks[del_block].peb_num, blocks[del_block].leb_num, use_block)) + break + + if second_newer: + if blocks[k].data_crc != blocks[k].vid_hdr.data_crc: + del_block = k + use_block = i + else: + del_block = i + use_block = k + else: + if blocks[i].data_crc != blocks[i].vid_hdr.data_crc: + del_block = i + use_block = k + else: + del_block = k + use_block = i + + if del_block is not None: + del_blocks.append(del_block) + log(rm_old_blocks, 'Old block removed (data_crc): PEB %s, LEB %s, vid_hdr.data_crc %s / %s, Using PEB %s' % (blocks[del_block].peb_num, + blocks[del_block].leb_num, + blocks[del_block].vid_hdr.data_crc, + blocks[del_block].data_crc, + use_block)) + + else: + use_block = min(k, i) + del_blocks.append(use_block) + error('Warn', rm_old_blocks, 'Multiple PEB [%s] for LEB %s: Using first.' % (', '.join(i, k), blocks[i].leb_num, use_block)) + + break + + return [j for j in block_list if j not in del_blocks] diff --git a/xmir_base/ubireader/ubi/block/layout.py b/xmir_base/ubireader/ubi/block/layout.py new file mode 100644 index 0000000..67388d9 --- /dev/null +++ b/xmir_base/ubireader/ubi/block/layout.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python +############################################################# +# ubi_reader/ubi +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +from ubireader.debug import log +from ubireader.ubi.block import sort + +def group_pairs(blocks, layout_blocks_list): + """Sort a list of layout blocks into pairs + + Arguments: + List:blocks -- List of block objects + List:layout_blocks -- List of layout block indexes + + Returns: + List -- Layout block pair indexes grouped in a list + """ + + image_dict={} + for block_id in layout_blocks_list: + image_seq=blocks[block_id].ec_hdr.image_seq + if image_seq not in image_dict: + image_dict[image_seq]=[block_id] + else: + image_dict[image_seq].append(block_id) + + log(group_pairs, 'Layout blocks found at PEBs: %s' % list(image_dict.values())) + + return list(image_dict.values()) + + +def associate_blocks(blocks, layout_pairs): + """Group block indexes with appropriate layout pairs + + Arguments: + List:blocks -- List of block objects + List:layout_pairs -- List of grouped layout blocks + + Returns: + List -- Layout block pairs grouped with associated block ranges. + """ + + seq_blocks = [] + for layout_pair in layout_pairs: + seq_blocks = sort.by_image_seq(blocks, blocks[layout_pair[0]].ec_hdr.image_seq) + seq_blocks = [b for b in seq_blocks if b not in layout_pair] + layout_pair.append(seq_blocks) + + return layout_pairs diff --git a/xmir_base/ubireader/ubi/block/sort.py b/xmir_base/ubireader/ubi/block/sort.py new file mode 100644 index 0000000..e9e115b --- /dev/null +++ b/xmir_base/ubireader/ubi/block/sort.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python +############################################################# +# ubi_reader/ubi +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +from ubireader import settings + +def by_image_seq(blocks, image_seq): + """Filter blocks to return only those associated with the provided image_seq number. + If uboot_fix is set, associate blocks with an image_seq of 0 also. + + Argument: + List:blocks -- List of block objects to sort. + Int:image_seq -- image_seq number found in ec_hdr. + + Returns: + List -- List of block indexes matching image_seq number. + """ + if settings.uboot_fix: + return list(filter(lambda block: blocks[block].ec_hdr.image_seq == image_seq or image_seq == 0 or blocks[block].ec_hdr.image_seq == 0, blocks)) + + else: + return list(filter(lambda block: blocks[block].ec_hdr.image_seq == image_seq, blocks)) + +def by_leb(blocks): + """Sort blocks by Logical Erase Block number. + + Arguments: + List:blocks -- List of block objects to sort. + + Returns: + List -- Indexes of blocks sorted by LEB. + """ + slist_len = len(blocks) + slist = ['x'] * slist_len + + for block in blocks: + if blocks[block].leb_num >= slist_len: + add_elements = blocks[block].leb_num - slist_len + 1 + slist += (['x'] * add_elements) + slist_len = len(slist) + + slist[blocks[block].leb_num] = block + + return slist + + +def by_vol_id(blocks, slist=None): + """Sort blocks by volume id + + Arguments: + Obj:blocks -- List of block objects. + List:slist -- (optional) List of block indexes. + + Return: + Dict -- blocks grouped in lists with dict key as volume id. + """ + + vol_blocks = {} + + # sort block by volume + # not reliable with multiple partitions (fifo) + + for i in blocks: + if slist and i not in slist: + continue + elif not blocks[i].is_valid: + continue + + if blocks[i].vid_hdr.vol_id not in vol_blocks: + vol_blocks[blocks[i].vid_hdr.vol_id] = [] + + vol_blocks[blocks[i].vid_hdr.vol_id].append(blocks[i].peb_num) + + return vol_blocks + +def by_type(blocks, slist=None): + """Sort blocks into layout, internal volume, data or unknown + + Arguments: + Obj:blocks -- List of block objects. + List:slist -- (optional) List of block indexes. + + Returns: + List:layout -- List of block indexes of blocks containing the + volume table records. + List:data -- List of block indexes containing filesystem data. + List:int_vol -- List of block indexes containing volume ids + greater than UBI_INTERNAL_VOL_START that are not + layout volumes. + List:unknown -- List of block indexes of blocks that failed validation + of crc in ed_hdr or vid_hdr. + """ + + layout = [] + data = [] + int_vol = [] + unknown = [] + + for i in blocks: + if slist and i not in slist: + continue + + if blocks[i].is_vtbl and blocks[i].is_valid: + layout.append(i) + + elif blocks[i].is_internal_vol and blocks[i].is_valid: + int_vol.append(i) + + elif blocks[i].is_valid: + data.append(i) + + else: + unknown.append(i) + + return layout, data, int_vol, unknown diff --git a/xmir_base/ubireader/ubi/defines.py b/xmir_base/ubireader/ubi/defines.py new file mode 100644 index 0000000..9a26bc0 --- /dev/null +++ b/xmir_base/ubireader/ubi/defines.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +############################################################# +# Adapted in part from linux-source-3.2/drivers/mtd/ubi/ubi-media.h +# for use in Python. +# Oct. 2013 by Jason Pruitt +# +# Original copyright notice. +# -------------------------- +# +# Copyright (c) International Business Machines Corp., 2006 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See +# the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Authors: Artem Bityutskiy (Битюцкий Артём) +# Thomas Gleixner +# Frank Haverkamp +# Oliver Lohmann +# Andreas Arnez +# +############################################################# + +import struct + +# Magic number +UBI_EC_HDR_MAGIC = b'\x55\x42\x49\x23' + +# Initial CRC32 checksum value. +UBI_CRC32_INIT = 4294967295 #0xFFFFFFFF + +# Max number of volumes allowed. +UBI_MAX_VOLUMES = 128 + +# Internal Volume ID start. +UBI_INTERNAL_VOL_START = 2147479551 + +# Error Count header. +EC_HDR_FORMAT = '>4sB3sQIII32sI' +EC_HDR_FIELDS = ['magic', # Magic string UBI# + 'version', # UBI version meant to accept this image. + 'padding', # Reserved for future, zeros. + 'ec', # Erase counter + 'vid_hdr_offset', # Where the VID header starts. + 'data_offset', # Where user data starts. + 'image_seq', # Image sequence number + 'padding2', # Reserved for future, zeros. + 'hdr_crc'] # EC header crc32 checksum. + + +UBI_EC_HDR_SZ = struct.calcsize(EC_HDR_FORMAT) # 64 + +# Volume ID header. +UBI_VID_HDR_MAGIC =b'\x55\x42\x49\x21' # UBI! +VID_HDR_FORMAT = '>4sBBBBII4sIIII4sQ12sI' +VID_HDR_FIELDS = ['magic', # Magic string UBI! + 'version', # UBI version meant to accept this image. + 'vol_type', # Volume type, Dynamic/Static + 'copy_flag', # If this is a copied PEB b/c of wear leveling. + 'compat', # Compatibility of this volume UBI_COMPAT_* + 'vol_id', # ID of this volume. + 'lnum', # LEB number. + 'padding', # Reserved for future, zeros. + 'data_size', # How many bytes of data this contains. + # Used for static types only. + 'used_ebs', # Total num of used LEBs in this volume. + 'data_pad', # How many bytes at end of LEB are not used. + 'data_crc', # CRC32 checksum of data, static type only. + 'padding2', # Reserved for future, zeros. + 'sqnum', # Sequence number. + 'padding3', # Reserved for future, zeros. + 'hdr_crc'] # VID header CRC32 checksum. + + +UBI_VID_HDR_SZ = struct.calcsize(VID_HDR_FORMAT) # 64 + +# Volume table records. +VTBL_REC_FORMAT = '>IIIBBH128sB23sI' +VTBL_REC_FIELDS = ['reserved_pebs', # How many PEBs reserved for this volume. + 'alignment', # Volume alignment. + 'data_pad', # Number of unused bytes at end of PEB. + 'vol_type', # Volume type, static/dynamic. + 'upd_marker', # If vol update started but not finished. + 'name_len', # Length of name. + 'name', # Volume name. + 'flags', # Volume flags + 'padding', # Reserved for future, zeros. + 'crc'] # Vol record CRC32 checksum. + + +UBI_VTBL_REC_SZ = struct.calcsize(VTBL_REC_FORMAT) # 172 + +# Volume Identifier Header +UBI_VID_DYNAMIC = 1 # Volume can be resized. +UBI_VID_STATIC = 2 # Volume can not be resized. +PRINT_VOL_TYPE_LIST = [0, 'dynamic', 'static'] + +# Volume table record +UBI_VTBL_AUTORESIZE_FLG = 1 + +UBI_COMPAT_DELETE = 1 # Delete this internal volume before anything written. +UBI_COMPAT_RO = 2 # Attach this device in read-only mode. +UBI_COMPAT_PRESERVE = 4 # Preserve this internal volume - touch nothing. +UBI_COMPAT_REJECT = 5 # Reject this UBI image +PRINT_COMPAT_LIST = [0, 'Delete', 'Read Only', 0, 'Preserve', 'Reject'] + +# File chunk size for reads. +FILE_CHUNK_SZ = 5 * 1024 * 1024 \ No newline at end of file diff --git a/xmir_base/ubireader/ubi/display.py b/xmir_base/ubireader/ubi/display.py new file mode 100644 index 0000000..9efbc6a --- /dev/null +++ b/xmir_base/ubireader/ubi/display.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python +############################################################# +# ubi_reader/ubi +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +from ubireader import settings +from ubireader.ubi.defines import PRINT_COMPAT_LIST, PRINT_VOL_TYPE_LIST, UBI_VTBL_AUTORESIZE_FLG + +def ubi(ubi, tab=''): + buf = '%sUBI File\n' % (tab) + buf += '%s---------------------\n' % (tab) + buf += '\t%sMin I/O: %s\n' % (tab, ubi.min_io_size) + buf += '\t%sLEB Size: %s\n' % (tab, ubi.leb_size) + buf += '\t%sPEB Size: %s\n' % (tab, ubi.peb_size) + buf += '\t%sTotal Block Count: %s\n' % (tab, ubi.block_count) + buf += '\t%sData Block Count: %s\n' % (tab, len(ubi.data_blocks_list)) + buf += '\t%sLayout Block Count: %s\n' % (tab, len(ubi.layout_blocks_list)) + buf += '\t%sInternal Volume Block Count: %s\n' % (tab, len(ubi.int_vol_blocks_list)) + buf += '\t%sUnknown Block Count: %s\n' % (tab, len(ubi.unknown_blocks_list)) + buf += '\t%sFirst UBI PEB Number: %s\n' % (tab, ubi.first_peb_num) + return buf + + +def image(image, tab=''): + buf = '%s%s\n' % (tab, image) + buf += '%s---------------------\n' % (tab) + buf += '\t%sImage Sequence Num: %s\n' % (tab, image.image_seq) + for volume in image.volumes: + buf += '\t%sVolume Name:%s\n' % (tab, volume) + buf += '\t%sPEB Range: %s - %s\n' % (tab, image.peb_range[0], image.peb_range[1]) + return buf + + +def volume(volume, tab=''): + buf = '%s%s\n' % (tab, volume) + buf += '%s---------------------\n' % (tab) + buf += '\t%sVol ID: %s\n' % (tab, volume.vol_id) + buf += '\t%sName: %s\n' % (tab, volume.name.decode('utf-8')) + buf += '\t%sBlock Count: %s\n' % (tab, volume.block_count) + + buf += '\n' + buf += '\t%sVolume Record\n' % (tab) + buf += '\t%s---------------------\n' % (tab) + buf += vol_rec(volume.vol_rec, '\t\t%s' % tab) + + buf += '\n' + return buf + + +def block(block, tab='\t'): + buf = '%s%s\n' % (tab, block) + buf += '%s---------------------\n' % (tab) + buf += '\t%sFile Offset: %s\n' % (tab, block.file_offset) + buf += '\t%sPEB #: %s\n' % (tab, block.peb_num) + buf += '\t%sLEB #: %s\n' % (tab, block.leb_num) + buf += '\t%sBlock Size: %s\n' % (tab, block.size) + buf += '\t%sInternal Volume: %s\n' % (tab, block.is_internal_vol) + buf += '\t%sIs Volume Table: %s\n' % (tab, block.is_vtbl) + buf += '\t%sIs Valid: %s\n' % (tab, block.is_valid) + + if not block.ec_hdr.errors or settings.ignore_block_header_errors: + buf += '\n' + buf += '\t%sErase Count Header\n' % (tab) + buf += '\t%s---------------------\n' % (tab) + buf += ec_hdr(block.ec_hdr, '\t\t%s' % tab) + + if (block.vid_hdr and not block.vid_hdr.errors) or settings.ignore_block_header_errors: + buf += '\n' + buf += '\t%sVID Header\n' % (tab) + buf += '\t%s---------------------\n' % (tab) + buf += vid_hdr(block.vid_hdr, '\t\t%s' % tab) + + if block.vtbl_recs: + buf += '\n' + buf += '\t%sVolume Records\n' % (tab) + buf += '\t%s---------------------\n' % (tab) + for vol in block.vtbl_recs: + buf += vol_rec(vol, '\t\t%s' % tab) + + buf += '\n' + return buf + + +def ec_hdr(ec_hdr, tab=''): + buf = '' + for key, value in ec_hdr: + if key == 'errors': + value = ','.join(value) + + elif key == 'hdr_crc': + value = hex(value) + + buf += '%s%s: %r\n' % (tab, key, value) + return buf + + +def vid_hdr(vid_hdr, tab=''): + buf = '' + for key, value in vid_hdr: + if key == 'errors': + value = ','.join(value) + + elif key == 'hdr_crc': + value = hex(value) + + elif key == 'compat': + if value in PRINT_COMPAT_LIST: + value = PRINT_COMPAT_LIST[value] + else: + value = -1 + + elif key == 'vol_type': + if value < len(PRINT_VOL_TYPE_LIST): + value = PRINT_VOL_TYPE_LIST[value] + else: + value = -1 + + buf += '%s%s: %r\n' % (tab, key, value) + return buf + + +def vol_rec(vol_rec, tab=''): + buf = '' + for key, value in vol_rec: + if key == 'errors': + value = ','.join(value) + + elif key == 'crc': + value = hex(value) + + elif key == 'vol_type': + if value < len(PRINT_VOL_TYPE_LIST): + value = PRINT_VOL_TYPE_LIST[value] + else: + value = -1 + + elif key == 'flags' and value == UBI_VTBL_AUTORESIZE_FLG: + value = 'autoresize' + + elif key == 'name': + value = value.strip(b'\x00').decode('utf-8') + + elif key == 'padding': + value = value.decode('utf-8') + + buf += '%s%s: %r\n' % (tab, key, value) + return buf diff --git a/xmir_base/ubireader/ubi/headers.py b/xmir_base/ubireader/ubi/headers.py new file mode 100644 index 0000000..5fa3905 --- /dev/null +++ b/xmir_base/ubireader/ubi/headers.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python +############################################################# +# ubi_reader/ubi +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +import struct +from zlib import crc32 + +from ubireader.debug import log +from ubireader.ubi.defines import * + +class ec_hdr(object): + def __init__(self, buf): + fields = dict(list(zip(EC_HDR_FIELDS, struct.unpack(EC_HDR_FORMAT,buf)))) + for key in fields: + setattr(self, key, fields[key]) + setattr(self, 'errors', []) + + self._check_errors(buf[:-4]) + + def __repr__(self): + return 'Erase Count Header' + + def __iter__(self): + for key in dir(self): + if not key.startswith('_'): + yield key, getattr(self, key) + + def _check_errors(self, buf_crc): + crc_chk = (~crc32(buf_crc) & UBI_CRC32_INIT) + if self.hdr_crc != crc_chk: + log(vid_hdr, 'CRC Failed: expected 0x%x got 0x%x' % (crc_chk, self.hdr_crc)) + self.errors.append('crc') + + +class vid_hdr(object): + def __init__(self, buf): + fields = dict(list(zip(VID_HDR_FIELDS, struct.unpack(VID_HDR_FORMAT,buf)))) + for key in fields: + setattr(self, key, fields[key]) + setattr(self, 'errors', []) + + self._check_errors(buf[:-4]) + + def __iter__(self): + for key in dir(self): + if not key.startswith('_'): + yield key, getattr(self, key) + + def __repr__(self): + return 'VID Header' + + def _check_errors(self, buf_crc): + crc_chk = (~crc32(buf_crc) & UBI_CRC32_INIT) + if self.hdr_crc != crc_chk: + log(vid_hdr, 'CRC Failed: expected 0x%x got 0x%x' % (crc_chk, self.hdr_crc)) + self.errors.append('crc') + + +def vtbl_recs(buf): + data_buf = buf + vtbl_recs = [] + vtbl_rec_ret = '' + + for i in range(0, UBI_MAX_VOLUMES): + offset = i*UBI_VTBL_REC_SZ + vtbl_rec_buf = data_buf[offset:offset+UBI_VTBL_REC_SZ] + + if len(vtbl_rec_buf) == UBI_VTBL_REC_SZ: + vtbl_rec_ret = _vtbl_rec(vtbl_rec_buf) + + if len(vtbl_rec_ret.errors) == 0 and vtbl_rec_ret.name_len: + vtbl_rec_ret.rec_index = i + vtbl_recs.append(vtbl_rec_ret) + + return vtbl_recs + + +class _vtbl_rec(object): + def __init__(self, buf): + fields = dict(list(zip(VTBL_REC_FIELDS, struct.unpack(VTBL_REC_FORMAT,buf)))) + for key in fields: + setattr(self, key, fields[key]) + setattr(self, 'errors', []) + setattr(self, 'rec_index', -1) + + self.name = self.name[0: self.name_len] + self._check_errors(buf[:-4]) + + def __repr__(self): + return 'Volume Table Record: %s' % getattr(self, 'name') + + def __iter__(self): + for key in dir(self): + if not key.startswith('_'): + yield key, getattr(self, key) + + def _check_errors(self, buf_crc): + if self.crc != (~crc32(buf_crc) & 0xFFFFFFFF): + self.errors.append('crc') diff --git a/xmir_base/ubireader/ubi/image.py b/xmir_base/ubireader/ubi/image.py new file mode 100644 index 0000000..f634b28 --- /dev/null +++ b/xmir_base/ubireader/ubi/image.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +############################################################# +# ubi_reader/ubi +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +from ubireader.debug import log +from ubireader.ubi import display +from ubireader.ubi.volume import get_volumes +from ubireader.ubi.block import get_blocks_in_list + +class description(object): + def __init__(self, blocks, layout_info): + self._image_seq = blocks[layout_info[0]].ec_hdr.image_seq + self.vid_hdr_offset = blocks[layout_info[0]].ec_hdr.vid_hdr_offset + self.version = blocks[layout_info[0]].ec_hdr.version + self._block_list = layout_info[2] + self._start_peb = min(layout_info[2]) + self._end_peb = max(layout_info[2]) + self._volumes = get_volumes(blocks, layout_info) + log(description, 'Created Image: %s, Volume Cnt: %s' % (self.image_seq, len(self.volumes))) + + def __repr__(self): + return 'Image: %s' % (self.image_seq) + + + def get_blocks(self, blocks): + return get_blocks_in_list(blocks, self._block_list) + + + def _get_peb_range(self): + return [self._start_peb, self._end_peb] + peb_range = property(_get_peb_range) + + + def _get_image_seq(self): + return self._image_seq + image_seq = property(_get_image_seq) + + + def _get_volumes(self): + return self._volumes + volumes = property(_get_volumes) + + + def display(self, tab=''): + return display.image(self, tab) diff --git a/xmir_base/ubireader/ubi/volume.py b/xmir_base/ubireader/ubi/volume.py new file mode 100644 index 0000000..f02834b --- /dev/null +++ b/xmir_base/ubireader/ubi/volume.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python +############################################################# +# ubi_reader/ubi +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +from ubireader.debug import log +from ubireader.ubi import display +from ubireader.ubi.block import sort, get_blocks_in_list, rm_old_blocks + +class description(object): + """UBI Volume object + + Attributes: + Int:vol_id -- Volume ID + Str:name -- Name of volume. + Obj:vol_rec -- Volume record object + Int:block_count -- Number of block associated with volume. + + Methods: + display(tab) -- Print Volume information + Str:tab -- (optional) '\t' to preface lines with. + + get_blocks(blocks) -- Returns list of block objects tied to this volume + + Volume object is basically a list of block indexes and some metadata + describing a volume found in a UBI image. + """ + def __init__(self, vol_id, vol_rec, block_list): + self._vol_id = vol_id + self._vol_rec = vol_rec + self._name = self._vol_rec.name + self._block_list = block_list + log(description, 'Create Volume: %s, ID: %s, Block Cnt: %s' % (self.name, self.vol_id, len(self.block_list))) + + + def __repr__(self): + return 'Volume: %s' % (self.name.decode('utf-8')) + + + def _get_name(self): + return self._name + name = property(_get_name) + + + def _get_vol_id(self): + return self._vol_id + vol_id = property(_get_vol_id) + + + def _get_block_count(self): + return len(self._block_list) + block_count = property(_get_block_count) + + + def _get_vol_rec(self): + return self._vol_rec + vol_rec = property(_get_vol_rec) + + + def _get_block_list(self): + return self._block_list + block_list = property(_get_block_list) + + + def get_blocks(self, blocks): + return get_blocks_in_list(blocks, self._block_list) + + + def display(self, tab=''): + return display.volume(self, tab) + + + def reader(self, ubi): + last_leb = 0 + for block in sort.by_leb(self.get_blocks(ubi.blocks)): + if block == 'x': + last_leb += 1 + yield b'\xff'*ubi.leb_size + else: + last_leb += 1 + yield ubi.file.read_block_data(ubi.blocks[block]) + + +def get_volumes(blocks, layout_info): + """Get a list of UBI volume objects from list of blocks + + Arguments: + List:blocks -- List of layout block objects + List:layout_info -- Layout info (indexes of layout blocks and + associated data blocks.) + + Returns: + Dict -- Of Volume objects by volume name, including any + relevant blocks. + """ + volumes = {} + + vol_blocks_lists = sort.by_vol_id(blocks, layout_info[2]) + for vol_rec in blocks[layout_info[0]].vtbl_recs: + vol_name = vol_rec.name.strip(b'\x00').decode('utf-8') + if vol_rec.rec_index not in vol_blocks_lists: + vol_blocks_lists[vol_rec.rec_index] = [] + + vol_blocks_lists[vol_rec.rec_index] = rm_old_blocks(blocks, vol_blocks_lists[vol_rec.rec_index]) + volumes[vol_name] = description(vol_rec.rec_index, vol_rec, vol_blocks_lists[vol_rec.rec_index]) + + return volumes + diff --git a/xmir_base/ubireader/ubi_io.py b/xmir_base/ubireader/ubi_io.py new file mode 100644 index 0000000..d6f80db --- /dev/null +++ b/xmir_base/ubireader/ubi_io.py @@ -0,0 +1,251 @@ +#!/usr/bin/env python +############################################################# +# ubi_reader/ubi_io +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +from ubireader.debug import error, log, verbose_log +from ubireader.ubi.block import sort + +class ubi_file(object): + """UBI image file object + + Arguments: + Str:path -- Path to file to parse + Int:block_size -- Erase block size of NAND in bytes. + Int:start_offset -- (optional) Where to start looking in the file for + UBI data. + Int:end_offset -- (optional) Where to stop looking in the file. + + Methods: + seek -- Put file head to specified byte offset. + Int:offset + read -- Read specified bytes from file handle. + Int:size + tell -- Returns byte offset of current file location. + read_block -- Returns complete PEB data of provided block + description. + Obj:block + read_block_data -- Returns LEB data only from provided block. + Obj:block + reader -- Generator that returns data from file. + reset -- Reset file position to start_offset. + is_valid -- If the object intialized okay. + + Handles all the actual file interactions, read, seek, + extract blocks, etc. + """ + + def __init__(self, path, block_size, start_offset=0, end_offset=None): + self.__name__ = 'UBI_File' + self.is_valid = False + try: + log(self, 'Open Path: %s' % path) + self._fhandle = open(path, 'rb') + except Exception as e: + error(self, 'Fatal', 'Open file: %s' % e) + + self._fhandle.seek(0,2) + file_size = self.tell() + log(self, 'File Size: %s' % file_size) + + self._start_offset = start_offset + log(self, 'Start Offset: %s' % (self._start_offset)) + + if end_offset: + self._end_offset = end_offset + else: + self._end_offset = file_size + log(self, 'End Offset: %s' % (self._end_offset)) + + self._block_size = block_size + log(self, 'Block Size: %s' % block_size) + + if start_offset > self._end_offset: + error(self, 'Fatal', 'Start offset larger than end offset.') + + if ( not end_offset is None ) and ( end_offset > file_size ): + error(self, 'Fatal', 'End offset larger than file size.') + + remainder = (self._end_offset - start_offset) % block_size + if remainder != 0: + error(self, 'Warning', 'end_offset - start_offset length is not block aligned, could mean missing data.') + + self._fhandle.seek(self._start_offset) + self._last_read_addr = self._fhandle.tell() + self.is_valid = True + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + def _set_start(self, i): + self._start_offset = i + def _get_start(self): + return self._start_offset + start_offset = property(_get_start, _set_start) + + + def _get_end(self): + return self._end_offset + end_offset = property(_get_end) + + + def _get_block_size(self): + return self._block_size + block_size = property(_get_block_size) + + def close(self): + self._fhandle.close() + + def seek(self, offset): + self._fhandle.seek(offset) + + + def read(self, size): + self._last_read_addr = self.tell() + verbose_log(self, 'read loc: %s, size: %s' % (self._last_read_addr, size)) + return self._fhandle.read(size) + + + def tell(self): + return self._fhandle.tell() + + + def last_read_addr(self): + return self._last_read_addr + + + def reset(self): + self._fhandle.seek(self.start_offset) + + + def reader(self): + self.reset() + while True: + cur_loc = self._fhandle.tell() + if self.end_offset and cur_loc > self.end_offset: + break + elif self.end_offset and self.end_offset - cur_loc < self.block_size: + chunk_size = self.end_offset - cur_loc + else: + chunk_size = self.block_size + + buf = self.read(chunk_size) + + if not buf: + break + yield buf + + + def read_block(self, block): + """Read complete PEB data from file. + + Argument: + Obj:block -- Block data is desired for. + """ + self.seek(block.file_offset) + return self._fhandle.read(block.size) + + + def read_block_data(self, block): + """Read LEB data from file + + Argument: + Obj:block -- Block data is desired for. + """ + self.seek(block.file_offset + block.ec_hdr.data_offset) + buf = self._fhandle.read(block.size - block.ec_hdr.data_offset - block.vid_hdr.data_pad) + return buf + + + +class leb_virtual_file(): + def __init__(self, ubi, block_list): + self.__name__ = 'leb_virtual_file' + self.is_valid = False + self._ubi = ubi + self._last_read_addr = 0 + + if not len(block_list): + error(self, 'Info', 'Empty block list') + else: + self._blocks = sort.by_leb(block_list) + self._seek = 0 + self._last_leb = -1 + self._last_buf = '' + self.is_valid = True + + + def read(self, size): + buf = '' + leb = int(self.tell() / self._ubi.leb_size) + offset = self.tell() % self._ubi.leb_size + + try: + if size < 0: + raise Exception('Bad Read Offset Request') + + self._last_read_addr = self._ubi.blocks[self._blocks[leb]].file_offset + self._ubi.blocks[self._blocks[leb]].ec_hdr.data_offset + offset + + except Exception as e: + error(self.read, 'Error', 'LEB: %s is corrupted or has no data.' % (leb)) + raise Exception('Bad Read Offset Request') + + verbose_log(self, 'read loc: %s, size: %s' % (self._last_read_addr, size)) + + if leb == self._last_leb: + self.seek(self.tell() + size) + return self._last_buf[offset:offset+size] + else: + try: + buf = self._ubi.file.read_block_data(self._ubi.blocks[self._blocks[leb]]) + self._last_buf = buf + self._last_leb = leb + self.seek(self.tell() + size) + return buf[offset:offset+size] + except Exception as e: + error(self, 'Fatal', 'read loc: %s, size: %s, LEB: %s, offset: %s, error: %s' % (self._last_read_addr, size, leb, offset, e)) + + + def reset(self): + self.seek(0) + + + def seek(self, offset): + self._seek = offset + + + def tell(self): + return self._seek + + + def last_read_addr(self): + """Start address of last physical file read""" + return self._last_read_addr + + + def reader(self): + last_leb = 0 + for block in self._blocks: + while 0 != (self._ubi.blocks[block].leb_num - last_leb): + last_leb += 1 + yield b'\xff'*self._ubi.leb_size + + last_leb += 1 + yield self._ubi.file.read_block_data(self._ubi.blocks[block]) diff --git a/xmir_base/ubireader/ubifs/__init__.py b/xmir_base/ubireader/ubifs/__init__.py new file mode 100644 index 0000000..3e2bbca --- /dev/null +++ b/xmir_base/ubireader/ubifs/__init__.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python +############################################################# +# ubi_reader/ubifs +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +from ubireader.debug import error, log, verbose_display +from ubireader.ubifs.defines import * +from ubireader.ubifs import nodes, display + +class ubifs(): + """UBIFS object + + Arguments: + Str:path -- File path to UBIFS image. + + Attributes: + Obj:file -- File object + Int:leb_size -- Size of Logical Erase Blocks. + Int:min_io -- Size of min I/O from vid_hdr_offset. + Obj:sb_node -- Superblock node of UBIFS image LEB0 + Obj:mst_node -- Master Node of UBIFS image LEB1 + Obj:mst_node2 -- Master Node 2 of UBIFS image LEB2 + """ + def __init__(self, ubifs_file): + self.__name__ = 'UBIFS' + self._file = ubifs_file + try: + self.file.reset() + sb_chdr = nodes.common_hdr(self.file.read(UBIFS_COMMON_HDR_SZ)) + log(self , '%s file addr: %s' % (sb_chdr, self.file.last_read_addr())) + verbose_display(sb_chdr) + + if sb_chdr.node_type == UBIFS_SB_NODE: + self.file.seek(UBIFS_COMMON_HDR_SZ) + buf = self.file.read(UBIFS_SB_NODE_SZ) + self._sb_node = nodes.sb_node(buf, self.file.last_read_addr()) + self._min_io_size = self._sb_node.min_io_size + self._leb_size = self._sb_node.leb_size + log(self , '%s file addr: %s' % (self._sb_node, self.file.last_read_addr())) + verbose_display(self._sb_node) + else: + raise Exception('Wrong node type.') + except Exception as e: + error(self, 'Fatal', 'Super block error: %s' % e) + + self._mst_nodes = [None, None] + for i in range(0, 2): + try: + mst_offset = self.leb_size * (UBIFS_MST_LNUM + i) + self.file.seek(mst_offset) + mst_chdr = nodes.common_hdr(self.file.read(UBIFS_COMMON_HDR_SZ)) + log(self , '%s file addr: %s' % (mst_chdr, self.file.last_read_addr())) + verbose_display(mst_chdr) + + if mst_chdr.node_type == UBIFS_MST_NODE: + self.file.seek(mst_offset + UBIFS_COMMON_HDR_SZ) + buf = self.file.read(UBIFS_MST_NODE_SZ) + self._mst_nodes[i] = nodes.mst_node(buf, self.file.last_read_addr()) + log(self , '%s%s file addr: %s' % (self._mst_nodes[i], i, self.file.last_read_addr())) + verbose_display(self._mst_nodes[i]) + else: + raise Exception('Wrong node type.') + except Exception as e: + error(self, 'Warn', 'Master block %s error: %s' % (i, e)) + + if self._mst_nodes[0] is None and self._mst_nodes[1] is None: + error(self, 'Fatal', 'No valid Master Node found.') + + elif self._mst_nodes[0] is None and self._mst_nodes[1] is not None: + self._mst_nodes[0] = self._mst_nodes[1] + self._mst_nodes[1] = None + log(self , 'Swapping Master Nodes due to bad first node.') + + + def _get_file(self): + return self._file + file = property(_get_file) + + + def _get_superblock(self): + """ Superblock Node Object + + Returns: + Obj:Superblock Node + """ + return self._sb_node + superblock_node = property(_get_superblock) + + + def _get_master_node(self): + """Master Node Object + + Returns: + Obj:Master Node + """ + return self._mst_nodes[0] + master_node = property(_get_master_node) + + + def _get_master_node2(self): + """Master Node Object 2 + + Returns: + Obj:Master Node + """ + return self._mst_nodes[1] + master_node2 = property(_get_master_node2) + + + def _get_leb_size(self): + """LEB size of UBI blocks in file. + + Returns: + Int -- LEB Size. + """ + return self._leb_size + leb_size = property(_get_leb_size) + + + def _get_min_io_size(self): + """Min I/O Size + + Returns: + Int -- Min I/O Size. + """ + return self._min_io_size + min_io_size = property(_get_min_io_size) + + def display(self, tab=''): + """Print information about this object. + + Argument: + Str:tab -- '\t' for spacing this object. + """ + return display.ubifs(self, tab) diff --git a/xmir_base/ubireader/ubifs/defines.py b/xmir_base/ubireader/ubifs/defines.py new file mode 100644 index 0000000..1520706 --- /dev/null +++ b/xmir_base/ubireader/ubifs/defines.py @@ -0,0 +1,432 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +############################################################# +# Adapted in part from linux-source-3.2/fs/ubi/ubi-media.h +# for use in Python. +# Oct. 2013 by Jason Pruitt +# +# Original copyright notice. +# -------------------------- +# +# This file is part of UBIFS. +# +# Copyright (C) 2006-2008 Nokia Corporation. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License version 2 as published by +# the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., 51 +# Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Artem Bityutskiy (Битюцкий Артём) +# Adrian Hunter +# +############################################################# + +import struct + +# Constant defines + +# Common Header. +UBIFS_NODE_MAGIC = b'\x31\x18\x10\x06' # Set to LSB + +# Initial CRC32 value. +UBIFS_CRC32_INIT = 0xFFFFFFFF + +# Do not compress data smaller than this. +UBIFS_MIN_COMPR_LEN = 128 + +# If difference between compressed data length and compressed data +# length, is less than this, do not compress data. +UBIFS_MIN_COMPRESS_DIFF = 64 + +# Root inode number +UBIFS_ROOT_INO = 1 + +# Lowest inode number for regular inodes, non-internal inodes. +UBIFS_FIRST_INO = 64 + +# Max file name and extended attr length (muptple of 8 minus 1. +UBIFS_MAX_NLEN = 255 + +# Max number of data journal heads. +UBIFS_MAX_JHEADS = 1 + +# Max data node data length/amount attached to inode node. +UBIFS_BLOCK_SIZE = 4096 +UBIFS_BLOCK_SHIFT = 12 + +# UBIFS padding byte pattern. +UBIFS_PADDING_BYTE = b'\xCE' + +# Max key length +UBIFS_MAX_KEY_LEN = 16 + +# Key length of simple format. +UBIFS_SK_LEN = 8 + +# Min index tree fanout. +UBIFS_MIN_FANOUT = 3 + +# Max number of levels in UBIFS indexing B-tree. +UBIFS_MAX_LEVELS = 512 + +# Max amount of data attached to inode in bytes. +UBIFS_MAX_INO_DATA = UBIFS_BLOCK_SIZE + +# LEB Properties Tree fanout (power of 2) and fanout. +UBIFS_LPT_FANOUT = 4 +UBIFS_LPT_FANOUT_SHIFT = 2 + +# LEB Properties Tree bit field sizes. +UBIFS_LPT_CRC_BITS = 16 +UBIFS_LPT_CRC_BYTES = 2 +UBIFS_LPT_TYPE_BITS = 4 + +# LEB Properties Tree node types. +UBIFS_LPT_PNODE = 0 # LPT leaf node (contains LEB Properties) +UBIFS_LPT_NNODE = 1 # LPT internal node +UBIFS_LPT_LTAB = 2 # LPT's own lprops table +UBIFS_LPT_LSAVE = 3 # LPT's save table (big model only) +UBIFS_LPT_NODE_CNT = 4 # count of LPT node types +UBIFS_LPT_NOT_A_NODE = (1 << UBIFS_LPT_TYPE_BITS) - 1 # 4 bits of 1 + +# Inode types +UBIFS_ITYPE_REG = 0 # Regular file +UBIFS_ITYPE_DIR = 1 # Directory +UBIFS_ITYPE_LNK = 2 # Soft link +UBIFS_ITYPE_BLK = 3 # Block device node +UBIFS_ITYPE_CHR = 4 # Char device node +UBIFS_ITYPE_FIFO = 5 # FIFO +UBIFS_ITYPE_SOCK = 6 # Socket +UBIFS_ITYPES_CNT = 7 # Support file type count + +# Supported key has functions +UBIFS_KEY_HASH_R5 = 0 # R5 hash +UBIFS_KEY_HASH_TEST = 1 # Test hash, returns first 4 bytes of name +PRINT_UBIFS_KEY_HASH = ['r5', 'test'] + +# Supported key formats +UBIFS_SIMPLE_KEY_FMT = 0 + +# Simple key format uses 29 bits for storing UBIFS name and hash. +UBIFS_S_KEY_BLOCK_BITS = 29 +UBIFS_S_KEY_BLOCK_MASK = 0x1FFFFFFF +UBIFS_S_KEY_HASH_BITS = UBIFS_S_KEY_BLOCK_BITS +UBIFS_S_KEY_HASH_MASK = UBIFS_S_KEY_BLOCK_MASK + +# Key types +UBIFS_INO_KEY = 0 # Inode node key +UBIFS_DATA_KEY = 1 # Data node key +UBIFS_DENT_KEY = 2 # Directory node key +UBIFS_XENT_KEY = 3 # Extended attribute entry key +UBIFS_KEY_TYPES_CNT = 4 # Supported key count + +# Number of reserved LEBs for Superblock area +UBIFS_SB_LEBS = 1 + +# Number of reserved LEBs for master area +UBIFS_MST_LEBS = 2 + +# First LEB of the Superblock area +UBIFS_SB_LNUM = 0 + +# First LEB of the master area +UBIFS_MST_LNUM = (UBIFS_SB_LNUM + UBIFS_SB_LEBS) + +# First LEB of log area +UBIFS_LOG_LNUM = (UBIFS_MST_LNUM + UBIFS_MST_LEBS) + +# On-flash inode flags +UBIFS_COMPR_FL = 1 # Use compression for this inode +UBIFS_SYNC_FL = 2 # Has to be synchronous I/O +UBIFS_IMMUTABLE_FL = 4 # Inode is immutable +UBIFS_APPEND_FL = 8 # Writes may only append data +UBIFS_DIRSYNC_FL = 16 # I/O on this directory inode must be synchronous +UBIFS_XATTR_FL = 32 # This inode is inode for extended attributes + +# Inode flag bits used by UBIFS +UBIFS_FL_MASK = 0x0000001F + +# Compression alogrithms. +UBIFS_COMPR_NONE = 0 # No compression +UBIFS_COMPR_LZO = 1 # LZO compression +UBIFS_COMPR_ZLIB = 2 # ZLIB compression +UBIFS_COMPR_ZSTD = 3 # ZSTD compression +UBIFS_COMPR_TYPES_CNT = 4 # Count of supported compression types +PRINT_UBIFS_COMPR = ['none','lzo','zlib', 'zstd'] + +# UBIFS node types +UBIFS_INO_NODE = 0 # Inode node +UBIFS_DATA_NODE = 1 # Data node +UBIFS_DENT_NODE = 2 # Directory entry node +UBIFS_XENT_NODE = 3 # Extended attribute node +UBIFS_TRUN_NODE = 4 # Truncation node +UBIFS_PAD_NODE = 5 # Padding node +UBIFS_SB_NODE = 6 # Superblock node +UBIFS_MST_NODE = 7 # Master node +UBIFS_REF_NODE = 8 # LEB reference node +UBIFS_IDX_NODE = 9 # Index node +UBIFS_CS_NODE = 10 # Commit start node +UBIFS_ORPH_NODE = 11 # Orphan node +UBIFS_AUTH_NODE = 12 # Authentication node +UBIFS_SIG_NODE = 13 # Signature node +UBIFS_NODE_TYPES_CNT = 14 # Count of supported node types + +# Master node flags +UBIFS_MST_DIRTY = 1 # Rebooted uncleanly +UBIFS_MST_NO_ORPHS = 2 # No orphans present +UBIFS_MST_RCVRY = 4 # Written by recovery +PRINT_UBIFS_MST = [[UBIFS_MST_DIRTY, 'Dirty'], + [UBIFS_MST_NO_ORPHS, 'No orphans'], + [UBIFS_MST_RCVRY, 'Recovery write'], + ] + +# Node group type +UBIFS_NO_NODE_GROUP = 0 # This node is not part of a group +UBIFS_IN_NODE_GROUP = 1 # This node is part of a group +UBIFS_LAST_OF_NODE_GROUP = 2 # This node is the last in a group + +# Superblock flags +UBIFS_FLG_BIGLPT = 2 # If 'big' LPT model is used if set. +UBIFS_FLG_SPACE_FIXUP = 4 # First-mount 'fixup' of free space within. +UBIFS_FLG_DOUBLE_HASH = 8 # Store 32bit cookie for 64bit support. +UBIFS_FLG_ENCRYPTION = 16 # If filesystem contains encrypted files. +UBIFS_FLG_AUTHENTICATION = 32 # If contains hashes for authentication. +PRINT_UBIFS_FLGS = [[UBIFS_FLG_BIGLPT, 'Big LPT'], + [UBIFS_FLG_SPACE_FIXUP, 'Space fixup'], + [UBIFS_FLG_DOUBLE_HASH, 'Double hash'], + [UBIFS_FLG_ENCRYPTION, 'Encryption'], + [UBIFS_FLG_AUTHENTICATION,'Authentication'], + ] + +# Struct defines + +# Common header node +UBIFS_COMMON_HDR_FORMAT = '. +############################################################# + +from ubireader.ubifs.defines import PRINT_UBIFS_FLGS, PRINT_UBIFS_MST + +def ubifs(ubifs, tab=''): + buf = '%sUBIFS Image\n' % (tab) + buf += '%s---------------------\n' % (tab) + buf += '%sMin I/O: %s\n' % (tab, ubifs.min_io_size) + buf += '%sLEB Size: %s\n' % (tab, ubifs.leb_size) + return buf + +def common_hdr(chdr, tab=''): + buf = '%s%s\n' % (tab, chdr) + buf += '%s---------------------\n' % (tab) + tab += '\t' + for key, value in chdr: + if key == 'display': + continue + elif key == 'crc': + buf += '%s%s: 0x%x\n' % (tab, key, value) + elif key == 'errors': + buf += '%s%s: %s\n' % (tab, key, ','.join(value)) + else: + buf += '%s%s: %r\n' % (tab, key, value) + return buf + +def sb_node(node, tab=''): + buf = '%s%s\n' % (tab, node) + buf += '%sFile offset: %s\n' % (tab, node.file_offset) + buf += '%s---------------------\n' % (tab) + tab += '\t' + for key, value in node: + if key == 'display': + continue + elif key == 'errors': + buf += '%s%s: %s\n' % (tab, key, ','.join(value)) + elif key == 'uuid': + buf += '%s%s: %r\n' % (tab, key, value) + elif key == 'flags': + flags = '' + for flag in PRINT_UBIFS_FLGS: + if value & flag[0]: + flags += '%s, ' % flag[1] + + if flags.endswith(', '): + flags = flags[0:-2] + + buf += '%s%s: %s\n' % (tab, key, flags) + + else: + buf += '%s%s: %r\n' % (tab, key, value) + return buf + + +def mst_node(node, tab=''): + buf = '%s%s\n' % (tab, node) + buf += '%sFile offset: %s\n' % (tab, node.file_offset) + buf += '%s---------------------\n' % (tab) + tab += '\t' + for key, value in node: + if key == 'display': + continue + elif key == 'errors': + buf += '%s%s: %s\n' % (tab, key, ','.join(value)) + elif key == 'flags': + flags = '' + for flag in PRINT_UBIFS_MST: + if value & flag[0]: + flags += '%s, ' % flag[1] + + if flags.endswith(', '): + flags = flags[0:-2] + + buf += '%s%s: %s\n' % (tab, key, flags) + else: + buf += '%s%s: %r\n' % (tab, key, value) + return buf + + +def dent_node(node, tab=''): + buf = '%s%s\n' % (tab, node) + buf += '%s---------------------\n' % (tab) + tab += '\t' + for key, value in node: + if key == 'display': + continue + elif key == 'errors': + buf += '%s%s: %s\n' % (tab, key, ','.join(value)) + else: + buf += '%s%s: %r\n' % (tab, key, value) + return buf + + +def data_node(node, tab=''): + buf = '%s%s\n' % (tab, node) + buf += '%s---------------------\n' % (tab) + tab += '\t' + for key, value in node: + if key in ['display', 'data']: + continue + elif key == 'errors': + buf += '%s%s: %s\n' % (tab, key, ','.join(value)) + else: + buf += '%s%s: %r\n' % (tab, key, value) + return buf + + +def idx_node(node, tab=''): + buf = '%s%s\n' % (tab, node) + buf += '%s---------------------\n' % (tab) + tab += '\t' + for key, value in node: + if key == 'display': + continue + elif key == 'errors': + buf += '%s%s: %s\n' % (tab, key, ','.join(value)) + else: + buf += '%s%s: %r\n' % (tab, key, value) + return buf + + +def ino_node(node, tab=''): + buf = '%s%s\n' % (tab, node) + buf += '%s---------------------\n' % (tab) + tab += '\t' + for key, value in node: + if key == 'display': + continue + elif key == 'errors': + buf += '%s%s: %s\n' % (tab, key, ','.join(value)) + else: + buf += '%s%s: %r\n' % (tab, key, value) + return buf + + +def branch(node, tab=''): + buf = '%s%s\n' % (tab, node) + buf += '%s---------------------\n' % (tab) + tab += '\t' + for key, value in node: + if key == 'display': + continue + elif key == 'errors': + buf += '%s%s: %s\n' % (tab, key, ','.join(value)) + else: + buf += '%s%s: %r\n' % (tab, key, value) + return buf + diff --git a/xmir_base/ubireader/ubifs/list.py b/xmir_base/ubireader/ubifs/list.py new file mode 100644 index 0000000..166ea83 --- /dev/null +++ b/xmir_base/ubireader/ubifs/list.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python +############################################################# +# ubi_reader/ubifs +# (C) Collin Mulliner based on Jason Pruitt's output.py + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +import os +import time +from ubireader.ubifs.defines import * +from ubireader.ubifs import walk +from ubireader.ubifs.misc import decompress +from ubireader.debug import error, log, verbose_log + + +def list_files(ubifs, list_path): + pathnames = list_path.split("/") + pnames = [] + for i in pathnames: + if len(i) > 0: + pnames.append(i) + try: + inodes = {} + bad_blocks = [] + + walk.index(ubifs, ubifs.master_node.root_lnum, ubifs.master_node.root_offs, inodes, bad_blocks) + + if len(inodes) < 2: + raise Exception('No inodes found') + + inum = find_dir(inodes, 1, pnames, 0) + + if inum == None: + return + + if not 'dent' in inodes[inum]: + return + + for dent in inodes[inum]['dent']: + print_dent(ubifs, inodes, dent, longts=False) + + if len(bad_blocks): + error(list_files, 'Warn', 'Data may be missing or corrupted, bad blocks, LEB [%s]' % ','.join(map(str, bad_blocks))) + + except Exception as e: + error(list_files, 'Error', '%s' % e) + + +def copy_file(ubifs, filepath, destpath): + pathnames = filepath.split("/") + pnames = [] + for i in pathnames: + if len(i) > 0: + pnames.append(i) + + filename = pnames[len(pnames)-1] + del pnames[-1] + + inodes = {} + bad_blocks = [] + + walk.index(ubifs, ubifs.master_node.root_lnum, ubifs.master_node.root_offs, inodes, bad_blocks) + + if len(inodes) < 2: + return False + + inum = find_dir(inodes, 1, pnames, 0) + + if inum == None: + return False + + if not 'dent' in inodes[inum]: + return False + + for dent in inodes[inum]['dent']: + if dent.name == filename: + filedata = _process_reg_file(ubifs, inodes[dent.inum], filepath) + if os.path.isdir(destpath): + destpath = os.path.join(destpath, filename) + with open(destpath, 'wb') as f: + f.write(filedata) + return True + return False + + +def find_dir(inodes, inum, names, idx): + if len(names) == 0: + return 1 + for dent in inodes[inum]['dent']: + if dent.name == names[idx]: + if len(names) == idx+1: + return dent.inum + else: + return find_dir(inodes, dent.inum, names, idx+1) + return None + + +def print_dent(ubifs, inodes, dent_node, long=True, longts=False): + inode = inodes[dent_node.inum] + if long: + fl = file_leng(ubifs, inode) + + lnk = "" + if dent_node.type == UBIFS_ITYPE_LNK: + lnk = " -> " + inode['ino'].data.decode('utf-8') + + if longts: + mtime = inode['ino'].mtime_sec + else: + mtime = time.strftime("%b %d %H:%M", time.gmtime(inode['ino'].mtime_sec)) + + print('%6o %2d %s %s %7d %s %s%s' % (inode['ino'].mode, inode['ino'].nlink, inode['ino'].uid, inode['ino'].gid, fl, mtime, dent_node.name, lnk)) + else: + print(dent_node.name) + + +def file_leng(ubifs, inode): + fl = 0 + if 'data' in inode: + compr_type = 0 + sorted_data = sorted(inode['data'], key=lambda x: x.key['khash']) + last_khash = sorted_data[0].key['khash']-1 + + for data in sorted_data: + if data.key['khash'] - last_khash != 1: + while 1 != (data.key['khash'] - last_khash): + last_khash += 1 + fl = fl + UBIFS_BLOCK_SIZE + fl = fl + data.size + return fl + return 0 + + +def _process_reg_file(ubifs, inode, path): + try: + buf = bytearray() + if 'data' in inode: + compr_type = 0 + sorted_data = sorted(inode['data'], key=lambda x: x.key['khash']) + last_khash = sorted_data[0].key['khash']-1 + + for data in sorted_data: + + # If data nodes are missing in sequence, fill in blanks + # with \x00 * UBIFS_BLOCK_SIZE + if data.key['khash'] - last_khash != 1: + while 1 != (data.key['khash'] - last_khash): + buf += b'\x00'*UBIFS_BLOCK_SIZE + last_khash += 1 + + compr_type = data.compr_type + ubifs.file.seek(data.offset) + d = ubifs.file.read(data.compr_len) + buf += decompress(compr_type, data.size, d) + last_khash = data.key['khash'] + verbose_log(_process_reg_file, 'ino num: %s, compression: %s, path: %s' % (inode['ino'].key['ino_num'], compr_type, path)) + + except Exception as e: + error(_process_reg_file, 'Warn', 'inode num:%s :%s' % (inode['ino'].key['ino_num'], e)) + + # Pad end of file with \x00 if needed. + if inode['ino'].size > len(buf): + buf += b'\x00' * (inode['ino'].size - len(buf)) + + return bytes(buf) diff --git a/xmir_base/ubireader/ubifs/misc.py b/xmir_base/ubireader/ubifs/misc.py new file mode 100644 index 0000000..7d63d34 --- /dev/null +++ b/xmir_base/ubireader/ubifs/misc.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python +############################################################# +# ubi_reader/ubifs +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +#from lzallright import LZOCompressor, LZOError +import struct +import zlib +from ubireader.ubifs.defines import * +from ubireader.debug import error + +# For happy printing +ino_types = ['file', 'dir','lnk','blk','chr','fifo','sock'] +node_types = ['ino','data','dent','xent','trun','pad','sb','mst','ref','idx','cs','orph'] +key_types = ['ino','data','dent','xent'] + + +def parse_key(key): + """Parse node key + + Arguments: + Str:key -- Hex string literal of node key. + + Returns: + Int:key_type -- Type of key, data, ino, dent, etc. + Int:ino_num -- Inode number. + Int:khash -- Key hash. + """ + hkey, lkey = struct.unpack('> UBIFS_S_KEY_BLOCK_BITS + khash = lkey + + #if key_type < UBIFS_KEY_TYPES_CNT: + return {'type':key_type, 'ino_num':ino_num, 'khash': khash} + + +def decompress(ctype, unc_len, data): + """Decompress data. + + Arguments: + Int:ctype -- Compression type LZO, ZLIB (*currently unused*). + Int:unc_len -- Uncompressed data lenth. + Str:data -- Data to be uncompessed. + + Returns: + Uncompressed Data. + """ + if ctype == UBIFS_COMPR_LZO: + from lzallright import LZOCompressor, LZOError + try: + return LZOCompressor.decompress(data, output_size_hint=unc_len) + except Exception as e: + error(decompress, 'Warn', 'LZO Error: %s' % e) + elif ctype == UBIFS_COMPR_ZLIB: + try: + return zlib.decompress(data, -11) + except Exception as e: + error(decompress, 'Warn', 'ZLib Error: %s' % e) + else: + return data + + diff --git a/xmir_base/ubireader/ubifs/nodes.py b/xmir_base/ubireader/ubifs/nodes.py new file mode 100644 index 0000000..fd2bcb7 --- /dev/null +++ b/xmir_base/ubireader/ubifs/nodes.py @@ -0,0 +1,262 @@ +#!/usr/bin/env python +############################################################# +# ubi_reader/ubifs +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +from ubireader.ubifs.misc import parse_key +from ubireader.ubifs.defines import * +from ubireader.ubifs import display + +class common_hdr(object): + """Get common header at given LEB number + offset. + + Arguments: + Bin:buf -- Raw data to extract header information from. + + See ubifs/defines.py for object attributes. + """ + def __init__(self, buf): + + fields = dict(list(zip(UBIFS_COMMON_HDR_FIELDS, struct.unpack(UBIFS_COMMON_HDR_FORMAT, buf)))) + for key in fields: + setattr(self, key, fields[key]) + + setattr(self, 'errors', []) + + def __repr__(self): + return 'UBIFS Common Header' + + def __iter__(self): + for key in dir(self): + if not key.startswith('_'): + yield key, getattr(self, key) + + def display(self, tab=''): + return display.common_hdr(self, tab) + + +class ino_node(object): + """Get inode node at given LEB number + offset. + + Arguments: + Bin:buf -- Raw data to extract header information from. + + See ubifs/defines.py for object attributes. + """ + def __init__(self, buf): + + fields = dict(list(zip(UBIFS_INO_NODE_FIELDS, struct.unpack(UBIFS_INO_NODE_FORMAT, buf[0:UBIFS_INO_NODE_SZ])))) + for key in fields: + if key == 'key': + setattr(self, key, parse_key(fields[key])) + else: + setattr(self, key, fields[key]) + + setattr(self, 'data', buf[UBIFS_INO_NODE_SZ:]) + setattr(self, 'errors', []) + + def __repr__(self): + return 'UBIFS Ino Node' + + def __iter__(self): + for key in dir(self): + if not key.startswith('_'): + yield key, getattr(self, key) + + def display(self, tab=''): + return display.ino_node(self, tab) + + +class dent_node(object): + """Get dir entry node at given LEB number + offset. + + Arguments: + Bin:buf -- Raw data to extract header information from. + + See ubifs/defines.py for object attributes. + """ + def __init__(self, buf): + fields = dict(list(zip(UBIFS_DENT_NODE_FIELDS, struct.unpack(UBIFS_DENT_NODE_FORMAT, buf[0:UBIFS_DENT_NODE_SZ])))) + for key in fields: + if key == 'key': + setattr(self, key, parse_key(fields[key])) + else: + setattr(self, key, fields[key]) + + setattr(self, 'name', '%s' % buf[-self.nlen-1:-1].decode('utf-8')) + setattr(self, 'errors', []) + + def __repr__(self): + return 'UBIFS Directory Entry Node' + + def __iter__(self): + for key in dir(self): + if not key.startswith('_'): + yield key, getattr(self, key) + + def display(self, tab=''): + return display.dent_node(self, tab) + + +class data_node(object): + """Get data node at given LEB number + offset. + + Arguments: + Bin:buf -- Raw data to extract header information from. + Int:offset -- Offset in LEB of data node. + + See ubifs/defines.py for object attributes. + """ + def __init__(self, buf, file_offset): + + fields = dict(list(zip(UBIFS_DATA_NODE_FIELDS, struct.unpack(UBIFS_DATA_NODE_FORMAT, buf[0:UBIFS_DATA_NODE_SZ])))) + for key in fields: + if key == 'key': + setattr(self, key, parse_key(fields[key])) + else: + setattr(self, key, fields[key]) + + setattr(self, 'offset', file_offset) + setattr(self, 'compr_len', (len(buf) - UBIFS_DATA_NODE_SZ)) + setattr(self, 'errors', []) + + def __repr__(self): + return 'UBIFS Data Node' + + def __iter__(self): + for key in dir(self): + if not key.startswith('_'): + yield key, getattr(self, key) + + def display(self, tab=''): + return display.data_node(self, tab) + + +class idx_node(object): + """Get index node at given LEB number + offset. + + Arguments: + Bin:buf -- Raw data to extract header information from. + + See ubifs/defines.py for object attributes. + """ + def __init__(self, buf): + fields = dict(list(zip(UBIFS_IDX_NODE_FIELDS, struct.unpack(UBIFS_IDX_NODE_FORMAT, buf[0:UBIFS_IDX_NODE_SZ])))) + for key in fields: + setattr(self, key, fields[key]) + + idxs = UBIFS_IDX_NODE_SZ + brs = UBIFS_BRANCH_SZ + setattr(self, 'branches', [branch(buf[idxs+(brs*i):idxs+(brs*i)+brs]) for i in range(0, self.child_cnt)]) + setattr(self, 'errors', []) + + def __repr__(self): + return 'UBIFS Index Node' + + def __iter__(self): + for key in dir(self): + if not key.startswith('_'): + yield key, getattr(self, key) + + def display(self, tab=''): + return display.idx_node(self, tab) + + +class branch(object): + """ Create branch from given idx_node data buf. + + Arguments: + Bin:buf -- Raw data to extract header information from. + """ + def __init__(self, buf): + fields = dict(list(zip(UBIFS_BRANCH_FIELDS, struct.unpack(UBIFS_BRANCH_FORMAT, buf)))) + for key in fields: + if key == 'key': + setattr(self, key, parse_key(fields[key])) + else: + setattr(self, key, fields[key]) + + setattr(self, 'errors', []) + + def __repr__(self): + return 'UBIFS Branch' + + def __iter__(self): + for key in dir(self): + if not key.startswith('_'): + yield key, getattr(self, key) + + def display(self, tab=''): + return display.branch(self, tab) + + +class sb_node(object): + """Get superblock node at given LEB number + offset. + + Arguments: + Bin:buf -- Raw data to extract header information from. + Int:offset -- Offset in LEB of data node. + + See ubifs/defines.py for object attributes. + """ + def __init__(self, buf, file_offset=-1): + self.file_offset = file_offset + fields = dict(list(zip(UBIFS_SB_NODE_FIELDS, struct.unpack(UBIFS_SB_NODE_FORMAT, buf)))) + for key in fields: + setattr(self, key, fields[key]) + + setattr(self, 'errors', []) + + def __repr__(self): + return 'UBIFS Super Block Node' + + def __iter__(self): + for key in dir(self): + if not key.startswith('_'): + yield key, getattr(self, key) + + def display(self, tab=''): + return display.sb_node(self, tab) + + +class mst_node(object): + """Get master node at given LEB number + offset. + + Arguments: + Bin:buf -- Raw data to extract header information from. + Int:offset -- Offset in LEB of data node. + + See ubifs/defines.py for object attributes. + """ + def __init__(self, buf, file_offset=-1): + self.file_offset = file_offset + fields = dict(list(zip(UBIFS_MST_NODE_FIELDS, struct.unpack(UBIFS_MST_NODE_FORMAT, buf)))) + for key in fields: + setattr(self, key, fields[key]) + + setattr(self, 'errors', []) + + def __repr__(self): + return 'UBIFS Master Block Node' + + def __iter__(self): + for key in dir(self): + if not key.startswith('_'): + yield key, getattr(self, key) + + def display(self, tab=''): + return display.mst_node(self, tab) diff --git a/xmir_base/ubireader/ubifs/output.py b/xmir_base/ubireader/ubifs/output.py new file mode 100644 index 0000000..b4aa653 --- /dev/null +++ b/xmir_base/ubireader/ubifs/output.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python +############################################################# +# ubi_reader/ubifs +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +import os +import struct + +from ubireader import settings +from ubireader.ubifs.defines import * +from ubireader.ubifs import walk +from ubireader.ubifs.misc import decompress +from ubireader.debug import error, log, verbose_log + +def is_safe_path(basedir, path): + basedir = os.path.realpath(basedir) + path = os.path.realpath(os.path.join(basedir, path)) + return True if path.startswith(basedir) else False + + +def extract_files(ubifs, out_path, perms=False): + """Extract UBIFS contents to_path/ + + Arguments: + Obj:ubifs -- UBIFS object. + Str:out_path -- Path to extract contents to. + """ + try: + inodes = {} + bad_blocks = [] + + walk.index(ubifs, ubifs.master_node.root_lnum, ubifs.master_node.root_offs, inodes, bad_blocks) + + if len(inodes) < 2: + raise Exception('No inodes found') + + for dent in inodes[1]['dent']: + extract_dents(ubifs, inodes, dent, out_path, perms) + + if len(bad_blocks): + error(extract_files, 'Warn', 'Data may be missing or corrupted, bad blocks, LEB [%s]' % ','.join(map(str, bad_blocks))) + + except Exception as e: + error(extract_files, 'Error', '%s' % e) + + +def extract_dents(ubifs, inodes, dent_node, path='', perms=False): + if dent_node.inum not in inodes: + error(extract_dents, 'Error', 'inum: %s not found in inodes' % (dent_node.inum)) + return + + inode = inodes[dent_node.inum] + + if not is_safe_path(path, dent_node.name): + error(extract_dents, 'Warn', 'Path traversal attempt: %s, discarding.' % (dent_node.name)) + return + dent_path = os.path.realpath(os.path.join(path, dent_node.name)) + + if dent_node.type == UBIFS_ITYPE_DIR: + try: + if not os.path.exists(dent_path): + os.mkdir(dent_path) + log(extract_dents, 'Make Dir: %s' % (dent_path)) + + if perms: + _set_file_perms(dent_path, inode) + except Exception as e: + error(extract_dents, 'Warn', 'DIR Fail: %s' % e) + + if 'dent' in inode: + for dnode in inode['dent']: + extract_dents(ubifs, inodes, dnode, dent_path, perms) + + _set_file_timestamps(dent_path, inode) + + elif dent_node.type == UBIFS_ITYPE_REG: + try: + if inode['ino'].nlink > 1: + if 'hlink' not in inode: + inode['hlink'] = dent_path + buf = _process_reg_file(ubifs, inode, dent_path) + _write_reg_file(dent_path, buf) + else: + os.link(inode['hlink'], dent_path) + log(extract_dents, 'Make Link: %s > %s' % (dent_path, inode['hlink'])) + else: + buf = _process_reg_file(ubifs, inode, dent_path) + _write_reg_file(dent_path, buf) + + _set_file_timestamps(dent_path, inode) + + if perms: + _set_file_perms(dent_path, inode) + + except Exception as e: + error(extract_dents, 'Warn', 'FILE Fail: %s' % e) + + elif dent_node.type == UBIFS_ITYPE_LNK: + try: + # probably will need to decompress ino data if > UBIFS_MIN_COMPR_LEN + os.symlink('%s' % inode['ino'].data.decode('utf-8'), dent_path) + log(extract_dents, 'Make Symlink: %s > %s' % (dent_path, inode['ino'].data)) + + except Exception as e: + error(extract_dents, 'Warn', 'SYMLINK Fail: %s' % e) + + elif dent_node.type in [UBIFS_ITYPE_BLK, UBIFS_ITYPE_CHR]: + try: + dev = struct.unpack(' len(buf): + buf += b'\x00' * (inode['ino'].size - len(buf)) + + return bytes(buf) diff --git a/xmir_base/ubireader/ubifs/walk.py b/xmir_base/ubireader/ubifs/walk.py new file mode 100644 index 0000000..370caf5 --- /dev/null +++ b/xmir_base/ubireader/ubifs/walk.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python +############################################################# +# ubi_reader/ubifs +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +from ubireader import settings +from ubireader.ubifs import nodes +from ubireader.ubifs.defines import * +from ubireader.debug import error, log, verbose_log, verbose_display + +def index(ubifs, lnum, offset, inodes={}, bad_blocks=[]): + """Walk the index gathering Inode, Dir Entry, and File nodes. + + Arguments: + Obj:ubifs -- UBIFS object. + Int:lnum -- Logical erase block number. + Int:offset -- Offset in logical erase block. + Dict:inodes -- Dict of ino/dent/file nodes keyed to inode number. + + Returns: + Dict:inodes -- Dict of ino/dent/file nodes keyed to inode number. + 'ino' -- Inode node. + 'data' -- List of data nodes if present. + 'dent' -- List of directory entry nodes if present. + """ + if len(bad_blocks): + if lnum in bad_blocks: + return + + ubifs.file.seek((ubifs.leb_size * lnum) + offset) + buf = ubifs.file.read(UBIFS_COMMON_HDR_SZ) + + if len(buf) < UBIFS_COMMON_HDR_SZ: + if settings.warn_only_block_read_errors: + error(index, 'Error', 'LEB: %s, Common Hdr Size smaller than expected.' % (lnum)) + return + + else: + error(index, 'Fatal', 'LEB: %s, Common Hdr Size smaller than expected.' % (lnum)) + + chdr = nodes.common_hdr(buf) + log(index , '%s file addr: %s' % (chdr, ubifs.file.last_read_addr())) + verbose_display(chdr) + read_size = chdr.len - UBIFS_COMMON_HDR_SZ + node_buf = ubifs.file.read(read_size) + file_offset = ubifs.file.last_read_addr() + + if len(node_buf) < read_size: + if settings.warn_only_block_read_errors: + error(index, 'Error', 'LEB: %s at %s, Node size smaller than expected.' % (lnum, file_offset)) + return + + else: + error(index, 'Fatal', 'LEB: %s at %s, Node size smaller than expected.' % (lnum, file_offset)) + + if chdr.node_type == UBIFS_IDX_NODE: + try: + idxn = nodes.idx_node(node_buf) + + except Exception as e: + if settings.warn_only_block_read_errors: + error(index, 'Error', 'Problem at file address: %s extracting idx_node: %s' % (file_offset, e)) + return + + else: + error(index, 'Fatal', 'Problem at file address: %s extracting idx_node: %s' % (file_offset, e)) + + log(index, '%s file addr: %s' % (idxn, file_offset)) + verbose_display(idxn) + branch_idx = 0 + + for branch in idxn.branches: + verbose_log(index, '-------------------') + log(index, '%s file addr: %s' % (branch, file_offset + UBIFS_IDX_NODE_SZ + (branch_idx * UBIFS_BRANCH_SZ))) + verbose_display(branch) + index(ubifs, branch.lnum, branch.offs, inodes, bad_blocks) + branch_idx += 1 + + elif chdr.node_type == UBIFS_INO_NODE: + try: + inon = nodes.ino_node(node_buf) + + except Exception as e: + if settings.warn_only_block_read_errors: + error(index, 'Error', 'Problem at file address: %s extracting ino_node: %s' % (file_offset, e)) + return + + else: + error(index, 'Fatal', 'Problem at file address: %s extracting ino_node: %s' % (file_offset, e)) + + ino_num = inon.key['ino_num'] + log(index, '%s file addr: %s, ino num: %s' % (inon, file_offset, ino_num)) + verbose_display(inon) + + if not ino_num in inodes: + inodes[ino_num] = {} + + inodes[ino_num]['ino'] = inon + + elif chdr.node_type == UBIFS_DATA_NODE: + try: + datn = nodes.data_node(node_buf, (ubifs.leb_size * lnum) + UBIFS_COMMON_HDR_SZ + offset + UBIFS_DATA_NODE_SZ) + + except Exception as e: + if settings.warn_only_block_read_errors: + error(index, 'Error', 'Problem at file address: %s extracting data_node: %s' % (file_offset, e)) + return + + else: + error(index, 'Fatal', 'Problem at file address: %s extracting data_node: %s' % (file_offset, e)) + + ino_num = datn.key['ino_num'] + log(index, '%s file addr: %s, ino num: %s' % (datn, file_offset, ino_num)) + verbose_display(datn) + + if not ino_num in inodes: + inodes[ino_num] = {} + + if not 'data' in inodes[ino_num]: + inodes[ino_num]['data']= [] + + inodes[ino_num]['data'].append(datn) + + elif chdr.node_type == UBIFS_DENT_NODE: + try: + dn = nodes.dent_node(node_buf) + + except Exception as e: + if settings.warn_only_block_read_errors: + error(index, 'Error', 'Problem at file address: %s extracting dent_node: %s' % (file_offset, e)) + return + + else: + error(index, 'Fatal', 'Problem at file address: %s extracting dent_node: %s' % (file_offset, e)) + + ino_num = dn.key['ino_num'] + log(index, '%s file addr: %s, ino num: %s' % (dn, file_offset, ino_num)) + verbose_display(dn) + + if not ino_num in inodes: + inodes[ino_num] = {} + + if not 'dent' in inodes[ino_num]: + inodes[ino_num]['dent']= [] + + inodes[ino_num]['dent'].append(dn) diff --git a/xmir_base/ubireader/utils.py b/xmir_base/ubireader/utils.py new file mode 100644 index 0000000..1793285 --- /dev/null +++ b/xmir_base/ubireader/utils.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python +############################################################# +# ubi_reader +# (c) 2013 Jason Pruitt (jrspruitt@gmail.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +############################################################# + +import re +from ubireader.debug import error, log +from ubireader.ubi.defines import UBI_EC_HDR_MAGIC, FILE_CHUNK_SZ +from ubireader.ubifs.defines import UBIFS_NODE_MAGIC, UBIFS_SB_NODE_SZ, UBIFS_SB_NODE, UBIFS_COMMON_HDR_SZ +from ubireader.ubifs import nodes + +def guess_start_offset(path, guess_offset=0): + file_offset = guess_offset + + f = open(path, 'rb') + f.seek(0,2) + file_size = f.tell()+1 + f.seek(guess_offset) + + for _ in range(0, file_size, FILE_CHUNK_SZ): + buf = f.read(FILE_CHUNK_SZ) + ubi_loc = buf.find(UBI_EC_HDR_MAGIC) + ubifs_loc = buf.find(UBIFS_NODE_MAGIC) + + if ubi_loc == -1 and ubifs_loc == -1: + file_offset += FILE_CHUNK_SZ + continue + else: + if ubi_loc == -1: + ubi_loc = file_size + 1 + elif ubifs_loc == -1: + ubifs_loc = file_size + 1 + + if ubi_loc < ubifs_loc: + log(guess_start_offset, 'Found UBI magic number at %s' % (file_offset + ubi_loc)) + return file_offset + ubi_loc + + elif ubifs_loc < ubi_loc: + log(guess_start_offset, 'Found UBIFS magic number at %s' % (file_offset + ubifs_loc)) + return file_offset + ubifs_loc + else: + error(guess_start_offset, 'Fatal', 'Could not determine start offset.') + else: + error(guess_start_offset, 'Fatal', 'Could not determine start offset.') + + f.close() + + +def guess_filetype(path, start_offset=0): + log(guess_filetype, 'Looking for file type at %s' % start_offset) + + with open(path, 'rb') as f: + f.seek(start_offset) + buf = f.read(4) + + if buf == UBI_EC_HDR_MAGIC: + ftype = UBI_EC_HDR_MAGIC + log(guess_filetype, 'File looks like a UBI image.') + + elif buf == UBIFS_NODE_MAGIC: + ftype = UBIFS_NODE_MAGIC + log(guess_filetype, 'File looks like a UBIFS image.') + else: + ftype = None + error(guess_filetype, 'Fatal', 'Could not determine file type.') + + return ftype + + +def guess_leb_size(path): + """Get LEB size from superblock + + Arguments: + Str:path -- Path to file. + + Returns: + Int -- LEB size. + + Searches file for superblock and retrieves leb size. + """ + + f = open(path, 'rb') + f.seek(0,2) + file_size = f.tell()+1 + f.seek(0) + block_size = None + + for _ in range(0, file_size, FILE_CHUNK_SZ): + buf = f.read(FILE_CHUNK_SZ) + + for m in re.finditer(UBIFS_NODE_MAGIC, buf): + start = m.start() + chdr = nodes.common_hdr(buf[start:start+UBIFS_COMMON_HDR_SZ]) + + if chdr and chdr.node_type == UBIFS_SB_NODE: + sb_start = start + UBIFS_COMMON_HDR_SZ + sb_end = sb_start + UBIFS_SB_NODE_SZ + + if chdr.len != len(buf[sb_start:sb_end]): + f.seek(sb_start) + buf = f.read(UBIFS_SB_NODE_SZ) + else: + buf = buf[sb_start:sb_end] + + sbn = nodes.sb_node(buf) + block_size = sbn.leb_size + f.close() + return block_size + + f.close() + return block_size + + +def guess_peb_size(path): + """Determine the most likely block size + + Arguments: + Str:path -- Path to file. + + Returns: + Int -- PEB size. + + Searches file for Magic Number, picks most + common length between them. + """ + file_offset = 0 + offsets = [] + f = open(path, 'rb') + f.seek(0,2) + file_size = f.tell()+1 + f.seek(0) + + for _ in range(0, file_size, FILE_CHUNK_SZ): + buf = f.read(FILE_CHUNK_SZ) + for m in re.finditer(UBI_EC_HDR_MAGIC, buf): + start = m.start() + + if not file_offset: + file_offset = start + idx = start + else: + idx = start+file_offset + + offsets.append(idx) + + file_offset += FILE_CHUNK_SZ + f.close() + + occurrences = {} + for i in range(0, len(offsets)): + try: + diff = offsets[i] - offsets[i-1] + except: + diff = offsets[i] + + if diff not in occurrences: + occurrences[diff] = 0 + + occurrences[diff] += 1 + + most_frequent = 0 + block_size = None + + for offset in occurrences: + if occurrences[offset] > most_frequent: + most_frequent = occurrences[offset] + block_size = offset + + return block_size \ No newline at end of file