@ -66,7 +66,7 @@ class Gateway():
rom_channel = None
mac_address = None
nonce_key = None
stok = None
stok = None # HTTP session token
status = -2
ftp = None
socket = None # TCP socket for SSH
@ -130,17 +130,24 @@ class Gateway():
if not self.device_name:
die("You need to make the initial configuration in the WEB of the device!")
self.status = -1
x = -1
x = r0.text.find('a href="/cgi-bin/luci/web/init/hello')
return self.status
x = r0.text.find('a href="/cgi-bin/luci/web/init/hello')
if (x > 10):
#self.webpassword = 'admin'
die("You need to make the initial configuration in the WEB of the device!")
self.status = 1
return self.status
def web_ping(self, timeout):
res = requests.get("http://{ip_addr}/cgi-bin/luci/web".format(ip_addr = self.ip_addr), timeout = timeout)
mac_address = re.search(r'var deviceId = \'(.*?)\'', res.text)
self.mac_address = mac_address.group(1) if mac_address else None
nonce_key = re.search(r'key: \'(.*)\',', res.text)
self.nonce_key = nonce_key.group(1) if nonce_key else None
except Exception:
return False
return True
def web_login(self):
self.stok = None
if not self.nonce_key or not self.mac_address:
@ -156,9 +163,9 @@ class Gateway():
username = 'admin'
data = "username={username}&password={password}&logtype=2&nonce={nonce}".format(username = username, password = password, nonce = nonce)
requrl = "http://{ip_addr}/cgi-bin/luci/api/xqsystem/login".format(ip_addr = self.ip_addr)
r1 = requests.post(requrl, data = data, headers = get_http_headers())
res = requests.post(requrl, data = data, headers = get_http_headers())
stok = re.findall(r'"token":"(.*?)"',r1.text)[0]
stok = re.findall(r'"token":"(.*?)"', res.text)[0]
except Exception:
self.webpassword = ""
die("WEB password is not correct!")
@ -170,6 +177,58 @@ class Gateway():
def apiurl(self):
return "http://{ip_addr}/cgi-bin/luci/;stok={stok}/api/".format(ip_addr = self.ip_addr, stok = self.stok)
def get_factory_info(self, timeout = 5):
self.facinfo = {}
if not self.stok:
res = requests.get(self.apiurl + 'xqsystem/fac_info', timeout = timeout)
except Exception:
return {}
self.facinfo = json.loads(res.text)
return self.facinfo
def wait_shutdown(self, timeout, verbose = 1):
if verbose:
print('Waiting for shutdown: ', end='', flush=True)
start_time = datetime.datetime.now()
while datetime.datetime.now() - start_time <= datetime.timedelta(seconds = timeout):
if verbose:
print('.', end='', flush=True)
if self.web_ping(1) is False:
if verbose:
print('.', flush=True)
self.stok = None
return True
if verbose:
print('timedout', flush=True)
return False
def wait_reboot(self, timeout, verbose = 1):
if verbose:
print('Waiting for reboot: ', end='', flush=True)
start_time = datetime.datetime.now()
while datetime.datetime.now() - start_time <= datetime.timedelta(seconds = timeout):
if verbose:
print('.', end='', flush=True)
if self.web_ping(1) is True:
if verbose:
print('log', end='', flush=True)
self.web_login() # TODO
if verbose:
print('on', end='', flush=True)
for i in range(5):
if verbose:
print('.', end='', flush=True)
print('', flush=True)
return True
if verbose:
print('timedout', flush=True)
return False
def shutdown(self):
if self.use_ssh:
@ -324,6 +383,16 @@ class Gateway():
return None
def check_telnet(self, timeout = 2, port = 23, verbose = 0):
tn = telnetlib.Telnet(self.ip_addr, port=port, timeout=timeout)
return True
except Exception as e:
if verbose:
die("TELNET not responding (IP: {})".format(self.ip_addr))
return False
def get_telnet(self, verbose = 0):
tn = telnetlib.Telnet(self.ip_addr, timeout=4)