From 4710c53dcad1ebf3755f3efb9e80ac24bd72a9b2 Mon Sep 17 00:00:00 2001 From: darylm503 Date: Mon, 16 Apr 2012 22:12:42 +0000 Subject: AppPkg/Applications/Python: Add Python 2.7.2 sources since the release of Python 2.7.3 made them unavailable from the python.org web site. These files are a subset of the python-2.7.2.tgz distribution from python.org. Changed files from PyMod-2.7.2 have been copied into the corresponding directories of this tree, replacing the original files in the distribution. Signed-off-by: daryl.mcdaniel@intel.com git-svn-id: https://edk2.svn.sourceforge.net/svnroot/edk2/trunk/edk2@13197 6f19259b-4bc3-4df7-8a09-765794883524 --- .../Python/Python-2.7.2/Lib/test/test_thread.py | 242 +++++++++++++++++++++ 1 file changed, 242 insertions(+) create mode 100644 AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_thread.py (limited to 'AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_thread.py') diff --git a/AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_thread.py b/AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_thread.py new file mode 100644 index 0000000000..82b8fbede0 --- /dev/null +++ b/AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_thread.py @@ -0,0 +1,242 @@ +import os +import unittest +import random +from test import test_support +thread = test_support.import_module('thread') +import time +import sys +import weakref + +from test import lock_tests + +NUMTASKS = 10 +NUMTRIPS = 3 + + +_print_mutex = thread.allocate_lock() + +def verbose_print(arg): + """Helper function for printing out debugging output.""" + if test_support.verbose: + with _print_mutex: + print arg + + +class BasicThreadTest(unittest.TestCase): + + def setUp(self): + self.done_mutex = thread.allocate_lock() + self.done_mutex.acquire() + self.running_mutex = thread.allocate_lock() + self.random_mutex = thread.allocate_lock() + self.created = 0 + self.running = 0 + self.next_ident = 0 + + +class ThreadRunningTests(BasicThreadTest): + + def newtask(self): + with self.running_mutex: + self.next_ident += 1 + verbose_print("creating task %s" % self.next_ident) + thread.start_new_thread(self.task, (self.next_ident,)) + self.created += 1 + self.running += 1 + + def task(self, ident): + with self.random_mutex: + delay = random.random() / 10000.0 + verbose_print("task %s will run for %sus" % (ident, round(delay*1e6))) + time.sleep(delay) + verbose_print("task %s done" % ident) + with self.running_mutex: + self.running -= 1 + if self.created == NUMTASKS and self.running == 0: + self.done_mutex.release() + + def test_starting_threads(self): + # Basic test for thread creation. + for i in range(NUMTASKS): + self.newtask() + verbose_print("waiting for tasks to complete...") + self.done_mutex.acquire() + verbose_print("all tasks done") + + def test_stack_size(self): + # Various stack size tests. + self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0") + + thread.stack_size(0) + self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default") + + if os.name not in ("nt", "os2", "posix"): + return + + tss_supported = True + try: + thread.stack_size(4096) + except ValueError: + verbose_print("caught expected ValueError setting " + "stack_size(4096)") + except thread.error: + tss_supported = False + verbose_print("platform does not support changing thread stack " + "size") + + if tss_supported: + fail_msg = "stack_size(%d) failed - should succeed" + for tss in (262144, 0x100000, 0): + thread.stack_size(tss) + self.assertEqual(thread.stack_size(), tss, fail_msg % tss) + verbose_print("successfully set stack_size(%d)" % tss) + + for tss in (262144, 0x100000): + verbose_print("trying stack_size = (%d)" % tss) + self.next_ident = 0 + self.created = 0 + for i in range(NUMTASKS): + self.newtask() + + verbose_print("waiting for all tasks to complete") + self.done_mutex.acquire() + verbose_print("all tasks done") + + thread.stack_size(0) + + def test__count(self): + # Test the _count() function. + orig = thread._count() + mut = thread.allocate_lock() + mut.acquire() + started = [] + def task(): + started.append(None) + mut.acquire() + mut.release() + thread.start_new_thread(task, ()) + while not started: + time.sleep(0.01) + self.assertEqual(thread._count(), orig + 1) + # Allow the task to finish. + mut.release() + # The only reliable way to be sure that the thread ended from the + # interpreter's point of view is to wait for the function object to be + # destroyed. + done = [] + wr = weakref.ref(task, lambda _: done.append(None)) + del task + while not done: + time.sleep(0.01) + self.assertEqual(thread._count(), orig) + + +class Barrier: + def __init__(self, num_threads): + self.num_threads = num_threads + self.waiting = 0 + self.checkin_mutex = thread.allocate_lock() + self.checkout_mutex = thread.allocate_lock() + self.checkout_mutex.acquire() + + def enter(self): + self.checkin_mutex.acquire() + self.waiting = self.waiting + 1 + if self.waiting == self.num_threads: + self.waiting = self.num_threads - 1 + self.checkout_mutex.release() + return + self.checkin_mutex.release() + + self.checkout_mutex.acquire() + self.waiting = self.waiting - 1 + if self.waiting == 0: + self.checkin_mutex.release() + return + self.checkout_mutex.release() + + +class BarrierTest(BasicThreadTest): + + def test_barrier(self): + self.bar = Barrier(NUMTASKS) + self.running = NUMTASKS + for i in range(NUMTASKS): + thread.start_new_thread(self.task2, (i,)) + verbose_print("waiting for tasks to end") + self.done_mutex.acquire() + verbose_print("tasks done") + + def task2(self, ident): + for i in range(NUMTRIPS): + if ident == 0: + # give it a good chance to enter the next + # barrier before the others are all out + # of the current one + delay = 0 + else: + with self.random_mutex: + delay = random.random() / 10000.0 + verbose_print("task %s will run for %sus" % + (ident, round(delay * 1e6))) + time.sleep(delay) + verbose_print("task %s entering %s" % (ident, i)) + self.bar.enter() + verbose_print("task %s leaving barrier" % ident) + with self.running_mutex: + self.running -= 1 + # Must release mutex before releasing done, else the main thread can + # exit and set mutex to None as part of global teardown; then + # mutex.release() raises AttributeError. + finished = self.running == 0 + if finished: + self.done_mutex.release() + + +class LockTests(lock_tests.LockTests): + locktype = thread.allocate_lock + + +class TestForkInThread(unittest.TestCase): + def setUp(self): + self.read_fd, self.write_fd = os.pipe() + + @unittest.skipIf(sys.platform.startswith('win'), + "This test is only appropriate for POSIX-like systems.") + @test_support.reap_threads + def test_forkinthread(self): + def thread1(): + try: + pid = os.fork() # fork in a thread + except RuntimeError: + sys.exit(0) # exit the child + + if pid == 0: # child + os.close(self.read_fd) + os.write(self.write_fd, "OK") + sys.exit(0) + else: # parent + os.close(self.write_fd) + + thread.start_new_thread(thread1, ()) + self.assertEqual(os.read(self.read_fd, 2), "OK", + "Unable to fork() in thread") + + def tearDown(self): + try: + os.close(self.read_fd) + except OSError: + pass + + try: + os.close(self.write_fd) + except OSError: + pass + + +def test_main(): + test_support.run_unittest(ThreadRunningTests, BarrierTest, LockTests, + TestForkInThread) + +if __name__ == "__main__": + test_main() -- cgit v1.2.3