[install_fw] Check images after flashing and before flashing

pull/56/head
remittor 4 months ago
parent 3c4f371785
commit c2c080c349

@ -80,8 +80,7 @@ def uboot_boot_change(gw, fw_num):
cmd.append("nvram set flag_try_sys2_failed=0")
cmd.append("nvram set flag_boot_rootfs={}".format(fw_num))
cmd.append("nvram commit")
gw.run_cmd(cmd)
return True
return gw.run_cmd(';'.join(cmd))
if __name__ == "__main__":

@ -79,6 +79,7 @@ class Gateway():
self.login = 'root' # default username
def __init__(self, timeout = 4, verbose = 2, detect_device = True, detect_ssh = True, load_cfg = True):
random.seed()
self.__init_fields()
self.verbose = verbose
self.timeout = timeout
@ -798,6 +799,7 @@ class Gateway():
tn.read_until(tn.prompt, timeout = 4 if timeout is None else timeout)
if not self.use_ssh:
tn.write(b"exit\n")
ret = True
return ret
def download(self, fn_remote, fn_local, verbose = 1):
@ -826,11 +828,12 @@ class Gateway():
raise RuntimeError('FIXME')
return True
def upload(self, fn_local, fn_remote, verbose = 1):
try:
def upload(self, fn_local, fn_remote, md5chk = True, verbose = 1):
if not os.path.exists(fn_local):
die(f'File "{fn_local}" not found.')
if md5chk:
md5_local = self.get_md5_for_local_file(fn_local)
file = open(fn_local, 'rb')
except Exception:
die('File "{}" not found.'.format(fn_local))
if verbose and self.verbose:
print('Upload file: "{}" ....'.format(fn_local))
if self.use_ssh:
@ -848,8 +851,70 @@ class Gateway():
else:
raise RuntimeError('FIXME')
file.close()
if md5chk:
md5_remote = self.get_md5_for_remote_file(fn_remote)
if md5_remote != md5_local:
if md5chk == 2:
die(f'File "{fn_local}" uploaded, but MD5 incorrect!')
#if verbose:
print(f'ERROR: File "{fn_local}" uploaded, but MD5 incorrect!')
return False
return True
def get_md5_for_remote_file(self, fn_remote):
fname = os.path.basename(fn_remote)
num = str(random.randint(10000, 1000000))
md5_local_fn = f"tmp/{fname}.{num}.md5"
md5_remote_fn = f"/tmp/{fname}.{num}.md5"
cmd = f'md5sum "{fn_remote}" &> "{md5_remote_fn}" '
rc = self.run_cmd(cmd, timeout = 4)
if not rc:
return -5
os.remove(md5_local_fn) if os.path.exists(md5_local_fn) else None
self.download(md5_remote_fn, md5_local_fn)
if not os.path.exists(md5_local_fn):
return -4
with open(md5_local_fn, 'r', encoding = 'latin1') as file:
md5 = file.read()
os.remove(md5_local_fn)
if not md5:
return -3
if md5.startswith('md5sum:'):
return -2
md5 = md5.split(' ')[0]
md5 = md5.strip()
if len(md5) != 32:
return -1
return md5.lower()
def get_md5_for_local_file(self, fn_local, size = None):
hasher = hashlib.md5()
bs = 512*1024
if size is None:
with open(fn_local, 'rb') as file:
for chunk in iter(lambda: file.read(bs), b''):
hasher.update(chunk)
elif size > 0:
tail_size = 0
nsize = size
filesize = os.path.getsize(fn_local)
if size > filesize:
tail_size = size - filesize
nsize = filesize
readed = 0
with open(fn_local, 'rb') as file:
while True:
if readed + bs > nsize:
bs = nsize - readed
chunk = file.read(bs)
hasher.update(chunk)
readed += bs
if readed >= nsize:
break
if tail_size:
hasher.update(b'\0' * tail_size)
return hasher.hexdigest()
#===============================================================================
if __name__ == "__main__":

