[xmir_base] Add module "ubi-reader"

Sources: https://github.com/onekey-sec/ubi_reader
pull/24/head
remittor 1 year ago
parent 5f8eca833b
commit d36f194a34

@ -0,0 +1,53 @@
#!/usr/bin/env python
#############################################################
# ubi_reader/ubifs
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
import sys
import traceback
from ubireader import settings
def log(obj, message):
if settings.logging_on or settings.logging_on_verbose:
print('{} {}'.format(obj.__name__, message))
def verbose_log(obj, message):
if settings.logging_on_verbose:
log(obj, message)
def verbose_display(displayable_obj):
if settings.logging_on_verbose:
print(displayable_obj.display('\t'))
def error(obj, level, message):
if settings.error_action == 'exit':
print('{} {}: {}'.format(obj.__name__, level, message))
if settings.fatal_traceback:
traceback.print_exc()
sys.exit(1)
else:
if level.lower() == 'warn':
print('{} {}: {}'.format(obj.__name__, level, message))
elif level.lower() == 'fatal':
print('{} {}: {}'.format(obj.__name__, level, message))
if settings.fatal_traceback:
traceback.print_exc()
sys.exit(1)
else:
print('{} {}: {}'.format(obj.__name__, level, message))

@ -0,0 +1,195 @@
#!/usr/bin/env python
#############################################################
# ubi_reader/scripts/ubireader_display_blocks
# (c) 2019 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
#############################################################
# Search by block parameters and display information about
# matching blocks.
#############################################################
import os
import sys
import argparse
from ubireader.ubi import ubi_base
from ubireader.ubi_io import ubi_file
from ubireader import settings
from ubireader.ubi.defines import UBI_EC_HDR_MAGIC
from ubireader.ubifs.defines import UBIFS_NODE_MAGIC
from ubireader.utils import guess_filetype, guess_start_offset, guess_leb_size, guess_peb_size
def main():
description = 'Search for specified blocks and display information.'
usage = """
ubireader_display_blocks "{'block.attr': value,...}" path/to/image
Search for blocks by given parameters and display information about them.
This is block only, no volume or image information is created, which can
be used to debug file and image extraction.
Example:
"{'peb_num':[0, 1] + range(100, 102), 'ec_hdr.ec': 1, 'is_valid': True}"
This matches block.peb_num 0, 1, 100, 101, and 102
with a block.ec_hdr.ec (erase count) of 1, that are valid PEB blocks.
For a full list of parameters check ubireader.ubi.block.description.
"""
parser = argparse.ArgumentParser(usage=usage, description=description)
parser.add_argument('-l', '--log', action='store_true', dest='log',
help='Print extraction information to screen.')
parser.add_argument('-v', '--verbose-log', action='store_true', dest='verbose',
help='Prints nearly everything about anything to screen.')
parser.add_argument('-p', '--peb-size', type=int, dest='block_size',
help='Specify PEB size. (UBI Only)')
parser.add_argument('-e', '--leb-size', type=int, dest='block_size',
help='Specify LEB size. (UBIFS Only)')
parser.add_argument('-s', '--start-offset', type=int, dest='start_offset',
help='Specify offset of UBI/UBIFS data in file. (default: 0)')
parser.add_argument('-n', '--end-offset', type=int, dest='end_offset',
help='Specify end offset of UBI/UBIFS data in file.')
parser.add_argument('-g', '--guess-offset', type=int, dest='guess_offset',
help='Specify offset to start guessing where UBI data is in file. (default: 0)')
parser.add_argument('-w', '--warn-only-block-read-errors', action='store_true', dest='warn_only_block_read_errors',
help='Attempts to continue extracting files even with bad block reads. Some data will be missing or corrupted! (default: False)')
parser.add_argument('-i', '--ignore-block-header-errors', action='store_true', dest='ignore_block_header_errors',
help='Forces unused and error containing blocks to be included and also displayed with log/verbose. (default: False)')
parser.add_argument('-f', '--u-boot-fix', action='store_true', dest='uboot_fix',
help='Assume blocks with image_seq 0 are because of older U-boot implementations and include them. (default: False)')
parser.add_argument('block_search_params',
help="""
Double quoted Dict of ubi.block.description attributes, which is run through eval().
Ex. "{\'peb_num\':[0, 1], \'ec_hdr.ec\': 1, \'is_valid\': True}"
""")
parser.add_argument('filepath', help='File with blocks of interest.')
if len(sys.argv) == 1:
parser.print_help()
args = parser.parse_args()
settings.logging_on = args.log
settings.logging_on_verbose = args.verbose
settings.warn_only_block_read_errors = args.warn_only_block_read_errors
settings.ignore_block_header_errors = args.ignore_block_header_errors
settings.uboot_fix = args.uboot_fix
if args.filepath:
path = args.filepath
if not os.path.exists(path):
parser.error("File path doesn't exist.")
else:
parser.error('File path must be provided.')
sys.exit(1)
if args.start_offset:
start_offset = args.start_offset
elif args.guess_offset:
start_offset = guess_start_offset(path, args.guess_offset)
else:
start_offset = guess_start_offset(path)
if args.end_offset:
end_offset = args.end_offset
else:
end_offset = None
filetype = guess_filetype(path, start_offset)
if not filetype:
parser.error('Could not determine file type.')
if args.block_size:
block_size = args.block_size
else:
if filetype == UBI_EC_HDR_MAGIC:
block_size = guess_peb_size(path)
elif filetype == UBIFS_NODE_MAGIC:
block_size = guess_leb_size(path)
if not block_size:
parser.error('Block size could not be determined.')
if args.block_search_params:
try:
search_params = eval(args.block_search_params)
if not isinstance(search_params, dict):
parser.error('Search Param Error: Params must be a Dict of block PEB object items:value pairs.')
except NameError as e:
parser.error('Search Param Error: Dict key block attrs must be single quoted.')
except Exception as e:
parser.error('Search Param Error: %s' % e)
else:
parser.error('No search parameters given, -b arg is required.')
ufile_obj = ubi_file(path, block_size, start_offset, end_offset)
ubi_obj = ubi_base(ufile_obj)
blocks = []
for block in ubi_obj.blocks:
match = True
for key in search_params:
b = ubi_obj.blocks[block]
for attr in key.split('.'):
if hasattr(b, attr):
b = getattr(b, attr)
if isinstance(search_params[key], list):
if isinstance(b, list):
for value in b:
if value in search_params[key]:
break
else:
match = False
elif b not in search_params[key]:
match = False
elif b != search_params[key]:
match = False
break
if match:
blocks.append(ubi_obj.blocks[block])
ufile_obj.close()
print('\nBlock matches: %s' % len(blocks))
for block in blocks:
print(block.display())
if __name__=='__main__':
main()

@ -0,0 +1,192 @@
#!/usr/bin/env python
#############################################################
# ubi_reader
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
import os
import sys
import time
import argparse
from ubireader import settings
from ubireader.ubi import ubi
from ubireader.ubi.defines import UBI_EC_HDR_MAGIC
from ubireader.ubifs import ubifs
from ubireader.ubifs.defines import UBIFS_NODE_MAGIC
from ubireader.utils import guess_filetype, guess_start_offset, guess_leb_size, guess_peb_size
from ubireader.ubi_io import ubi_file, leb_virtual_file
def main():
start = time.time()
description = 'Show information about UBI or UBIFS image.'
usage = 'ubireader_display_info [options] filepath'
parser = argparse.ArgumentParser(usage=usage, description=description)
parser.add_argument('-l', '--log', action='store_true', dest='log',
help='Print extraction information to screen.')
parser.add_argument('-v', '--verbose-log', action='store_true', dest='verbose',
help='Prints nearly everything about anything to screen.')
parser.add_argument('-u', '--ubifs-info', action='store_true', dest='ubifs_info',
help='Get UBIFS information from inside a UBI image. (default: false)')
parser.add_argument('-p', '--peb-size', type=int, dest='block_size',
help='Specify PEB size. (UBI Only)')
parser.add_argument('-e', '--leb-size', type=int, dest='block_size',
help='Specify LEB size. (UBIFS Only)')
parser.add_argument('-s', '--start-offset', type=int, dest='start_offset',
help='Specify offset of UBI/UBIFS data in file. (default: 0)')
parser.add_argument('-n', '--end-offset', type=int, dest='end_offset',
help='Specify end offset of UBI/UBIFS data in file.')
parser.add_argument('-g', '--guess-offset', type=int, dest='guess_offset',
help='Specify offset to start guessing where UBI data is in file. (default: 0)')
parser.add_argument('-w', '--warn-only-block-read-errors', action='store_true', dest='warn_only_block_read_errors',
help='Attempts to continue extracting files even with bad block reads. Some data will be missing or corrupted! (default: False)')
parser.add_argument('-i', '--ignore-block-header-errors', action='store_true', dest='ignore_block_header_errors',
help='Forces unused and error containing blocks to be included and also displayed with log/verbose. (default: False)')
parser.add_argument('-f', '--u-boot-fix', action='store_true', dest='uboot_fix',
help='Assume blocks with image_seq 0 are because of older U-boot implementations and include them. (default: False)')
parser.add_argument('filepath', help='File to extract contents of.')
if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
args = parser.parse_args()
settings.logging_on = args.log
settings.logging_on_verbose = args.verbose
settings.warn_only_block_read_errors = args.warn_only_block_read_errors
settings.ignore_block_header_errors = args.ignore_block_header_errors
settings.uboot_fix = args.uboot_fix
if args.filepath:
path = args.filepath
if not os.path.exists(path):
parser.error("File path doesn't exist.")
if args.start_offset:
start_offset = args.start_offset
elif args.guess_offset:
start_offset = guess_start_offset(path, args.guess_offset)
else:
start_offset = guess_start_offset(path)
if args.end_offset:
end_offset = args.end_offset
else:
end_offset = None
filetype = guess_filetype(path, start_offset)
if not filetype:
parser.error('Could not determine file type.')
ubifs_info = args.ubifs_info
if args.block_size:
block_size = args.block_size
else:
if filetype == UBI_EC_HDR_MAGIC:
block_size = guess_peb_size(path)
elif filetype == UBIFS_NODE_MAGIC:
block_size = guess_leb_size(path)
if not block_size:
parser.error('Block size could not be determined.')
# Create file object.
ufile_obj = ubi_file(path, block_size, start_offset, end_offset)
if filetype == UBI_EC_HDR_MAGIC:
# Create UBI object
ubi_obj = ubi(ufile_obj)
# Display UBI info if not UBIFS request.
if not ubifs_info:
print(ubi_obj.display())
# Loop through found images in file.
for image in ubi_obj.images:
# Display image information if not UBIFS request.
if not ubifs_info:
print('%s' % image.display('\t'))
# Loop through volumes in each image.
for volume in image.volumes:
# Show UBI or UBIFS info.
if not ubifs_info:
# Display volume information.
print(image.volumes[volume].display('\t\t'))
else:
# Get blocks associated with this volume.
vol_blocks = image.volumes[volume].get_blocks(ubi_obj.blocks)
# Skip volume if empty.
if not len(vol_blocks):
continue
# Create LEB backed virtual file with volume blocks.
# Necessary to prevent having to load entire UBI image
# into memory.
lebv_file = leb_virtual_file(ubi_obj, vol_blocks)
# Create UBIFS object and print info.
ubifs_obj = ubifs(lebv_file)
print(ubifs_obj.display())
print(ubifs_obj.superblock_node.display('\t'))
print(ubifs_obj.master_node.display('\t'))
try:
print(ubifs_obj.master_node2.display('\t'))
except:
print('Master Node Error only one valid node.')
elif filetype == UBIFS_NODE_MAGIC:
# Create UBIFS object
ubifs_obj = ubifs(ufile_obj)
print(ubifs_obj.display())
print(ubifs_obj.superblock_node.display('\t'))
print(ubifs_obj.master_node.display('\t'))
try:
print(ubifs_obj.master_node2.display('\t'))
except:
print('Master Node Error only one valid node.')
else:
print('Something went wrong to get here.')
ufile_obj.close()
if __name__=='__main__':
main()

@ -0,0 +1,205 @@
#!/usr/bin/env python
#############################################################
# ubi_reader
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
import os
import sys
import time
import argparse
from ubireader import settings
from ubireader.ubi import ubi
from ubireader.ubi.defines import UBI_EC_HDR_MAGIC
from ubireader.ubifs import ubifs
from ubireader.ubifs.output import extract_files
from ubireader.ubifs.defines import UBIFS_NODE_MAGIC
from ubireader.ubi_io import ubi_file, leb_virtual_file
from ubireader.debug import error, log
from ubireader.utils import guess_filetype, guess_start_offset, guess_leb_size, guess_peb_size
def create_output_dir(outpath):
if os.path.exists(outpath):
if os.listdir(outpath):
error(create_output_dir, 'Fatal', 'Output directory is not empty. %s' % outpath)
else:
try:
os.makedirs(outpath)
log(create_output_dir, 'Created output path: %s' % outpath)
except Exception as e:
error(create_output_dir, 'Fatal', '%s' % e)
def main():
start = time.time()
description = 'Extract contents of a UBI or UBIFS image.'
usage = 'ubireader_extract_files [options] filepath'
parser = argparse.ArgumentParser(usage=usage, description=description)
parser.add_argument('-k', '--keep-permissions', action='store_true', dest='permissions',
help='Maintain file permissions, requires running as root. (default: False)')
parser.add_argument('-l', '--log', action='store_true', dest='log',
help='Print extraction information to screen.')
parser.add_argument('-v', '--verbose-log', action='store_true', dest='verbose',
help='Prints nearly everything about anything to screen.')
parser.add_argument('-p', '--peb-size', type=int, dest='block_size',
help='Specify PEB size. (UBI Only)')
parser.add_argument('-e', '--leb-size', type=int, dest='block_size',
help='Specify LEB size. (UBIFS Only)')
parser.add_argument('-s', '--start-offset', type=int, dest='start_offset',
help='Specify offset of UBI/UBIFS data in file. (default: 0)')
parser.add_argument('-n', '--end-offset', type=int, dest='end_offset',
help='Specify end offset of UBI/UBIFS data in file.')
parser.add_argument('-g', '--guess-offset', type=int, dest='guess_offset',
help='Specify offset to start guessing where UBI data is in file. (default: 0)')
parser.add_argument('-w', '--warn-only-block-read-errors', action='store_true', dest='warn_only_block_read_errors',
help='Attempts to continue extracting files even with bad block reads. Some data will be missing or corrupted! (default: False)')
parser.add_argument('-i', '--ignore-block-header-errors', action='store_true', dest='ignore_block_header_errors',
help='Forces unused and error containing blocks to be included and also displayed with log/verbose. (default: False)')
parser.add_argument('-f', '--u-boot-fix', action='store_true', dest='uboot_fix',
help='Assume blocks with image_seq 0 are because of older U-boot implementations and include them. (default: False)')
parser.add_argument('-o', '--output-dir', dest='outpath',
help='Specify output directory path.')
parser.add_argument('filepath', help='File to extract contents of.')
if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
args = parser.parse_args()
settings.logging_on = args.log
settings.logging_on_verbose = args.verbose
settings.warn_only_block_read_errors = args.warn_only_block_read_errors
settings.ignore_block_header_errors = args.ignore_block_header_errors
settings.uboot_fix = args.uboot_fix
if args.filepath:
path = args.filepath
if not os.path.exists(path):
parser.error("File path doesn't exist.")
if args.start_offset:
start_offset = args.start_offset
elif args.guess_offset:
start_offset = guess_start_offset(path, args.guess_offset)
else:
start_offset = guess_start_offset(path)
if args.end_offset:
end_offset = args.end_offset
else:
end_offset = None
filetype = guess_filetype(path, start_offset)
if not filetype:
parser.error('Could not determine file type.')
if args.outpath:
outpath = args.outpath
else:
outpath = settings.output_dir
if args.block_size:
block_size = args.block_size
else:
if filetype == UBI_EC_HDR_MAGIC:
block_size = guess_peb_size(path)
elif filetype == UBIFS_NODE_MAGIC:
block_size = guess_leb_size(path)
if not block_size:
parser.error('Block size could not be determined.')
perms = args.permissions
# Create file object.
ufile_obj = ubi_file(path, block_size, start_offset, end_offset)
if filetype == UBI_EC_HDR_MAGIC:
# Create UBI object
ubi_obj = ubi(ufile_obj)
# Loop through found images in file.
for image in ubi_obj.images:
# Create path for specific image
# In case multiple images in data
img_outpath = os.path.join(outpath, '%s' % image.image_seq)
# Loop through volumes in each image.
for volume in image.volumes:
# Get blocks associated with this volume.
vol_blocks = image.volumes[volume].get_blocks(ubi_obj.blocks)
# Create volume data output path.
vol_outpath = os.path.join(img_outpath, volume)
# Create volume output path directory.
create_output_dir(vol_outpath)
# Skip volume if empty.
if not len(vol_blocks):
continue
# Create LEB backed virtual file with volume blocks.
# Necessary to prevent having to load entire UBI image
# into memory.
lebv_file = leb_virtual_file(ubi_obj, vol_blocks)
# Extract files from UBI image.
ubifs_obj = ubifs(lebv_file)
print('Extracting files to: %s' % vol_outpath)
extract_files(ubifs_obj, vol_outpath, perms)
elif filetype == UBIFS_NODE_MAGIC:
# Create UBIFS object
ubifs_obj = ubifs(ufile_obj)
# Create directory for files.
create_output_dir(outpath)
# Extract files from UBIFS image.
print('Extracting files to: %s' % outpath)
extract_files(ubifs_obj, outpath, perms)
else:
print('Something went wrong to get here.')
ufile_obj.close()
if __name__=='__main__':
main()

