import os, os.path, glob import subprocess import commands import time import sys import re import signal import unittest import types from subprocess import Popen from os.path import basename DEV_NULL = file("/dev/null","w") DAEMONS_TO_BE_STOPPED = ['xsession/applifed','xsession/conndlgs'] LAUNCHER_BINARY='/usr/bin/applauncherd' DEV_NULL = file("/dev/null","w") LAUNCHABLE_APPS = ['/usr/bin/fala_ft_hello','/usr/bin/fala_ft_hello1', '/usr/bin/fala_ft_hello2'] LAUNCHABLE_APPS_QML = ['/usr/bin/fala_qml_helloworld','/usr/bin/fala_qml_helloworld1', '/usr/bin/fala_qml_helloworld2'] PREFERED_APP = '/usr/bin/fala_ft_hello' PREFERED_APP_QML = '/usr/bin/fala_qml_helloworld' GET_COORDINATE_SCRIPT = '/usr/share/applauncherd-testscripts/get-coordinates.rb' PIXELCHANHED_BINARY = '/usr/bin/fala_pixelchanged' class CustomTestCase(unittest.TestCase) : def waitForAsert(self, periodicCheck, msg="", timeout=20, sleep=1) : assert(type(periodicCheck) == types.FunctionType or type(periodicCheck) == types.LambdaType) start = time.time() stop = start + timeout debug("Waiting for '%s' to return True value in time of %.1fs." %(periodicCheck.__name__, timeout)) while stop > time.time() : if periodicCheck() : debug("'%s' has returned True after %.1fs." %(periodicCheck.__name__, time.time()-start)) return time.sleep(sleep) debug("waitForAsert has timed out after %ss." %timeout) self.assert_(periodicCheck(), msg) return def waitForAsertEqual(self, periodicCheck, expectedValue, msg="", timeout=20, sleep=1) : assert(type(periodicCheck) == types.FunctionType or type(periodicCheck) == types.LambdaType) start = time.time() stop = start + timeout debug("Waiting for '%s' to return expected value: '%s' in time of %.1fs." %(periodicCheck.__name__, expectedValue, timeout)) while stop > time.time() : if periodicCheck() == expectedValue : debug("'%s' has returned expected value: '%s' after %.1fs." %(periodicCheck.__name__, expectedValue, time.time()-start)) return time.sleep(sleep) debug("waitForAsertEqual has timed out after %ss." %timeout) value = periodicCheck() self.assertEqual(value, expectedValue, "%s\n" "Values are different, have value: %s\n" " expected value is: %s\n" %(msg, value, expectedValue)) return def waitForAssertLogFileContains(self, fileName, expContent, msg="", findCount = 1, timeout=20, sleep=1) : start = time.time() stop = start + timeout file = None # wait until file exists: while stop > time.time() : try : file = open(fileName, "r") break except IOError: time.sleep(sleep) self.assert_(file, "Failed to open log file: '%s' in time of %.1f" %(fileName, timeout)) try : if sleep < time.time()-start : debug("File '%s' was opened after a period of: %.1fs." %(fileName, time.time() - start)) debug("File '%s' is opened. Waiting for '%s' to appear in file in time of %.1fs." %(fileName, expContent, timeout)) while stop > time.time() : line = file.readline() while line : if expContent in line : findCount = findCount - 1 if findCount<=0 : line = line[:-1] #remove tailing end line character debug("String: '%s' has been found inside file: %s after %.1fs.\nDetected line contains: '%s'" %(expContent, fileName, time.time()-start, line)) return line line = file.readline() time.sleep(sleep) finally : file.close() debug("waitForAsert has timed out after %ss." %timeout) self.assert_(False, "Content '%s' was not found in log file '%s'. %s" %(expContent, fileName, msg)) return # Function to stop desired daemons. This is also done in setup function # if stop_daemons is not called before. def stop_daemons(): for daemon in DAEMONS_TO_BE_STOPPED: os.system('initctl stop %s'%(daemon)) wait_for_single_applauncherd() get_booster_pid() # Function to start desired daemons. This is also done in teardown function # if start_daemons is not called before. def start_daemons(): for daemon in DAEMONS_TO_BE_STOPPED: os.system('initctl start %s'%(daemon)) wait_for_single_applauncherd() get_booster_pid() def daemons_running(): st, op = commands.getstatusoutput('pgrep %s'%DAEMONS_TO_BE_STOPPED[0].split("/")[1]) return not(st) def debug(*msg): """ Debug function """ sys.stderr.write('[DEBUG %s] %s\n' % (time.ctime(), \ ' '.join([str(s) for s in msg]),)) def error(*msg): """ exit when error, give proper log """ sys.stderr.write('ERROR %s\n' % (' '.join([str(s) for s in msg]),)) sys.exit(1) def remove_applauncherd_runtime_files(): """ Removes files that applauncherd leaves behind after it has been stopped """ debug("Removing files that applauncherd leaves behind after it has been stopped") files = glob.glob('/tmp/boost*') for f in files: debug("removing %s" % f) try: os.remove(f) except: pass def start_applauncherd(): debug("Starting applauncherd") handle = Popen(['initctl', 'start', 'xsession/applauncherd'], stdout = DEV_NULL, stderr = DEV_NULL, shell = False, preexec_fn=permit_sigpipe) get_booster_pid() return handle.wait() == 0 def stop_applauncherd(): debug("Stoping applauncherd") handle = Popen(['initctl', 'stop', 'xsession/applauncherd'], stdout = DEV_NULL, stderr = DEV_NULL, shell = False, preexec_fn=permit_sigpipe) time.sleep(1) remove_applauncherd_runtime_files() return handle.wait() def restart_applauncherd(): debug("Restart applauncherd") stop_applauncherd() start_applauncherd() def run_app_as_user_with_invoker(appname, booster = 'm', arg = "", out = DEV_NULL, err = DEV_NULL): """ Runs the specified app as a user. """ inv_cmd = "/usr/bin/invoker --type=%s %s %s" %(booster,arg, appname) debug("run %s as user" %inv_cmd) cmd = ['su', '-', 'user', '-c'] if type(appname) == list: cmd += inv_cmd elif type(appname) == str: cmd.append(inv_cmd) else: raise TypeError("List or string expected") p = subprocess.Popen(cmd, shell = False, stdout = out, stderr = err, preexec_fn=permit_sigpipe) return p def run_cmd_as_user(cmnd, out = DEV_NULL, err = DEV_NULL): """ Runs the specified command as a user. """ debug("run %s as user" %cmnd) cmd = ['su', '-', 'user', '-c'] if type(cmnd) == list: cmd += cmnd elif type(cmnd) == str: cmd.append(cmnd) else: raise TypeError("List or string expected") p = subprocess.Popen(cmd, shell = False, stdout = out, stderr = err, preexec_fn=permit_sigpipe) return p def get_pid(appname, printdebug=True): temp = basename(appname)[:14] st, op = commands.getstatusoutput("pgrep %s" % temp) if(printdebug): debug("The Pid of %s is %s" %(appname, op)) if st == 0: return op else: return None def get_oldest_pid(appname): temp = basename(appname)[:14] st, op = commands.getstatusoutput("pgrep -o %s" % temp) debug("The Pid of %s is %s" %(appname, op)) if st == 0: return op else: return None def get_newest_pid(app): p = subprocess.Popen(['pgrep', '-n', app], shell = False, stdout = subprocess.PIPE, stderr = DEV_NULL, preexec_fn=permit_sigpipe) op = p.communicate()[0] debug("The New Pid of %s is %s" %(app, op.strip())) if p.wait() == 0: return op.strip() return None def wait_for_app(app = None, timeout = 40, sleep = 1): """ Waits for an application to start. Checks periodically if the app is running for a maximum wait set in timeout. Returns the pid of the application if it was running before the timeout finished, otherwise None is returned. """ pid = None start = time.time() debug("Waiting for '%s' to startup in %.1fs time" %(app, timeout)) while pid == None and time.time() < start + timeout: p = subprocess.Popen(['pgrep', '-n', app], shell = False, stdout = subprocess.PIPE, stderr = DEV_NULL, preexec_fn=permit_sigpipe) op = p.communicate()[0] if p.wait() == 0: pid = op.strip() break time.sleep(sleep) if (pid==None): debug("Failed to fetch PID for '%s' in %ss time" %(app, timeout)) else: debug("Application '%s' has started with PID: %s in time of %.1fs." %(app, pid, time.time()-start)) return pid def wait_for_single_applauncherd(timeout = 20, sleep = 1): pid = get_pid('applauncherd', False) count = len(pid.split("\n")) start = time.time() while count > 1 and time.time() < start + timeout: time.sleep(sleep) pid = get_pid('applauncherd', False) count = len(pid.split("\n")) if count == 1: break debug("got single applauncherd pid %s in %.1fs" % (pid, time.time()-start)) return pid def get_booster_pid(timeout = 40): boosters = ('e', 'd', 'q', 'm') pids = [None, None, None, None] start = time.time() debug("Waiting for all booster to appear in time of %.1fs" %(timeout)) while time.time()>>>>\n%s\n<<<<<" % op) creds = op.split("\n")[1:] return creds def launch_and_get_creds(path): """ Tries to launch an application and if successful, returns the credentials the application has as a list. """ # try launch the specified application handle = run_app_as_user_with_invoker(path,arg = '--no-wait') # sleep for a moment to allow applauncherd to start the process time.sleep(3) # with luck, the process should have correct name by now pid = get_pid(path) debug("%s has PID %s" % (basename(path), pid,)) if pid == None: print "couldn't launch %s" % basename(path) return None creds = get_creds(pid = pid) kill_process(path) return creds # returns the fd list of a process as a dict def get_fd_dict(pid): fd_dict = {} fd_info = commands.getoutput('ls -l /proc/%s/fd/' % pid).splitlines() for fd in fd_info: if "->" in fd: fd_dict[fd.split(" -> ")[0].split(' ')[-1]] = fd.split(" -> ")[-1] return fd_dict def get_groups_for_user(): # get supplementary groups user belongs to (doesn't return # the gid group) p = run_cmd_as_user('id -Gn', out = subprocess.PIPE) groups = p.communicate()[0].split() debug("The groups for users is :%s" %groups) p.wait() return groups #checks if there is a change in booster pids until 5 seconds # the param must be a string - output of get_pid('booster') def wait_for_new_boosters(old_booster_pids): new_booster_pids = old_booster_pids for count in range(4): new_booster_pids = get_pid('booster') if(old_booster_pids != new_booster_pids): break time.sleep(1) return new_booster_pids def send_sighup_to_applauncherd(): wait_for_single_applauncherd() (e1, d1, q1, m1) = get_booster_pid() launcher_pid1 = get_oldest_pid('applauncherd') debug("before sighup, applauncherd pid = ", launcher_pid1) boosterpids1 = get_pid('booster') # get the list of booster pids b4 sighup kill_process(None, launcher_pid1, 1) #sending sighup to applauncherd wait_for_new_boosters(boosterpids1) # give sometime for applauncherd to react wait_for_single_applauncherd() (e2, d2, q2, m2) = get_booster_pid() launcher_pid2 = get_oldest_pid('applauncherd') debug("after sighup, applauncherd pid = ", launcher_pid2) #check if applauncherd has same pid before and after sighup #check if all boosters have different pids before and after sighup return (launcher_pid1==launcher_pid2, m1!=m2 and q1!=q2 and d1!=d2 and e1!=e2) def wait_for_windows(windowName, minCount=1, timeout=20) : """ Waits 'timeout' seconds of time until at least minCount windows with windowName appears. Returns ids of those windows in list. """ debug("Searching for window with name %s (with timeout %ss)" %(windowName, timeout)) xwininfocommand = "xwininfo -root -tree | awk '/%s/ {print $1}'" %(windowName) start = time.time() while time.time() < start + timeout : st, op = commands.getstatusoutput(xwininfocommand) if op : op = op.split() if (len(op)>=minCount) : break time.sleep(1) if op : debug("Window '%s' has been found in time of %.1fs" %(op, time.time()-start)) else : debug("Command '%s' didn't detect any windows with name %s. Timeout in %ss!" %(xwininfocommand, windowName, timeout)) op = None return op def isPackageInstalled(packageName): st, op = commands.getstatusoutput("dpkg -l %s" % packageName) if(st == 0): # success, check if version is "" or a valid version m = re.search('.*%s\s*(\S*)\s*.*'%packageName, op) if(m and m.group(1).find("none")==-1): #m.group(1) contains version return True return False #trick to avoid broken pipe in Popen #use ,preexec_fn=permit_sigpipe in each Popen def permit_sigpipe(): signal.signal(signal.SIGPIPE, signal.SIG_DFL)