Changes: Moved some helper methods out from test-func-launcher to utils

pull/1/head
Oskari Timperi 15 years ago
parent ae2ad85d91
commit 5b57a3febe

@ -6,3 +6,4 @@ usr/share/applauncherd-testscripts/tc_theming.rb
usr/share/applauncherd-testscripts/fala_sf.sh
usr/share/applauncherd-testscripts/fala_sf.py
usr/share/applauncherd-testscripts/test-perf.rb
usr/share/applauncherd-testscripts/utils.py

@ -2,6 +2,7 @@ install(FILES
test-perf-mbooster.py
check_pipes.py
test-func-launcher.py
utils.py
DESTINATION /usr/share/applauncherd-testscripts)
install(PROGRAMS

@ -41,8 +41,9 @@ import time
import sys
import unittest
import re
from subprocess import Popen
from utils import *
from os.path import basename
LAUNCHER_BINARY='/usr/bin/applauncherd'
DEV_NULL = file("/dev/null","w")
@ -51,26 +52,6 @@ PREFERED_APP = '/usr/bin/fala_ft_hello'
using_scratchbox = False
def debug(*msg):
"""
Debug function
"""
sys.stderr.write('[DEBUG %s] %s\n' % (time.ctime(), \
' '.join([str(s) for s in msg]),))
def error(*msg):
"""
exit when error, give proper log
"""
sys.stderr.write('ERROR %s\n' % (' '.join([str(s) for s in msg]),))
sys.exit(1)
def basename(filepath):
"""
return base name of a file
"""
return os.path.basename(filepath)
def start_launcher_daemon():
temp = basename(LAUNCHER_BINARY)
st, op = commands.getstatusoutput("pgrep %s" %temp)
@ -98,140 +79,6 @@ class launcher_tests (unittest.TestCase):
#teardown here
print "Executing TearDown"
#Other functions
def run_app_with_launcher(self, appname):
p = subprocess.Popen(appname,
shell=False,
stdout=DEV_NULL, stderr=DEV_NULL)
return p
def get_pid(self, appname):
temp = basename(appname)[:14]
st, op = commands.getstatusoutput("pgrep %s" % temp)
if st == 0:
return op
else:
return None
def kill_process(self, appname=None, apppid=None, signum=9):
if apppid and appname:
return None
else:
if apppid:
st, op = commands.getstatusoutput("kill -%s %s" % (str(signum), str(apppid)))
if appname:
temp = basename(appname)[:14]
st, op = commands.getstatusoutput("pkill -%s %s" % (str(signum), temp))
try:
os.wait()
except:
pass
def process_state(self, processid):
st, op = commands.getstatusoutput('cat /proc/%s/stat' %processid)
if st == 0:
return op
else:
debug(op)
return None
def get_creds(self, path = None, pid = None):
"""
Tries to retrieve credentials for a running application
using either the pid or path. Credentials are returned
as a string list.
"""
if path != None:
pid = self.get_pid(path)
self.assert_(pid != None, 'Invalid PID')
handle = Popen(['/usr/bin/creds-get', '-p', str(pid)],
stdout = subprocess.PIPE)
op = handle.communicate()[0].strip()
handle.wait()
self.assert_(handle.returncode == 0, "There was no such PID!")
debug("creds-get gave >>>>>\n%s\n<<<<<" % op)
creds = op.split("\n")[1:]
return creds
def launch_and_get_creds(self, path):
"""
Tries to launch an application and if successful, returns the
credentials the application has as a list.
"""
# try launch the specified application
handle = self.run_app_with_launcher(path)
# sleep for a moment to allow applauncherd to start the process
time.sleep(5)
# with luck, the process should have correct name by now
pid = self.get_pid(path)
debug("%s has PID %s" % (basename(path), pid,))
self.assert_(pid != None, "Couldn't launch %s" % basename(path))
creds = self.get_creds(pid = pid)
self.kill_process(path)
return creds
def get_file_descriptor(self, booster, type):
"""
To test that file descriptors are closed before calling application main
"""
#get fd of booster before launching application
pid = commands.getoutput("pgrep %s" %booster)
fd_info = commands.getoutput('ls -l /proc/%s/fd/' % str(pid))
fd_info = fd_info.split('\n')
init = {}
final = {}
for fd in fd_info:
if "->" in fd:
init[fd.split(" -> ")[0].split(' ')[-1]] = fd.split(" -> ")[-1]
print init
#launch application using booster
st = os.system('invoker --type=%s /usr/bin/fala_ft_hello.launch' %type)
time.sleep(2)
#get fd of booster after launching the application
if st == 0:
fd_info = commands.getoutput('ls -l /proc/%s/fd/' % str(pid))
fd_info = fd_info.split('\n')
for fd in fd_info:
if "->" in fd:
final[fd.split(" -> ")[0].split(' ')[-1]] = fd.split(" -> ")[-1]
print final
pid = commands.getoutput('pgrep fala_ft_hello')
mykeys = init.keys()
count = 0
for key in mykeys:
try:
if init[key] != final[key]:
count = count + 1
except KeyError:
print "some key in init is not in final"
time.sleep(2)
print "The number of changed file descriptors %d" %count
self.kill_process(apppid=pid)
return count
#Testcases
def test_001_launcher_exist(self):
"""
@ -251,30 +98,6 @@ class launcher_tests (unittest.TestCase):
failed_apps.append(temp)
self.assert_(failed_apps == [], "Some applications do not have the launch files, list: %s" % str(failed_apps))
def wait_for_app(self, app = None, timeout = 5, sleep = 0.5):
"""
Waits for an application to start. Checks periodically if
the app is running for a maximum wait set in timeout.
Returns the pid of the application if it was running before
the timeout finished, otherwise None is returned.
"""
pid = None
start = time.time()
while pid == None and time.time() < start + timeout:
pid = self.get_pid(app)
if pid != None:
break
print "waiting %s secs for %s" % (sleep, app)
time.sleep(sleep)
return pid
def test_003_zombie_state(self):
"""
To test that no Zombie process exist after the application is killed
@ -285,19 +108,19 @@ class launcher_tests (unittest.TestCase):
#check if pgrep appname should be nothing
#self.kill_process(LAUNCHER_BINARY)
process_handle = self.run_app_with_launcher(PREFERED_APP)
process_id = self.wait_for_app(PREFERED_APP, 5)
process_handle = run_app_with_launcher(PREFERED_APP)
process_id = wait_for_app(PREFERED_APP, 5)
print process_id
self.kill_process(PREFERED_APP)
kill_process(PREFERED_APP)
time.sleep(4)
process_handle = self.run_app_with_launcher(PREFERED_APP)
process_id1 = self.wait_for_app(PREFERED_APP, 5)
process_handle = run_app_with_launcher(PREFERED_APP)
process_id1 = wait_for_app(PREFERED_APP, 5)
print process_id1
self.kill_process(PREFERED_APP)
kill_process(PREFERED_APP)
time.sleep(4)
process_id1 = self.get_pid(PREFERED_APP)
process_id1 = get_pid(PREFERED_APP)
print process_id1
self.assert_(process_id != process_id1 , "New Process not launched")
@ -310,13 +133,13 @@ class launcher_tests (unittest.TestCase):
def kill_launched(pids):
for pid in pids:
self.kill_process(apppid = pid)
kill_process(apppid = pid)
pidlist = []
for app in LAUNCHABLE_APPS:
p = self.run_app_with_launcher(app)
pid = self.wait_for_app(app, timeout = 10, sleep = 1)
p = run_app_with_launcher(app)
pid = wait_for_app(app, timeout = 10, sleep = 1)
if pid == None:
kill_launched(pidlist)
@ -337,16 +160,16 @@ class launcher_tests (unittest.TestCase):
#check pgrep application
#y = commands.getstatusoutput(pgrep appname)
#len(y[-1].split(' ')) == 1
process_handle = self.run_app_with_launcher(PREFERED_APP)
process_id = self.wait_for_app(PREFERED_APP)
process_handle = run_app_with_launcher(PREFERED_APP)
process_id = wait_for_app(PREFERED_APP)
debug("PID of first %s" % process_id)
process_handle1 = self.run_app_with_launcher(PREFERED_APP)
process_handle1 = run_app_with_launcher(PREFERED_APP)
time.sleep(2)
process_id = self.wait_for_app(PREFERED_APP)
process_id = wait_for_app(PREFERED_APP)
debug("PID of 2nd %s" % process_id)
self.kill_process(PREFERED_APP)
kill_process(PREFERED_APP)
self.assert_( len(process_id.split(' ')) == 1, "Only one instance of app not running")
@ -356,16 +179,11 @@ class launcher_tests (unittest.TestCase):
Test that the fala_ft_creds* applications have the correct
credentials set (check aegis file included in the debian package)
"""
creds1 = self.launch_and_get_creds('/usr/bin/fala_ft_creds1')
creds2 = self.launch_and_get_creds('/usr/bin/fala_ft_creds2')
creds1 = launch_and_get_creds('/usr/bin/fala_ft_creds1')
creds2 = launch_and_get_creds('/usr/bin/fala_ft_creds2')
# filter out some unnecessary tokens
def filterfunc(x):
pattern = "^(SRC|AID)::"
return re.match(pattern, x) == None
creds1 = filter(filterfunc, creds1)
creds2 = filter(filterfunc, creds2)
self.assert_(creds1 != None, "couldn't get credentials")
self.assert_(creds2 != None, "couldn't get credentials")
debug("fala_ft_creds1 has %s" % ', '.join(creds1))
debug("fala_ft_creds2 has %s" % ', '.join(creds2))
@ -391,9 +209,11 @@ class launcher_tests (unittest.TestCase):
get any funny credentials.
"""
creds = self.launch_and_get_creds('/usr/bin/fala_ft_hello')
creds = launch_and_get_creds('/usr/bin/fala_ft_hello')
debug("fala_ft_hello has %s" % ', '.join(creds))
self.assert_(creds != None, "error retrieving credentials")
# Credentials should be dropped, but uid/gid retained
req_creds = ['UID::user', 'GID::users']
@ -416,18 +236,18 @@ class launcher_tests (unittest.TestCase):
#launching the testapp with actual invoker
st = os.system('%s --type=m %s'%(INVOKER_BINARY, Testapp))
pid = self.get_pid(Testapp.replace('.launch', ''))
pid = get_pid(Testapp.replace('.launch', ''))
self.assert_((st == 0), "Application was not launched using launcher")
self.assert_(not (pid == None), "Application was not launched using launcher: actual pid%s" %pid)
print pid
#self.kill_process(Testapp.replace('.launch', ''))
self.kill_process(apppid=pid)
pid = self.get_pid(Testapp.replace('.launch', ''))
kill_process(apppid=pid)
pid = get_pid(Testapp.replace('.launch', ''))
self.assert_((pid == None), "Application still running")
#launching the testapp with fake invoker
st = os.system('%s --type=m %s'%(FAKE_INVOKER_BINARY, Testapp))
pid = self.get_pid(Testapp.replace('.launch', ''))
pid = get_pid(Testapp.replace('.launch', ''))
self.assert_(not (st == 0), "Application was launched using fake launcher")
self.assert_((pid == None), "Application was launched using fake launcher")
@ -440,13 +260,13 @@ class launcher_tests (unittest.TestCase):
#check if the application is running
#check if p.pid is same as pgrep appname
#in a global dictionary, append the pid
process_handle = self.run_app_with_launcher(app)
process_handle = run_app_with_launcher(app)
time.sleep(8)
process_id = self.get_pid('fala_ft_hello')
process_id = get_pid('fala_ft_hello')
pid_list = process_id.split()
self.assert_(len(pid_list) == len(LAUNCHABLE_APPS), "All Applications were not launched using launcher")
for pid in pid_list:
self.kill_process(apppid=pid)
kill_process(apppid=pid)
def test_010(self):
"""
@ -465,23 +285,23 @@ class launcher_tests (unittest.TestCase):
stdout = DEV_NULL, stderr = DEV_NULL)
# Retrieve their pids
invoker_pid = self.wait_for_app('invoker')
app_pid = self.wait_for_app('fala_ft_hello')
invoker_pid = wait_for_app('invoker')
app_pid = wait_for_app('fala_ft_hello')
# Make sure that both apps started
self.assert_(invoker_pid != None, "invoker not executed?")
self.assert_(app_pid != None, "%s not launched by invoker?" % app_path)
# Send SIGTERM to invoker, the launched app should die
self.kill_process(None, invoker_pid, 15)
kill_process(None, invoker_pid, 15)
time.sleep(2)
# This should be None
app_pid2 = self.get_pid('fala_ft_hello')
app_pid2 = get_pid('fala_ft_hello')
if (app_pid2 != None):
self.kill_process(None, app_pid2)
kill_process(None, app_pid2)
self.assert_(False, "%s was not killed" % app_path)
@ -521,7 +341,8 @@ class launcher_tests (unittest.TestCase):
st, op = commands.getstatusoutput('pgrep -lf "applauncherd.bin --daemon"')
print op
# filter some cruft out from the output and see how many instances are running
# filter some cruft out from the output and see how many
# instances are running
op = filter(lambda x: x.find("sh ") == -1, op.split("\n"))
count = len(op)
@ -530,13 +351,13 @@ class launcher_tests (unittest.TestCase):
self.assert_(count == 1, "applauncherd was not daemonized (or too many instances running ..)")
# try to launch an app
self.run_app_with_launcher('/usr/bin/fala_ft_hello')
run_app_with_launcher('/usr/bin/fala_ft_hello')
time.sleep(2)
pid = self.wait_for_app('fala_ft_hello')
pid = wait_for_app('fala_ft_hello')
if pid != None:
self.kill_process(apppid = pid)
kill_process(apppid = pid)
else:
self.assert_(False, "fala_ft_hello was not launched!")
@ -599,14 +420,14 @@ class launcher_tests (unittest.TestCase):
"""
File descriptor test for booster-m
"""
count = self.get_file_descriptor("booster-m","m")
count = get_file_descriptor("booster-m","m")
self.assert_(count != 0, "None of the file descriptors were changed")
def test_015_fd_booster_q(self):
"""
File descriptor test for booster-q
"""
count = self.get_file_descriptor("booster-q","qt")
count = get_file_descriptor("booster-q","qt")
self.assert_(count != 0, "None of the file descriptors were changed")
def test_016_restart_booster(self):
@ -614,28 +435,28 @@ class launcher_tests (unittest.TestCase):
Test that booster is restarted if it is killed
"""
#get the pids of boosters and make sure they are running
qpid = self.get_pid('booster-q')
qpid = get_pid('booster-q')
print "Pid of booster-q before killing :%s" %qpid
self.assert_(qpid != None, "No booster process running")
mpid = self.get_pid('booster-m')
mpid = get_pid('booster-m')
print "Pid of booster-m before killing :%s" %mpid
self.assert_(mpid != None, "No booster process running")
#Kill the booster processes
self.kill_process(apppid=qpid)
self.kill_process(apppid=mpid)
kill_process(apppid=qpid)
kill_process(apppid=mpid)
#wait for the boosters to be restarted
time.sleep(6)
#check that the new boosters are started
qpid_new = self.get_pid('booster-q')
qpid_new = get_pid('booster-q')
print "Pid of booster-q after killing :%s" %qpid_new
self.assert_(qpid_new != None, "No booster process running")
self.assert_(qpid_new != qpid, "booster process was not killed")
mpid_new = self.get_pid('booster-m')
mpid_new = get_pid('booster-m')
print "Pid of booster-m after killing :%s" %mpid_new
self.assert_(mpid_new != None, "No booster process running")
self.assert_(mpid_new != mpid, "booster process was not killed")
@ -734,11 +555,11 @@ class launcher_tests (unittest.TestCase):
#Try to launch an application using invoker
os.system('invoker --type=m /usr/bin/fala_ft_hello.launch &')
time.sleep(3)
process_id1 = self.get_pid('fala_ft_hello')
process_id1 = get_pid('fala_ft_hello')
self.assert_(process_id1 != None , "application not launcherd running")
time.sleep(1)
self.kill_process(PREFERED_APP)
kill_process(PREFERED_APP)
os.system("initctl start xsession/applauncherd")
@ -769,8 +590,13 @@ class launcher_tests (unittest.TestCase):
print "invoker pid = %s" % invoker_pid
# get credentials
invoker_creds = self.get_creds(pid = invoker_pid)
app_creds = self.get_creds(path = 'fala_ft_hello')
invoker_creds = get_creds(pid = invoker_pid)
app_creds = get_creds(path = 'fala_ft_hello')
self.assert_(invoker_creds != None,
"error retrieving creds for invoker")
self.assert_(app_creds != None,
"error retrieving creds for fala_ft_hello")
invoker_creds.sort()
app_creds.sort()
@ -778,7 +604,7 @@ class launcher_tests (unittest.TestCase):
print "invoker creds = %s" % invoker_creds
print "app creds = %s" % app_creds
self.kill_process('fala_ft_hello')
kill_process('fala_ft_hello')
return (invoker_creds, app_creds)
@ -843,7 +669,6 @@ class launcher_tests (unittest.TestCase):
as user and root.
"""
# get credentials for the app when launched as user and root
handle = Popen(['su', '-', 'user', '-c',
'/usr/bin/fala_ft_hello'],
stdout = DEV_NULL, stderr = DEV_NULL)
@ -851,10 +676,10 @@ class launcher_tests (unittest.TestCase):
# give the application some time to launch up
time.sleep(2)
user = self.get_creds('fala_ft_hello')
self.kill_process('fala_ft_hello')
user = get_creds('fala_ft_hello')
kill_process('fala_ft_hello')
root = self.launch_and_get_creds('/usr/bin/fala_ft_hello').sort()
root = launch_and_get_creds('/usr/bin/fala_ft_hello').sort()
return (user, root)

@ -0,0 +1,190 @@
import os, os.path, glob
import subprocess
import commands
import time
import sys
import re
from subprocess import Popen
from os.path import basename
DEV_NULL = file("/dev/null","w")
def debug(*msg):
"""
Debug function
"""
sys.stderr.write('[DEBUG %s] %s\n' % (time.ctime(), \
' '.join([str(s) for s in msg]),))
def error(*msg):
"""
exit when error, give proper log
"""
sys.stderr.write('ERROR %s\n' % (' '.join([str(s) for s in msg]),))
sys.exit(1)
def run_app_with_launcher(appname):
p = subprocess.Popen(appname,
shell=False,
stdout=DEV_NULL, stderr=DEV_NULL)
return p
def get_pid(appname):
temp = basename(appname)[:14]
st, op = commands.getstatusoutput("pgrep %s" % temp)
if st == 0:
return op
else:
return None
def wait_for_app(app = None, timeout = 5, sleep = 0.5):
"""
Waits for an application to start. Checks periodically if
the app is running for a maximum wait set in timeout.
Returns the pid of the application if it was running before
the timeout finished, otherwise None is returned.
"""
pid = None
start = time.time()
while pid == None and time.time() < start + timeout:
pid = get_pid(app)
if pid != None:
break
print "waiting %s secs for %s" % (sleep, app)
time.sleep(sleep)
return pid
def kill_process(appname=None, apppid=None, signum=9):
if apppid and appname:
return None
else:
if apppid:
st, op = commands.getstatusoutput("kill -%s %s" % (str(signum), str(apppid)))
if appname:
temp = basename(appname)[:14]
st, op = commands.getstatusoutput("pkill -%s %s" % (str(signum), temp))
try:
os.wait()
except Exception as e:
print e
def process_state(processid):
st, op = commands.getstatusoutput('cat /proc/%s/stat' %processid)
if st == 0:
return op
else:
debug(op)
return None
def get_creds(path = None, pid = None):
"""
Tries to retrieve credentials for a running application
using either the pid or path. Credentials are returned
as a string list.
"""
if path != None:
pid = get_pid(path)
if pid == None:
print 'invalid PID'
return None
handle = Popen(['/usr/bin/creds-get', '-p', str(pid)],
stdout = subprocess.PIPE)
op = handle.communicate()[0].strip()
handle.wait()
if handle.returncode != 0:
print 'error retrieving credentials'
return None
#self.assert_(handle.returncode == 0, "There was no such PID!")
debug("creds-get gave >>>>>\n%s\n<<<<<" % op)
creds = op.split("\n")[1:]
return creds
def launch_and_get_creds(path):
"""
Tries to launch an application and if successful, returns the
credentials the application has as a list.
"""
# try launch the specified application
handle = run_app_with_launcher(path)
# sleep for a moment to allow applauncherd to start the process
time.sleep(5)
# with luck, the process should have correct name by now
pid = get_pid(path)
debug("%s has PID %s" % (basename(path), pid,))
if pid == None:
print "couldn't launch %s" % basename(path)
return None
#self.assert_(pid != None, "Couldn't launch %s" % basename(path))
creds = get_creds(pid = pid)
kill_process(path)
return creds
def get_file_descriptor(booster, type):
"""
To test that file descriptors are closed before calling application main
"""
#get fd of booster before launching application
pid = commands.getoutput("pgrep %s" %booster)
fd_info = commands.getoutput('ls -l /proc/%s/fd/' % str(pid))
fd_info = fd_info.split('\n')
init = {}
final = {}
for fd in fd_info:
if "->" in fd:
init[fd.split(" -> ")[0].split(' ')[-1]] = fd.split(" -> ")[-1]
print init
#launch application using booster
st = os.system('invoker --type=%s /usr/bin/fala_ft_hello.launch' %type)
time.sleep(2)
#get fd of booster after launching the application
if st == 0:
fd_info = commands.getoutput('ls -l /proc/%s/fd/' % str(pid))
fd_info = fd_info.split('\n')
for fd in fd_info:
if "->" in fd:
final[fd.split(" -> ")[0].split(' ')[-1]] = fd.split(" -> ")[-1]
print final
pid = commands.getoutput('pgrep fala_ft_hello')
mykeys = init.keys()
count = 0
for key in mykeys:
try:
if init[key] != final[key]:
count = count + 1
except KeyError:
print "some key in init is not in final"
time.sleep(2)
print "The number of changed file descriptors %d" %count
kill_process(apppid=pid)
return count
Loading…
Cancel
Save