@ -1062,11 +1062,11 @@ class XqFlash():
gw.set_timeout(12)
if fw_img.cmd:
gw.upload(fw_img.fn_local, fw_img.fn_remote)
gw.upload(fw_img.fn_local, fw_img.fn_remote, md5chk = 2)
if kernel.cmd:
gw.upload(kernel.fn_local, kernel.fn_remote)
gw.upload(kernel.fn_local, kernel.fn_remote, md5chk = 2)
if rootfs.cmd:
gw.upload(rootfs.fn_local, rootfs.fn_remote)
gw.upload(rootfs.fn_local, rootfs.fn_remote, md5chk = 2)
if self.img_write:
cmd = [ ]
@ -1076,52 +1076,66 @@ class XqFlash():
cmd.append("nvram set ssh_en=1")
cmd.append("nvram set uart_en=1")
cmd.append("nvram commit")
gw.run_cmd(cmd, timeout = 8)
if self.install_fw_num is not None:
print("Run scripts for change NVRAM params...")
if self.img_write:
activate_boot.uboot_boot_change(gw, self.install_fw_num)
if hasattr(kernel, 'partname') and kernel.partname:
print(f'Boot from partition "{kernel.partname}" activated. ({self.install_fw_num})')
else:
print(f'Boot from firmware [{self.install_fw_num}] activated.')
rc = gw.run_cmd(';'.join(cmd), timeout = 8)
if not rc:
die(f'Cannot change nvram parameters!')
if fw_img.cmd:
print('Writing firmware image to addr {} ...'.format("0x%08X" % fw_img.addr))
print(" " + fw_img.cmd)
if self.img_write:
gw.run_cmd(fw_img.cmd, timeout = 60)
self.flash_data_to_mtd('firmware', fw_img, timeout = 60)
if kernel.cmd:
print('Writing kernel image to addr {} ...'.format("0x%08X" % kernel.addr))
print(" " + kernel.cmd)
if self.img_write:
gw.run_cmd(kernel.cmd, timeout = 34)
self.flash_data_to_mtd('kernel', kernel, timeout = 34)
if rootfs.cmd:
print('Writing rootfs image to addr {} ...'.format("0x%08X" % rootfs.addr))
print(" " + rootfs.cmd)
if self.img_write:
gw.run_cmd(rootfs.cmd, timeout = 60)
self.flash_data_to_mtd('rootfs', rootfs, timeout = 60)
if not self.img_write:
die('===== Flash TEST is over =====')
if self.install_fw_num is not None:
print("Run scripts for change NVRAM params...")
activate_boot.uboot_boot_change(gw, self.install_fw_num)
if hasattr(kernel, 'partname') and kernel.partname:
print(f'Boot from partition "{kernel.partname}" activated. [{self.install_fw_num}]')
else:
print(f'Boot from firmware [{self.install_fw_num}] activated.')
nvram = self.dev.get_nvram()
if 'flag_boot_rootfs' not in nvram:
die(f'Parameter "flag_boot_rootfs" not founeded into nvram')
flag_boot_rootfs = int(nvram['flag_boot_rootfs'])
if flag_boot_rootfs != self.install_fw_num:
die(f'Parameter flag_boot_rootfs = {flag_boot_rootfs} , but expected [{self.install_fw_num}]')
print("The firmware has been successfully flashed!")
if self.install_method == 100:
gw.run_cmd("sync ; umount -a", timeout = 5)
print("Please, reboot router!")
else:
import ssh2
print('Send command "reboot" via SSH/Telnet ...')
try:
gw.run_cmd("reboot -f")
except ssh2.exceptions.SocketRecvError as e:
pass
gw.run_cmd("reboot -f", die_on_error = False)
print("Forced REBOOT activated!")
def flash_data_to_mtd(self, img_name, img: Image, timeout, check = True):
print(f'Writing {img_name} image to addr 0x{img.addr:08X} ...')
print(f" {img.cmd}")
partname = img.partname
size = os.path.getsize(img.fn_local)
size = (size // 4096) * 4096
md5_orig = self.gw.get_md5_for_local_file(img.fn_local, size)
if not self.img_write:
return True
rc = self.gw.run_cmd(img.cmd, timeout = timeout, die_on_error = True)
if not rc:
print(f' ERROR: cannot flash data to partition "{partname}"')
return False
if check:
md5 = self.dev.get_md5_for_mtd_data(partname, offset = 0, size = size)
if md5 != md5_orig:
die(f'Flashed data corrupted! Partition "{partname}" md5: {md5}')
return False
return True
# =====================================================================
xf = XqFlash()

@ -668,6 +668,48 @@ class DevInfo():
print("")
return self.ver
def get_md5_for_mtd_data(self, partname, offset = 0, size = None):
if not self.partlist:
return -10
mtd_num = self.get_part_num(partname)
if mtd_num < 0:
return -9
mtd_part = self.partlist[mtd_num]
bs = 4096
if not size:
size = mtd_part['size']
if size > mtd_part['size']:
return -8
if size % bs != 0:
return -7
if offset % bs != 0:
return -6
skip = f'skip={offset // bs}' if offset else ''
num = str(random.randint(10000, 1000000))
md5_local_fn = f"tmp/mtd{mtd_num}_{offset}_{size}_{num}.md5"
md5_remote_fn = f"/tmp/mtd{mtd_num}_{offset}_{size}_{num}.md5"
count = size // bs
cmd = f'dd if=/dev/mtd{mtd_num} bs={bs} count={count} {skip} | md5sum > "{md5_remote_fn}" '
try:
self.gw.run_cmd(cmd)
self.gw.download(md5_remote_fn, md5_local_fn)
except Exception:
return -5
if not os.path.exists(md5_local_fn):
return -4
with open(md5_local_fn, 'r', encoding = 'latin1') as file:
md5 = file.read()
os.remove(md5_local_fn)
if not md5:
return -3
if md5.startswith('md5sum:'):
return -2
md5 = md5.split(' ')[0]
md5 = md5.strip()
if len(md5) != 32:
return -1
return md5.lower()
def get_bootloader(self, verbose = None):
verbose = verbose if verbose is not None else self.verbose
self.bl = Bootloader()

Loading…
Cancel
Save