You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

946 lines
33 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import re
import types
import binascii
import random
import tarfile
import io
import requests
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
import gateway
from gateway import die
from envbuffer import EnvBuffer
class RootFS():
num = None # 0 / 1
mtd_num = None # 10 / 11
mtd_dev = None # "/dev/mtd10"
partition = None # "rootfs0" / "rootfs1" / "rootfs_1"
class Bootloader():
type = None # 'uboot' / 'breed' / 'pandora'
img = None
img_size = None
addr = None
spi_rom = False
class BaseInfo():
linux_ver = None
cpu_arch = None
cpu_name = None
spi_rom = False
class Version():
openwrt = None # '12.09.1'
fw = None
channel = None # 'release' / 'stable'
buildtime = None
hardware = None # 'R3G'
uboot1 = None # '4.2.S.1'
uboot2 = None
class DevInfo():
gw = None # Gateway()
verbose = 0
syslog = [] # list of FileObject()
dmesg = None # text
info = BaseInfo()
partlist = [] # list of {addr, size, name}
kcmdline = {} # key=value
nvram = {} # key=value
rootfs = RootFS()
board_name = None
model = None
ver = Version()
bl = Bootloader() # first bootloader
bl_list = [] # list of Bootloaders
env_list = [] # list of EnvBuffer
env = types.SimpleNamespace()
env.fw = EnvBuffer()
env.breed = EnvBuffer()
env.bdata = EnvBuffer()
def __init__(self, gw = None, verbose = 0, infolevel = 1):
self.gw = gateway.Gateway() if gw is None else gw
self.verbose = verbose
os.makedirs('outdir', exist_ok = True)
os.makedirs('tmp', exist_ok = True)
if infolevel > 0:
self.update(infolevel)
def update(self, infolevel):
if infolevel >= 1:
self.get_dmesg()
self.get_part_table()
if not self.partlist or len(self.partlist) <= 1:
die("Partition list is empty!")
self.get_rootfs()
self.get_kernel_cmdline()
if self.rootfs.num is None:
if 'firmware' in self.kcmdline:
self.rootfs.num = int(self.kcmdline['firmware'])
print(f'rootfs.num = {self.rootfs.num}\n')
self.get_baseinfo()
if not self.info.cpu_arch:
die("Can't detect CPU arch! Try to reboot device.")
if infolevel >= 2:
self.get_ver()
if infolevel >= 3:
self.get_nvram()
if infolevel >= 4:
self.get_bootloader()
if infolevel >= 5:
self.get_env_list()
def run_command(self, cmd, fn = None, encoding = "latin_1", binary = False, verbose = 1):
if not fn:
fn = hex(random.getrandbits(64)) + '.txt'
fn = fn[1:]
fn_local = f'outdir/{fn}'
fn_remote = f'/tmp/{fn}'
if os.path.exists(fn_local):
os.remove(fn_local)
try:
self.gw.run_cmd(cmd + " > " + fn_remote)
self.gw.download(fn_remote, fn_local, verbose = verbose)
self.gw.run_cmd("rm -f " + fn_remote)
except Exception:
return None
if not os.path.exists(fn_local):
return None
if os.path.getsize(fn_local) <= 0:
return None
openmode = 'rb' if binary else 'r'
with open(fn_local, openmode, encoding = encoding) as file:
output = file.read()
return output
def get_dmesg(self, verbose = None):
verbose = verbose if verbose is not None else self.verbose
self.dmesg = self.run_command('dmesg', 'dmesg.log')
return self.dmesg
def get_part_table(self, verbose = None):
verbose = verbose if verbose is not None else self.verbose
self.partlist = []
if not self.dmesg:
return self.partlist
x = self.dmesg.find(" MTD partitions on ")
if x <= 0:
return self.get_part_table2(verbose)
parttbl = re.findall(r'0x0000(.*?)-0x0000(.*?) : "(.*?)"', self.dmesg)
if len(parttbl) <= 0:
return self.partlist
if verbose:
print("MTD partitions:")
for i, part in enumerate(parttbl):
addr = int(part[0], 16)
size = int(part[1], 16) - addr
name = part[2]
self.partlist.append({'addr': addr, 'size': size, 'name': name})
if verbose:
print(' %2d > addr: 0x%08X size: 0x%08X name: "%s"' % (i, addr, size, name))
if verbose:
print(" ")
return self.partlist
def get_part_table2(self, verbose = None):
verbose = verbose if verbose is not None else self.verbose
self.partlist = []
mtd_list = self.run_command('cat /proc/mtd', 'mtd_list.txt')
if not mtd_list or len(mtd_list) <= 0:
return []
mtdtbl = re.findall(r'mtd([0-9]+): ([0-9a-fA-F]+) ([0-9a-fA-F]+) "(.*?)"', mtd_list)
if len(mtdtbl) <= 0:
return []
mtdlist = []
if self.verbose:
print("MTD partitions :")
for i, mtd in enumerate(mtdtbl):
mtdid = int(mtd[0])
size = int(mtd[1], 16)
name = mtd[3]
addr = self.run_command(f'cat /sys/class/mtd/mtd{mtdid}/offset', 'offset.txt', verbose = 0)
if addr is None or len(addr) <= 0:
return []
addr = int(addr)
self.partlist.append( {'addr': addr, 'size': size, 'name': name} )
if verbose:
print(' %2d > addr: 0x%08X size: 0x%08X name: "%s"' % (i, addr, size, name))
if verbose:
print(" ")
return self.partlist
def get_part_num(self, name_or_addr, comptype = None):
if not self.partlist:
return -2
for i, part in enumerate(self.partlist):
if isinstance(name_or_addr, int):
if part['addr'] == 0 and part['size'] > 0x00800000:
continue # skip "ALL" part
addr = name_or_addr
if comptype and comptype == '#': # range
if addr >= part['addr'] and addr < part['addr'] + part['size']:
return i
else:
if addr == part['addr']:
return i
else:
if comptype and comptype[0] == 'e': # endswith
if part['name'].lower().endswith(name_or_addr.lower()):
return i
if part['name'].lower() == name_or_addr.lower():
return i
return -1
def get_part_list(self, name_or_addr_list, comptype = None):
if not self.partlist:
return None
lst = []
for i, val in enumerate(name_or_addr_list):
p = self.get_part_num(val, comptype)
if p >= 0:
lst.append(p)
return lst
def get_part(self, name_or_addr, comptype = None):
i = self.get_part_num(name_or_addr, comptype)
if i < 0:
return None
return self.partlist[i]
def get_part_by_addr(self, addr):
return self.get_part(addr, None)
def get_rootfs(self, verbose = None):
verbose = verbose if verbose is not None else self.verbose
self.rootfs = RootFS()
if not self.dmesg:
return self.rootfs
# flag_boot_rootfs=0 mounting /dev/mtd10
x = re.search(r'flag_boot_rootfs=([0-9]) mounting (\S+)', self.dmesg)
if x:
self.rootfs.num = int(x.group(1))
self.rootfs.mtd_dev = x.group(2)
# UBI: attached mtd10 (name "rootfs0", size 32 MiB) to ubi0
x = re.search(r'attached mtd([0-9]+) \(name "(.*?)", size', self.dmesg)
if x and x.group(2).lower().startswith('rootfs'):
self.rootfs.mtd_num = int(x.group(1))
self.rootfs.partition = x.group(2).strip()
# mtd: device 11 (rootfs) set to be root filesystem
x = re.search(r'mtd: device ([0-9]+) \(rootfs\) set to be root filesystem', self.dmesg)
if x:
self.rootfs.mtd_num = int(x.group(1))
if self.rootfs.num is None:
k = re.search(r'Kernel command line:(.*?) ubi\.mtd=(\S+)', self.dmesg) # ([^\s]+)
if k:
self.rootfs.partition = k.group(2)
if self.rootfs.num is None:
k = re.search(r'Kernel command line:(.*?) firmware=([0-9])', self.dmesg)
if k:
self.rootfs.num = int(k.group(2))
if self.rootfs.num is None and self.rootfs.mtd_num is None:
x = re.search(r'Kernel command line:(.*?) root=(\S+)', self.dmesg)
if x and x.group(2).startswith('/dev/mtdblock'):
self.rootfs.mtd_dev = x.group(2)
self.rootfs.mtd_num = int(self.rootfs.mtd_dev.replace('/dev/mtdblock', ''))
if self.rootfs.num is None and self.rootfs.partition:
if self.rootfs.partition.lower().startswith('rootfs'):
self.rootfs.num = 0
if self.rootfs.partition.endswith('1'):
self.rootfs.num = 1
if verbose:
print('RootFS info:')
print(' num = {}'.format(self.rootfs.num))
print(' mtd_num = {}'.format(self.rootfs.mtd_num))
print(' mtd_dev = "{}"'.format(self.rootfs.mtd_dev))
print(' partition = "{}"'.format(self.rootfs.partition))
print(" ")
return self.rootfs
def get_baseinfo(self, verbose = None):
verbose = verbose if verbose is not None else self.verbose
self.info = BaseInfo()
ret = self.info
if verbose:
print('Base info:')
if self.dmesg:
# Linux version 3.10.14 (jenkins@cefa8cf504dc) (gcc version 4.8.5 (crosstool-NG crosstool-ng-1.22.0) )
x = re.search(r'Linux version (.*?) ', self.dmesg)
if x:
ret.linux_ver = x.group(1).strip()
if verbose:
print(' Linux version: {}'.format(ret.linux_ver))
fn_local = 'outdir/openwrt_release.txt'
fn_remote = '/etc/openwrt_release'
if os.path.exists(fn_local):
os.remove(fn_local)
try:
self.gw.download(fn_remote, fn_local, verbose=0)
except Exception:
if verbose:
print(' File "{}" cannot download!'.format(fn_remote))
return ret
if not os.path.exists(fn_local):
return ret
if os.path.getsize(fn_local) <= 1:
return ret
with open(fn_local, "r", encoding="latin_1") as file:
txt = file.read()
x = re.search("DISTRIB_TARGET=['\"](.*?)['\"]", txt)
if not x:
return ret
if verbose:
print(" DISTRIB_TARGET =", x.group(1))
target = x.group(1).strip().lower()
board = target.split(r'/')[0]
subtarget = target.split(r'/')[1]
cpu_arch = None
cpu_name = ''
if board == 'ramips':
cpu_arch = 'mips'
cpu_name = subtarget
if board == 'mediatek':
cpu_arch = 'arm64'
cpu_name = subtarget[:6]
if board.startswith('ar71'): # Atheros
cpu_arch = 'mips'
cpu_name = board[:6]
if board == 'ipq' and subtarget.startswith('ipq'):
cpu_name = subtarget[:7]
elif board.startswith('ipq') and len(board) >= 7:
cpu_name = board[:7]
if cpu_name.startswith('ipq401'):
cpu_arch = 'armv7'
if cpu_name.startswith('ipq806'):
cpu_arch = 'armv7'
if cpu_name.startswith('ipq807'):
cpu_arch = 'arm64'
if cpu_name.startswith('ipq50'):
cpu_arch = 'arm64'
if cpu_name.startswith('ipq60'):
cpu_arch = 'arm64'
ret.cpu_arch = cpu_arch if cpu_arch else None
ret.cpu_name = cpu_name if cpu_name else None
if verbose:
print(' CPU arch: {}'.format(ret.cpu_arch))
print(' CPU name: {}'.format(ret.cpu_name))
if board == 'ramips' and self.dmesg:
# spi-mt7621 1e000b00.spi: sys_freq: 50000000
x = re.search(r'spi-mt(.*?) (.*?).spi: sys_freq: ', self.dmesg)
if x:
ret.spi_rom = True
if verbose:
print(' SPI rom: {}'.format(ret.spi_rom))
if verbose:
print(" ")
return ret
def get_kernel_cmdline(self, verbose = None, retdict = True):
verbose = verbose if verbose is not None else self.verbose
self.kcmdline = {} if retdict else None
fn_local = 'outdir/kcmdline.log'
fn_remote = '/tmp/kcmdline.log'
try:
self.gw.run_cmd("cat /proc/cmdline > " + fn_remote)
self.gw.download(fn_remote, fn_local)
self.gw.run_cmd("rm -f " + fn_remote)
except Exception:
return self.kcmdline
if not os.path.exists(fn_local):
return self.kcmdline
if os.path.getsize(fn_local) <= 1:
return self.kcmdline
with open(fn_local, "r", encoding="latin_1") as file:
data = file.read()
if verbose:
print("Kernel command line:")
print(" ", data)
if not retdict:
return data
data = data.strip()
data = data.replace("\n", ' ')
data = data.replace("\x00", ' ')
data = data.strip()
env = EnvBuffer(data, ' ', crc_prefix = False, encoding = 'latin_1')
self.kcmdline = env.var
#self.kcmdline = type("Names", [object], self.kcmdline)
return self.kcmdline
def get_nvram(self, verbose = None, retdict = True):
verbose = verbose if verbose is not None else self.verbose
self.nvram = {} if retdict else None
fn_local = 'outdir/nvram.txt'
fn_remote = '/tmp/nvram.txt'
try:
self.gw.run_cmd("nvram show > " + fn_remote)
self.gw.download(fn_remote, fn_local)
self.gw.run_cmd("rm -f " + fn_remote)
except Exception:
return self.nvram
if not os.path.exists(fn_local):
return self.nvram
if os.path.getsize(fn_local) <= 1:
return self.nvram
with open(fn_local, "r", encoding="latin_1") as file:
data = file.read()
if not retdict:
return data
if verbose:
print("NVRam params:")
env = EnvBuffer(data, '\n', crc_prefix = False, encoding = 'latin_1')
self.nvram = env.var
if verbose and self.nvram:
for i, (k, v) in enumerate(self.nvram.items()):
if verbose == 1 and not k.startswith('flag_') and k != 'ipaddr' and k != 'serverip':
continue
print(" {key}{value}".format(key=k, value=('=' + v if v is not None else '')))
if verbose:
print(" ")
#self.nvram = type("Names", [object], self.nvram)
return self.nvram
def get_board_name(self, verbose = None):
verbose = verbose if verbose is not None else self.verbose
self.board_name = None
fn_local = 'outdir/board_name.txt'
fn_remote = '/tmp/sysinfo/board_name'
self.gw.download(fn_remote, fn_local)
if os.path.getsize(fn_local) <= 0:
return None
with open(fn_local, "r") as file:
self.board_name = file.read()
self.board_name = self.board_name.strip()
if verbose:
print("Board name: {}".format(self.board_name))
print("")
return self.board_name
def get_model(self, verbose = None):
verbose = verbose if verbose is not None else self.verbose
self.model = None
fn_local = 'outdir/model.txt'
fn_remote = '/tmp/sysinfo/model'
self.gw.download(fn_remote, fn_local)
if os.path.getsize(fn_local) <= 0:
return None
with open(fn_local, "r") as file:
self.model = file.read()
self.model = self.model.strip()
if verbose:
print("Model: {}".format(self.model))
print("")
return self.model
def get_ver(self, verbose = None):
verbose = verbose if verbose is not None else self.verbose
self.ver = Version()
if verbose:
print("Version info:")
fn_local = 'outdir/uboot_version.txt'
fn_remote = '/etc/uboot_version'
try:
self.gw.download(fn_remote, fn_local, verbose = 0)
with open(fn_local, "r") as file:
self.ver.uboot1 = file.read().strip()
except Exception:
pass
if verbose:
print(" UBoot: {}".format(self.ver.uboot1))
fn_local = 'outdir/openwrt_version.txt'
fn_remote = '/etc/openwrt_version'
try:
self.gw.download(fn_remote, fn_local, verbose = 0)
with open(fn_local, "r") as file:
self.ver.openwrt = file.read().strip()
except Exception:
pass
if verbose:
print(" OpenWrt: {}".format(self.ver.openwrt))
fn_local = 'outdir/fw_ver.txt'
fn_remote = '/etc/xiaoqiang_version'
try:
self.gw.download(fn_remote, fn_local, verbose = 0)
except Exception:
fn_remote = None
if not fn_remote:
fn_remote = '/usr/share/xiaoqiang/xiaoqiang_version'
try:
self.gw.download(fn_remote, fn_local, verbose = 0)
except Exception:
fn_remote = None
if fn_remote and os.path.getsize(fn_local) > 0:
with open(fn_local, "r") as file:
s = file.read()
x = re.search(r"option ROM '(.*?)'", s)
self.ver.fw = x.group(1) if x else None
x = re.search(r"option CHANNEL '(.*?)'", s)
self.ver.channel = x.group(1) if x else None
x = re.search(r"option HARDWARE '(.*?)'", s)
self.ver.hardware = x.group(1) if x else None
x = re.search(r"option UBOOT '(.*?)'", s)
self.ver.uboot2 = x.group(1) if x else None
x = re.search(r"option BUILDTIME '(.*?)'", s)
self.ver.buildtime = x.group(1) if x else None
if verbose:
print(" Firmware: {}".format(self.ver.fw))
print(" Channel: {}".format(self.ver.channel))
print(" BuildTime: {}".format(self.ver.buildtime))
print(" Hardware: {}".format(self.ver.hardware))
print(" UBoot(2): {}".format(self.ver.uboot2))
print("")
return self.ver
def get_bootloader(self, verbose = None):
verbose = verbose if verbose is not None else self.verbose
self.bl = Bootloader()
self.bl_list = []
ret = self.bl
if verbose:
print("Bootloader info:")
plst = self.get_part_list(['bootloader', 'uboot', 'SBL1', 'APPSBL', 'SBL2', 'SBL3'], comptype = 'ends')
if not plst:
return ret
for i, p in enumerate(plst):
bl = Bootloader()
bl.addr = self.partlist[p]['addr']
size = self.partlist[p]['size']
name = self.partlist[p]['name']
name = ''.join(e for e in name if e.isalnum())
fn_local = 'outdir/mtd{id}_{name}.bin'.format(id=p, name=name)
fn_remote = '/tmp/bl_{name}.bin'.format(name=name)
bs = 128*1024
cnt = size // bs
try:
self.gw.run_cmd("dd if=/dev/mtd{i} of={o} bs={bs} count={cnt}".format(i=p, o=fn_remote, bs=bs, cnt=cnt))
self.gw.download(fn_remote, fn_local)
self.gw.run_cmd("rm -f " + fn_remote)
except Exception:
continue
if verbose:
print(" addr: 0x%08X (size: 0x%08X)" % (bl.addr, self.partlist[p]['size']))
if not os.path.exists(fn_local):
continue
if os.path.getsize(fn_local) <= 1:
continue
with open(fn_local, "rb") as file:
data = file.read()
bl.img = data
self.bl_list.append(bl)
if data[0:4] == b'\x27\x05\x19\x56':
bl.img_size = 0x40 + int.from_bytes(data[0x0C:0x0C+4], byteorder='big')
else:
if self.info.cpu_arch == 'mips':
bl.spi_rom = True
if bl.img_size is None:
x = data.find(b'\x00' * 0x240)
if x > 0:
bl.img_size = x
x = data.find(b'\xFF' * 0x240)
if x > 0 and x < (bl.img_size if bl.img_size is not None else len(data)):
bl.img_size = x
max_size = bl.img_size if bl.img_size is not None else len(data)
if verbose:
print(" image size: {} bytes".format(bl.img_size))
#if not bl.type:
# x = data.find(b"Breed ")
# if (x > 0 and x < 0x40):
# bl.type = 'breed'
if not bl.type:
x = data.find(b'hackpascal@gmail.com')
if x > 0 and x < max_size:
bl.type = 'breed'
if not bl.type:
x = data.find(b"PandoraBox-Boot")
if x > 0 and x < max_size:
bl.type = 'pandora'
if not bl.type:
x = data.find(b"UBoot Version")
if x > 0 and x < max_size:
bl.type = 'uboot'
if verbose:
print(" type: {}".format(bl.type))
if self.bl_list:
self.bl = self.bl_list[0]
if verbose:
print("")
return self.bl
def get_env_list(self, verbose = None):
verbose = verbose if verbose is not None else self.verbose
self.env.fw = EnvBuffer()
self.env.breed = EnvBuffer()
self.env.bdata = EnvBuffer()
self.env_list = []
ret = self.env.fw
if verbose:
print("ENV info:")
plst = self.get_part_list(['config', 'nvram', 'APPSBLENV', 'bdata'], comptype = 'ends')
if not plst:
return ret
env_breed_addr = 0x60000 # breed env addr for r3g
env_breed_size = 0x20000
pb = self.get_part_num(env_breed_addr, '#')
if pb >= 0:
plst.append(1000 + pb)
for i, p in enumerate(plst):
env = EnvBuffer()
type = ''
if p >= 1000 and p < 2000:
type = 'breed'
p = p - 1000
part = self.partlist[p]
name = part['name']
name = ''.join(e for e in name if e.isalnum())
if type == 'breed':
env.addr = env_breed_addr
data_size = part['size'] - (env.addr - part['addr'])
if data_size < env_breed_size:
continue
else:
env.addr = part['addr']
data_size = part['size']
env.max_size = data_size
fn_local = 'outdir/mtd{id}_{name}.bin'.format(id=p, name=name)
fn_remote = '/tmp/env_{name}.bin'.format(name=name)
if part['size'] < 128*1024:
bs = 1024
cnt = part['size'] // bs
else:
bs = 128*1024
cnt = part['size'] // bs
try:
self.gw.run_cmd("dd if=/dev/mtd{i} of={o} bs={bs} count={cnt}".format(i=p, o=fn_remote, bs=bs, cnt=cnt))
self.gw.download(fn_remote, fn_local)
self.gw.run_cmd("rm -f " + fn_remote)
except Exception:
continue
if verbose:
print(" addr: 0x%08X (size: 0x%08X) " % (env.addr, env.max_size), type)
if not os.path.exists(fn_local):
continue
if os.path.getsize(fn_local) <= 1:
continue
with open(fn_local, "rb") as file:
data = file.read()
if env.addr is None:
continue
prefix = data[0:4]
if prefix == b"\x00\x00\x00\x00" or prefix == b"\xFF\xFF\xFF\xFF":
if type != 'breed':
continue
env.data = data
env.offset = 0
self.env_list.append(env)
if self.env.fw.addr is None:
self.env.fw = env
if self.env.bdata.addr is None and name.lower().endswith('bdata'):
self.env.bdata = env
if self.env.breed.addr is None and type == 'breed':
self.env.breed = env
if type == 'breed':
env.offset = env.addr - part['addr']
data = data[env.offset:]
if data[0:4] != b'ENV\x00':
continue
max_size = env_breed_size
end = data.find(b"\x00\x00", 4)
if end > max_size:
continue
else:
max_size = data.find(b"\xFF\xFF\xFF\xFF", 4)
if max_size <= 0:
max_size = env.max_size
env.max_size = max_size
if type != 'breed':
env_crc32 = int.from_bytes(prefix, byteorder='little')
for i in range(1, 256):
size = 1024 * i
if size < len(data):
buf = data[4:size]
crc = binascii.crc32(buf)
else:
buf = data[4:] + (b'\x00' * (size - len(data) - 4))
crc = binascii.crc32(buf)
if crc == env_crc32:
if verbose:
print(" CRC32: 0x%08X" % crc)
if size <= data_size:
env.max_size = size
break
if verbose:
print(" max size: 0x%X" % env.max_size)
#if verbose:
# env.buf = EnvBuffer(data, '\x00')
# buf, crc = env.buf.pack(env.max_size)
# print(" XXX CRC: 0x%X (len = %X)" % (crc, len(buf)))
end = data.find(b"\x00\x00", 4)
if (end <= 4):
continue
data = data[4:end+1]
env.delim = '\x00'
env.crc_prefix = False
env.encoding = 'ascii'
env.var = env.parse_env_b(data, env.delim, encoding = env.encoding)
env.crc_prefix = True
if verbose >= 2 and env.var:
for i, (k, v) in enumerate(env.var.items()):
if (v is not None):
v = '=' + v
print(" " + k + v)
if self.env_list:
self.env.fw = self.env_list[0]
if verbose:
print("")
return self.env.fw
class SysLog():
gw = None # Gateway()
verbose = 1
timeout = 10
files = []
skiplogs = True
mtdlist = []
bdata = None # EnvBuffer()
def __init__(self, gw, timeout = 17, verbose = 1, infolevel = 1):
self.gw = gateway.Gateway(detect_ssh = False) if gw is None else gw
self.verbose = verbose
self.timeout = timeout
os.makedirs('outdir', exist_ok = True)
os.makedirs('tmp', exist_ok = True)
if infolevel > 0:
self.update(infolevel)
def update(self, infolevel):
if infolevel >= 1:
self.download_syslog()
if infolevel >= 2:
self.parse_baseinfo(fatal_error = True)
self.parse_mtdlist()
if infolevel >= 3:
self.parse_bdata(fatal_error = True)
def download_syslog(self, timeout = None):
timeout = timeout if timeout is not None else self.timeout
self.files = []
if not self.gw:
gw = gateway.Gateway(detect_ssh = False)
gw.web_login()
else:
gw = self.gw
if gw.status < 1:
gw.detect_device()
if not gw.stok:
gw.web_login()
if gw.status < 1:
die("Xiaomi Mi Wi-Fi device not found (IP: {})".format(gw.ip_addr))
if self.verbose > 0:
print("Start generating syslog...")
r2 = requests.get(gw.apiurl + "misystem/sys_log", timeout = timeout)
if r2.text.find('"code":0') < 0:
die("SysLog not generated!")
try:
path = re.search(r'"path":"(.*?)"', r2.text)
path = path.group(1).strip()
except Exception:
die("SysLog not generated! (2)")
url = "http://" + path
if self.verbose > 0:
print('Downloading SysLog from file "{}" ...'.format(url))
zip = b''
with requests.get(url, stream=True, timeout = timeout) as r3:
r3.raise_for_status()
for chunk in r3.iter_content(chunk_size=8192):
zip += chunk
fn_local = 'outdir/syslog.tar.gz'
with open(fn_local, "wb") as file:
file.write(zip)
#if os.path.exists("syslog_test.tar.gz"): # TEST
# fn_local = "syslog_test.tar.gz"
tar = tarfile.open(fn_local, mode='r:gz')
for member in tar.getmembers():
if not member.isfile() or not member.name:
continue
if self.skiplogs and member.name.find('usr/log/') >= 0: # skip raw syslog files
continue
item = types.SimpleNamespace()
item.name = member.name
item.size = member.size
item.data = tar.extractfile(member).read()
self.files.append(item)
if self.verbose >= 3:
print('name = "{}", size = {} ({})'.format(item.name, item.size, len(item.data)))
if len(item.data) < 200:
print(item.data)
tar.close()
return self.files
def get_file_by_name(self, filename, fatal_error = False):
if self.files:
for i, item in enumerate(self.files):
if os.path.basename(item.name) == filename:
return item
if fatal_error:
die('File "{}" not found in syslog!'.format(filename))
return None
def parse_baseinfo(self, fatal_error = False):
self.device_sn = ""
file = self.get_file_by_name('xiaoqiang.log', fatal_error)
txt = file.data.decode('latin_1')
sn = re.search('====SN\n(.*?)\n====', txt)
if not sn:
if fatal_error:
die('Device SN not found into syslog!')
return ""
sn = sn.group(1).strip()
if self.verbose >= 1:
print('Device SN: {}'.format(sn))
self.device_sn = sn
return sn
def parse_mtdlist(self):
self.mtdlist = []
file = self.get_file_by_name('xiaoqiang.log', fatal_error = True)
txt = file.data.decode('latin_1')
x = txt.find("\nMTD table:\n")
if x <= 0:
die('MTD table not found into syslog!')
mtdtbl = re.findall(r'mtd([0-9]+): ([0-9a-fA-F]+) ([0-9a-fA-F]+) "(.*?)"', txt)
if len(mtdtbl) <= 0:
return []
mtdlist = []
if self.verbose:
print("SysLog MTD table:")
for i, mtd in enumerate(mtdtbl):
item = types.SimpleNamespace()
item.id = int(mtd[0])
item.size = int(mtd[1], 16)
item.name = mtd[3]
mtdlist.append(item)
if self.verbose:
print(' %2d > size: 0x%08X name: "%s"' % (item.id, item.size, item.name))
self.mtdlist = mtdlist
return mtdlist
def get_mtd_by_name(self, name):
if self.mtdlist:
name = name.lower()
for i, mtd in enumerate(self.mtdlist):
if mtd.name.lower().endswith(name):
return mtd
return None
def parse_bdata(self, fatal_error = False):
self.bdata = None
file = self.get_file_by_name('bdata.txt', fatal_error)
if not file:
return None
env = EnvBuffer(file.data.decode('ascii'), '\n')
if self.verbose >= 2:
print('SysLog BData List:')
for i, (k, v) in enumerate(env.var.items()):
v = '' if (v is None) else ('=' + v)
print(" " + k + v)
self.bdata = env
return env
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == 'syslog':
gw = gateway.Gateway(timeout = 4, detect_ssh = False)
if gw.status < 1:
die("Xiaomi Mi Wi-Fi device not found (IP: {})".format(gw.ip_addr))
slog = SysLog(gw, timeout = 22, verbose = 1, infolevel = 2)
sys.exit(0)
fn_dir = ''
fn_old = 'full_info_old.txt'
fn_local = 'full_info.txt'
fn_remote = '/outdir/full_info.txt'
if os.path.exists(fn_local):
if os.path.exists(fn_old):
os.remove(fn_old)
os.rename(fn_local, fn_old)
info = DevInfo(verbose = 1, infolevel = 99)
#if not info.partlist:
# die("В ядерном логе не обнаружена информация о разметке NAND")
file = open(fn_local, "w")
file.write("_MTD_partitions_:\n")
for i, part in enumerate(info.partlist):
file.write(" %2d > addr: %08X size: %08X name: \"%s\" \n" % (i, part['addr'], part['size'], part['name']))
file.write("\n")
file.write("_Base_info_:\n")
file.write(' Linux version: {}\n'.format(info.info.linux_ver))
file.write(' CPU arch: {}\n'.format(info.info.cpu_arch))
file.write(' CPU name: {}\n'.format(info.info.cpu_name))
file.write(' SPI rom: {}\n'.format(info.info.spi_rom))
file.write("\n")
file.write("_Kernel_command_line_:\n")
if (info.kcmdline):
for i, (k, v) in enumerate(info.kcmdline.items()):
v = '' if (v is None) else ('=' + v)
file.write(" " + k + v + '\n')
file.write("\n")
file.write("_NVRam_params_:\n")
if (info.nvram):
for i, (k, v) in enumerate(info.nvram.items()):
v = '' if (v is None) else ('=' + v)
file.write(" " + k + v + '\n')
file.write("\n")
file.write("_RootFS_current_:\n")
file.write(' num = {}\n'.format(info.rootfs.num))
file.write(' mtd_num = {}\n'.format(info.rootfs.mtd_num))
file.write(' mtd_dev = "{}"\n'.format(info.rootfs.mtd_dev))
file.write(' partition = "{}"\n'.format(info.rootfs.partition))
file.write("\n")
#file.write('Board name: "{}" \n\n'.format(info.board_name))
#file.write('Model: "{}" \n\n'.format(info.model))
file.write("_Version_info_:\n")
file.write(" UBoot: {} \n".format(info.ver.uboot1))
file.write(" OpenWrt: {} \n".format(info.ver.openwrt))
file.write(" Firmware: {} \n".format(info.ver.fw))
file.write(" Channel: {} \n".format(info.ver.channel))
file.write(" BuildTime: {} \n".format(info.ver.buildtime))
file.write(" Hardware: {} \n".format(info.ver.hardware))
file.write(" UBoot(2): {} \n".format(info.ver.uboot2))
file.write("\n")
file.write("_Bootloader_info_:\n")
for i, bl in enumerate(info.bl_list):
p = info.get_part_num(bl.addr, '#')
name = info.partlist[p]['name'] if p >= 0 else "<unknown_name>"
file.write(" {}:\n".format(name))
file.write(" addr: 0x%08X \n" % (bl.addr if bl.addr else 0))
file.write(" size: 0x%08X \n" % (len(bl.img) if bl.img else 0))
file.write(" image size: {} bytes \n".format(bl.img_size))
file.write(" type: {} \n".format(bl.type))
file.write("\n")
file.write("_ENV_info_:\n")
for i, env in enumerate(info.env_list):
p = info.get_part_num(env.addr, '#')
name = info.partlist[p]['name'] if p >= 0 else "<unknown_name>"
file.write(" {}:\n".format(name))
file.write(" addr: 0x%08X \n" % (env.addr if env.addr else 0))
file.write(" size: 0x%08X \n" % (env.max_size if env.max_size else 0))
file.write(" len: %d bytes \n" % env.len)
file.write(" prefix: {} \n".format(env.data[env.offset:env.offset+4] if env.data else None))
if env.var:
for i, (k, v) in enumerate(env.var.items()):
v = '' if (v is None) else ('=' + v)
file.write(" " + k + v + '\n')
file.write("\n")
file.close()
print("Full device information saved to file {}".format(fn_local))