@ -0,0 +1,174 @@
#!/usr/bin/env python
#############################################################
# ubi_reader
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
import os
import sys
import time
import argparse
from ubireader import settings
from ubireader.ubi import ubi
from ubireader.ubi.defines import UBI_EC_HDR_MAGIC
from ubireader.ubi_io import ubi_file
from ubireader.debug import error, log
from ubireader.utils import guess_filetype, guess_start_offset, guess_peb_size
def create_output_dir(outpath):
if not os.path.exists(outpath):
try:
os.makedirs(outpath)
log(create_output_dir, 'Created output path: %s' % outpath)
except Exception as e:
error(create_output_dir, 'Fatal', '%s' % e)
def main():
start = time.time()
description = 'Extract UBI or UBIFS images from file containing UBI data in it.'
usage = 'ubireader_extract_images [options] filepath'
parser = argparse.ArgumentParser(usage=usage, description=description)
parser.add_argument('-l', '--log', action='store_true', dest='log',
help='Print extraction information to screen.')
parser.add_argument('-v', '--verbose-log', action='store_true', dest='verbose',
help='Prints nearly everything about anything to screen.')
parser.add_argument('-p', '--peb-size', type=int, dest='block_size',
help='Specify PEB size.')
parser.add_argument('-u', '--image-type', dest='image_type',
help='Specify image type to extract UBI or UBIFS. (default: UBIFS)')
parser.add_argument('-s', '--start-offset', type=int, dest='start_offset',
help='Specify offset of UBI data in file. (default: 0)')
parser.add_argument('-n', '--end-offset', type=int, dest='end_offset',
help='Specify end offset of UBI data in file.')
parser.add_argument('-g', '--guess-offset', type=int, dest='guess_offset',
help='Specify offset to start guessing where UBI data is in file. (default: 0)')
parser.add_argument('-w', '--warn-only-block-read-errors', action='store_true', dest='warn_only_block_read_errors',
help='Attempts to continue extracting files even with bad block reads. Some data will be missing or corrupted! (default: False)')
parser.add_argument('-i', '--ignore-block-header-errors', action='store_true', dest='ignore_block_header_errors',
help='Forces unused and error containing blocks to be included and also displayed with log/verbose. (default: False)')
parser.add_argument('-f', '--u-boot-fix', action='store_true', dest='uboot_fix',
help='Assume blocks with image_seq 0 are because of older U-boot implementations and include them. (default: False)')
parser.add_argument('-o', '--output-dir', dest='outpath',
help='Specify output directory path.')
parser.add_argument('filepath', help='File to extract contents of.')
if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
args = parser.parse_args()
settings.logging_on = args.log
settings.logging_on_verbose = args.verbose
settings.warn_only_block_read_errors = args.warn_only_block_read_errors
settings.ignore_block_header_errors = args.ignore_block_header_errors
settings.uboot_fix = args.uboot_fix
if args.filepath:
path = args.filepath
if not os.path.exists(path):
parser.error("File path doesn't exist.")
if args.start_offset:
start_offset = args.start_offset
elif args.guess_offset:
start_offset = guess_start_offset(path, args.guess_offset)
else:
start_offset = guess_start_offset(path)
if args.end_offset:
end_offset = args.end_offset
else:
end_offset = None
filetype = guess_filetype(path, start_offset)
if filetype != UBI_EC_HDR_MAGIC:
parser.error('File does not look like UBI data.')
img_name = os.path.basename(path)
if args.outpath:
outpath = os.path.abspath(os.path.join(args.outpath, img_name))
else:
outpath = os.path.join(settings.output_dir, img_name)
if args.block_size:
block_size = args.block_size
else:
block_size = guess_peb_size(path)
if not block_size:
parser.error('Block size could not be determined.')
if args.image_type:
image_type = args.image_type.upper()
else:
image_type = 'UBIFS'
# Create file object.
ufile_obj = ubi_file(path, block_size, start_offset, end_offset)
# Create UBI object
ubi_obj = ubi(ufile_obj)
# Loop through found images in file.
for image in ubi_obj.images:
if image_type == 'UBI':
# Create output path and open file.
img_outpath = os.path.join(outpath, 'img-%s.ubi' % image.image_seq)
create_output_dir(outpath)
f = open(img_outpath, 'wb')
# Loop through UBI image blocks
for block in image.get_blocks(ubi_obj.blocks):
if ubi_obj.blocks[block].is_valid:
# Write block (PEB) to file
f.write(ubi_obj.file.read_block(ubi_obj.blocks[block]))
elif image_type == 'UBIFS':
# Loop through image volumes
for volume in image.volumes:
# Create output path and open file.
vol_outpath = os.path.join(outpath, 'img-%s_vol-%s.ubifs' % (image.image_seq, volume))
create_output_dir(outpath)
f = open(vol_outpath, 'wb')
# Loop through and write volume block data (LEB) to file.
for block in image.volumes[volume].reader(ubi_obj):
f.write(block)
ufile_obj.close()
if __name__=='__main__':
main()

@ -0,0 +1,180 @@
#!/usr/bin/env python
#############################################################
# ubi_reader
# (C) Collin Mulliner based on Jason Pruitt's ubireader_extract_images
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
import os
import sys
import time
import argparse
from ubireader import settings
from ubireader.ubi import ubi
from ubireader.ubi.defines import UBI_EC_HDR_MAGIC
from ubireader.ubifs import ubifs
from ubireader.ubifs.list import list_files, copy_file
from ubireader.ubifs.defines import UBIFS_NODE_MAGIC
from ubireader.ubi_io import ubi_file, leb_virtual_file
from ubireader.debug import error, log
from ubireader.utils import guess_filetype, guess_start_offset, guess_leb_size, guess_peb_size
def main():
start = time.time()
description = 'List and Extract files of a UBI or UBIFS image.'
usage = 'ubireader_list_files [options] filepath'
parser = argparse.ArgumentParser(usage=usage, description=description)
parser.add_argument('-l', '--log', action='store_true', dest='log',
help='Print extraction information to screen.')
parser.add_argument('-v', '--verbose-log', action='store_true', dest='verbose',
help='Prints nearly everything about anything to screen.')
parser.add_argument('-p', '--peb-size', type=int, dest='block_size',
help='Specify PEB size. (UBI Only)')
parser.add_argument('-e', '--leb-size', type=int, dest='block_size',
help='Specify LEB size. (UBIFS Only)')
parser.add_argument('-s', '--start-offset', type=int, dest='start_offset',
help='Specify offset of UBI/UBIFS data in file. (default: 0)')
parser.add_argument('-n', '--end-offset', type=int, dest='end_offset',
help='Specify end offset of UBI/UBIFS data in file.')
parser.add_argument('-g', '--guess-offset', type=int, dest='guess_offset',
help='Specify offset to start guessing where UBI data is in file. (default: 0)')
parser.add_argument('-w', '--warn-only-block-read-errors', action='store_true', dest='warn_only_block_read_errors',
help='Attempts to continue extracting files even with bad block reads. Some data will be missing or corrupted! (default: False)')
parser.add_argument('-i', '--ignore-block-header-errors', action='store_true', dest='ignore_block_header_errors',
help='Forces unused and error containing blocks to be included and also displayed with log/verbose. (default: False)')
parser.add_argument('-f', '--u-boot-fix', action='store_true', dest='uboot_fix',
help='Assume blocks with image_seq 0 are because of older U-boot implementations and include them. (default: False)')
parser.add_argument('-P', '--path', dest='listpath',
help='Path to list.')
parser.add_argument('-C', '--copy', dest='copyfile',
help='File to Copy.')
parser.add_argument('-D', '--copy-dest', dest='copyfiledest',
help='Copy Destination.')
parser.add_argument('filepath', help='UBI/UBIFS image file.')
if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
args = parser.parse_args()
settings.logging_on = args.log
settings.logging_on_verbose = args.verbose
settings.warn_only_block_read_errors = args.warn_only_block_read_errors
settings.ignore_block_header_errors = args.ignore_block_header_errors
settings.uboot_fix = args.uboot_fix
if args.filepath:
path = args.filepath
if not os.path.exists(path):
parser.error("File path doesn't exist.")
if args.start_offset:
start_offset = args.start_offset
elif args.guess_offset:
start_offset = guess_start_offset(path, args.guess_offset)
else:
start_offset = guess_start_offset(path)
if args.end_offset:
end_offset = args.end_offset
else:
end_offset = None
filetype = guess_filetype(path, start_offset)
if not filetype:
parser.error('Could not determine file type.')
if args.block_size:
block_size = args.block_size
else:
if filetype == UBI_EC_HDR_MAGIC:
block_size = guess_peb_size(path)
elif filetype == UBIFS_NODE_MAGIC:
block_size = guess_leb_size(path)
if not block_size:
parser.error('Block size could not be determined.')
# Create file object.
ufile_obj = ubi_file(path, block_size, start_offset, end_offset)
if filetype == UBI_EC_HDR_MAGIC:
# Create UBI object
ubi_obj = ubi(ufile_obj)
# Loop through found images in file.
for image in ubi_obj.images:
# Loop through volumes in each image.
for volume in image.volumes:
# Get blocks associated with this volume.
vol_blocks = image.volumes[volume].get_blocks(ubi_obj.blocks)
# Skip volume if empty.
if not len(vol_blocks):
continue
# Create LEB backed virtual file with volume blocks.
# Necessary to prevent having to load entire UBI image
# into memory.
lebv_file = leb_virtual_file(ubi_obj, vol_blocks)
# Create UBIFS object.
ubifs_obj = ubifs(lebv_file)
if args.listpath:
list_files(ubifs_obj, args.listpath)
if args.copyfile and args.copyfiledest:
copy_file(ubifs_obj, args.copyfile, args.copyfiledest)
elif filetype == UBIFS_NODE_MAGIC:
# Create UBIFS object
ubifs_obj = ubifs(ufile_obj)
if args.listpath:
list_files(ubifs_obj, args.listpath)
if args.copyfile and args.copyfiledest:
copy_file(ubifs_obj, args.copyfile, args.copyfiledest)
else:
print('Something went wrong to get here.')
ufile_obj.close()
if __name__=='__main__':
main()

@ -0,0 +1,350 @@
#!/usr/bin/env python
#############################################################
# ubi_reader
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
import os
import sys
import time
import argparse
if (sys.version_info > (3, 0)):
import configparser
else:
import ConfigParser as configparser
from ubireader import settings
from ubireader.ubi import ubi
from ubireader.ubi.defines import UBI_EC_HDR_MAGIC, PRINT_VOL_TYPE_LIST, UBI_VTBL_AUTORESIZE_FLG
from ubireader.ubifs import ubifs
from ubireader.ubifs.defines import PRINT_UBIFS_KEY_HASH, PRINT_UBIFS_COMPR
from ubireader.ubi_io import ubi_file, leb_virtual_file
from ubireader.debug import error, log
from ubireader.utils import guess_filetype, guess_start_offset, guess_peb_size
def create_output_dir(outpath):
if os.path.exists(outpath):
if os.listdir(outpath):
error(create_output_dir, 'Fatal', 'Output directory is not empty. %s' % outpath)
else:
try:
os.makedirs(outpath)
log(create_output_dir, 'Created output path: %s' % outpath)
except Exception as e:
error(create_output_dir, 'Fatal', '%s' % e)
def get_ubi_params(ubi_obj):
"""Get ubi_obj utils params
Arguments:
Obj:ubi -- UBI object.
Returns:
Dict -- Dict keyed to volume with Dict of args and flags.
"""
ubi_flags = {'min_io_size':'-m',
'max_bud_bytes':'-j',
'leb_size':'-e',
'default_compr':'-x',
'sub_page_size':'-s',
'fanout':'-f',
'key_hash':'-k',
'orph_lebs':'-p',
'log_lebs':'-l',
'max_leb_cnt': '-c',
'peb_size':'-p',
'sub_page_size':'-s',
'vid_hdr_offset':'-O',
'version':'-x',
'image_seq':'-Q',
'alignment':'-a',
'vol_id':'-n',
'name':'-N'}
ubi_params = {}
ubi_args = {}
ini_params = {}
for image in ubi_obj.images:
img_seq = image.image_seq
ubi_params[img_seq] = {}
ubi_args[img_seq] = {}
ini_params[img_seq] = {}
for volume in image.volumes:
ubi_args[img_seq][volume] = {}
ini_params[img_seq][volume] = {}
# Get ubinize.ini settings
ini_params[img_seq][volume]['vol_type'] = PRINT_VOL_TYPE_LIST[image.volumes[volume].vol_rec.vol_type]
if image.volumes[volume].vol_rec.flags == UBI_VTBL_AUTORESIZE_FLG:
ini_params[img_seq][volume]['vol_flags'] = 'autoresize'
else:
ini_params[img_seq][volume]['vol_flags'] = image.volumes[volume].vol_rec.flags
ini_params[img_seq][volume]['vol_id'] = image.volumes[volume].vol_id
ini_params[img_seq][volume]['vol_name'] = image.volumes[volume].name.rstrip(b'\x00').decode('utf-8')
ini_params[img_seq][volume]['vol_alignment'] = image.volumes[volume].vol_rec.alignment
ini_params[img_seq][volume]['vol_size'] = image.volumes[volume].vol_rec.reserved_pebs * ubi_obj.leb_size
# Create file object backed by UBI blocks.
lebv_file = leb_virtual_file(ubi_obj, image.volumes[volume].get_blocks(ubi_obj.blocks))
# Create UBIFS object
ubifs_obj = ubifs(lebv_file)
for key, value in ubifs_obj.superblock_node:
if key == 'key_hash':
value = PRINT_UBIFS_KEY_HASH[value]
elif key == 'default_compr':
value = PRINT_UBIFS_COMPR[value]
if key in ubi_flags:
ubi_args[img_seq][volume][key] = value
for key, value in image.volumes[volume].vol_rec:
if key == 'name':
value = value.rstrip(b'\x00').decode('utf-8')
if key in ubi_flags:
ubi_args[img_seq][volume][key] = value
ubi_args[img_seq][volume]['version'] = image.version
ubi_args[img_seq][volume]['vid_hdr_offset'] = image.vid_hdr_offset
ubi_args[img_seq][volume]['sub_page_size'] = ubi_args[img_seq][volume]['vid_hdr_offset']
ubi_args[img_seq][volume]['sub_page_size'] = ubi_args[img_seq][volume]['vid_hdr_offset']
ubi_args[img_seq][volume]['image_seq'] = image.image_seq
ubi_args[img_seq][volume]['peb_size'] = ubi_obj.peb_size
ubi_args[img_seq][volume]['vol_id'] = image.volumes[volume].vol_id
ubi_params[img_seq][volume] = {'flags':ubi_flags, 'args':ubi_args[img_seq][volume], 'ini':ini_params[img_seq][volume]}
return ubi_params
def print_ubi_params(ubi_obj):
ubi_params = get_ubi_params(ubi_obj)
for img_params in ubi_params:
for volume in ubi_params[img_params]:
ubi_flags = ubi_params[img_params][volume]['flags']
ubi_args = ubi_params[img_params][volume]['args']
ini_params = ubi_params[img_params][volume]['ini']
sorted_keys = sorted(ubi_params[img_params][volume]['args'])
print('\nVolume %s' % volume)
for key in sorted_keys:
if len(key)< 8:
name = '%s\t' % key
else:
name = key
print('\t%s\t%s %s' % (name, ubi_flags[key], ubi_args[key]))
print('\n\t#ubinize.ini#')
print('\t[%s]' % ini_params['vol_name'])
for key in ini_params:
if key != 'name':
print('\t%s=%s' % (key, ini_params[key]))
def make_files(ubi, outpath):
ubi_params = get_ubi_params(ubi)
for img_params in ubi_params:
config = configparser.ConfigParser()
img_outpath = os.path.join(outpath, 'img-%s' % img_params)
if not os.path.exists(img_outpath):
os.mkdir(img_outpath)
ini_path = os.path.join(img_outpath, 'img-%s.ini' % img_params)
ubi_file = os.path.join('img-%s.ubi' % img_params)
script_path = os.path.join(img_outpath, 'create_ubi_img-%s.sh' % img_params)
ubifs_files =[]
buf = '#!/bin/sh\n'
print('Writing to: %s' % script_path)
with open(script_path, 'w') as fscr:
with open(ini_path, 'w') as fini:
print('Writing to: %s' % ini_path)
vol_idx = 0
for volume in ubi_params[img_params]:
ubifs_files.append(os.path.join('img-%s_%s.ubifs' % (img_params, vol_idx)))
ini_params = ubi_params[img_params][volume]['ini']
ini_file = 'img-%s.ini' % img_params
config.add_section(volume)
config.set(volume, 'mode', 'ubi')
config.set(volume, 'image', ubifs_files[vol_idx])
for i in ini_params:
config.set(volume, i, str(ini_params[i]))
ubi_flags = ubi_params[img_params][volume]['flags']
ubi_args = ubi_params[img_params][volume]['args']
mkfs_flags = ['min_io_size',
'leb_size',
'max_leb_cnt',
'default_compr',
'fanout',
'key_hash',
'orph_lebs',
'log_lebs']
argstr = ''
for flag in mkfs_flags:
argstr += ' %s %s' % (ubi_flags[flag], ubi_args[flag])
#leb = '%s %s' % (ubi_flags['leb_size'], ubi_args['leb_size'])
peb = '%s %s' % (ubi_flags['peb_size'], ubi_args['peb_size'])
min_io = '%s %s' % (ubi_flags['min_io_size'], ubi_args['min_io_size'])
#leb_cnt = '%s %s' % (ubi_flags['max_leb_cnt'], ubi_args['max_leb_cnt'])
vid_hdr = '%s %s' % (ubi_flags['vid_hdr_offset'], ubi_args['vid_hdr_offset'])
sub_page = '%s %s' % (ubi_flags['sub_page_size'], ubi_args['sub_page_size'])
buf += '/usr/sbin/mkfs.ubifs%s -r $%s %s\n' % (argstr, (vol_idx+1), ubifs_files[vol_idx])
vol_idx += 1
config.write(fini)
ubinize_flags = ['peb_size',
'min_io_size',
'vid_hdr_offset',
'sub_page_size',
'version',
'image_seq']
argstr = ''
for flag in ubinize_flags:
argstr += ' %s %s' % (ubi_flags[flag], ubi_args[flag])
buf += '/usr/sbin/ubinize%s -o %s %s\n' % (argstr, ubi_file, ini_file)
fscr.write(buf)
os.chmod(script_path, 0o755)
def main():
start = time.time()
description = 'Determine settings for recreating UBI image.'
usage = 'ubireader_utils_info [options] filepath'
parser = argparse.ArgumentParser(usage=usage, description=description)
parser.add_argument('-r', '--show-only', action='store_true', dest='show_only',
help='Print parameters to screen only. (default: false)')
parser.add_argument('-l', '--log', action='store_true', dest='log',
help='Print extraction information to screen.')
parser.add_argument('-v', '--verbose-log', action='store_true', dest='verbose',
help='Prints nearly everything about anything to screen.')
parser.add_argument('-p', '--peb-size', type=int, dest='block_size',
help='Specify PEB size.')
parser.add_argument('-s', '--start-offset', type=int, dest='start_offset',
help='Specify offset of UBI data in file. (default: 0)')
parser.add_argument('-n', '--end-offset', type=int, dest='end_offset',
help='Specify end offset of UBI data in file.')
parser.add_argument('-g', '--guess-offset', type=int, dest='guess_offset',
help='Specify offset to start guessing where UBI data is in file. (default: 0)')
parser.add_argument('-w', '--warn-only-block-read-errors', action='store_true', dest='warn_only_block_read_errors',
help='Attempts to continue extracting files even with bad block reads. Some data will be missing or corrupted! (default: False)')
parser.add_argument('-i', '--ignore-block-header-errors', action='store_true', dest='ignore_block_header_errors',
help='Forces unused and error containing blocks to be included and also displayed with log/verbose. (default: False)')
parser.add_argument('-f', '--u-boot-fix', action='store_true', dest='uboot_fix',
help='Assume blocks with image_seq 0 are because of older U-boot implementations and include them. (default: False)')
parser.add_argument('-o', '--output-dir', dest='outpath',
help='Specify output directory path.')
parser.add_argument('filepath', help='File to extract contents of.')
if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
args = parser.parse_args()
settings.logging_on = args.log
settings.logging_on_verbose = args.verbose
settings.warn_only_block_read_errors = args.warn_only_block_read_errors
settings.ignore_block_header_errors = args.ignore_block_header_errors
settings.uboot_fix = args.uboot_fix
if args.filepath:
path = args.filepath
if not os.path.exists(path):
parser.error("File path doesn't exist.")
if args.start_offset:
start_offset = args.start_offset
elif args.guess_offset:
start_offset = guess_start_offset(path, args.guess_offset)
else:
start_offset = guess_start_offset(path)
if args.end_offset:
end_offset = args.end_offset
else:
end_offset = None
filetype = guess_filetype(path, start_offset)
if filetype != UBI_EC_HDR_MAGIC:
parser.error('File does not look like UBI data.')
img_name = os.path.basename(path)
if args.outpath:
outpath = os.path.abspath(os.path.join(args.outpath, img_name))
else:
outpath = os.path.join(settings.output_dir, img_name)
if args.block_size:
block_size = args.block_size
else:
block_size = guess_peb_size(path)
if not block_size:
parser.error('Block size could not be determined.')
# Create file object.
ufile_obj = ubi_file(path, block_size, start_offset, end_offset)
# Create UBI object
ubi_obj = ubi(ufile_obj)
# Print info.
print_ubi_params(ubi_obj)
if not args.show_only:
create_output_dir(outpath)
# Create build scripts.
make_files(ubi_obj, outpath)
ufile_obj.close()
if __name__=='__main__':
main()

