summaryrefslogtreecommitdiff
path: root/ext/testlib/handlers.py
diff options
context:
space:
mode:
Diffstat (limited to 'ext/testlib/handlers.py')
-rw-r--r--ext/testlib/handlers.py437
1 files changed, 437 insertions, 0 deletions
diff --git a/ext/testlib/handlers.py b/ext/testlib/handlers.py
new file mode 100644
index 000000000..7530f9bf1
--- /dev/null
+++ b/ext/testlib/handlers.py
@@ -0,0 +1,437 @@
+# Copyright (c) 2017 Mark D. Hill and David A. Wood
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met: redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer;
+# redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution;
+# neither the name of the copyright holders nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+# Authors: Sean Wilson
+
+'''
+Handlers for the testlib Log.
+
+
+'''
+from __future__ import print_function
+
+import multiprocessing
+import os
+import Queue
+import sys
+import threading
+import time
+import traceback
+
+import helper
+import log
+import result
+import state
+import test
+import terminal
+
+from config import config, constants
+
+
+class _TestStreamManager(object):
+ def __init__(self):
+ self._writers = {}
+
+ def open_writer(self, test_result):
+ if test_result in self._writers:
+ raise ValueError('Cannot have multiple writters on a single test.')
+ self._writers[test_result] = _TestStreams(test_result.stdout,
+ test_result.stderr)
+
+ def get_writer(self, test_result):
+ if test_result not in self._writers:
+ self.open_writer(test_result)
+ return self._writers[test_result]
+
+ def close_writer(self, test_result):
+ if test_result in self._writers:
+ writer = self._writers.pop(test_result)
+ writer.close()
+
+ def close(self):
+ for writer in self._writers.values():
+ writer.close()
+ self._writers.clear()
+
+class _TestStreams(object):
+ def __init__(self, stdout, stderr):
+ helper.mkdir_p(os.path.dirname(stdout))
+ helper.mkdir_p(os.path.dirname(stderr))
+ self.stdout = open(stdout, 'w')
+ self.stderr = open(stderr, 'w')
+
+ def close(self):
+ self.stdout.close()
+ self.stderr.close()
+
+class ResultHandler(log.Handler):
+ '''
+ Log handler which listens for test results and output saving data as
+ it is reported.
+
+ When the handler is closed it writes out test results in the python pickle
+ format.
+ '''
+ def __init__(self, schedule, directory):
+ '''
+ :param schedule: The entire schedule as a :class:`LoadedLibrary`
+ object.
+
+ :param directory: Directory to save test stdout/stderr and aggregate
+ results to.
+ '''
+ self.directory = directory
+ self.internal_results = result.InternalLibraryResults(schedule,
+ directory)
+ self.test_stream_manager = _TestStreamManager()
+ self._closed = False
+
+ self.mapping = {
+ log.LibraryStatus.type_id: self.handle_library_status,
+
+ log.SuiteResult.type_id: self.handle_suite_result,
+ log.TestResult.type_id: self.handle_test_result,
+
+ log.TestStderr.type_id: self.handle_stderr,
+ log.TestStdout.type_id: self.handle_stdout,
+ }
+
+ def handle(self, record):
+ if not self._closed:
+ self.mapping.get(record.type_id, lambda _:None)(record)
+
+ def handle_library_status(self, record):
+ if record['status'] in (state.Status.Complete, state.Status.Avoided):
+ self.test_stream_manager.close()
+
+ def handle_suite_result(self, record):
+ suite_result = self.internal_results.get_suite_result(
+ record['metadata'].uid)
+ suite_result.result = record['result']
+
+ def handle_test_result(self, record):
+ test_result = self._get_test_result(record)
+ test_result.result = record['result']
+
+ def handle_stderr(self, record):
+ self.test_stream_manager.get_writer(
+ self._get_test_result(record)
+ ).stderr.write(record['buffer'])
+
+ def handle_stdout(self, record):
+ self.test_stream_manager.get_writer(
+ self._get_test_result(record)
+ ).stdout.write(record['buffer'])
+
+ def _get_test_result(self, test_record):
+ return self.internal_results.get_test_result(
+ test_record['metadata'].uid,
+ test_record['metadata'].suite_uid)
+
+ def _save(self):
+ #FIXME Hardcoded path name
+ result.InternalSavedResults.save(
+ self.internal_results,
+ os.path.join(self.directory, constants.pickle_filename))
+ result.JUnitSavedResults.save(
+ self.internal_results,
+ os.path.join(self.directory, constants.xml_filename))
+
+ def close(self):
+ if self._closed:
+ return
+ self._closed = True
+ self._save()
+
+
+#TODO Change from a handler to an internal post processor so it can be used
+# to reprint results
+class SummaryHandler(log.Handler):
+ '''
+ A log handler which listens to the log for test results
+ and reports the aggregate results when closed.
+ '''
+ color = terminal.get_termcap()
+ reset = color.Normal
+ colormap = {
+ state.Result.Errored: color.Red,
+ state.Result.Failed: color.Red,
+ state.Result.Passed: color.Green,
+ state.Result.Skipped: color.Cyan,
+ }
+ sep_fmtkey = 'separator'
+ sep_fmtstr = '{%s}' % sep_fmtkey
+
+ def __init__(self):
+ self.mapping = {
+ log.TestResult.type_id: self.handle_testresult,
+ log.LibraryStatus.type_id: self.handle_library_status,
+ }
+ self._timer = helper.Timer()
+ self.results = []
+
+ def handle_library_status(self, record):
+ if record['status'] == state.Status.Building:
+ self._timer.restart()
+
+ def handle_testresult(self, record):
+ result = record['result'].value
+ if result in (state.Result.Skipped, state.Result.Failed,
+ state.Result.Passed, state.Result.Errored):
+ self.results.append(result)
+
+ def handle(self, record):
+ self.mapping.get(record.type_id, lambda _:None)(record)
+
+ def close(self):
+ print(self._display_summary())
+
+ def _display_summary(self):
+ most_severe_outcome = None
+ outcome_fmt = ' {count} {outcome}'
+ strings = []
+
+ outcome_count = [0] * len(state.Result.enums)
+ for result in self.results:
+ outcome_count[result] += 1
+
+ # Iterate over enums so they are in order of severity
+ for outcome in state.Result.enums:
+ outcome = getattr(state.Result, outcome)
+ count = outcome_count[outcome]
+ if count:
+ strings.append(outcome_fmt.format(count=count,
+ outcome=state.Result.enums[outcome]))
+ most_severe_outcome = outcome
+ string = ','.join(strings)
+ if most_severe_outcome is None:
+ string = ' No testing done'
+ most_severe_outcome = state.Result.Passed
+ else:
+ string = ' Results:' + string + ' in {:.2} seconds '.format(
+ self._timer.active_time())
+ string += ' '
+ return terminal.insert_separator(
+ string,
+ color=self.colormap[most_severe_outcome] + self.color.Bold)
+
+class TerminalHandler(log.Handler):
+ color = terminal.get_termcap()
+ verbosity_mapping = {
+ log.LogLevel.Warn: color.Yellow,
+ log.LogLevel.Error: color.Red,
+ }
+ default = color.Normal
+
+ def __init__(self, verbosity=log.LogLevel.Info, machine_only=False):
+ self.stream = verbosity >= log.LogLevel.Trace
+ self.verbosity = verbosity
+ self.machine_only = machine_only
+ self.mapping = {
+ log.TestResult.type_id: self.handle_testresult,
+ log.SuiteStatus.type_id: self.handle_suitestatus,
+ log.TestStatus.type_id: self.handle_teststatus,
+ log.TestStderr.type_id: self.handle_stderr,
+ log.TestStdout.type_id: self.handle_stdout,
+ log.TestMessage.type_id: self.handle_testmessage,
+ log.LibraryMessage.type_id: self.handle_librarymessage,
+ }
+
+ def _display_outcome(self, name, outcome, reason=None):
+ print(self.color.Bold
+ + SummaryHandler.colormap[outcome]
+ + name
+ + ' '
+ + state.Result.enums[outcome]
+ + SummaryHandler.reset)
+
+ if reason is not None:
+ log.test_log.info('')
+ log.test_log.info('Reason:')
+ log.test_log.info(reason)
+ log.test_log.info(terminal.separator('-'))
+
+ def handle_teststatus(self, record):
+ if record['status'] == state.Status.Running:
+ log.test_log.debug('Starting Test Case: %s' %\
+ record['metadata'].name)
+
+ def handle_testresult(self, record):
+ self._display_outcome(
+ 'Test: %s' % record['metadata'].name,
+ record['result'].value)
+
+ def handle_suitestatus(self, record):
+ if record['status'] == state.Status.Running:
+ log.test_log.debug('Starting Test Suite: %s ' %\
+ record['metadata'].name)
+
+ def handle_stderr(self, record):
+ if self.stream:
+ print(record.data['buffer'], file=sys.stderr, end='')
+
+ def handle_stdout(self, record):
+ if self.stream:
+ print(record.data['buffer'], file=sys.stdout, end='')
+
+ def handle_testmessage(self, record):
+ if self.stream:
+ print(self._colorize(record['message'], record['level']))
+
+ def handle_librarymessage(self, record):
+ if not self.machine_only or record.data.get('machine_readable', False):
+ print(self._colorize(record['message'], record['level'],
+ record['bold']))
+
+ def _colorize(self, message, level, bold=False):
+ return '%s%s%s%s' % (
+ self.color.Bold if bold else '',
+ self.verbosity_mapping.get(level, ''),
+ message,
+ self.default)
+
+ def handle(self, record):
+ if record.data.get('level', self.verbosity) > self.verbosity:
+ return
+ self.mapping.get(record.type_id, lambda _:None)(record)
+
+ def set_verbosity(self, verbosity):
+ self.verbosity = verbosity
+
+
+class PrintHandler(log.Handler):
+ def __init__(self):
+ pass
+
+ def handle(self, record):
+ print(str(record).rstrip())
+
+ def close(self):
+ pass
+
+
+class MultiprocessingHandlerWrapper(log.Handler):
+ '''
+ A handler class which forwards log records to subhandlers, enabling
+ logging across multiprocessing python processes.
+
+ The 'parent' side of the handler should execute either
+ :func:`async_process` or :func:`process` to forward
+ log records to subhandlers.
+ '''
+ def __init__(self, *subhandlers):
+ # Create thread to spin handing recipt of messages
+ # Create queue to push onto
+ self.queue = multiprocessing.Queue()
+ self.queue.cancel_join_thread()
+ self._shutdown = threading.Event()
+
+ # subhandlers should be accessed with the _handler_lock
+ self._handler_lock = threading.Lock()
+ self._subhandlers = subhandlers
+
+ def add_handler(self, handler):
+ self._handler_lock.acquire()
+ self._subhandlers = (handler, ) + self._subhandlers
+ self._handler_lock.release()
+
+ def _with_handlers(self, callback):
+ exception = None
+ self._handler_lock.acquire()
+ for handler in self._subhandlers:
+ # Prevent deadlock when using this handler by delaying
+ # exception raise until we get a chance to unlock.
+ try:
+ callback(handler)
+ except Exception as e:
+ exception = e
+ break
+ self._handler_lock.release()
+
+ if exception is not None:
+ raise exception
+
+ def async_process(self):
+ self.thread = threading.Thread(target=self.process)
+ self.thread.daemon = True
+ self.thread.start()
+
+ def process(self):
+ while not self._shutdown.is_set():
+ try:
+ item = self.queue.get(timeout=0.1)
+ self._handle(item)
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except EOFError:
+ return
+ except Queue.Empty:
+ continue
+
+ def _drain(self):
+ while True:
+ try:
+ item = self.queue.get(block=False)
+ self._handle(item)
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except EOFError:
+ return
+ except Queue.Empty:
+ return
+
+ def _handle(self, record):
+ self._with_handlers(lambda handler: handler.handle(record))
+
+ def handle(self, record):
+ self.queue.put(record)
+
+ def _close(self):
+ if hasattr(self, 'thread'):
+ self.thread.join()
+ _wrap(self._drain)
+ self._with_handlers(lambda handler: _wrap(handler.close))
+
+ # NOTE Python2 has an known bug which causes IOErrors to be raised
+ # if this shutdown doesn't go cleanly on both ends.
+ # This sleep adds some time for the sender threads on this process to
+ # finish pickling the object and complete shutdown after the queue is
+ # closed.
+ time.sleep(.2)
+ self.queue.close()
+ time.sleep(.2)
+
+ def close(self):
+ if not self._shutdown.is_set():
+ self._shutdown.set()
+ self._close()
+
+
+def _wrap(callback, *args, **kwargs):
+ try:
+ callback(*args, **kwargs)
+ except:
+ traceback.print_exc()