summaryrefslogtreecommitdiff
path: root/ext/testlib/handlers.py
blob: 7530f9bf1f66af8678086718d7b94f6c7c21aa8c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson

'''
Handlers for the testlib Log.


'''
from __future__ import print_function

import multiprocessing
import os
import Queue
import sys
import threading
import time
import traceback

import helper
import log
import result
import state
import test
import terminal

from config import config, constants


class _TestStreamManager(object):
    def __init__(self):
        self._writers = {}

    def open_writer(self, test_result):
        if test_result in self._writers:
            raise ValueError('Cannot have multiple writters on a single test.')
        self._writers[test_result] = _TestStreams(test_result.stdout,
                test_result.stderr)

    def get_writer(self, test_result):
        if test_result not in self._writers:
            self.open_writer(test_result)
        return self._writers[test_result]

    def close_writer(self, test_result):
        if test_result in self._writers:
            writer = self._writers.pop(test_result)
            writer.close()

    def close(self):
        for writer in self._writers.values():
            writer.close()
        self._writers.clear()

class _TestStreams(object):
    def __init__(self, stdout, stderr):
        helper.mkdir_p(os.path.dirname(stdout))
        helper.mkdir_p(os.path.dirname(stderr))
        self.stdout = open(stdout, 'w')
        self.stderr = open(stderr, 'w')

    def close(self):
        self.stdout.close()
        self.stderr.close()

class ResultHandler(log.Handler):
    '''
    Log handler which listens for test results and output saving data as
    it is reported.

    When the handler is closed it writes out test results in the python pickle
    format.
    '''
    def __init__(self, schedule, directory):
        '''
        :param schedule: The entire schedule as a :class:`LoadedLibrary`
            object.

        :param directory: Directory to save test stdout/stderr and aggregate
            results to.
        '''
        self.directory = directory
        self.internal_results = result.InternalLibraryResults(schedule,
                directory)
        self.test_stream_manager = _TestStreamManager()
        self._closed = False

        self.mapping = {
            log.LibraryStatus.type_id: self.handle_library_status,

            log.SuiteResult.type_id: self.handle_suite_result,
            log.TestResult.type_id: self.handle_test_result,

            log.TestStderr.type_id: self.handle_stderr,
            log.TestStdout.type_id: self.handle_stdout,
        }

    def handle(self, record):
        if not self._closed:
            self.mapping.get(record.type_id, lambda _:None)(record)

    def handle_library_status(self, record):
        if record['status'] in (state.Status.Complete, state.Status.Avoided):
            self.test_stream_manager.close()

    def handle_suite_result(self, record):
        suite_result = self.internal_results.get_suite_result(
                    record['metadata'].uid)
        suite_result.result = record['result']

    def handle_test_result(self, record):
        test_result = self._get_test_result(record)
        test_result.result = record['result']

    def handle_stderr(self, record):
        self.test_stream_manager.get_writer(
            self._get_test_result(record)
        ).stderr.write(record['buffer'])

    def handle_stdout(self, record):
        self.test_stream_manager.get_writer(
            self._get_test_result(record)
        ).stdout.write(record['buffer'])

    def _get_test_result(self, test_record):
        return self.internal_results.get_test_result(
                    test_record['metadata'].uid,
                    test_record['metadata'].suite_uid)

    def _save(self):
        #FIXME Hardcoded path name
        result.InternalSavedResults.save(
            self.internal_results,
            os.path.join(self.directory, constants.pickle_filename))
        result.JUnitSavedResults.save(
            self.internal_results,
            os.path.join(self.directory, constants.xml_filename))

    def close(self):
        if self._closed:
            return
        self._closed = True
        self._save()


#TODO Change from a handler to an internal post processor so it can be used
# to reprint results
class SummaryHandler(log.Handler):
    '''
    A log handler which listens to the log for test results
    and reports the aggregate results when closed.
    '''
    color = terminal.get_termcap()
    reset = color.Normal
    colormap = {
            state.Result.Errored: color.Red,
            state.Result.Failed: color.Red,
            state.Result.Passed: color.Green,
            state.Result.Skipped: color.Cyan,
    }
    sep_fmtkey = 'separator'
    sep_fmtstr = '{%s}' % sep_fmtkey

    def __init__(self):
        self.mapping = {
            log.TestResult.type_id: self.handle_testresult,
            log.LibraryStatus.type_id: self.handle_library_status,
        }
        self._timer = helper.Timer()
        self.results = []

    def handle_library_status(self, record):
        if record['status'] == state.Status.Building:
            self._timer.restart()

    def handle_testresult(self, record):
        result = record['result'].value
        if result in (state.Result.Skipped, state.Result.Failed,
                state.Result.Passed, state.Result.Errored):
            self.results.append(result)

    def handle(self, record):
        self.mapping.get(record.type_id, lambda _:None)(record)

    def close(self):
        print(self._display_summary())

    def _display_summary(self):
        most_severe_outcome = None
        outcome_fmt = ' {count} {outcome}'
        strings = []

        outcome_count = [0] * len(state.Result.enums)
        for result in self.results:
            outcome_count[result] += 1

        # Iterate over enums so they are in order of severity
        for outcome in state.Result.enums:
            outcome = getattr(state.Result, outcome)
            count  = outcome_count[outcome]
            if count:
                strings.append(outcome_fmt.format(count=count,
                        outcome=state.Result.enums[outcome]))
                most_severe_outcome = outcome
        string = ','.join(strings)
        if most_severe_outcome is None:
            string = ' No testing done'
            most_severe_outcome = state.Result.Passed
        else:
            string = ' Results:' + string + ' in {:.2} seconds '.format(
                    self._timer.active_time())
        string += ' '
        return terminal.insert_separator(
                string,
                color=self.colormap[most_severe_outcome] + self.color.Bold)