@ -0,0 +1,34 @@
#!/usr/bin/env python
#############################################################
# ubi_reader
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
output_dir = 'ubifs-root'
error_action = True # if 'exit' on any error exit program.
fatal_traceback = False # Print traceback on fatal errors.
ignore_block_header_errors = False # Ignore block errors.
warn_only_block_read_errors = False # Warning instead of Fatal error.
logging_on = False # Print debug info on.
logging_on_verbose = False # Print verbose debug info on.
use_dummy_socket_file = False # Create regular file place holder for sockets.
use_dummy_devices = False # Create regular file place holder for devices.
uboot_fix = False # Older u-boot sets image_seq to 0 on blocks it's written to.

@ -0,0 +1,226 @@
#!/usr/bin/env python
#############################################################
# ubi_reader/ubi
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
from ubireader.debug import error
from ubireader.ubi.block import sort, extract_blocks
from ubireader.ubi import display
from ubireader.ubi.image import description as image
from ubireader.ubi.block import layout, rm_old_blocks
class ubi_base(object):
"""UBI Base object
Arguments:
Obj:image -- UBI image object
Attributes:
ubi_file:file -- ubi_file object
Int:block_count -- Number of blocks found.
Int:first_peb_num -- Number of the first UBI PEB in file.
Int:leb_size -- Size of Logical Erase Blocks.
Int:peb_size -- Size of Physical Erase Blocks.
Int:min_io -- Size of min I/O from vid_hdr_offset.
Dict:blocks -- Dict keyed by PEB number of all blocks.
"""
def __init__(self, ubi_file):
self.__name__ = 'UBI'
self._file = ubi_file
self._first_peb_num = 0
self._blocks = extract_blocks(self)
self._block_count = len(self.blocks)
if self._block_count <= 0:
error(self, 'Fatal', 'No blocks found.')
arbitrary_block = next(iter(self.blocks.values()))
self._min_io_size = arbitrary_block.ec_hdr.vid_hdr_offset
self._leb_size = self.file.block_size - arbitrary_block.ec_hdr.data_offset
def _get_file(self):
"""UBI File object
Returns:
Obj -- UBI File object.
"""
return self._file
file = property(_get_file)
def _get_block_count(self):
"""Total amount of UBI blocks in file.
Returns:
Int -- Number of blocks
"""
return self._block_count
block_count = property(_get_block_count)
def _set_first_peb_num(self, i):
self._first_peb_num = i
def _get_first_peb_num(self):
"""First Physical Erase Block with UBI data
Returns:
Int -- Number of the first PEB.
"""
return self._first_peb_num
first_peb_num = property(_get_first_peb_num, _set_first_peb_num)
def _get_leb_size(self):
"""LEB size of UBI blocks in file.
Returns:
Int -- LEB Size.
"""
return self._leb_size
leb_size = property(_get_leb_size)
def _get_peb_size(self):
"""PEB size of UBI blocks in file.
Returns:
Int -- PEB Size.
"""
return self.file.block_size
peb_size = property(_get_peb_size)
def _get_min_io_size(self):
"""Min I/O Size
Returns:
Int -- Min I/O Size.
"""
return self._min_io_size
min_io_size = property(_get_min_io_size)
def _get_blocks(self):
"""Main Dict of UBI Blocks
Passed around for lists of indexes to be made or to be returned
filtered through a list. So there isn't multiple copies of blocks,
as there can be thousands.
"""
return self._blocks
blocks = property(_get_blocks)
class ubi(ubi_base):
"""UBI object
Arguments:
Obj:ubi_file -- UBI file object.
Attributes:
Inherits:ubi_base -- ubi_base attributes.
List:images -- List of UBI image objects.
List:data_blocks_list -- List of all data blocks in file.
List:layout_blocks_list -- List of all layout blocks in file.
List:int_vol_blocks_list -- List of internal volumes minus layout.
List:unknown_blocks_list -- List of blocks with unknown types. *
"""
def __init__(self, ubi_file):
super(ubi, self).__init__(ubi_file)
layout_list, data_list, int_vol_list, unknown_list = sort.by_type(self.blocks)
self._layout_blocks_list = layout_list
self._data_blocks_list = data_list
self._int_vol_blocks_list = int_vol_list
self._unknown_blocks_list = unknown_list
newest_layout_list = rm_old_blocks(self.blocks, self.layout_blocks_list)
if len(newest_layout_list) < 2:
error(self, 'Fatal', 'Less than 2 layout blocks found.')
layout_pairs = layout.group_pairs(self.blocks, newest_layout_list)
layout_infos = layout.associate_blocks(self.blocks, layout_pairs)
self._images = []
for i in range(0, len(layout_infos)):
self._images.append(image(self.blocks, layout_infos[i]))
def _get_images(self):
"""Get UBI images.
Returns:
List -- Of volume objects groupled by image.
"""
return self._images
images = property(_get_images)
def _get_data_blocks_list(self):
"""Get all UBI blocks found in file that are data blocks.
Returns:
List -- List of block objects.
"""
return self._data_blocks_list
data_blocks_list = property(_get_data_blocks_list)
def _get_layout_blocks_list(self):
"""Get all UBI blocks found in file that are layout volume blocks.
Returns:
List -- List of block objects.
"""
return self._layout_blocks_list
layout_blocks_list = property(_get_layout_blocks_list)
def _get_int_vol_blocks_list(self):
"""Get all UBI blocks found in file that are internal volume blocks.
Returns:
List -- List of block objects.
This does not include layout blocks.
"""
return self._int_vol_blocks_list
int_vol_blocks_list = property(_get_int_vol_blocks_list)
def _get_unknown_blocks_list(self):
"""Get all UBI blocks found in file of unknown type..
Returns:
List -- List of block objects.
"""
return self._unknown_blocks_list
unknown_blocks_list = property(_get_unknown_blocks_list)
def display(self, tab=''):
"""Print information about this object.
Argument:
Str:tab -- '\t' for spacing this object.
"""
return display.ubi(self, tab)

@ -0,0 +1,237 @@
#!/usr/bin/env python
#############################################################
# ubi_reader/ubi
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
from zlib import crc32
from ubireader import settings
from ubireader.debug import error, log, verbose_display, verbose_log
from ubireader.ubi import display
from ubireader.ubi.defines import UBI_EC_HDR_SZ, UBI_VID_HDR_SZ, UBI_INTERNAL_VOL_START, UBI_EC_HDR_MAGIC, UBI_CRC32_INIT
from ubireader.ubi.headers import ec_hdr, vid_hdr, vtbl_recs
class description(object):
"""UBI Block description Object
UBI Specifications:
http://www.linux-mtd.infradead.org/ -- Home page
<kernel>/drivers/mtd/ubi/ubi-media.h -- Header structs
and defines
Attributes:
Obj:ec_hdr -- Error Count Header
Obj:vid_hdr -- Volume ID Header
List:vtbl_recs -- (Optional) List of Volume Table Records.
Bool:is_vtbl -- If contains volume records table.
Bool:is_internal_vol -- If Vol ID is > UBI_INTERNAL_VOL_START
Bool:is_valid -- If ec_hdr & vid_hdr are error free.
Int:peb_num -- Physical Erase Block number.
Int:leb_num -- Logical Erase Block number.
Int:file_offset -- Address location in file of this block.
Int:size -- Size of total block data or PEB size.
Int:data_crc -- crc32 of block data.
Will print out all information when invoked as a string.
"""
def __init__(self, block_buf):
self.file_offset = -1
self.peb_num = -1
self.leb_num = -1
self.size = -1
self.vid_hdr = None
self.is_internal_vol = False
self.vtbl_recs = []
# TODO better understanding of block types/errors
self.ec_hdr = ec_hdr(block_buf[0:UBI_EC_HDR_SZ])
if not self.ec_hdr.errors or settings.ignore_block_header_errors:
self.vid_hdr = vid_hdr(block_buf[self.ec_hdr.vid_hdr_offset:self.ec_hdr.vid_hdr_offset+UBI_VID_HDR_SZ])
if not self.vid_hdr.errors or settings.ignore_block_header_errors:
self.is_internal_vol = self.vid_hdr.vol_id >= UBI_INTERNAL_VOL_START
if self.vid_hdr.vol_id >= UBI_INTERNAL_VOL_START:
self.vtbl_recs = vtbl_recs(block_buf[self.ec_hdr.data_offset:])
self.leb_num = self.vid_hdr.lnum
self.is_vtbl = bool(self.vtbl_recs) or False
self.is_valid = not self.ec_hdr.errors and not self.vid_hdr.errors or settings.ignore_block_header_errors
def __repr__(self):
return 'Block: PEB# %s: LEB# %s' % (self.peb_num, self.leb_num)
def display(self, tab=''):
return display.block(self, tab)
def get_blocks_in_list(blocks, idx_list):
"""Retrieve block objects in list of indexes
Arguments:
List:blocks -- List of block objects
List:idx_list -- List of block indexes
Returns:
Dict:blocks -- List of block objects generated
from provided list of indexes in
order of idx_list.
"""
return {i:blocks[i] for i in idx_list}
def extract_blocks(ubi):
"""Get a list of UBI block objects from file
Arguments:.
Obj:ubi -- UBI object.
Returns:
Dict -- Of block objects keyed by PEB number.
"""
blocks = {}
ubi.file.seek(ubi.file.start_offset)
peb_count = 0
cur_offset = 0
bad_blocks = []
# range instead of xrange, as xrange breaks > 4GB end_offset.
for i in range(ubi.file.start_offset, ubi.file.end_offset, ubi.file.block_size):
buf = ubi.file.read(ubi.file.block_size)
if buf.startswith(UBI_EC_HDR_MAGIC):
blk = description(buf)
blk.file_offset = i
blk.peb_num = ubi.first_peb_num + peb_count
blk.size = ubi.file.block_size
blk.data_crc = (~crc32(buf[blk.ec_hdr.data_offset:blk.ec_hdr.data_offset+blk.vid_hdr.data_size]) & UBI_CRC32_INIT)
blocks[blk.peb_num] = blk
peb_count += 1
log(extract_blocks, blk)
verbose_log(extract_blocks, 'file addr: %s' % (ubi.file.last_read_addr()))
ec_hdr_errors = ''
vid_hdr_errors = ''
if blk.ec_hdr.errors:
ec_hdr_errors = ','.join(blk.ec_hdr.errors)
if blk.vid_hdr and blk.vid_hdr.errors:
vid_hdr_errors = ','.join(blk.vid_hdr.errors)
if ec_hdr_errors or vid_hdr_errors:
if blk.peb_num not in bad_blocks:
bad_blocks.append(blk.peb_num)
log(extract_blocks, 'PEB: %s has possible issue EC_HDR [%s], VID_HDR [%s]' % (blk.peb_num, ec_hdr_errors, vid_hdr_errors))
verbose_display(blk)
else:
cur_offset += ubi.file.block_size
ubi.first_peb_num = cur_offset//ubi.file.block_size
ubi.file.start_offset = cur_offset
return blocks
def rm_old_blocks(blocks, block_list):
del_blocks = []
for i in block_list:
if i in del_blocks:
continue
if blocks[i].is_valid is not True:
del_blocks.append(i)
continue
for k in block_list:
if i == k:
continue
if k in del_blocks:
continue
if blocks[k].is_valid is not True:
del_blocks.append(k)
continue
if blocks[i].leb_num != blocks[k].leb_num:
continue
if blocks[i].ec_hdr.image_seq != blocks[k].ec_hdr.image_seq:
continue
second_newer = blocks[k].vid_hdr.sqnum > blocks[i].vid_hdr.sqnum
del_block = None
use_block = None
if second_newer:
if blocks[k].vid_hdr.copy_flag == 0:
del_block = i
use_block = k
else:
if blocks[i].vid_hdr.copy_flag == 0:
del_block = k
use_block = i
if del_block is not None:
del_blocks.append(del_block)
log(rm_old_blocks, 'Old block removed (copy_flag): PEB %s, LEB %s, Using PEB%s' % (blocks[del_block].peb_num, blocks[del_block].leb_num, use_block))
break
if second_newer:
if blocks[k].data_crc != blocks[k].vid_hdr.data_crc:
del_block = k
use_block = i
else:
del_block = i
use_block = k
else:
if blocks[i].data_crc != blocks[i].vid_hdr.data_crc:
del_block = i
use_block = k
else:
del_block = k
use_block = i
if del_block is not None:
del_blocks.append(del_block)
log(rm_old_blocks, 'Old block removed (data_crc): PEB %s, LEB %s, vid_hdr.data_crc %s / %s, Using PEB %s' % (blocks[del_block].peb_num,
blocks[del_block].leb_num,
blocks[del_block].vid_hdr.data_crc,
blocks[del_block].data_crc,
use_block))
else:
use_block = min(k, i)
del_blocks.append(use_block)
error('Warn', rm_old_blocks, 'Multiple PEB [%s] for LEB %s: Using first.' % (', '.join(i, k), blocks[i].leb_num, use_block))
break
return [j for j in block_list if j not in del_blocks]

