From 4710c53dcad1ebf3755f3efb9e80ac24bd72a9b2 Mon Sep 17 00:00:00 2001 From: darylm503 Date: Mon, 16 Apr 2012 22:12:42 +0000 Subject: AppPkg/Applications/Python: Add Python 2.7.2 sources since the release of Python 2.7.3 made them unavailable from the python.org web site. These files are a subset of the python-2.7.2.tgz distribution from python.org. Changed files from PyMod-2.7.2 have been copied into the corresponding directories of this tree, replacing the original files in the distribution. Signed-off-by: daryl.mcdaniel@intel.com git-svn-id: https://edk2.svn.sourceforge.net/svnroot/edk2/trunk/edk2@13197 6f19259b-4bc3-4df7-8a09-765794883524 --- .../Python/Python-2.7.2/Lib/test/test_signal.py | 493 +++++++++++++++++++++ 1 file changed, 493 insertions(+) create mode 100644 AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_signal.py (limited to 'AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_signal.py') diff --git a/AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_signal.py b/AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_signal.py new file mode 100644 index 0000000000..4add6dae3c --- /dev/null +++ b/AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_signal.py @@ -0,0 +1,493 @@ +import unittest +from test import test_support +from contextlib import closing +import gc +import pickle +import select +import signal +import subprocess +import traceback +import sys, os, time, errno + +if sys.platform in ('os2', 'riscos'): + raise unittest.SkipTest("Can't test signal on %s" % sys.platform) + + +class HandlerBCalled(Exception): + pass + + +def exit_subprocess(): + """Use os._exit(0) to exit the current subprocess. + + Otherwise, the test catches the SystemExit and continues executing + in parallel with the original test, so you wind up with an + exponential number of tests running concurrently. + """ + os._exit(0) + + +def ignoring_eintr(__func, *args, **kwargs): + try: + return __func(*args, **kwargs) + except EnvironmentError as e: + if e.errno != errno.EINTR: + raise + return None + + +@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") +class InterProcessSignalTests(unittest.TestCase): + MAX_DURATION = 20 # Entire test should last at most 20 sec. + + def setUp(self): + self.using_gc = gc.isenabled() + gc.disable() + + def tearDown(self): + if self.using_gc: + gc.enable() + + def format_frame(self, frame, limit=None): + return ''.join(traceback.format_stack(frame, limit=limit)) + + def handlerA(self, signum, frame): + self.a_called = True + if test_support.verbose: + print "handlerA invoked from signal %s at:\n%s" % ( + signum, self.format_frame(frame, limit=1)) + + def handlerB(self, signum, frame): + self.b_called = True + if test_support.verbose: + print "handlerB invoked from signal %s at:\n%s" % ( + signum, self.format_frame(frame, limit=1)) + raise HandlerBCalled(signum, self.format_frame(frame)) + + def wait(self, child): + """Wait for child to finish, ignoring EINTR.""" + while True: + try: + child.wait() + return + except OSError as e: + if e.errno != errno.EINTR: + raise + + def run_test(self): + # Install handlers. This function runs in a sub-process, so we + # don't worry about re-setting the default handlers. + signal.signal(signal.SIGHUP, self.handlerA) + signal.signal(signal.SIGUSR1, self.handlerB) + signal.signal(signal.SIGUSR2, signal.SIG_IGN) + signal.signal(signal.SIGALRM, signal.default_int_handler) + + # Variables the signals will modify: + self.a_called = False + self.b_called = False + + # Let the sub-processes know who to send signals to. + pid = os.getpid() + if test_support.verbose: + print "test runner's pid is", pid + + child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)]) + if child: + self.wait(child) + if not self.a_called: + time.sleep(1) # Give the signal time to be delivered. + self.assertTrue(self.a_called) + self.assertFalse(self.b_called) + self.a_called = False + + # Make sure the signal isn't delivered while the previous + # Popen object is being destroyed, because __del__ swallows + # exceptions. + del child + try: + child = subprocess.Popen(['kill', '-USR1', str(pid)]) + # This wait should be interrupted by the signal's exception. + self.wait(child) + time.sleep(1) # Give the signal time to be delivered. + self.fail('HandlerBCalled exception not thrown') + except HandlerBCalled: + self.assertTrue(self.b_called) + self.assertFalse(self.a_called) + if test_support.verbose: + print "HandlerBCalled exception caught" + + child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)]) + if child: + self.wait(child) # Nothing should happen. + + try: + signal.alarm(1) + # The race condition in pause doesn't matter in this case, + # since alarm is going to raise a KeyboardException, which + # will skip the call. + signal.pause() + # But if another signal arrives before the alarm, pause + # may return early. + time.sleep(1) + except KeyboardInterrupt: + if test_support.verbose: + print "KeyboardInterrupt (the alarm() went off)" + except: + self.fail("Some other exception woke us from pause: %s" % + traceback.format_exc()) + else: + self.fail("pause returned of its own accord, and the signal" + " didn't arrive after another second.") + + # Issue 3864. Unknown if this affects earlier versions of freebsd also. + @unittest.skipIf(sys.platform=='freebsd6', + 'inter process signals not reliable (do not mix well with threading) ' + 'on freebsd6') + def test_main(self): + # This function spawns a child process to insulate the main + # test-running process from all the signals. It then + # communicates with that child process over a pipe and + # re-raises information about any exceptions the child + # throws. The real work happens in self.run_test(). + os_done_r, os_done_w = os.pipe() + with closing(os.fdopen(os_done_r)) as done_r, \ + closing(os.fdopen(os_done_w, 'w')) as done_w: + child = os.fork() + if child == 0: + # In the child process; run the test and report results + # through the pipe. + try: + done_r.close() + # Have to close done_w again here because + # exit_subprocess() will skip the enclosing with block. + with closing(done_w): + try: + self.run_test() + except: + pickle.dump(traceback.format_exc(), done_w) + else: + pickle.dump(None, done_w) + except: + print 'Uh oh, raised from pickle.' + traceback.print_exc() + finally: + exit_subprocess() + + done_w.close() + # Block for up to MAX_DURATION seconds for the test to finish. + r, w, x = select.select([done_r], [], [], self.MAX_DURATION) + if done_r in r: + tb = pickle.load(done_r) + if tb: + self.fail(tb) + else: + os.kill(child, signal.SIGKILL) + self.fail('Test deadlocked after %d seconds.' % + self.MAX_DURATION) + + +@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") +class BasicSignalTests(unittest.TestCase): + def trivial_signal_handler(self, *args): + pass + + def test_out_of_range_signal_number_raises_error(self): + self.assertRaises(ValueError, signal.getsignal, 4242) + + self.assertRaises(ValueError, signal.signal, 4242, + self.trivial_signal_handler) + + def test_setting_signal_handler_to_none_raises_error(self): + self.assertRaises(TypeError, signal.signal, + signal.SIGUSR1, None) + + def test_getsignal(self): + hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) + self.assertEqual(signal.getsignal(signal.SIGHUP), + self.trivial_signal_handler) + signal.signal(signal.SIGHUP, hup) + self.assertEqual(signal.getsignal(signal.SIGHUP), hup) + + +@unittest.skipUnless(sys.platform == "win32", "Windows specific") +class WindowsSignalTests(unittest.TestCase): + def test_issue9324(self): + # Updated for issue #10003, adding SIGBREAK + handler = lambda x, y: None + for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE, + signal.SIGILL, signal.SIGINT, signal.SIGSEGV, + signal.SIGTERM): + # Set and then reset a handler for signals that work on windows + signal.signal(sig, signal.signal(sig, handler)) + + with self.assertRaises(ValueError): + signal.signal(-1, handler) + + with self.assertRaises(ValueError): + signal.signal(7, handler) + + +@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") +class WakeupSignalTests(unittest.TestCase): + TIMEOUT_FULL = 10 + TIMEOUT_HALF = 5 + + def test_wakeup_fd_early(self): + import select + + signal.alarm(1) + before_time = time.time() + # We attempt to get a signal during the sleep, + # before select is called + time.sleep(self.TIMEOUT_FULL) + mid_time = time.time() + self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF) + select.select([self.read], [], [], self.TIMEOUT_FULL) + after_time = time.time() + self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF) + + def test_wakeup_fd_during(self): + import select + + signal.alarm(1) + before_time = time.time() + # We attempt to get a signal during the select call + self.assertRaises(select.error, select.select, + [self.read], [], [], self.TIMEOUT_FULL) + after_time = time.time() + self.assertTrue(after_time - before_time < self.TIMEOUT_HALF) + + def setUp(self): + import fcntl + + self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None) + self.read, self.write = os.pipe() + flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0) + flags = flags | os.O_NONBLOCK + fcntl.fcntl(self.write, fcntl.F_SETFL, flags) + self.old_wakeup = signal.set_wakeup_fd(self.write) + + def tearDown(self): + signal.set_wakeup_fd(self.old_wakeup) + os.close(self.read) + os.close(self.write) + signal.signal(signal.SIGALRM, self.alrm) + +@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") +class SiginterruptTest(unittest.TestCase): + + def setUp(self): + """Install a no-op signal handler that can be set to allow + interrupts or not, and arrange for the original signal handler to be + re-installed when the test is finished. + """ + self.signum = signal.SIGUSR1 + oldhandler = signal.signal(self.signum, lambda x,y: None) + self.addCleanup(signal.signal, self.signum, oldhandler) + + def readpipe_interrupted(self): + """Perform a read during which a signal will arrive. Return True if the + read is interrupted by the signal and raises an exception. Return False + if it returns normally. + """ + # Create a pipe that can be used for the read. Also clean it up + # when the test is over, since nothing else will (but see below for + # the write end). + r, w = os.pipe() + self.addCleanup(os.close, r) + + # Create another process which can send a signal to this one to try + # to interrupt the read. + ppid = os.getpid() + pid = os.fork() + + if pid == 0: + # Child code: sleep to give the parent enough time to enter the + # read() call (there's a race here, but it's really tricky to + # eliminate it); then signal the parent process. Also, sleep + # again to make it likely that the signal is delivered to the + # parent process before the child exits. If the child exits + # first, the write end of the pipe will be closed and the test + # is invalid. + try: + time.sleep(0.2) + os.kill(ppid, self.signum) + time.sleep(0.2) + finally: + # No matter what, just exit as fast as possible now. + exit_subprocess() + else: + # Parent code. + # Make sure the child is eventually reaped, else it'll be a + # zombie for the rest of the test suite run. + self.addCleanup(os.waitpid, pid, 0) + + # Close the write end of the pipe. The child has a copy, so + # it's not really closed until the child exits. We need it to + # close when the child exits so that in the non-interrupt case + # the read eventually completes, otherwise we could just close + # it *after* the test. + os.close(w) + + # Try the read and report whether it is interrupted or not to + # the caller. + try: + d = os.read(r, 1) + return False + except OSError, err: + if err.errno != errno.EINTR: + raise + return True + + def test_without_siginterrupt(self): + """If a signal handler is installed and siginterrupt is not called + at all, when that signal arrives, it interrupts a syscall that's in + progress. + """ + i = self.readpipe_interrupted() + self.assertTrue(i) + # Arrival of the signal shouldn't have changed anything. + i = self.readpipe_interrupted() + self.assertTrue(i) + + def test_siginterrupt_on(self): + """If a signal handler is installed and siginterrupt is called with + a true value for the second argument, when that signal arrives, it + interrupts a syscall that's in progress. + """ + signal.siginterrupt(self.signum, 1) + i = self.readpipe_interrupted() + self.assertTrue(i) + # Arrival of the signal shouldn't have changed anything. + i = self.readpipe_interrupted() + self.assertTrue(i) + + def test_siginterrupt_off(self): + """If a signal handler is installed and siginterrupt is called with + a false value for the second argument, when that signal arrives, it + does not interrupt a syscall that's in progress. + """ + signal.siginterrupt(self.signum, 0) + i = self.readpipe_interrupted() + self.assertFalse(i) + # Arrival of the signal shouldn't have changed anything. + i = self.readpipe_interrupted() + self.assertFalse(i) + + +@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") +class ItimerTest(unittest.TestCase): + def setUp(self): + self.hndl_called = False + self.hndl_count = 0 + self.itimer = None + self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm) + + def tearDown(self): + signal.signal(signal.SIGALRM, self.old_alarm) + if self.itimer is not None: # test_itimer_exc doesn't change this attr + # just ensure that itimer is stopped + signal.setitimer(self.itimer, 0) + + def sig_alrm(self, *args): + self.hndl_called = True + if test_support.verbose: + print("SIGALRM handler invoked", args) + + def sig_vtalrm(self, *args): + self.hndl_called = True + + if self.hndl_count > 3: + # it shouldn't be here, because it should have been disabled. + raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL " + "timer.") + elif self.hndl_count == 3: + # disable ITIMER_VIRTUAL, this function shouldn't be called anymore + signal.setitimer(signal.ITIMER_VIRTUAL, 0) + if test_support.verbose: + print("last SIGVTALRM handler call") + + self.hndl_count += 1 + + if test_support.verbose: + print("SIGVTALRM handler invoked", args) + + def sig_prof(self, *args): + self.hndl_called = True + signal.setitimer(signal.ITIMER_PROF, 0) + + if test_support.verbose: + print("SIGPROF handler invoked", args) + + def test_itimer_exc(self): + # XXX I'm assuming -1 is an invalid itimer, but maybe some platform + # defines it ? + self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0) + # Negative times are treated as zero on some platforms. + if 0: + self.assertRaises(signal.ItimerError, + signal.setitimer, signal.ITIMER_REAL, -1) + + def test_itimer_real(self): + self.itimer = signal.ITIMER_REAL + signal.setitimer(self.itimer, 1.0) + if test_support.verbose: + print("\ncall pause()...") + signal.pause() + + self.assertEqual(self.hndl_called, True) + + # Issue 3864. Unknown if this affects earlier versions of freebsd also. + @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'), + 'itimer not reliable (does not mix well with threading) on some BSDs.') + def test_itimer_virtual(self): + self.itimer = signal.ITIMER_VIRTUAL + signal.signal(signal.SIGVTALRM, self.sig_vtalrm) + signal.setitimer(self.itimer, 0.3, 0.2) + + start_time = time.time() + while time.time() - start_time < 60.0: + # use up some virtual time by doing real work + _ = pow(12345, 67890, 10000019) + if signal.getitimer(self.itimer) == (0.0, 0.0): + break # sig_vtalrm handler stopped this itimer + else: # Issue 8424 + self.skipTest("timeout: likely cause: machine too slow or load too " + "high") + + # virtual itimer should be (0.0, 0.0) now + self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) + # and the handler should have been called + self.assertEqual(self.hndl_called, True) + + # Issue 3864. Unknown if this affects earlier versions of freebsd also. + @unittest.skipIf(sys.platform=='freebsd6', + 'itimer not reliable (does not mix well with threading) on freebsd6') + def test_itimer_prof(self): + self.itimer = signal.ITIMER_PROF + signal.signal(signal.SIGPROF, self.sig_prof) + signal.setitimer(self.itimer, 0.2, 0.2) + + start_time = time.time() + while time.time() - start_time < 60.0: + # do some work + _ = pow(12345, 67890, 10000019) + if signal.getitimer(self.itimer) == (0.0, 0.0): + break # sig_prof handler stopped this itimer + else: # Issue 8424 + self.skipTest("timeout: likely cause: machine too slow or load too " + "high") + + # profiling itimer should be (0.0, 0.0) now + self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) + # and the handler should have been called + self.assertEqual(self.hndl_called, True) + +def test_main(): + test_support.run_unittest(BasicSignalTests, InterProcessSignalTests, + WakeupSignalTests, SiginterruptTest, + ItimerTest, WindowsSignalTests) + + +if __name__ == "__main__": + test_main() -- cgit v1.2.3