class TerminalHandler(log.Handler):
    color = terminal.get_termcap()
    verbosity_mapping = {
        log.LogLevel.Warn: color.Yellow,
        log.LogLevel.Error: color.Red,
    }
    default = color.Normal

    def __init__(self, verbosity=log.LogLevel.Info, machine_only=False):
        self.stream = verbosity >= log.LogLevel.Trace
        self.verbosity = verbosity
        self.machine_only = machine_only
        self.mapping = {
            log.TestResult.type_id: self.handle_testresult,
            log.SuiteStatus.type_id: self.handle_suitestatus,
            log.TestStatus.type_id: self.handle_teststatus,
            log.TestStderr.type_id: self.handle_stderr,
            log.TestStdout.type_id: self.handle_stdout,
            log.TestMessage.type_id: self.handle_testmessage,
            log.LibraryMessage.type_id: self.handle_librarymessage,
        }

    def _display_outcome(self, name, outcome, reason=None):
        print(self.color.Bold
                 + SummaryHandler.colormap[outcome]
                 + name
                 + ' '
                 + state.Result.enums[outcome]
                 + SummaryHandler.reset)

        if reason is not None:
            log.test_log.info('')
            log.test_log.info('Reason:')
            log.test_log.info(reason)
            log.test_log.info(terminal.separator('-'))

    def handle_teststatus(self, record):
        if record['status'] == state.Status.Running:
            log.test_log.debug('Starting Test Case: %s' %\
                    record['metadata'].name)

    def handle_testresult(self, record):
        self._display_outcome(
            'Test: %s'  % record['metadata'].name,
            record['result'].value)

    def handle_suitestatus(self, record):
        if record['status'] == state.Status.Running:
              log.test_log.debug('Starting Test Suite: %s ' %\
                    record['metadata'].name)

    def handle_stderr(self, record):
        if self.stream:
            print(record.data['buffer'], file=sys.stderr, end='')

    def handle_stdout(self, record):
        if self.stream:
            print(record.data['buffer'], file=sys.stdout, end='')

    def handle_testmessage(self, record):
        if self.stream:
            print(self._colorize(record['message'], record['level']))

    def handle_librarymessage(self, record):
        if not self.machine_only or record.data.get('machine_readable', False):
            print(self._colorize(record['message'], record['level'],
                    record['bold']))

    def _colorize(self, message, level, bold=False):
        return '%s%s%s%s' % (
                self.color.Bold if bold else '',
                self.verbosity_mapping.get(level, ''),
                message,
                self.default)

    def handle(self, record):
        if record.data.get('level', self.verbosity) > self.verbosity:
            return
        self.mapping.get(record.type_id, lambda _:None)(record)

    def set_verbosity(self, verbosity):
        self.verbosity = verbosity


class PrintHandler(log.Handler):
    def __init__(self):
        pass

    def handle(self, record):
        print(str(record).rstrip())

    def close(self):
        pass


class MultiprocessingHandlerWrapper(log.Handler):
    '''
    A handler class which forwards log records to subhandlers, enabling
    logging across multiprocessing python processes.

    The 'parent' side of the handler should execute either
    :func:`async_process` or :func:`process` to forward
    log records to subhandlers.
    '''
    def __init__(self, *subhandlers):
        # Create thread to spin handing recipt of messages
        # Create queue to push onto
        self.queue = multiprocessing.Queue()
        self.queue.cancel_join_thread()
        self._shutdown = threading.Event()

        # subhandlers should be accessed with the _handler_lock
        self._handler_lock = threading.Lock()
        self._subhandlers = subhandlers

    def add_handler(self, handler):
        self._handler_lock.acquire()
        self._subhandlers = (handler, ) + self._subhandlers
        self._handler_lock.release()

    def _with_handlers(self, callback):
        exception = None
        self._handler_lock.acquire()
        for handler in self._subhandlers:
            # Prevent deadlock when using this handler by delaying
            # exception raise until we get a chance to unlock.
            try:
                callback(handler)
            except Exception as e:
                exception = e
                break
        self._handler_lock.release()

        if exception is not None:
            raise exception

    def async_process(self):
        self.thread = threading.Thread(target=self.process)
        self.thread.daemon = True
        self.thread.start()

    def process(self):
        while not self._shutdown.is_set():
            try:
                item = self.queue.get(timeout=0.1)
                self._handle(item)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                return
            except Queue.Empty:
                continue

    def _drain(self):
        while True:
            try:
                item = self.queue.get(block=False)
                self._handle(item)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                return
            except Queue.Empty:
                return

    def _handle(self, record):
        self._with_handlers(lambda handler: handler.handle(record))

    def handle(self, record):
        self.queue.put(record)

    def _close(self):
        if hasattr(self, 'thread'):
            self.thread.join()
        _wrap(self._drain)
        self._with_handlers(lambda handler: _wrap(handler.close))

        # NOTE Python2 has an known bug which causes IOErrors to be raised
        # if this shutdown doesn't go cleanly on both ends.
        # This sleep adds some time for the sender threads on this process to
        # finish pickling the object and complete shutdown after the queue is
        # closed.
        time.sleep(.2)
        self.queue.close()
        time.sleep(.2)

    def close(self):
        if not self._shutdown.is_set():
            self._shutdown.set()
            self._close()


def _wrap(callback, *args, **kwargs):
    try:
        callback(*args, **kwargs)
    except:
        traceback.print_exc()