@ -0,0 +1,64 @@
#!/usr/bin/env python
#############################################################
# ubi_reader/ubi
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
from ubireader.debug import log
from ubireader.ubi.block import sort
def group_pairs(blocks, layout_blocks_list):
"""Sort a list of layout blocks into pairs
Arguments:
List:blocks -- List of block objects
List:layout_blocks -- List of layout block indexes
Returns:
List -- Layout block pair indexes grouped in a list
"""
image_dict={}
for block_id in layout_blocks_list:
image_seq=blocks[block_id].ec_hdr.image_seq
if image_seq not in image_dict:
image_dict[image_seq]=[block_id]
else:
image_dict[image_seq].append(block_id)
log(group_pairs, 'Layout blocks found at PEBs: %s' % list(image_dict.values()))
return list(image_dict.values())
def associate_blocks(blocks, layout_pairs):
"""Group block indexes with appropriate layout pairs
Arguments:
List:blocks -- List of block objects
List:layout_pairs -- List of grouped layout blocks
Returns:
List -- Layout block pairs grouped with associated block ranges.
"""
seq_blocks = []
for layout_pair in layout_pairs:
seq_blocks = sort.by_image_seq(blocks, blocks[layout_pair[0]].ec_hdr.image_seq)
seq_blocks = [b for b in seq_blocks if b not in layout_pair]
layout_pair.append(seq_blocks)
return layout_pairs

@ -0,0 +1,130 @@
#!/usr/bin/env python
#############################################################
# ubi_reader/ubi
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
from ubireader import settings
def by_image_seq(blocks, image_seq):
"""Filter blocks to return only those associated with the provided image_seq number.
If uboot_fix is set, associate blocks with an image_seq of 0 also.
Argument:
List:blocks -- List of block objects to sort.
Int:image_seq -- image_seq number found in ec_hdr.
Returns:
List -- List of block indexes matching image_seq number.
"""
if settings.uboot_fix:
return list(filter(lambda block: blocks[block].ec_hdr.image_seq == image_seq or image_seq == 0 or blocks[block].ec_hdr.image_seq == 0, blocks))
else:
return list(filter(lambda block: blocks[block].ec_hdr.image_seq == image_seq, blocks))
def by_leb(blocks):
"""Sort blocks by Logical Erase Block number.
Arguments:
List:blocks -- List of block objects to sort.
Returns:
List -- Indexes of blocks sorted by LEB.
"""
slist_len = len(blocks)
slist = ['x'] * slist_len
for block in blocks:
if blocks[block].leb_num >= slist_len:
add_elements = blocks[block].leb_num - slist_len + 1
slist += (['x'] * add_elements)
slist_len = len(slist)
slist[blocks[block].leb_num] = block
return slist
def by_vol_id(blocks, slist=None):
"""Sort blocks by volume id
Arguments:
Obj:blocks -- List of block objects.
List:slist -- (optional) List of block indexes.
Return:
Dict -- blocks grouped in lists with dict key as volume id.
"""
vol_blocks = {}
# sort block by volume
# not reliable with multiple partitions (fifo)
for i in blocks:
if slist and i not in slist:
continue
elif not blocks[i].is_valid:
continue
if blocks[i].vid_hdr.vol_id not in vol_blocks:
vol_blocks[blocks[i].vid_hdr.vol_id] = []
vol_blocks[blocks[i].vid_hdr.vol_id].append(blocks[i].peb_num)
return vol_blocks
def by_type(blocks, slist=None):
"""Sort blocks into layout, internal volume, data or unknown
Arguments:
Obj:blocks -- List of block objects.
List:slist -- (optional) List of block indexes.
Returns:
List:layout -- List of block indexes of blocks containing the
volume table records.
List:data -- List of block indexes containing filesystem data.
List:int_vol -- List of block indexes containing volume ids
greater than UBI_INTERNAL_VOL_START that are not
layout volumes.
List:unknown -- List of block indexes of blocks that failed validation
of crc in ed_hdr or vid_hdr.
"""
layout = []
data = []
int_vol = []
unknown = []
for i in blocks:
if slist and i not in slist:
continue
if blocks[i].is_vtbl and blocks[i].is_valid:
layout.append(i)
elif blocks[i].is_internal_vol and blocks[i].is_valid:
int_vol.append(i)
elif blocks[i].is_valid:
data.append(i)
else:
unknown.append(i)
return layout, data, int_vol, unknown

@ -0,0 +1,119 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#############################################################
# Adapted in part from linux-source-3.2/drivers/mtd/ubi/ubi-media.h
# for use in Python.
# Oct. 2013 by Jason Pruitt
#
# Original copyright notice.
# --------------------------
#
# Copyright (c) International Business Machines Corp., 2006
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Authors: Artem Bityutskiy (Битюцкий Артём)
# Thomas Gleixner
# Frank Haverkamp
# Oliver Lohmann
# Andreas Arnez
#
#############################################################
import struct
# Magic number
UBI_EC_HDR_MAGIC = b'\x55\x42\x49\x23'
# Initial CRC32 checksum value.
UBI_CRC32_INIT = 4294967295 #0xFFFFFFFF
# Max number of volumes allowed.
UBI_MAX_VOLUMES = 128
# Internal Volume ID start.
UBI_INTERNAL_VOL_START = 2147479551
# Error Count header.
EC_HDR_FORMAT = '>4sB3sQIII32sI'
EC_HDR_FIELDS = ['magic', # Magic string UBI#
'version', # UBI version meant to accept this image.
'padding', # Reserved for future, zeros.
'ec', # Erase counter
'vid_hdr_offset', # Where the VID header starts.
'data_offset', # Where user data starts.
'image_seq', # Image sequence number
'padding2', # Reserved for future, zeros.
'hdr_crc'] # EC header crc32 checksum.
UBI_EC_HDR_SZ = struct.calcsize(EC_HDR_FORMAT) # 64
# Volume ID header.
UBI_VID_HDR_MAGIC =b'\x55\x42\x49\x21' # UBI!
VID_HDR_FORMAT = '>4sBBBBII4sIIII4sQ12sI'
VID_HDR_FIELDS = ['magic', # Magic string UBI!
'version', # UBI version meant to accept this image.
'vol_type', # Volume type, Dynamic/Static
'copy_flag', # If this is a copied PEB b/c of wear leveling.
'compat', # Compatibility of this volume UBI_COMPAT_*
'vol_id', # ID of this volume.
'lnum', # LEB number.
'padding', # Reserved for future, zeros.
'data_size', # How many bytes of data this contains.
# Used for static types only.
'used_ebs', # Total num of used LEBs in this volume.
'data_pad', # How many bytes at end of LEB are not used.
'data_crc', # CRC32 checksum of data, static type only.
'padding2', # Reserved for future, zeros.
'sqnum', # Sequence number.
'padding3', # Reserved for future, zeros.
'hdr_crc'] # VID header CRC32 checksum.
UBI_VID_HDR_SZ = struct.calcsize(VID_HDR_FORMAT) # 64
# Volume table records.
VTBL_REC_FORMAT = '>IIIBBH128sB23sI'
VTBL_REC_FIELDS = ['reserved_pebs', # How many PEBs reserved for this volume.
'alignment', # Volume alignment.
'data_pad', # Number of unused bytes at end of PEB.
'vol_type', # Volume type, static/dynamic.
'upd_marker', # If vol update started but not finished.
'name_len', # Length of name.
'name', # Volume name.
'flags', # Volume flags
'padding', # Reserved for future, zeros.
'crc'] # Vol record CRC32 checksum.
UBI_VTBL_REC_SZ = struct.calcsize(VTBL_REC_FORMAT) # 172
# Volume Identifier Header
UBI_VID_DYNAMIC = 1 # Volume can be resized.
UBI_VID_STATIC = 2 # Volume can not be resized.
PRINT_VOL_TYPE_LIST = [0, 'dynamic', 'static']
# Volume table record
UBI_VTBL_AUTORESIZE_FLG = 1
UBI_COMPAT_DELETE = 1 # Delete this internal volume before anything written.
UBI_COMPAT_RO = 2 # Attach this device in read-only mode.
UBI_COMPAT_PRESERVE = 4 # Preserve this internal volume - touch nothing.
UBI_COMPAT_REJECT = 5 # Reject this UBI image
PRINT_COMPAT_LIST = [0, 'Delete', 'Read Only', 0, 'Preserve', 'Reject']
# File chunk size for reads.
FILE_CHUNK_SZ = 5 * 1024 * 1024

@ -0,0 +1,161 @@
#!/usr/bin/env python
#############################################################
# ubi_reader/ubi
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
from ubireader import settings
from ubireader.ubi.defines import PRINT_COMPAT_LIST, PRINT_VOL_TYPE_LIST, UBI_VTBL_AUTORESIZE_FLG
def ubi(ubi, tab=''):
buf = '%sUBI File\n' % (tab)
buf += '%s---------------------\n' % (tab)
buf += '\t%sMin I/O: %s\n' % (tab, ubi.min_io_size)
buf += '\t%sLEB Size: %s\n' % (tab, ubi.leb_size)
buf += '\t%sPEB Size: %s\n' % (tab, ubi.peb_size)
buf += '\t%sTotal Block Count: %s\n' % (tab, ubi.block_count)
buf += '\t%sData Block Count: %s\n' % (tab, len(ubi.data_blocks_list))
buf += '\t%sLayout Block Count: %s\n' % (tab, len(ubi.layout_blocks_list))
buf += '\t%sInternal Volume Block Count: %s\n' % (tab, len(ubi.int_vol_blocks_list))
buf += '\t%sUnknown Block Count: %s\n' % (tab, len(ubi.unknown_blocks_list))
buf += '\t%sFirst UBI PEB Number: %s\n' % (tab, ubi.first_peb_num)
return buf
def image(image, tab=''):
buf = '%s%s\n' % (tab, image)
buf += '%s---------------------\n' % (tab)
buf += '\t%sImage Sequence Num: %s\n' % (tab, image.image_seq)
for volume in image.volumes:
buf += '\t%sVolume Name:%s\n' % (tab, volume)
buf += '\t%sPEB Range: %s - %s\n' % (tab, image.peb_range[0], image.peb_range[1])
return buf
def volume(volume, tab=''):
buf = '%s%s\n' % (tab, volume)
buf += '%s---------------------\n' % (tab)
buf += '\t%sVol ID: %s\n' % (tab, volume.vol_id)
buf += '\t%sName: %s\n' % (tab, volume.name.decode('utf-8'))
buf += '\t%sBlock Count: %s\n' % (tab, volume.block_count)
buf += '\n'
buf += '\t%sVolume Record\n' % (tab)
buf += '\t%s---------------------\n' % (tab)
buf += vol_rec(volume.vol_rec, '\t\t%s' % tab)
buf += '\n'
return buf
def block(block, tab='\t'):
buf = '%s%s\n' % (tab, block)
buf += '%s---------------------\n' % (tab)
buf += '\t%sFile Offset: %s\n' % (tab, block.file_offset)
buf += '\t%sPEB #: %s\n' % (tab, block.peb_num)
buf += '\t%sLEB #: %s\n' % (tab, block.leb_num)
buf += '\t%sBlock Size: %s\n' % (tab, block.size)
buf += '\t%sInternal Volume: %s\n' % (tab, block.is_internal_vol)
buf += '\t%sIs Volume Table: %s\n' % (tab, block.is_vtbl)
buf += '\t%sIs Valid: %s\n' % (tab, block.is_valid)
if not block.ec_hdr.errors or settings.ignore_block_header_errors:
buf += '\n'
buf += '\t%sErase Count Header\n' % (tab)
buf += '\t%s---------------------\n' % (tab)
buf += ec_hdr(block.ec_hdr, '\t\t%s' % tab)
if (block.vid_hdr and not block.vid_hdr.errors) or settings.ignore_block_header_errors:
buf += '\n'
buf += '\t%sVID Header\n' % (tab)
buf += '\t%s---------------------\n' % (tab)
buf += vid_hdr(block.vid_hdr, '\t\t%s' % tab)
if block.vtbl_recs:
buf += '\n'
buf += '\t%sVolume Records\n' % (tab)
buf += '\t%s---------------------\n' % (tab)
for vol in block.vtbl_recs:
buf += vol_rec(vol, '\t\t%s' % tab)
buf += '\n'
return buf
def ec_hdr(ec_hdr, tab=''):
buf = ''
for key, value in ec_hdr:
if key == 'errors':
value = ','.join(value)
elif key == 'hdr_crc':
value = hex(value)
buf += '%s%s: %r\n' % (tab, key, value)
return buf
def vid_hdr(vid_hdr, tab=''):
buf = ''
for key, value in vid_hdr:
if key == 'errors':
value = ','.join(value)
elif key == 'hdr_crc':
value = hex(value)
elif key == 'compat':
if value in PRINT_COMPAT_LIST:
value = PRINT_COMPAT_LIST[value]
else:
value = -1
elif key == 'vol_type':
if value < len(PRINT_VOL_TYPE_LIST):
value = PRINT_VOL_TYPE_LIST[value]
else:
value = -1
buf += '%s%s: %r\n' % (tab, key, value)
return buf
def vol_rec(vol_rec, tab=''):
buf = ''
for key, value in vol_rec:
if key == 'errors':
value = ','.join(value)
elif key == 'crc':
value = hex(value)
elif key == 'vol_type':
if value < len(PRINT_VOL_TYPE_LIST):
value = PRINT_VOL_TYPE_LIST[value]
else:
value = -1
elif key == 'flags' and value == UBI_VTBL_AUTORESIZE_FLG:
value = 'autoresize'
elif key == 'name':
value = value.strip(b'\x00').decode('utf-8')
elif key == 'padding':
value = value.decode('utf-8')
buf += '%s%s: %r\n' % (tab, key, value)
return buf

@ -0,0 +1,114 @@
#!/usr/bin/env python
#############################################################
# ubi_reader/ubi
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
import struct
from zlib import crc32
from ubireader.debug import log
from ubireader.ubi.defines import *
class ec_hdr(object):
def __init__(self, buf):
fields = dict(list(zip(EC_HDR_FIELDS, struct.unpack(EC_HDR_FORMAT,buf))))
for key in fields:
setattr(self, key, fields[key])
setattr(self, 'errors', [])
self._check_errors(buf[:-4])
def __repr__(self):
return 'Erase Count Header'
def __iter__(self):
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
def _check_errors(self, buf_crc):
crc_chk = (~crc32(buf_crc) & UBI_CRC32_INIT)
if self.hdr_crc != crc_chk:
log(vid_hdr, 'CRC Failed: expected 0x%x got 0x%x' % (crc_chk, self.hdr_crc))
self.errors.append('crc')
class vid_hdr(object):
def __init__(self, buf):
fields = dict(list(zip(VID_HDR_FIELDS, struct.unpack(VID_HDR_FORMAT,buf))))
for key in fields:
setattr(self, key, fields[key])
setattr(self, 'errors', [])
self._check_errors(buf[:-4])
def __iter__(self):
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
def __repr__(self):
return 'VID Header'
def _check_errors(self, buf_crc):
crc_chk = (~crc32(buf_crc) & UBI_CRC32_INIT)
if self.hdr_crc != crc_chk:
log(vid_hdr, 'CRC Failed: expected 0x%x got 0x%x' % (crc_chk, self.hdr_crc))
self.errors.append('crc')
def vtbl_recs(buf):
data_buf = buf
vtbl_recs = []
vtbl_rec_ret = ''
for i in range(0, UBI_MAX_VOLUMES):
offset = i*UBI_VTBL_REC_SZ
vtbl_rec_buf = data_buf[offset:offset+UBI_VTBL_REC_SZ]
if len(vtbl_rec_buf) == UBI_VTBL_REC_SZ:
vtbl_rec_ret = _vtbl_rec(vtbl_rec_buf)
if len(vtbl_rec_ret.errors) == 0 and vtbl_rec_ret.name_len:
vtbl_rec_ret.rec_index = i
vtbl_recs.append(vtbl_rec_ret)
return vtbl_recs
class _vtbl_rec(object):
def __init__(self, buf):
fields = dict(list(zip(VTBL_REC_FIELDS, struct.unpack(VTBL_REC_FORMAT,buf))))
for key in fields:
setattr(self, key, fields[key])
setattr(self, 'errors', [])
setattr(self, 'rec_index', -1)
self.name = self.name[0: self.name_len]
self._check_errors(buf[:-4])
def __repr__(self):
return 'Volume Table Record: %s' % getattr(self, 'name')
def __iter__(self):
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
def _check_errors(self, buf_crc):
if self.crc != (~crc32(buf_crc) & 0xFFFFFFFF):
self.errors.append('crc')

@ -0,0 +1,60 @@
#!/usr/bin/env python
#############################################################
# ubi_reader/ubi
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
from ubireader.debug import log
from ubireader.ubi import display
from ubireader.ubi.volume import get_volumes
from ubireader.ubi.block import get_blocks_in_list
class description(object):
def __init__(self, blocks, layout_info):
self._image_seq = blocks[layout_info[0]].ec_hdr.image_seq
self.vid_hdr_offset = blocks[layout_info[0]].ec_hdr.vid_hdr_offset
self.version = blocks[layout_info[0]].ec_hdr.version
self._block_list = layout_info[2]
self._start_peb = min(layout_info[2])
self._end_peb = max(layout_info[2])
self._volumes = get_volumes(blocks, layout_info)
log(description, 'Created Image: %s, Volume Cnt: %s' % (self.image_seq, len(self.volumes)))
def __repr__(self):
return 'Image: %s' % (self.image_seq)
def get_blocks(self, blocks):
return get_blocks_in_list(blocks, self._block_list)
def _get_peb_range(self):
return [self._start_peb, self._end_peb]
peb_range = property(_get_peb_range)
def _get_image_seq(self):
return self._image_seq
image_seq = property(_get_image_seq)
def _get_volumes(self):
return self._volumes
volumes = property(_get_volumes)
def display(self, tab=''):
return display.image(self, tab)

@ -0,0 +1,122 @@
#!/usr/bin/env python
#############################################################
# ubi_reader/ubi
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
from ubireader.debug import log
from ubireader.ubi import display
from ubireader.ubi.block import sort, get_blocks_in_list, rm_old_blocks
class description(object):
"""UBI Volume object
Attributes:
Int:vol_id -- Volume ID
Str:name -- Name of volume.
Obj:vol_rec -- Volume record object
Int:block_count -- Number of block associated with volume.
Methods:
display(tab) -- Print Volume information
Str:tab -- (optional) '\t' to preface lines with.
get_blocks(blocks) -- Returns list of block objects tied to this volume
Volume object is basically a list of block indexes and some metadata
describing a volume found in a UBI image.
"""
def __init__(self, vol_id, vol_rec, block_list):
self._vol_id = vol_id
self._vol_rec = vol_rec
self._name = self._vol_rec.name
self._block_list = block_list
log(description, 'Create Volume: %s, ID: %s, Block Cnt: %s' % (self.name, self.vol_id, len(self.block_list)))
def __repr__(self):
return 'Volume: %s' % (self.name.decode('utf-8'))
def _get_name(self):
return self._name
name = property(_get_name)
def _get_vol_id(self):
return self._vol_id
vol_id = property(_get_vol_id)
def _get_block_count(self):
return len(self._block_list)
block_count = property(_get_block_count)
def _get_vol_rec(self):
return self._vol_rec
vol_rec = property(_get_vol_rec)
def _get_block_list(self):
return self._block_list
block_list = property(_get_block_list)
def get_blocks(self, blocks):
return get_blocks_in_list(blocks, self._block_list)
def display(self, tab=''):
return display.volume(self, tab)
def reader(self, ubi):
last_leb = 0
for block in sort.by_leb(self.get_blocks(ubi.blocks)):
if block == 'x':
last_leb += 1
yield b'\xff'*ubi.leb_size
else:
last_leb += 1
yield ubi.file.read_block_data(ubi.blocks[block])
def get_volumes(blocks, layout_info):
"""Get a list of UBI volume objects from list of blocks
Arguments:
List:blocks -- List of layout block objects
List:layout_info -- Layout info (indexes of layout blocks and
associated data blocks.)
Returns:
Dict -- Of Volume objects by volume name, including any
relevant blocks.
"""
volumes = {}
vol_blocks_lists = sort.by_vol_id(blocks, layout_info[2])
for vol_rec in blocks[layout_info[0]].vtbl_recs:
vol_name = vol_rec.name.strip(b'\x00').decode('utf-8')
if vol_rec.rec_index not in vol_blocks_lists:
vol_blocks_lists[vol_rec.rec_index] = []
vol_blocks_lists[vol_rec.rec_index] = rm_old_blocks(blocks, vol_blocks_lists[vol_rec.rec_index])
volumes[vol_name] = description(vol_rec.rec_index, vol_rec, vol_blocks_lists[vol_rec.rec_index])
return volumes

@ -0,0 +1,251 @@
#!/usr/bin/env python
#############################################################
# ubi_reader/ubi_io
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
from ubireader.debug import error, log, verbose_log
from ubireader.ubi.block import sort
class ubi_file(object):
"""UBI image file object
Arguments:
Str:path -- Path to file to parse
Int:block_size -- Erase block size of NAND in bytes.
Int:start_offset -- (optional) Where to start looking in the file for
UBI data.
Int:end_offset -- (optional) Where to stop looking in the file.
Methods:
seek -- Put file head to specified byte offset.
Int:offset
read -- Read specified bytes from file handle.
Int:size
tell -- Returns byte offset of current file location.
read_block -- Returns complete PEB data of provided block
description.
Obj:block
read_block_data -- Returns LEB data only from provided block.
Obj:block
reader -- Generator that returns data from file.
reset -- Reset file position to start_offset.
is_valid -- If the object intialized okay.
Handles all the actual file interactions, read, seek,
extract blocks, etc.
"""
def __init__(self, path, block_size, start_offset=0, end_offset=None):
self.__name__ = 'UBI_File'
self.is_valid = False
try:
log(self, 'Open Path: %s' % path)
self._fhandle = open(path, 'rb')
except Exception as e:
error(self, 'Fatal', 'Open file: %s' % e)
self._fhandle.seek(0,2)
file_size = self.tell()
log(self, 'File Size: %s' % file_size)
self._start_offset = start_offset
log(self, 'Start Offset: %s' % (self._start_offset))
if end_offset:
self._end_offset = end_offset
else:
self._end_offset = file_size
log(self, 'End Offset: %s' % (self._end_offset))
self._block_size = block_size
log(self, 'Block Size: %s' % block_size)
if start_offset > self._end_offset:
error(self, 'Fatal', 'Start offset larger than end offset.')
if ( not end_offset is None ) and ( end_offset > file_size ):
error(self, 'Fatal', 'End offset larger than file size.')
remainder = (self._end_offset - start_offset) % block_size
if remainder != 0:
error(self, 'Warning', 'end_offset - start_offset length is not block aligned, could mean missing data.')
self._fhandle.seek(self._start_offset)
self._last_read_addr = self._fhandle.tell()
self.is_valid = True
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def _set_start(self, i):
self._start_offset = i
def _get_start(self):
return self._start_offset
start_offset = property(_get_start, _set_start)
def _get_end(self):
return self._end_offset
end_offset = property(_get_end)
def _get_block_size(self):
return self._block_size
block_size = property(_get_block_size)
def close(self):
self._fhandle.close()
def seek(self, offset):
self._fhandle.seek(offset)
def read(self, size):
self._last_read_addr = self.tell()
verbose_log(self, 'read loc: %s, size: %s' % (self._last_read_addr, size))
return self._fhandle.read(size)
def tell(self):
return self._fhandle.tell()
def last_read_addr(self):
return self._last_read_addr
def reset(self):
self._fhandle.seek(self.start_offset)
def reader(self):
self.reset()
while True:
cur_loc = self._fhandle.tell()
if self.end_offset and cur_loc > self.end_offset:
break
elif self.end_offset and self.end_offset - cur_loc < self.block_size:
chunk_size = self.end_offset - cur_loc
else:
chunk_size = self.block_size
buf = self.read(chunk_size)
if not buf:
break
yield buf
def read_block(self, block):
"""Read complete PEB data from file.
Argument:
Obj:block -- Block data is desired for.
"""
self.seek(block.file_offset)
return self._fhandle.read(block.size)
def read_block_data(self, block):
"""Read LEB data from file
Argument:
Obj:block -- Block data is desired for.
"""
self.seek(block.file_offset + block.ec_hdr.data_offset)
buf = self._fhandle.read(block.size - block.ec_hdr.data_offset - block.vid_hdr.data_pad)
return buf
class leb_virtual_file():
def __init__(self, ubi, block_list):
self.__name__ = 'leb_virtual_file'
self.is_valid = False
self._ubi = ubi
self._last_read_addr = 0
if not len(block_list):
error(self, 'Info', 'Empty block list')
else:
self._blocks = sort.by_leb(block_list)
self._seek = 0
self._last_leb = -1
self._last_buf = ''
self.is_valid = True
def read(self, size):
buf = ''
leb = int(self.tell() / self._ubi.leb_size)
offset = self.tell() % self._ubi.leb_size
try:
if size < 0:
raise Exception('Bad Read Offset Request')
self._last_read_addr = self._ubi.blocks[self._blocks[leb]].file_offset + self._ubi.blocks[self._blocks[leb]].ec_hdr.data_offset + offset
except Exception as e:
error(self.read, 'Error', 'LEB: %s is corrupted or has no data.' % (leb))
raise Exception('Bad Read Offset Request')
verbose_log(self, 'read loc: %s, size: %s' % (self._last_read_addr, size))
if leb == self._last_leb:
self.seek(self.tell() + size)
return self._last_buf[offset:offset+size]
else:
try:
buf = self._ubi.file.read_block_data(self._ubi.blocks[self._blocks[leb]])
self._last_buf = buf
self._last_leb = leb
self.seek(self.tell() + size)
return buf[offset:offset+size]
except Exception as e:
error(self, 'Fatal', 'read loc: %s, size: %s, LEB: %s, offset: %s, error: %s' % (self._last_read_addr, size, leb, offset, e))
def reset(self):
self.seek(0)
def seek(self, offset):
self._seek = offset
def tell(self):
return self._seek
def last_read_addr(self):
"""Start address of last physical file read"""
return self._last_read_addr
def reader(self):
last_leb = 0
for block in self._blocks:
while 0 != (self._ubi.blocks[block].leb_num - last_leb):
last_leb += 1
yield b'\xff'*self._ubi.leb_size
last_leb += 1
yield self._ubi.file.read_block_data(self._ubi.blocks[block])

@ -0,0 +1,149 @@
#!/usr/bin/env python
#############################################################
# ubi_reader/ubifs
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
from ubireader.debug import error, log, verbose_display
from ubireader.ubifs.defines import *
from ubireader.ubifs import nodes, display
class ubifs():
"""UBIFS object
Arguments:
Str:path -- File path to UBIFS image.
Attributes:
Obj:file -- File object
Int:leb_size -- Size of Logical Erase Blocks.
Int:min_io -- Size of min I/O from vid_hdr_offset.
Obj:sb_node -- Superblock node of UBIFS image LEB0
Obj:mst_node -- Master Node of UBIFS image LEB1
Obj:mst_node2 -- Master Node 2 of UBIFS image LEB2
"""
def __init__(self, ubifs_file):
self.__name__ = 'UBIFS'
self._file = ubifs_file
try:
self.file.reset()
sb_chdr = nodes.common_hdr(self.file.read(UBIFS_COMMON_HDR_SZ))
log(self , '%s file addr: %s' % (sb_chdr, self.file.last_read_addr()))
verbose_display(sb_chdr)
if sb_chdr.node_type == UBIFS_SB_NODE:
self.file.seek(UBIFS_COMMON_HDR_SZ)
buf = self.file.read(UBIFS_SB_NODE_SZ)
self._sb_node = nodes.sb_node(buf, self.file.last_read_addr())
self._min_io_size = self._sb_node.min_io_size
self._leb_size = self._sb_node.leb_size
log(self , '%s file addr: %s' % (self._sb_node, self.file.last_read_addr()))
verbose_display(self._sb_node)
else:
raise Exception('Wrong node type.')
except Exception as e:
error(self, 'Fatal', 'Super block error: %s' % e)
self._mst_nodes = [None, None]
for i in range(0, 2):
try:
mst_offset = self.leb_size * (UBIFS_MST_LNUM + i)
self.file.seek(mst_offset)
mst_chdr = nodes.common_hdr(self.file.read(UBIFS_COMMON_HDR_SZ))
log(self , '%s file addr: %s' % (mst_chdr, self.file.last_read_addr()))
verbose_display(mst_chdr)
if mst_chdr.node_type == UBIFS_MST_NODE:
self.file.seek(mst_offset + UBIFS_COMMON_HDR_SZ)
buf = self.file.read(UBIFS_MST_NODE_SZ)
self._mst_nodes[i] = nodes.mst_node(buf, self.file.last_read_addr())
log(self , '%s%s file addr: %s' % (self._mst_nodes[i], i, self.file.last_read_addr()))
verbose_display(self._mst_nodes[i])
else:
raise Exception('Wrong node type.')
except Exception as e:
error(self, 'Warn', 'Master block %s error: %s' % (i, e))
if self._mst_nodes[0] is None and self._mst_nodes[1] is None:
error(self, 'Fatal', 'No valid Master Node found.')
elif self._mst_nodes[0] is None and self._mst_nodes[1] is not None:
self._mst_nodes[0] = self._mst_nodes[1]
self._mst_nodes[1] = None
log(self , 'Swapping Master Nodes due to bad first node.')
def _get_file(self):
return self._file
file = property(_get_file)
def _get_superblock(self):
""" Superblock Node Object
Returns:
Obj:Superblock Node
"""
return self._sb_node
superblock_node = property(_get_superblock)
def _get_master_node(self):
"""Master Node Object
Returns:
Obj:Master Node
"""
return self._mst_nodes[0]
master_node = property(_get_master_node)
def _get_master_node2(self):
"""Master Node Object 2
Returns:
Obj:Master Node
"""
return self._mst_nodes[1]
master_node2 = property(_get_master_node2)
def _get_leb_size(self):
"""LEB size of UBI blocks in file.
Returns:
Int -- LEB Size.
"""
return self._leb_size
leb_size = property(_get_leb_size)
def _get_min_io_size(self):
"""Min I/O Size
Returns:
Int -- Min I/O Size.
"""
return self._min_io_size
min_io_size = property(_get_min_io_size)
def display(self, tab=''):
"""Print information about this object.
Argument:
Str:tab -- '\t' for spacing this object.
"""
return display.ubifs(self, tab)

@ -0,0 +1,432 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#############################################################
# Adapted in part from linux-source-3.2/fs/ubi/ubi-media.h
# for use in Python.
# Oct. 2013 by Jason Pruitt
#
# Original copyright notice.
# --------------------------
#
# This file is part of UBIFS.
#
# Copyright (C) 2006-2008 Nokia Corporation.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 2 as published by
# the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
# Authors: Artem Bityutskiy (Битюцкий Артём)
# Adrian Hunter
#
#############################################################
import struct
# Constant defines
# Common Header.
UBIFS_NODE_MAGIC = b'\x31\x18\x10\x06' # Set to LSB
# Initial CRC32 value.
UBIFS_CRC32_INIT = 0xFFFFFFFF
# Do not compress data smaller than this.
UBIFS_MIN_COMPR_LEN = 128
# If difference between compressed data length and compressed data
# length, is less than this, do not compress data.
UBIFS_MIN_COMPRESS_DIFF = 64
# Root inode number
UBIFS_ROOT_INO = 1
# Lowest inode number for regular inodes, non-internal inodes.
UBIFS_FIRST_INO = 64
# Max file name and extended attr length (muptple of 8 minus 1.
UBIFS_MAX_NLEN = 255
# Max number of data journal heads.
UBIFS_MAX_JHEADS = 1
# Max data node data length/amount attached to inode node.
UBIFS_BLOCK_SIZE = 4096
UBIFS_BLOCK_SHIFT = 12
# UBIFS padding byte pattern.
UBIFS_PADDING_BYTE = b'\xCE'
# Max key length
UBIFS_MAX_KEY_LEN = 16
# Key length of simple format.
UBIFS_SK_LEN = 8
# Min index tree fanout.
UBIFS_MIN_FANOUT = 3
# Max number of levels in UBIFS indexing B-tree.
UBIFS_MAX_LEVELS = 512
# Max amount of data attached to inode in bytes.
UBIFS_MAX_INO_DATA = UBIFS_BLOCK_SIZE
# LEB Properties Tree fanout (power of 2) and fanout.
UBIFS_LPT_FANOUT = 4
UBIFS_LPT_FANOUT_SHIFT = 2
# LEB Properties Tree bit field sizes.
UBIFS_LPT_CRC_BITS = 16
UBIFS_LPT_CRC_BYTES = 2
UBIFS_LPT_TYPE_BITS = 4
# LEB Properties Tree node types.
UBIFS_LPT_PNODE = 0 # LPT leaf node (contains LEB Properties)
UBIFS_LPT_NNODE = 1 # LPT internal node
UBIFS_LPT_LTAB = 2 # LPT's own lprops table
UBIFS_LPT_LSAVE = 3 # LPT's save table (big model only)
UBIFS_LPT_NODE_CNT = 4 # count of LPT node types
UBIFS_LPT_NOT_A_NODE = (1 << UBIFS_LPT_TYPE_BITS) - 1 # 4 bits of 1
# Inode types
UBIFS_ITYPE_REG = 0 # Regular file
UBIFS_ITYPE_DIR = 1 # Directory
UBIFS_ITYPE_LNK = 2 # Soft link
UBIFS_ITYPE_BLK = 3 # Block device node
UBIFS_ITYPE_CHR = 4 # Char device node
UBIFS_ITYPE_FIFO = 5 # FIFO
UBIFS_ITYPE_SOCK = 6 # Socket
UBIFS_ITYPES_CNT = 7 # Support file type count
# Supported key has functions
UBIFS_KEY_HASH_R5 = 0 # R5 hash
UBIFS_KEY_HASH_TEST = 1 # Test hash, returns first 4 bytes of name
PRINT_UBIFS_KEY_HASH = ['r5', 'test']
# Supported key formats
UBIFS_SIMPLE_KEY_FMT = 0
# Simple key format uses 29 bits for storing UBIFS name and hash.
UBIFS_S_KEY_BLOCK_BITS = 29
UBIFS_S_KEY_BLOCK_MASK = 0x1FFFFFFF
UBIFS_S_KEY_HASH_BITS = UBIFS_S_KEY_BLOCK_BITS
UBIFS_S_KEY_HASH_MASK = UBIFS_S_KEY_BLOCK_MASK
# Key types
UBIFS_INO_KEY = 0 # Inode node key
UBIFS_DATA_KEY = 1 # Data node key
UBIFS_DENT_KEY = 2 # Directory node key
UBIFS_XENT_KEY = 3 # Extended attribute entry key
UBIFS_KEY_TYPES_CNT = 4 # Supported key count
# Number of reserved LEBs for Superblock area
UBIFS_SB_LEBS = 1
# Number of reserved LEBs for master area
UBIFS_MST_LEBS = 2
# First LEB of the Superblock area
UBIFS_SB_LNUM = 0
# First LEB of the master area
UBIFS_MST_LNUM = (UBIFS_SB_LNUM + UBIFS_SB_LEBS)
# First LEB of log area
UBIFS_LOG_LNUM = (UBIFS_MST_LNUM + UBIFS_MST_LEBS)
# On-flash inode flags
UBIFS_COMPR_FL = 1 # Use compression for this inode
UBIFS_SYNC_FL = 2 # Has to be synchronous I/O
UBIFS_IMMUTABLE_FL = 4 # Inode is immutable
UBIFS_APPEND_FL = 8 # Writes may only append data
UBIFS_DIRSYNC_FL = 16 # I/O on this directory inode must be synchronous
UBIFS_XATTR_FL = 32 # This inode is inode for extended attributes
# Inode flag bits used by UBIFS
UBIFS_FL_MASK = 0x0000001F
# Compression alogrithms.
UBIFS_COMPR_NONE = 0 # No compression
UBIFS_COMPR_LZO = 1 # LZO compression
UBIFS_COMPR_ZLIB = 2 # ZLIB compression
UBIFS_COMPR_ZSTD = 3 # ZSTD compression
UBIFS_COMPR_TYPES_CNT = 4 # Count of supported compression types
PRINT_UBIFS_COMPR = ['none','lzo','zlib', 'zstd']
# UBIFS node types
UBIFS_INO_NODE = 0 # Inode node
UBIFS_DATA_NODE = 1 # Data node
UBIFS_DENT_NODE = 2 # Directory entry node
UBIFS_XENT_NODE = 3 # Extended attribute node
UBIFS_TRUN_NODE = 4 # Truncation node
UBIFS_PAD_NODE = 5 # Padding node
UBIFS_SB_NODE = 6 # Superblock node
UBIFS_MST_NODE = 7 # Master node
UBIFS_REF_NODE = 8 # LEB reference node
UBIFS_IDX_NODE = 9 # Index node
UBIFS_CS_NODE = 10 # Commit start node
UBIFS_ORPH_NODE = 11 # Orphan node
UBIFS_AUTH_NODE = 12 # Authentication node
UBIFS_SIG_NODE = 13 # Signature node
UBIFS_NODE_TYPES_CNT = 14 # Count of supported node types
# Master node flags
UBIFS_MST_DIRTY = 1 # Rebooted uncleanly
UBIFS_MST_NO_ORPHS = 2 # No orphans present
UBIFS_MST_RCVRY = 4 # Written by recovery
PRINT_UBIFS_MST = [[UBIFS_MST_DIRTY, 'Dirty'],
[UBIFS_MST_NO_ORPHS, 'No orphans'],
[UBIFS_MST_RCVRY, 'Recovery write'],
]
# Node group type
UBIFS_NO_NODE_GROUP = 0 # This node is not part of a group
UBIFS_IN_NODE_GROUP = 1 # This node is part of a group
UBIFS_LAST_OF_NODE_GROUP = 2 # This node is the last in a group
# Superblock flags
UBIFS_FLG_BIGLPT = 2 # If 'big' LPT model is used if set.
UBIFS_FLG_SPACE_FIXUP = 4 # First-mount 'fixup' of free space within.
UBIFS_FLG_DOUBLE_HASH = 8 # Store 32bit cookie for 64bit support.
UBIFS_FLG_ENCRYPTION = 16 # If filesystem contains encrypted files.
UBIFS_FLG_AUTHENTICATION = 32 # If contains hashes for authentication.
PRINT_UBIFS_FLGS = [[UBIFS_FLG_BIGLPT, 'Big LPT'],
[UBIFS_FLG_SPACE_FIXUP, 'Space fixup'],
[UBIFS_FLG_DOUBLE_HASH, 'Double hash'],
[UBIFS_FLG_ENCRYPTION, 'Encryption'],
[UBIFS_FLG_AUTHENTICATION,'Authentication'],
]
# Struct defines
# Common header node
UBIFS_COMMON_HDR_FORMAT = '<IIQIBB2s'
UBIFS_COMMON_HDR_FIELDS = ['magic', # UBIFS node magic number.
'crc', # CRC32 checksum of header.
'sqnum', # Sequence number.
'len', # Full node length.
'node_type', # Node type.
'group_type', # Node group type.
'padding', # Reserved for future, zeros.
]
UBIFS_COMMON_HDR_SZ = struct.calcsize(UBIFS_COMMON_HDR_FORMAT)
# LEBs needed.
# Key offset in key nodes
# out of place because of ordering issues.
UBIFS_KEY_OFFSET = UBIFS_COMMON_HDR_SZ
# Device node descriptor
UBIFS_DEV_DESC_FORMAT = '<IQ'
UBIFS_DEV_DESC_FIELDS = ['new', # New type device descriptor.
'huge', # huge type device descriptor.
]
UBIFS_DEV_DESC_SZ = struct.calcsize(UBIFS_DEV_DESC_FORMAT)
# Inode node
UBIFS_INO_NODE_FORMAT = '<%ssQQQQQIIIIIIIIIII4sIH26s' % (UBIFS_MAX_KEY_LEN)
UBIFS_INO_NODE_FIELDS = ['key', # Node key.
'creat_sqnum', # Sequence number at time of creation.
'size', # Inode size in bytes (uncompressed).
'atime_sec', # Access time in seconds.
'ctime_sec', # Creation time seconds.
'mtime_sec', # Modification time in seconds.
'atime_nsec', # Access time in nanoseconds.
'ctime_nsec', # Creation time in nanoseconds.
'mtime_nsec', # Modification time in nanoseconds.
'nlink', # Number of hard links.
'uid', # Owner ID.
'gid', # Group ID.
'mode', # Access flags.
'flags', # Per-inode flags.
'data_len', # Inode data length.
'xattr_cnt', # Count of extended attr this inode has.
'xattr_size', # Summarized size of all extended
# attributes in bytes.
'padding1', # Reserved for future, zeros.
'xattr_names', # Sum of lengths of all extended
# attribute names belonging to this
# inode.
'compr_type', # Compression type used for this inode.
'padding2', # Reserved for future, zeros.
]
# 'data', no size.
UBIFS_INO_NODE_SZ = struct.calcsize(UBIFS_INO_NODE_FORMAT)
# Directory entry node
UBIFS_DENT_NODE_FORMAT = '<%ssQBBHI' % (UBIFS_MAX_KEY_LEN)
UBIFS_DENT_NODE_FIELDS = ['key', # Node key.
'inum', # Target inode number.
'padding1', # Reserved for future, zeros.
'type', # Type of target inode.
'nlen', # Name length.
'cookie', # 32bit random number, used to
# construct a 64bit identifier.
]
# 'name', no size.
UBIFS_DENT_NODE_SZ = struct.calcsize(UBIFS_DENT_NODE_FORMAT)
# Data node
UBIFS_DATA_NODE_FORMAT = '<%ssIHH' % (UBIFS_MAX_KEY_LEN)
UBIFS_DATA_NODE_FIELDS = ['key', # Node key.
'size', # Uncompressed data size.
'compr_type', # Compression type UBIFS_COMPR_*.
'compr_size', # Compressed data size in bytes
# only valid when data is encrypted.
]
# 'data', no size.
UBIFS_DATA_NODE_SZ = struct.calcsize(UBIFS_DATA_NODE_FORMAT)
# Truncation node
UBIFS_TRUN_NODE_FORMAT = '<I12sQQ'
UBIFS_TRUN_NODE_FIELDS = ['inum', # Truncated inode number.
'padding', # Reserved for future, zeros.
'old_size', # size before truncation.
'new_size', # Size after truncation.
]
UBIFS_TRUN_NODE_SZ = struct.calcsize(UBIFS_TRUN_NODE_FORMAT)
# Padding node
UBIFS_PAD_NODE_FORMAT = '<I'
UBIFS_PAD_NODE_FIELDS = ['pad_len'] # Number of bytes after this inode unused.
UBIFS_PAD_NODE_SZ = struct.calcsize(UBIFS_PAD_NODE_FORMAT)
# The maxmimum size of a hash, enough for sha512
UBIFS_MAX_HASH_LEN = 64
# The maxmimum size of a hmac, enough for hmac(sha512)
UBIFS_MAX_HMAC_LEN = 64
# Superblock node
UBIFS_SB_NODE_FORMAT = '<2sBBIIIIIQIIIIIIIH2sIIQI16sI%ss%ssH%ss3774s' % (UBIFS_MAX_HMAC_LEN, UBIFS_MAX_HMAC_LEN, UBIFS_MAX_HASH_LEN)
UBIFS_SB_NODE_FIELDS = ['padding', # Reserved for future, zeros.
'key_hash', # Type of hash func used in keys.
'key_fmt', # Format of the key.
'flags', # File system flags.
'min_io_size', # Min I/O unit size.
'leb_size', # LEB size in bytes.
'leb_cnt', # LEB count used by FS.
'max_leb_cnt', # Max count of LEBS used by FS.
'max_bud_bytes', # Max amount of data stored in buds.
'log_lebs', # Log size in LEBs.
'lpt_lebs', # Number of LEBS used for lprops
# table.
'orph_lebs', # Number of LEBS used for
# recording orphans.
'jhead_cnt', # Count of journal heads.
'fanout', # Tree fanout, max number of links
# per indexing node.
'lsave_cnt', # Number of LEB numbers in LPT's
# save table.
'fmt_version', # UBIFS on-flash format version.
'default_compr', # Default compression used.
'padding1', # Reserved for future, zeros.
'rp_uid', # Reserve pool UID.
'rp_gid', # Reserve pool GID.
'rp_size', # Reserve pool size in bytes.
'time_gran', # Time granularity in nanoseconds.
'uuid', # UUID generated when the FS image
# was created.
'ro_compat_version',# UBIFS R/O Compatibility version.
'hmac', # HAMC to authenticate the superblock node.
'hmac_wkm', # HMAC of a well known message (the string "UBIFS").
# as a convenience to the user to chek if the correct
# key is past.
'hash_algo', # The has algo used for this feilseystem.
# (one of enum hash_algo).
'hash_mst', # Hash of the master node, only valid for
# signed images in which the master node
# does not contain a hmac.
'padding2' # Reserved for future, zeros.
]
UBIFS_SB_NODE_SZ = struct.calcsize(UBIFS_SB_NODE_FORMAT)
# Master node
UBIFS_MST_NODE_FORMAT = '<QQIIIIIIIIQQQQQQIIIIIIIIIIII%ss%ss%ss152s' % (UBIFS_MAX_HASH_LEN, UBIFS_MAX_HASH_LEN, UBIFS_MAX_HMAC_LEN)
UBIFS_MST_NODE_FIELDS = ['highest_inum', # Highest inode number in the
# committed index.
'cmt_no', # Commit Number.
'flags', # Various flags.
'log_lnum', # LEB num start of log.
'root_lnum', # LEB num of root indexing node.
'root_offs', # Offset within root_lnum
'root_len', # Root indexing node length.
'gc_lnum', # LEB reserved for garbage collection.
'ihead_lnum', # LEB num of index head.
'ihead_offs', # Offset of index head.
'index_size', # Size of index on flash.
'total_free', # Total free space in bytes.
'total_dirty', # Total dirty space in bytes.
'total_used', # Total used space in bytes (data LEBs)
'total_dead', # Total dead space in bytes (data LEBs)
'total_dark', # Total dark space in bytes (data LEBs)
'lpt_lnum', # LEB num of LPT root nnode.
'lpt_offs', # Offset of LPT root nnode.
'nhead_lnum', # LEB num of LPT head.
'nhead_offs', # Offset of LPT head.
'ltab_lnum', # LEB num of LPT's own lprop table.
'ltab_offs', # Offset of LPT's own lprop table.
'lsave_lnum', # LEB num of LPT's save table.
'lsave_offs', # Offset of LPT's save table.
'lscan_lnum', # LEB num of last LPT scan.
'empty_lebs', # Number of empty LEBs.
'idx_lebs', # Number of indexing LEBs.
'leb_cnt', # Count of LEBs used by FS.
'hash_root_idx', # The hash of the root index node.
'hash_lpt', # The has of the LPT.
'hmac', # HMAC to athenticate the master node.
'padding', # Reserved for future, zeros.
]
UBIFS_MST_NODE_SZ = struct.calcsize(UBIFS_MST_NODE_FORMAT)
# LEB Reference node
UBIFS_REF_NODE_FORMAT = '<III28s'
UBIFS_REF_NODE_FIELDS = ['lnum', # Referred LEB number.
'offs', # Start offset of referred LEB.
'jhead', # Journal head number.
'padding', # Reserved for future, zeros.
]
UBIFS_REF_NODE_SZ = struct.calcsize(UBIFS_REF_NODE_FORMAT)
# Signature node
UBIFS_SIG_NODE_FORMAT = '<II32s'
UBIFS_SIG_NODE_FIELDS = ['type', # Type of the signature.
'len', # Length of signature data.
'padding', # Reserved for future, zeros.
]
# 'sig', no size.
UBIFS_SIG_NODE_SZ = struct.calcsize(UBIFS_SIG_NODE_FORMAT)
# key/reference/length branch
UBIFS_BRANCH_FORMAT = '<III%ss' % (UBIFS_SK_LEN)
UBIFS_BRANCH_FIELDS = ['lnum', # LEB number of target node.
'offs', # Offset within lnum.
'len', # Target node length.
'key', # Using UBIFS_SK_LEN as size.
]
UBIFS_BRANCH_SZ = struct.calcsize(UBIFS_BRANCH_FORMAT)
# Indexing node
UBIFS_IDX_NODE_FORMAT = '<HH'
UBIFS_IDX_NODE_FIELDS = ['child_cnt', # Number of child index nodes.
'level', # Tree level.
]
# 'branches', no size.
UBIFS_IDX_NODE_SZ = struct.calcsize(UBIFS_IDX_NODE_FORMAT)
# File chunk size for reads.
FILE_CHUNK_SZ = 5 * 1024 *1024

@ -0,0 +1,165 @@
#!/usr/bin/env python
#############################################################
# ubi_reader/ubifs/display
# (c) 2014 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
from ubireader.ubifs.defines import PRINT_UBIFS_FLGS, PRINT_UBIFS_MST
def ubifs(ubifs, tab=''):
buf = '%sUBIFS Image\n' % (tab)
buf += '%s---------------------\n' % (tab)
buf += '%sMin I/O: %s\n' % (tab, ubifs.min_io_size)
buf += '%sLEB Size: %s\n' % (tab, ubifs.leb_size)
return buf
def common_hdr(chdr, tab=''):
buf = '%s%s\n' % (tab, chdr)
buf += '%s---------------------\n' % (tab)
tab += '\t'
for key, value in chdr:
if key == 'display':
continue
elif key == 'crc':
buf += '%s%s: 0x%x\n' % (tab, key, value)
elif key == 'errors':
buf += '%s%s: %s\n' % (tab, key, ','.join(value))
else:
buf += '%s%s: %r\n' % (tab, key, value)
return buf
def sb_node(node, tab=''):
buf = '%s%s\n' % (tab, node)
buf += '%sFile offset: %s\n' % (tab, node.file_offset)
buf += '%s---------------------\n' % (tab)
tab += '\t'
for key, value in node:
if key == 'display':
continue
elif key == 'errors':
buf += '%s%s: %s\n' % (tab, key, ','.join(value))
elif key == 'uuid':
buf += '%s%s: %r\n' % (tab, key, value)
elif key == 'flags':
flags = ''
for flag in PRINT_UBIFS_FLGS:
if value & flag[0]:
flags += '%s, ' % flag[1]
if flags.endswith(', '):
flags = flags[0:-2]
buf += '%s%s: %s\n' % (tab, key, flags)
else:
buf += '%s%s: %r\n' % (tab, key, value)
return buf
def mst_node(node, tab=''):
buf = '%s%s\n' % (tab, node)
buf += '%sFile offset: %s\n' % (tab, node.file_offset)
buf += '%s---------------------\n' % (tab)
tab += '\t'
for key, value in node:
if key == 'display':
continue
elif key == 'errors':
buf += '%s%s: %s\n' % (tab, key, ','.join(value))
elif key == 'flags':
flags = ''
for flag in PRINT_UBIFS_MST:
if value & flag[0]:
flags += '%s, ' % flag[1]
if flags.endswith(', '):
flags = flags[0:-2]
buf += '%s%s: %s\n' % (tab, key, flags)
else:
buf += '%s%s: %r\n' % (tab, key, value)
return buf
def dent_node(node, tab=''):
buf = '%s%s\n' % (tab, node)
buf += '%s---------------------\n' % (tab)
tab += '\t'
for key, value in node:
if key == 'display':
continue
elif key == 'errors':
buf += '%s%s: %s\n' % (tab, key, ','.join(value))
else:
buf += '%s%s: %r\n' % (tab, key, value)
return buf
def data_node(node, tab=''):
buf = '%s%s\n' % (tab, node)
buf += '%s---------------------\n' % (tab)
tab += '\t'
for key, value in node:
if key in ['display', 'data']:
continue
elif key == 'errors':
buf += '%s%s: %s\n' % (tab, key, ','.join(value))
else:
buf += '%s%s: %r\n' % (tab, key, value)
return buf
def idx_node(node, tab=''):
buf = '%s%s\n' % (tab, node)
buf += '%s---------------------\n' % (tab)
tab += '\t'
for key, value in node:
if key == 'display':
continue
elif key == 'errors':
buf += '%s%s: %s\n' % (tab, key, ','.join(value))
else:
buf += '%s%s: %r\n' % (tab, key, value)
return buf
def ino_node(node, tab=''):
buf = '%s%s\n' % (tab, node)
buf += '%s---------------------\n' % (tab)
tab += '\t'
for key, value in node:
if key == 'display':
continue
elif key == 'errors':
buf += '%s%s: %s\n' % (tab, key, ','.join(value))
else:
buf += '%s%s: %r\n' % (tab, key, value)
return buf
def branch(node, tab=''):
buf = '%s%s\n' % (tab, node)
buf += '%s---------------------\n' % (tab)
tab += '\t'
for key, value in node:
if key == 'display':
continue
elif key == 'errors':
buf += '%s%s: %s\n' % (tab, key, ','.join(value))
else:
buf += '%s%s: %r\n' % (tab, key, value)
return buf

@ -0,0 +1,177 @@
#!/usr/bin/env python
#############################################################
# ubi_reader/ubifs
# (C) Collin Mulliner based on Jason Pruitt's output.py
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
import os
import time
from ubireader.ubifs.defines import *
from ubireader.ubifs import walk
from ubireader.ubifs.misc import decompress
from ubireader.debug import error, log, verbose_log
def list_files(ubifs, list_path):
pathnames = list_path.split("/")
pnames = []
for i in pathnames:
if len(i) > 0:
pnames.append(i)
try:
inodes = {}
bad_blocks = []
walk.index(ubifs, ubifs.master_node.root_lnum, ubifs.master_node.root_offs, inodes, bad_blocks)
if len(inodes) < 2:
raise Exception('No inodes found')
inum = find_dir(inodes, 1, pnames, 0)
if inum == None:
return
if not 'dent' in inodes[inum]:
return
for dent in inodes[inum]['dent']:
print_dent(ubifs, inodes, dent, longts=False)
if len(bad_blocks):
error(list_files, 'Warn', 'Data may be missing or corrupted, bad blocks, LEB [%s]' % ','.join(map(str, bad_blocks)))
except Exception as e:
error(list_files, 'Error', '%s' % e)
def copy_file(ubifs, filepath, destpath):
pathnames = filepath.split("/")
pnames = []
for i in pathnames:
if len(i) > 0:
pnames.append(i)
filename = pnames[len(pnames)-1]
del pnames[-1]
inodes = {}
bad_blocks = []
walk.index(ubifs, ubifs.master_node.root_lnum, ubifs.master_node.root_offs, inodes, bad_blocks)
if len(inodes) < 2:
return False
inum = find_dir(inodes, 1, pnames, 0)
if inum == None:
return False
if not 'dent' in inodes[inum]:
return False
for dent in inodes[inum]['dent']:
if dent.name == filename:
filedata = _process_reg_file(ubifs, inodes[dent.inum], filepath)
if os.path.isdir(destpath):
destpath = os.path.join(destpath, filename)
with open(destpath, 'wb') as f:
f.write(filedata)
return True
return False
def find_dir(inodes, inum, names, idx):
if len(names) == 0:
return 1
for dent in inodes[inum]['dent']:
if dent.name == names[idx]:
if len(names) == idx+1:
return dent.inum
else:
return find_dir(inodes, dent.inum, names, idx+1)
return None
def print_dent(ubifs, inodes, dent_node, long=True, longts=False):
inode = inodes[dent_node.inum]
if long:
fl = file_leng(ubifs, inode)
lnk = ""
if dent_node.type == UBIFS_ITYPE_LNK:
lnk = " -> " + inode['ino'].data.decode('utf-8')
if longts:
mtime = inode['ino'].mtime_sec
else:
mtime = time.strftime("%b %d %H:%M", time.gmtime(inode['ino'].mtime_sec))
print('%6o %2d %s %s %7d %s %s%s' % (inode['ino'].mode, inode['ino'].nlink, inode['ino'].uid, inode['ino'].gid, fl, mtime, dent_node.name, lnk))
else:
print(dent_node.name)
def file_leng(ubifs, inode):
fl = 0
if 'data' in inode:
compr_type = 0
sorted_data = sorted(inode['data'], key=lambda x: x.key['khash'])
last_khash = sorted_data[0].key['khash']-1
for data in sorted_data:
if data.key['khash'] - last_khash != 1:
while 1 != (data.key['khash'] - last_khash):
last_khash += 1
fl = fl + UBIFS_BLOCK_SIZE
fl = fl + data.size
return fl
return 0
def _process_reg_file(ubifs, inode, path):
try:
buf = bytearray()
if 'data' in inode:
compr_type = 0
sorted_data = sorted(inode['data'], key=lambda x: x.key['khash'])
last_khash = sorted_data[0].key['khash']-1
for data in sorted_data:
# If data nodes are missing in sequence, fill in blanks
# with \x00 * UBIFS_BLOCK_SIZE
if data.key['khash'] - last_khash != 1:
while 1 != (data.key['khash'] - last_khash):
buf += b'\x00'*UBIFS_BLOCK_SIZE
last_khash += 1
compr_type = data.compr_type
ubifs.file.seek(data.offset)
d = ubifs.file.read(data.compr_len)
buf += decompress(compr_type, data.size, d)
last_khash = data.key['khash']
verbose_log(_process_reg_file, 'ino num: %s, compression: %s, path: %s' % (inode['ino'].key['ino_num'], compr_type, path))
except Exception as e:
error(_process_reg_file, 'Warn', 'inode num:%s :%s' % (inode['ino'].key['ino_num'], e))
# Pad end of file with \x00 if needed.
if inode['ino'].size > len(buf):
buf += b'\x00' * (inode['ino'].size - len(buf))
return bytes(buf)

@ -0,0 +1,77 @@
#!/usr/bin/env python
#############################################################
# ubi_reader/ubifs
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
#from lzallright import LZOCompressor, LZOError
import struct
import zlib
from ubireader.ubifs.defines import *
from ubireader.debug import error
# For happy printing
ino_types = ['file', 'dir','lnk','blk','chr','fifo','sock']
node_types = ['ino','data','dent','xent','trun','pad','sb','mst','ref','idx','cs','orph']
key_types = ['ino','data','dent','xent']
def parse_key(key):
"""Parse node key
Arguments:
Str:key -- Hex string literal of node key.
Returns:
Int:key_type -- Type of key, data, ino, dent, etc.
Int:ino_num -- Inode number.
Int:khash -- Key hash.
"""
hkey, lkey = struct.unpack('<II',key[0:UBIFS_SK_LEN])
ino_num = hkey & UBIFS_S_KEY_HASH_MASK
key_type = lkey >> UBIFS_S_KEY_BLOCK_BITS
khash = lkey
#if key_type < UBIFS_KEY_TYPES_CNT:
return {'type':key_type, 'ino_num':ino_num, 'khash': khash}
def decompress(ctype, unc_len, data):
"""Decompress data.
Arguments:
Int:ctype -- Compression type LZO, ZLIB (*currently unused*).
Int:unc_len -- Uncompressed data lenth.
Str:data -- Data to be uncompessed.
Returns:
Uncompressed Data.
"""
if ctype == UBIFS_COMPR_LZO:
from lzallright import LZOCompressor, LZOError
try:
return LZOCompressor.decompress(data, output_size_hint=unc_len)
except Exception as e:
error(decompress, 'Warn', 'LZO Error: %s' % e)
elif ctype == UBIFS_COMPR_ZLIB:
try:
return zlib.decompress(data, -11)
except Exception as e:
error(decompress, 'Warn', 'ZLib Error: %s' % e)
else:
return data

@ -0,0 +1,262 @@
#!/usr/bin/env python
#############################################################
# ubi_reader/ubifs
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
from ubireader.ubifs.misc import parse_key
from ubireader.ubifs.defines import *
from ubireader.ubifs import display
class common_hdr(object):
"""Get common header at given LEB number + offset.
Arguments:
Bin:buf -- Raw data to extract header information from.
See ubifs/defines.py for object attributes.
"""
def __init__(self, buf):
fields = dict(list(zip(UBIFS_COMMON_HDR_FIELDS, struct.unpack(UBIFS_COMMON_HDR_FORMAT, buf))))
for key in fields:
setattr(self, key, fields[key])
setattr(self, 'errors', [])
def __repr__(self):
return 'UBIFS Common Header'
def __iter__(self):
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
def display(self, tab=''):
return display.common_hdr(self, tab)
class ino_node(object):
"""Get inode node at given LEB number + offset.
Arguments:
Bin:buf -- Raw data to extract header information from.
See ubifs/defines.py for object attributes.
"""
def __init__(self, buf):
fields = dict(list(zip(UBIFS_INO_NODE_FIELDS, struct.unpack(UBIFS_INO_NODE_FORMAT, buf[0:UBIFS_INO_NODE_SZ]))))
for key in fields:
if key == 'key':
setattr(self, key, parse_key(fields[key]))
else:
setattr(self, key, fields[key])
setattr(self, 'data', buf[UBIFS_INO_NODE_SZ:])
setattr(self, 'errors', [])
def __repr__(self):
return 'UBIFS Ino Node'
def __iter__(self):
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
def display(self, tab=''):
return display.ino_node(self, tab)
class dent_node(object):
"""Get dir entry node at given LEB number + offset.
Arguments:
Bin:buf -- Raw data to extract header information from.
See ubifs/defines.py for object attributes.
"""
def __init__(self, buf):
fields = dict(list(zip(UBIFS_DENT_NODE_FIELDS, struct.unpack(UBIFS_DENT_NODE_FORMAT, buf[0:UBIFS_DENT_NODE_SZ]))))
for key in fields:
if key == 'key':
setattr(self, key, parse_key(fields[key]))
else:
setattr(self, key, fields[key])
setattr(self, 'name', '%s' % buf[-self.nlen-1:-1].decode('utf-8'))
setattr(self, 'errors', [])
def __repr__(self):
return 'UBIFS Directory Entry Node'
def __iter__(self):
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
def display(self, tab=''):
return display.dent_node(self, tab)
class data_node(object):
"""Get data node at given LEB number + offset.
Arguments:
Bin:buf -- Raw data to extract header information from.
Int:offset -- Offset in LEB of data node.
See ubifs/defines.py for object attributes.
"""
def __init__(self, buf, file_offset):
fields = dict(list(zip(UBIFS_DATA_NODE_FIELDS, struct.unpack(UBIFS_DATA_NODE_FORMAT, buf[0:UBIFS_DATA_NODE_SZ]))))
for key in fields:
if key == 'key':
setattr(self, key, parse_key(fields[key]))
else:
setattr(self, key, fields[key])
setattr(self, 'offset', file_offset)
setattr(self, 'compr_len', (len(buf) - UBIFS_DATA_NODE_SZ))
setattr(self, 'errors', [])
def __repr__(self):
return 'UBIFS Data Node'
def __iter__(self):
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
def display(self, tab=''):
return display.data_node(self, tab)
class idx_node(object):
"""Get index node at given LEB number + offset.
Arguments:
Bin:buf -- Raw data to extract header information from.
See ubifs/defines.py for object attributes.
"""
def __init__(self, buf):
fields = dict(list(zip(UBIFS_IDX_NODE_FIELDS, struct.unpack(UBIFS_IDX_NODE_FORMAT, buf[0:UBIFS_IDX_NODE_SZ]))))
for key in fields:
setattr(self, key, fields[key])
idxs = UBIFS_IDX_NODE_SZ
brs = UBIFS_BRANCH_SZ
setattr(self, 'branches', [branch(buf[idxs+(brs*i):idxs+(brs*i)+brs]) for i in range(0, self.child_cnt)])
setattr(self, 'errors', [])
def __repr__(self):
return 'UBIFS Index Node'
def __iter__(self):
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
def display(self, tab=''):
return display.idx_node(self, tab)
class branch(object):
""" Create branch from given idx_node data buf.
Arguments:
Bin:buf -- Raw data to extract header information from.
"""
def __init__(self, buf):
fields = dict(list(zip(UBIFS_BRANCH_FIELDS, struct.unpack(UBIFS_BRANCH_FORMAT, buf))))
for key in fields:
if key == 'key':
setattr(self, key, parse_key(fields[key]))
else:
setattr(self, key, fields[key])
setattr(self, 'errors', [])
def __repr__(self):
return 'UBIFS Branch'
def __iter__(self):
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
def display(self, tab=''):
return display.branch(self, tab)
class sb_node(object):
"""Get superblock node at given LEB number + offset.
Arguments:
Bin:buf -- Raw data to extract header information from.
Int:offset -- Offset in LEB of data node.
See ubifs/defines.py for object attributes.
"""
def __init__(self, buf, file_offset=-1):
self.file_offset = file_offset
fields = dict(list(zip(UBIFS_SB_NODE_FIELDS, struct.unpack(UBIFS_SB_NODE_FORMAT, buf))))
for key in fields:
setattr(self, key, fields[key])
setattr(self, 'errors', [])
def __repr__(self):
return 'UBIFS Super Block Node'
def __iter__(self):
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
def display(self, tab=''):
return display.sb_node(self, tab)
class mst_node(object):
"""Get master node at given LEB number + offset.
Arguments:
Bin:buf -- Raw data to extract header information from.
Int:offset -- Offset in LEB of data node.
See ubifs/defines.py for object attributes.
"""
def __init__(self, buf, file_offset=-1):
self.file_offset = file_offset
fields = dict(list(zip(UBIFS_MST_NODE_FIELDS, struct.unpack(UBIFS_MST_NODE_FORMAT, buf))))
for key in fields:
setattr(self, key, fields[key])
setattr(self, 'errors', [])
def __repr__(self):
return 'UBIFS Master Block Node'
def __iter__(self):
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
def display(self, tab=''):
return display.mst_node(self, tab)

@ -0,0 +1,206 @@
#!/usr/bin/env python
#############################################################
# ubi_reader/ubifs
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
import os
import struct
from ubireader import settings
from ubireader.ubifs.defines import *
from ubireader.ubifs import walk
from ubireader.ubifs.misc import decompress
from ubireader.debug import error, log, verbose_log
def is_safe_path(basedir, path):
basedir = os.path.realpath(basedir)
path = os.path.realpath(os.path.join(basedir, path))
return True if path.startswith(basedir) else False
def extract_files(ubifs, out_path, perms=False):
"""Extract UBIFS contents to_path/
Arguments:
Obj:ubifs -- UBIFS object.
Str:out_path -- Path to extract contents to.
"""
try:
inodes = {}
bad_blocks = []
walk.index(ubifs, ubifs.master_node.root_lnum, ubifs.master_node.root_offs, inodes, bad_blocks)
if len(inodes) < 2:
raise Exception('No inodes found')
for dent in inodes[1]['dent']:
extract_dents(ubifs, inodes, dent, out_path, perms)
if len(bad_blocks):
error(extract_files, 'Warn', 'Data may be missing or corrupted, bad blocks, LEB [%s]' % ','.join(map(str, bad_blocks)))
except Exception as e:
error(extract_files, 'Error', '%s' % e)
def extract_dents(ubifs, inodes, dent_node, path='', perms=False):
if dent_node.inum not in inodes:
error(extract_dents, 'Error', 'inum: %s not found in inodes' % (dent_node.inum))
return
inode = inodes[dent_node.inum]
if not is_safe_path(path, dent_node.name):
error(extract_dents, 'Warn', 'Path traversal attempt: %s, discarding.' % (dent_node.name))
return
dent_path = os.path.realpath(os.path.join(path, dent_node.name))
if dent_node.type == UBIFS_ITYPE_DIR:
try:
if not os.path.exists(dent_path):
os.mkdir(dent_path)
log(extract_dents, 'Make Dir: %s' % (dent_path))
if perms:
_set_file_perms(dent_path, inode)
except Exception as e:
error(extract_dents, 'Warn', 'DIR Fail: %s' % e)
if 'dent' in inode:
for dnode in inode['dent']:
extract_dents(ubifs, inodes, dnode, dent_path, perms)
_set_file_timestamps(dent_path, inode)
elif dent_node.type == UBIFS_ITYPE_REG:
try:
if inode['ino'].nlink > 1:
if 'hlink' not in inode:
inode['hlink'] = dent_path
buf = _process_reg_file(ubifs, inode, dent_path)
_write_reg_file(dent_path, buf)
else:
os.link(inode['hlink'], dent_path)
log(extract_dents, 'Make Link: %s > %s' % (dent_path, inode['hlink']))
else:
buf = _process_reg_file(ubifs, inode, dent_path)
_write_reg_file(dent_path, buf)
_set_file_timestamps(dent_path, inode)
if perms:
_set_file_perms(dent_path, inode)
except Exception as e:
error(extract_dents, 'Warn', 'FILE Fail: %s' % e)
elif dent_node.type == UBIFS_ITYPE_LNK:
try:
# probably will need to decompress ino data if > UBIFS_MIN_COMPR_LEN
os.symlink('%s' % inode['ino'].data.decode('utf-8'), dent_path)
log(extract_dents, 'Make Symlink: %s > %s' % (dent_path, inode['ino'].data))
except Exception as e:
error(extract_dents, 'Warn', 'SYMLINK Fail: %s' % e)
elif dent_node.type in [UBIFS_ITYPE_BLK, UBIFS_ITYPE_CHR]:
try:
dev = struct.unpack('<II', inode['ino'].data)[0]
if not settings.use_dummy_devices:
os.mknod(dent_path, inode['ino'].mode, dev)
log(extract_dents, 'Make Device Node: %s' % (dent_path))
if perms:
_set_file_perms(dent_path, inode)
else:
log(extract_dents, 'Create dummy device.')
_write_reg_file(dent_path, str(dev))
if perms:
_set_file_perms(dent_path, inode)
except Exception as e:
error(extract_dents, 'Warn', 'DEV Fail: %s' % e)
elif dent_node.type == UBIFS_ITYPE_FIFO:
try:
os.mkfifo(dent_path, inode['ino'].mode)
log(extract_dents, 'Make FIFO: %s' % (path))
if perms:
_set_file_perms(dent_path, inode)
except Exception as e:
error(extract_dents, 'Warn', 'FIFO Fail: %s : %s' % (dent_path, e))
elif dent_node.type == UBIFS_ITYPE_SOCK:
try:
if settings.use_dummy_socket_file:
_write_reg_file(dent_path, '')
if perms:
_set_file_perms(dent_path, inode)
except Exception as e:
error(extract_dents, 'Warn', 'SOCK Fail: %s : %s' % (dent_path, e))
def _set_file_perms(path, inode):
os.chown(path, inode['ino'].uid, inode['ino'].gid)
os.chmod(path, inode['ino'].mode)
verbose_log(_set_file_perms, 'perms:%s, owner: %s.%s, path: %s' % (inode['ino'].mode, inode['ino'].uid, inode['ino'].gid, path))
def _set_file_timestamps(path, inode):
os.utime(path, (inode['ino'].atime_sec, inode['ino'].mtime_sec))
verbose_log(_set_file_timestamps, 'timestamps: access: %s, modify: %s, path: %s' % (inode['ino'].atime_sec, inode['ino'].mtime_sec, path))
def _write_reg_file(path, data):
with open(path, 'wb') as f:
f.write(data)
log(_write_reg_file, 'Make File: %s' % (path))
def _process_reg_file(ubifs, inode, path):
try:
buf = bytearray()
start_key = 0x00 | (UBIFS_DATA_KEY << UBIFS_S_KEY_BLOCK_BITS)
if 'data' in inode:
compr_type = 0
sorted_data = sorted(inode['data'], key=lambda x: x.key['khash'])
last_khash = start_key - 1
for data in sorted_data:
# If data nodes are missing in sequence, fill in blanks
# with \x00 * UBIFS_BLOCK_SIZE
if data.key['khash'] - last_khash != 1:
while 1 != (data.key['khash'] - last_khash):
buf += b'\x00'*UBIFS_BLOCK_SIZE
last_khash += 1
compr_type = data.compr_type
ubifs.file.seek(data.offset)
d = ubifs.file.read(data.compr_len)
buf += decompress(compr_type, data.size, d)
last_khash = data.key['khash']
verbose_log(_process_reg_file, 'ino num: %s, compression: %s, path: %s' % (inode['ino'].key['ino_num'], compr_type, path))
except Exception as e:
error(_process_reg_file, 'Warn', 'inode num:%s path:%s :%s' % (inode['ino'].key['ino_num'], path, e))
# Pad end of file with \x00 if needed.
if inode['ino'].size > len(buf):
buf += b'\x00' * (inode['ino'].size - len(buf))
return bytes(buf)

@ -0,0 +1,160 @@
#!/usr/bin/env python
#############################################################
# ubi_reader/ubifs
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
from ubireader import settings
from ubireader.ubifs import nodes
from ubireader.ubifs.defines import *
from ubireader.debug import error, log, verbose_log, verbose_display
def index(ubifs, lnum, offset, inodes={}, bad_blocks=[]):
"""Walk the index gathering Inode, Dir Entry, and File nodes.
Arguments:
Obj:ubifs -- UBIFS object.
Int:lnum -- Logical erase block number.
Int:offset -- Offset in logical erase block.
Dict:inodes -- Dict of ino/dent/file nodes keyed to inode number.
Returns:
Dict:inodes -- Dict of ino/dent/file nodes keyed to inode number.
'ino' -- Inode node.
'data' -- List of data nodes if present.
'dent' -- List of directory entry nodes if present.
"""
if len(bad_blocks):
if lnum in bad_blocks:
return
ubifs.file.seek((ubifs.leb_size * lnum) + offset)
buf = ubifs.file.read(UBIFS_COMMON_HDR_SZ)
if len(buf) < UBIFS_COMMON_HDR_SZ:
if settings.warn_only_block_read_errors:
error(index, 'Error', 'LEB: %s, Common Hdr Size smaller than expected.' % (lnum))
return
else:
error(index, 'Fatal', 'LEB: %s, Common Hdr Size smaller than expected.' % (lnum))
chdr = nodes.common_hdr(buf)
log(index , '%s file addr: %s' % (chdr, ubifs.file.last_read_addr()))
verbose_display(chdr)
read_size = chdr.len - UBIFS_COMMON_HDR_SZ
node_buf = ubifs.file.read(read_size)
file_offset = ubifs.file.last_read_addr()
if len(node_buf) < read_size:
if settings.warn_only_block_read_errors:
error(index, 'Error', 'LEB: %s at %s, Node size smaller than expected.' % (lnum, file_offset))
return
else:
error(index, 'Fatal', 'LEB: %s at %s, Node size smaller than expected.' % (lnum, file_offset))
if chdr.node_type == UBIFS_IDX_NODE:
try:
idxn = nodes.idx_node(node_buf)
except Exception as e:
if settings.warn_only_block_read_errors:
error(index, 'Error', 'Problem at file address: %s extracting idx_node: %s' % (file_offset, e))
return
else:
error(index, 'Fatal', 'Problem at file address: %s extracting idx_node: %s' % (file_offset, e))
log(index, '%s file addr: %s' % (idxn, file_offset))
verbose_display(idxn)
branch_idx = 0
for branch in idxn.branches:
verbose_log(index, '-------------------')
log(index, '%s file addr: %s' % (branch, file_offset + UBIFS_IDX_NODE_SZ + (branch_idx * UBIFS_BRANCH_SZ)))
verbose_display(branch)
index(ubifs, branch.lnum, branch.offs, inodes, bad_blocks)
branch_idx += 1
elif chdr.node_type == UBIFS_INO_NODE:
try:
inon = nodes.ino_node(node_buf)
except Exception as e:
if settings.warn_only_block_read_errors:
error(index, 'Error', 'Problem at file address: %s extracting ino_node: %s' % (file_offset, e))
return
else:
error(index, 'Fatal', 'Problem at file address: %s extracting ino_node: %s' % (file_offset, e))
ino_num = inon.key['ino_num']
log(index, '%s file addr: %s, ino num: %s' % (inon, file_offset, ino_num))
verbose_display(inon)
if not ino_num in inodes:
inodes[ino_num] = {}
inodes[ino_num]['ino'] = inon
elif chdr.node_type == UBIFS_DATA_NODE:
try:
datn = nodes.data_node(node_buf, (ubifs.leb_size * lnum) + UBIFS_COMMON_HDR_SZ + offset + UBIFS_DATA_NODE_SZ)
except Exception as e:
if settings.warn_only_block_read_errors:
error(index, 'Error', 'Problem at file address: %s extracting data_node: %s' % (file_offset, e))
return
else:
error(index, 'Fatal', 'Problem at file address: %s extracting data_node: %s' % (file_offset, e))
ino_num = datn.key['ino_num']
log(index, '%s file addr: %s, ino num: %s' % (datn, file_offset, ino_num))
verbose_display(datn)
if not ino_num in inodes:
inodes[ino_num] = {}
if not 'data' in inodes[ino_num]:
inodes[ino_num]['data']= []
inodes[ino_num]['data'].append(datn)
elif chdr.node_type == UBIFS_DENT_NODE:
try:
dn = nodes.dent_node(node_buf)
except Exception as e:
if settings.warn_only_block_read_errors:
error(index, 'Error', 'Problem at file address: %s extracting dent_node: %s' % (file_offset, e))
return
else:
error(index, 'Fatal', 'Problem at file address: %s extracting dent_node: %s' % (file_offset, e))
ino_num = dn.key['ino_num']
log(index, '%s file addr: %s, ino num: %s' % (dn, file_offset, ino_num))
verbose_display(dn)
if not ino_num in inodes:
inodes[ino_num] = {}
if not 'dent' in inodes[ino_num]:
inodes[ino_num]['dent']= []
inodes[ino_num]['dent'].append(dn)

@ -0,0 +1,183 @@
#!/usr/bin/env python
#############################################################
# ubi_reader
# (c) 2013 Jason Pruitt (jrspruitt@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#############################################################
import re
from ubireader.debug import error, log
from ubireader.ubi.defines import UBI_EC_HDR_MAGIC, FILE_CHUNK_SZ
from ubireader.ubifs.defines import UBIFS_NODE_MAGIC, UBIFS_SB_NODE_SZ, UBIFS_SB_NODE, UBIFS_COMMON_HDR_SZ
from ubireader.ubifs import nodes
def guess_start_offset(path, guess_offset=0):
file_offset = guess_offset
f = open(path, 'rb')
f.seek(0,2)
file_size = f.tell()+1
f.seek(guess_offset)
for _ in range(0, file_size, FILE_CHUNK_SZ):
buf = f.read(FILE_CHUNK_SZ)
ubi_loc = buf.find(UBI_EC_HDR_MAGIC)
ubifs_loc = buf.find(UBIFS_NODE_MAGIC)
if ubi_loc == -1 and ubifs_loc == -1:
file_offset += FILE_CHUNK_SZ
continue
else:
if ubi_loc == -1:
ubi_loc = file_size + 1
elif ubifs_loc == -1:
ubifs_loc = file_size + 1
if ubi_loc < ubifs_loc:
log(guess_start_offset, 'Found UBI magic number at %s' % (file_offset + ubi_loc))
return file_offset + ubi_loc
elif ubifs_loc < ubi_loc:
log(guess_start_offset, 'Found UBIFS magic number at %s' % (file_offset + ubifs_loc))
return file_offset + ubifs_loc
else:
error(guess_start_offset, 'Fatal', 'Could not determine start offset.')
else:
error(guess_start_offset, 'Fatal', 'Could not determine start offset.')
f.close()
def guess_filetype(path, start_offset=0):
log(guess_filetype, 'Looking for file type at %s' % start_offset)
with open(path, 'rb') as f:
f.seek(start_offset)
buf = f.read(4)
if buf == UBI_EC_HDR_MAGIC:
ftype = UBI_EC_HDR_MAGIC
log(guess_filetype, 'File looks like a UBI image.')
elif buf == UBIFS_NODE_MAGIC:
ftype = UBIFS_NODE_MAGIC
log(guess_filetype, 'File looks like a UBIFS image.')
else:
ftype = None
error(guess_filetype, 'Fatal', 'Could not determine file type.')
return ftype
def guess_leb_size(path):
"""Get LEB size from superblock
Arguments:
Str:path -- Path to file.
Returns:
Int -- LEB size.
Searches file for superblock and retrieves leb size.
"""
f = open(path, 'rb')
f.seek(0,2)
file_size = f.tell()+1
f.seek(0)
block_size = None
for _ in range(0, file_size, FILE_CHUNK_SZ):
buf = f.read(FILE_CHUNK_SZ)
for m in re.finditer(UBIFS_NODE_MAGIC, buf):
start = m.start()
chdr = nodes.common_hdr(buf[start:start+UBIFS_COMMON_HDR_SZ])
if chdr and chdr.node_type == UBIFS_SB_NODE:
sb_start = start + UBIFS_COMMON_HDR_SZ
sb_end = sb_start + UBIFS_SB_NODE_SZ
if chdr.len != len(buf[sb_start:sb_end]):
f.seek(sb_start)
buf = f.read(UBIFS_SB_NODE_SZ)
else:
buf = buf[sb_start:sb_end]
sbn = nodes.sb_node(buf)
block_size = sbn.leb_size
f.close()
return block_size
f.close()
return block_size
def guess_peb_size(path):
"""Determine the most likely block size
Arguments:
Str:path -- Path to file.
Returns:
Int -- PEB size.
Searches file for Magic Number, picks most
common length between them.
"""
file_offset = 0
offsets = []
f = open(path, 'rb')
f.seek(0,2)
file_size = f.tell()+1
f.seek(0)
for _ in range(0, file_size, FILE_CHUNK_SZ):
buf = f.read(FILE_CHUNK_SZ)
for m in re.finditer(UBI_EC_HDR_MAGIC, buf):
start = m.start()
if not file_offset:
file_offset = start
idx = start
else:
idx = start+file_offset
offsets.append(idx)
file_offset += FILE_CHUNK_SZ
f.close()
occurrences = {}
for i in range(0, len(offsets)):
try:
diff = offsets[i] - offsets[i-1]
except:
diff = offsets[i]
if diff not in occurrences:
occurrences[diff] = 0
occurrences[diff] += 1
most_frequent = 0
block_size = None
for offset in occurrences:
if occurrences[offset] > most_frequent:
most_frequent = occurrences[offset]
block_size = offset
return block_size
Loading…
Cancel
Save