From 4710c53dcad1ebf3755f3efb9e80ac24bd72a9b2 Mon Sep 17 00:00:00 2001 From: darylm503 Date: Mon, 16 Apr 2012 22:12:42 +0000 Subject: AppPkg/Applications/Python: Add Python 2.7.2 sources since the release of Python 2.7.3 made them unavailable from the python.org web site. These files are a subset of the python-2.7.2.tgz distribution from python.org. Changed files from PyMod-2.7.2 have been copied into the corresponding directories of this tree, replacing the original files in the distribution. Signed-off-by: daryl.mcdaniel@intel.com git-svn-id: https://edk2.svn.sourceforge.net/svnroot/edk2/trunk/edk2@13197 6f19259b-4bc3-4df7-8a09-765794883524 --- .../Python/Python-2.7.2/Lib/test/test_ssl.py | 1360 ++++++++++++++++++++ 1 file changed, 1360 insertions(+) create mode 100644 AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_ssl.py (limited to 'AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_ssl.py') diff --git a/AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_ssl.py b/AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_ssl.py new file mode 100644 index 0000000000..1d2fc45e5e --- /dev/null +++ b/AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_ssl.py @@ -0,0 +1,1360 @@ +# Test the support for SSL and sockets + +import sys +import unittest +from test import test_support +import asyncore +import socket +import select +import time +import gc +import os +import errno +import pprint +import urllib, urlparse +import traceback +import weakref +import functools +import platform + +from BaseHTTPServer import HTTPServer +from SimpleHTTPServer import SimpleHTTPRequestHandler + +ssl = test_support.import_module("ssl") + +HOST = test_support.HOST +CERTFILE = None +SVN_PYTHON_ORG_ROOT_CERT = None + +def handle_error(prefix): + exc_format = ' '.join(traceback.format_exception(*sys.exc_info())) + if test_support.verbose: + sys.stdout.write(prefix + exc_format) + + +class BasicTests(unittest.TestCase): + + def test_sslwrap_simple(self): + # A crude test for the legacy API + try: + ssl.sslwrap_simple(socket.socket(socket.AF_INET)) + except IOError, e: + if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that + pass + else: + raise + try: + ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock) + except IOError, e: + if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that + pass + else: + raise + +# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2 +def skip_if_broken_ubuntu_ssl(func): + if hasattr(ssl, 'PROTOCOL_SSLv2'): + # We need to access the lower-level wrapper in order to create an + # implicit SSL context without trying to connect or listen. + try: + import _ssl + except ImportError: + # The returned function won't get executed, just ignore the error + pass + @functools.wraps(func) + def f(*args, **kwargs): + try: + s = socket.socket(socket.AF_INET) + _ssl.sslwrap(s._sock, 0, None, None, + ssl.CERT_NONE, ssl.PROTOCOL_SSLv2, None, None) + except ssl.SSLError as e: + if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and + platform.linux_distribution() == ('debian', 'squeeze/sid', '') + and 'Invalid SSL protocol variant specified' in str(e)): + raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour") + return func(*args, **kwargs) + return f + else: + return func + + +class BasicSocketTests(unittest.TestCase): + + def test_constants(self): + #ssl.PROTOCOL_SSLv2 + ssl.PROTOCOL_SSLv23 + ssl.PROTOCOL_SSLv3 + ssl.PROTOCOL_TLSv1 + ssl.CERT_NONE + ssl.CERT_OPTIONAL + ssl.CERT_REQUIRED + + def test_random(self): + v = ssl.RAND_status() + if test_support.verbose: + sys.stdout.write("\n RAND_status is %d (%s)\n" + % (v, (v and "sufficient randomness") or + "insufficient randomness")) + try: + ssl.RAND_egd(1) + except TypeError: + pass + else: + print "didn't raise TypeError" + ssl.RAND_add("this is a random string", 75.0) + + def test_parse_cert(self): + # note that this uses an 'unofficial' function in _ssl.c, + # provided solely for this test, to exercise the certificate + # parsing code + p = ssl._ssl._test_decode_cert(CERTFILE, False) + if test_support.verbose: + sys.stdout.write("\n" + pprint.pformat(p) + "\n") + + def test_DER_to_PEM(self): + with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f: + pem = f.read() + d1 = ssl.PEM_cert_to_DER_cert(pem) + p2 = ssl.DER_cert_to_PEM_cert(d1) + d2 = ssl.PEM_cert_to_DER_cert(p2) + self.assertEqual(d1, d2) + if not p2.startswith(ssl.PEM_HEADER + '\n'): + self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2) + if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'): + self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2) + + def test_openssl_version(self): + n = ssl.OPENSSL_VERSION_NUMBER + t = ssl.OPENSSL_VERSION_INFO + s = ssl.OPENSSL_VERSION + self.assertIsInstance(n, (int, long)) + self.assertIsInstance(t, tuple) + self.assertIsInstance(s, str) + # Some sanity checks follow + # >= 0.9 + self.assertGreaterEqual(n, 0x900000) + # < 2.0 + self.assertLess(n, 0x20000000) + major, minor, fix, patch, status = t + self.assertGreaterEqual(major, 0) + self.assertLess(major, 2) + self.assertGreaterEqual(minor, 0) + self.assertLess(minor, 256) + self.assertGreaterEqual(fix, 0) + self.assertLess(fix, 256) + self.assertGreaterEqual(patch, 0) + self.assertLessEqual(patch, 26) + self.assertGreaterEqual(status, 0) + self.assertLessEqual(status, 15) + # Version string as returned by OpenSSL, the format might change + self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)), + (s, t)) + + def test_ciphers(self): + if not test_support.is_resource_enabled('network'): + return + remote = ("svn.python.org", 443) + with test_support.transient_internet(remote[0]): + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE, ciphers="ALL") + s.connect(remote) + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") + s.connect(remote) + # Error checking occurs when connecting, because the SSL context + # isn't created before. + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx") + with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"): + s.connect(remote) + + @test_support.cpython_only + def test_refcycle(self): + # Issue #7943: an SSL object doesn't create reference cycles with + # itself. + s = socket.socket(socket.AF_INET) + ss = ssl.wrap_socket(s) + wr = weakref.ref(ss) + del ss + self.assertEqual(wr(), None) + + def test_wrapped_unconnected(self): + # The _delegate_methods in socket.py are correctly delegated to by an + # unconnected SSLSocket, so they will raise a socket.error rather than + # something unexpected like TypeError. + s = socket.socket(socket.AF_INET) + ss = ssl.wrap_socket(s) + self.assertRaises(socket.error, ss.recv, 1) + self.assertRaises(socket.error, ss.recv_into, bytearray(b'x')) + self.assertRaises(socket.error, ss.recvfrom, 1) + self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1) + self.assertRaises(socket.error, ss.send, b'x') + self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0)) + + +class NetworkedTests(unittest.TestCase): + + def test_connect(self): + with test_support.transient_internet("svn.python.org"): + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE) + s.connect(("svn.python.org", 443)) + c = s.getpeercert() + if c: + self.fail("Peer cert %s shouldn't be here!") + s.close() + + # this should fail because we have no verification certs + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED) + try: + s.connect(("svn.python.org", 443)) + except ssl.SSLError: + pass + finally: + s.close() + + # this should succeed because we specify the root cert + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + ca_certs=SVN_PYTHON_ORG_ROOT_CERT) + try: + s.connect(("svn.python.org", 443)) + finally: + s.close() + + def test_connect_ex(self): + # Issue #11326: check connect_ex() implementation + with test_support.transient_internet("svn.python.org"): + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + ca_certs=SVN_PYTHON_ORG_ROOT_CERT) + try: + self.assertEqual(0, s.connect_ex(("svn.python.org", 443))) + self.assertTrue(s.getpeercert()) + finally: + s.close() + + def test_non_blocking_connect_ex(self): + # Issue #11326: non-blocking connect_ex() should allow handshake + # to proceed after the socket gets ready. + with test_support.transient_internet("svn.python.org"): + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + ca_certs=SVN_PYTHON_ORG_ROOT_CERT, + do_handshake_on_connect=False) + try: + s.setblocking(False) + rc = s.connect_ex(('svn.python.org', 443)) + # EWOULDBLOCK under Windows, EINPROGRESS elsewhere + self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK)) + # Wait for connect to finish + select.select([], [s], [], 5.0) + # Non-blocking handshake + while True: + try: + s.do_handshake() + break + except ssl.SSLError as err: + if err.args[0] == ssl.SSL_ERROR_WANT_READ: + select.select([s], [], [], 5.0) + elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: + select.select([], [s], [], 5.0) + else: + raise + # SSL established + self.assertTrue(s.getpeercert()) + finally: + s.close() + + @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows") + def test_makefile_close(self): + # Issue #5238: creating a file-like object with makefile() shouldn't + # delay closing the underlying "real socket" (here tested with its + # file descriptor, hence skipping the test under Windows). + with test_support.transient_internet("svn.python.org"): + ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) + ss.connect(("svn.python.org", 443)) + fd = ss.fileno() + f = ss.makefile() + f.close() + # The fd is still open + os.read(fd, 0) + # Closing the SSL socket should close the fd too + ss.close() + gc.collect() + with self.assertRaises(OSError) as e: + os.read(fd, 0) + self.assertEqual(e.exception.errno, errno.EBADF) + + def test_non_blocking_handshake(self): + with test_support.transient_internet("svn.python.org"): + s = socket.socket(socket.AF_INET) + s.connect(("svn.python.org", 443)) + s.setblocking(False) + s = ssl.wrap_socket(s, + cert_reqs=ssl.CERT_NONE, + do_handshake_on_connect=False) + count = 0 + while True: + try: + count += 1 + s.do_handshake() + break + except ssl.SSLError, err: + if err.args[0] == ssl.SSL_ERROR_WANT_READ: + select.select([s], [], []) + elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: + select.select([], [s], []) + else: + raise + s.close() + if test_support.verbose: + sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count) + + def test_get_server_certificate(self): + with test_support.transient_internet("svn.python.org"): + pem = ssl.get_server_certificate(("svn.python.org", 443)) + if not pem: + self.fail("No server certificate on svn.python.org:443!") + + try: + pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE) + except ssl.SSLError: + #should fail + pass + else: + self.fail("Got server certificate %s for svn.python.org!" % pem) + + pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT) + if not pem: + self.fail("No server certificate on svn.python.org:443!") + if test_support.verbose: + sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem) + + def test_algorithms(self): + # Issue #8484: all algorithms should be available when verifying a + # certificate. + # SHA256 was added in OpenSSL 0.9.8 + if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15): + self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION) + # NOTE: https://sha256.tbs-internet.com is another possible test host + remote = ("sha256.tbs-internet.com", 443) + sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem") + with test_support.transient_internet("sha256.tbs-internet.com"): + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + ca_certs=sha256_cert,) + try: + s.connect(remote) + if test_support.verbose: + sys.stdout.write("\nCipher with %r is %r\n" % + (remote, s.cipher())) + sys.stdout.write("Certificate is:\n%s\n" % + pprint.pformat(s.getpeercert())) + finally: + s.close() + + +try: + import threading +except ImportError: + _have_threads = False +else: + _have_threads = True + + class ThreadedEchoServer(threading.Thread): + + class ConnectionHandler(threading.Thread): + + """A mildly complicated class, because we want it to work both + with and without the SSL wrapper around the socket connection, so + that we can test the STARTTLS functionality.""" + + def __init__(self, server, connsock): + self.server = server + self.running = False + self.sock = connsock + self.sock.setblocking(1) + self.sslconn = None + threading.Thread.__init__(self) + self.daemon = True + + def show_conn_details(self): + if self.server.certreqs == ssl.CERT_REQUIRED: + cert = self.sslconn.getpeercert() + if test_support.verbose and self.server.chatty: + sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n") + cert_binary = self.sslconn.getpeercert(True) + if test_support.verbose and self.server.chatty: + sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n") + cipher = self.sslconn.cipher() + if test_support.verbose and self.server.chatty: + sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n") + + def wrap_conn(self): + try: + self.sslconn = ssl.wrap_socket(self.sock, server_side=True, + certfile=self.server.certificate, + ssl_version=self.server.protocol, + ca_certs=self.server.cacerts, + cert_reqs=self.server.certreqs, + ciphers=self.server.ciphers) + except ssl.SSLError: + # XXX Various errors can have happened here, for example + # a mismatching protocol version, an invalid certificate, + # or a low-level bug. This should be made more discriminating. + if self.server.chatty: + handle_error("\n server: bad connection attempt from " + + str(self.sock.getpeername()) + ":\n") + self.close() + self.running = False + self.server.stop() + return False + else: + return True + + def read(self): + if self.sslconn: + return self.sslconn.read() + else: + return self.sock.recv(1024) + + def write(self, bytes): + if self.sslconn: + return self.sslconn.write(bytes) + else: + return self.sock.send(bytes) + + def close(self): + if self.sslconn: + self.sslconn.close() + else: + self.sock._sock.close() + + def run(self): + self.running = True + if not self.server.starttls_server: + if isinstance(self.sock, ssl.SSLSocket): + self.sslconn = self.sock + elif not self.wrap_conn(): + return + self.show_conn_details() + while self.running: + try: + msg = self.read() + if not msg: + # eof, so quit this handler + self.running = False + self.close() + elif msg.strip() == 'over': + if test_support.verbose and self.server.connectionchatty: + sys.stdout.write(" server: client closed connection\n") + self.close() + return + elif self.server.starttls_server and msg.strip() == 'STARTTLS': + if test_support.verbose and self.server.connectionchatty: + sys.stdout.write(" server: read STARTTLS from client, sending OK...\n") + self.write("OK\n") + if not self.wrap_conn(): + return + elif self.server.starttls_server and self.sslconn and msg.strip() == 'ENDTLS': + if test_support.verbose and self.server.connectionchatty: + sys.stdout.write(" server: read ENDTLS from client, sending OK...\n") + self.write("OK\n") + self.sslconn.unwrap() + self.sslconn = None + if test_support.verbose and self.server.connectionchatty: + sys.stdout.write(" server: connection is now unencrypted...\n") + else: + if (test_support.verbose and + self.server.connectionchatty): + ctype = (self.sslconn and "encrypted") or "unencrypted" + sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n" + % (repr(msg), ctype, repr(msg.lower()), ctype)) + self.write(msg.lower()) + except ssl.SSLError: + if self.server.chatty: + handle_error("Test server failure:\n") + self.close() + self.running = False + # normally, we'd just stop here, but for the test + # harness, we want to stop the server + self.server.stop() + + def __init__(self, certificate, ssl_version=None, + certreqs=None, cacerts=None, + chatty=True, connectionchatty=False, starttls_server=False, + wrap_accepting_socket=False, ciphers=None): + + if ssl_version is None: + ssl_version = ssl.PROTOCOL_TLSv1 + if certreqs is None: + certreqs = ssl.CERT_NONE + self.certificate = certificate + self.protocol = ssl_version + self.certreqs = certreqs + self.cacerts = cacerts + self.ciphers = ciphers + self.chatty = chatty + self.connectionchatty = connectionchatty + self.starttls_server = starttls_server + self.sock = socket.socket() + self.flag = None + if wrap_accepting_socket: + self.sock = ssl.wrap_socket(self.sock, server_side=True, + certfile=self.certificate, + cert_reqs = self.certreqs, + ca_certs = self.cacerts, + ssl_version = self.protocol, + ciphers = self.ciphers) + if test_support.verbose and self.chatty: + sys.stdout.write(' server: wrapped server socket as %s\n' % str(self.sock)) + self.port = test_support.bind_port(self.sock) + self.active = False + threading.Thread.__init__(self) + self.daemon = True + + def start(self, flag=None): + self.flag = flag + threading.Thread.start(self) + + def run(self): + self.sock.settimeout(0.05) + self.sock.listen(5) + self.active = True + if self.flag: + # signal an event + self.flag.set() + while self.active: + try: + newconn, connaddr = self.sock.accept() + if test_support.verbose and self.chatty: + sys.stdout.write(' server: new connection from ' + + str(connaddr) + '\n') + handler = self.ConnectionHandler(self, newconn) + handler.start() + except socket.timeout: + pass + except KeyboardInterrupt: + self.stop() + self.sock.close() + + def stop(self): + self.active = False + + class AsyncoreEchoServer(threading.Thread): + + class EchoServer(asyncore.dispatcher): + + class ConnectionHandler(asyncore.dispatcher_with_send): + + def __init__(self, conn, certfile): + asyncore.dispatcher_with_send.__init__(self, conn) + self.socket = ssl.wrap_socket(conn, server_side=True, + certfile=certfile, + do_handshake_on_connect=False) + self._ssl_accepting = True + + def readable(self): + if isinstance(self.socket, ssl.SSLSocket): + while self.socket.pending() > 0: + self.handle_read_event() + return True + + def _do_ssl_handshake(self): + try: + self.socket.do_handshake() + except ssl.SSLError, err: + if err.args[0] in (ssl.SSL_ERROR_WANT_READ, + ssl.SSL_ERROR_WANT_WRITE): + return + elif err.args[0] == ssl.SSL_ERROR_EOF: + return self.handle_close() + raise + except socket.error, err: + if err.args[0] == errno.ECONNABORTED: + return self.handle_close() + else: + self._ssl_accepting = False + + def handle_read(self): + if self._ssl_accepting: + self._do_ssl_handshake() + else: + data = self.recv(1024) + if data and data.strip() != 'over': + self.send(data.lower()) + + def handle_close(self): + self.close() + if test_support.verbose: + sys.stdout.write(" server: closed connection %s\n" % self.socket) + + def handle_error(self): + raise + + def __init__(self, certfile): + self.certfile = certfile + asyncore.dispatcher.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.port = test_support.bind_port(self.socket) + self.listen(5) + + def handle_accept(self): + sock_obj, addr = self.accept() + if test_support.verbose: + sys.stdout.write(" server: new connection from %s:%s\n" %addr) + self.ConnectionHandler(sock_obj, self.certfile) + + def handle_error(self): + raise + + def __init__(self, certfile): + self.flag = None + self.active = False + self.server = self.EchoServer(certfile) + self.port = self.server.port + threading.Thread.__init__(self) + self.daemon = True + + def __str__(self): + return "<%s %s>" % (self.__class__.__name__, self.server) + + def start(self, flag=None): + self.flag = flag + threading.Thread.start(self) + + def run(self): + self.active = True + if self.flag: + self.flag.set() + while self.active: + asyncore.loop(0.05) + + def stop(self): + self.active = False + self.server.close() + + class SocketServerHTTPSServer(threading.Thread): + + class HTTPSServer(HTTPServer): + + def __init__(self, server_address, RequestHandlerClass, certfile): + HTTPServer.__init__(self, server_address, RequestHandlerClass) + # we assume the certfile contains both private key and certificate + self.certfile = certfile + self.allow_reuse_address = True + + def __str__(self): + return ('<%s %s:%s>' % + (self.__class__.__name__, + self.server_name, + self.server_port)) + + def get_request(self): + # override this to wrap socket with SSL + sock, addr = self.socket.accept() + sslconn = ssl.wrap_socket(sock, server_side=True, + certfile=self.certfile) + return sslconn, addr + + class RootedHTTPRequestHandler(SimpleHTTPRequestHandler): + # need to override translate_path to get a known root, + # instead of using os.curdir, since the test could be + # run from anywhere + + server_version = "TestHTTPS/1.0" + + root = None + + def translate_path(self, path): + """Translate a /-separated PATH to the local filename syntax. + + Components that mean special things to the local file system + (e.g. drive or directory names) are ignored. (XXX They should + probably be diagnosed.) + + """ + # abandon query parameters + path = urlparse.urlparse(path)[2] + path = os.path.normpath(urllib.unquote(path)) + words = path.split('/') + words = filter(None, words) + path = self.root + for word in words: + drive, word = os.path.splitdrive(word) + head, word = os.path.split(word) + if word in self.root: continue + path = os.path.join(path, word) + return path + + def log_message(self, format, *args): + + # we override this to suppress logging unless "verbose" + + if test_support.verbose: + sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" % + (self.server.server_address, + self.server.server_port, + self.request.cipher(), + self.log_date_time_string(), + format%args)) + + + def __init__(self, certfile): + self.flag = None + self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0] + self.server = self.HTTPSServer( + (HOST, 0), self.RootedHTTPRequestHandler, certfile) + self.port = self.server.server_port + threading.Thread.__init__(self) + self.daemon = True + + def __str__(self): + return "<%s %s>" % (self.__class__.__name__, self.server) + + def start(self, flag=None): + self.flag = flag + threading.Thread.start(self) + + def run(self): + if self.flag: + self.flag.set() + self.server.serve_forever(0.05) + + def stop(self): + self.server.shutdown() + + + def bad_cert_test(certfile): + """ + Launch a server with CERT_REQUIRED, and check that trying to + connect to it with the given client certificate fails. + """ + server = ThreadedEchoServer(CERTFILE, + certreqs=ssl.CERT_REQUIRED, + cacerts=CERTFILE, chatty=False) + flag = threading.Event() + server.start(flag) + # wait for it to start + flag.wait() + # try to connect + try: + try: + s = ssl.wrap_socket(socket.socket(), + certfile=certfile, + ssl_version=ssl.PROTOCOL_TLSv1) + s.connect((HOST, server.port)) + except ssl.SSLError, x: + if test_support.verbose: + sys.stdout.write("\nSSLError is %s\n" % x[1]) + except socket.error, x: + if test_support.verbose: + sys.stdout.write("\nsocket.error is %s\n" % x[1]) + else: + raise AssertionError("Use of invalid cert should have failed!") + finally: + server.stop() + server.join() + + def server_params_test(certfile, protocol, certreqs, cacertsfile, + client_certfile, client_protocol=None, indata="FOO\n", + ciphers=None, chatty=True, connectionchatty=False, + wrap_accepting_socket=False): + """ + Launch a server, connect a client to it and try various reads + and writes. + """ + server = ThreadedEchoServer(certfile, + certreqs=certreqs, + ssl_version=protocol, + cacerts=cacertsfile, + ciphers=ciphers, + chatty=chatty, + connectionchatty=connectionchatty, + wrap_accepting_socket=wrap_accepting_socket) + flag = threading.Event() + server.start(flag) + # wait for it to start + flag.wait() + # try to connect + if client_protocol is None: + client_protocol = protocol + try: + s = ssl.wrap_socket(socket.socket(), + certfile=client_certfile, + ca_certs=cacertsfile, + ciphers=ciphers, + cert_reqs=certreqs, + ssl_version=client_protocol) + s.connect((HOST, server.port)) + for arg in [indata, bytearray(indata), memoryview(indata)]: + if connectionchatty: + if test_support.verbose: + sys.stdout.write( + " client: sending %s...\n" % (repr(arg))) + s.write(arg) + outdata = s.read() + if connectionchatty: + if test_support.verbose: + sys.stdout.write(" client: read %s\n" % repr(outdata)) + if outdata != indata.lower(): + raise AssertionError( + "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" + % (outdata[:min(len(outdata),20)], len(outdata), + indata[:min(len(indata),20)].lower(), len(indata))) + s.write("over\n") + if connectionchatty: + if test_support.verbose: + sys.stdout.write(" client: closing connection.\n") + s.close() + finally: + server.stop() + server.join() + + def try_protocol_combo(server_protocol, + client_protocol, + expect_success, + certsreqs=None): + if certsreqs is None: + certsreqs = ssl.CERT_NONE + certtype = { + ssl.CERT_NONE: "CERT_NONE", + ssl.CERT_OPTIONAL: "CERT_OPTIONAL", + ssl.CERT_REQUIRED: "CERT_REQUIRED", + }[certsreqs] + if test_support.verbose: + formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n" + sys.stdout.write(formatstr % + (ssl.get_protocol_name(client_protocol), + ssl.get_protocol_name(server_protocol), + certtype)) + try: + # NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client + # will send an SSLv3 hello (rather than SSLv2) starting from + # OpenSSL 1.0.0 (see issue #8322). + server_params_test(CERTFILE, server_protocol, certsreqs, + CERTFILE, CERTFILE, client_protocol, + ciphers="ALL", chatty=False) + # Protocol mismatch can result in either an SSLError, or a + # "Connection reset by peer" error. + except ssl.SSLError: + if expect_success: + raise + except socket.error as e: + if expect_success or e.errno != errno.ECONNRESET: + raise + else: + if not expect_success: + raise AssertionError( + "Client protocol %s succeeded with server protocol %s!" + % (ssl.get_protocol_name(client_protocol), + ssl.get_protocol_name(server_protocol))) + + + class ThreadedTests(unittest.TestCase): + + def test_rude_shutdown(self): + """A brutal shutdown of an SSL server should raise an IOError + in the client when attempting handshake. + """ + listener_ready = threading.Event() + listener_gone = threading.Event() + + s = socket.socket() + port = test_support.bind_port(s, HOST) + + # `listener` runs in a thread. It sits in an accept() until + # the main thread connects. Then it rudely closes the socket, + # and sets Event `listener_gone` to let the main thread know + # the socket is gone. + def listener(): + s.listen(5) + listener_ready.set() + s.accept() + s.close() + listener_gone.set() + + def connector(): + listener_ready.wait() + c = socket.socket() + c.connect((HOST, port)) + listener_gone.wait() + try: + ssl_sock = ssl.wrap_socket(c) + except IOError: + pass + else: + self.fail('connecting to closed SSL socket should have failed') + + t = threading.Thread(target=listener) + t.start() + try: + connector() + finally: + t.join() + + @skip_if_broken_ubuntu_ssl + def test_echo(self): + """Basic test of an SSL client connecting to a server""" + if test_support.verbose: + sys.stdout.write("\n") + server_params_test(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE, + CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1, + chatty=True, connectionchatty=True) + + def test_getpeercert(self): + if test_support.verbose: + sys.stdout.write("\n") + s2 = socket.socket() + server = ThreadedEchoServer(CERTFILE, + certreqs=ssl.CERT_NONE, + ssl_version=ssl.PROTOCOL_SSLv23, + cacerts=CERTFILE, + chatty=False) + flag = threading.Event() + server.start(flag) + # wait for it to start + flag.wait() + # try to connect + try: + s = ssl.wrap_socket(socket.socket(), + certfile=CERTFILE, + ca_certs=CERTFILE, + cert_reqs=ssl.CERT_REQUIRED, + ssl_version=ssl.PROTOCOL_SSLv23) + s.connect((HOST, server.port)) + cert = s.getpeercert() + self.assertTrue(cert, "Can't get peer certificate.") + cipher = s.cipher() + if test_support.verbose: + sys.stdout.write(pprint.pformat(cert) + '\n') + sys.stdout.write("Connection cipher is " + str(cipher) + '.\n') + if 'subject' not in cert: + self.fail("No subject field in certificate: %s." % + pprint.pformat(cert)) + if ((('organizationName', 'Python Software Foundation'),) + not in cert['subject']): + self.fail( + "Missing or invalid 'organizationName' field in certificate subject; " + "should be 'Python Software Foundation'.") + s.close() + finally: + server.stop() + server.join() + + def test_empty_cert(self): + """Connecting with an empty cert file""" + bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, + "nullcert.pem")) + def test_malformed_cert(self): + """Connecting with a badly formatted certificate (syntax error)""" + bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, + "badcert.pem")) + def test_nonexisting_cert(self): + """Connecting with a non-existing cert file""" + bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, + "wrongcert.pem")) + def test_malformed_key(self): + """Connecting with a badly formatted key (syntax error)""" + bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, + "badkey.pem")) + + @skip_if_broken_ubuntu_ssl + def test_protocol_sslv2(self): + """Connecting to an SSLv2 server with various client options""" + if test_support.verbose: + sys.stdout.write("\n") + try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True) + try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL) + try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED) + try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True) + try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False) + try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False) + + @skip_if_broken_ubuntu_ssl + def test_protocol_sslv23(self): + """Connecting to an SSLv23 server with various client options""" + if test_support.verbose: + sys.stdout.write("\n") + try: + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True) + except (ssl.SSLError, socket.error), x: + # this fails on some older versions of OpenSSL (0.9.7l, for instance) + if test_support.verbose: + sys.stdout.write( + " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n" + % str(x)) + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True) + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True) + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True) + + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL) + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) + + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED) + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) + + @skip_if_broken_ubuntu_ssl + def test_protocol_sslv3(self): + """Connecting to an SSLv3 server with various client options""" + if test_support.verbose: + sys.stdout.write("\n") + try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True) + try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) + try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) + if hasattr(ssl, 'PROTOCOL_SSLv2'): + try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False) + try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False) + try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False) + + @skip_if_broken_ubuntu_ssl + def test_protocol_tlsv1(self): + """Connecting to a TLSv1 server with various client options""" + if test_support.verbose: + sys.stdout.write("\n") + try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True) + try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) + try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) + if hasattr(ssl, 'PROTOCOL_SSLv2'): + try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False) + try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False) + try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False) + + def test_starttls(self): + """Switching from clear text to encrypted and back again.""" + msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6") + + server = ThreadedEchoServer(CERTFILE, + ssl_version=ssl.PROTOCOL_TLSv1, + starttls_server=True, + chatty=True, + connectionchatty=True) + flag = threading.Event() + server.start(flag) + # wait for it to start + flag.wait() + # try to connect + wrapped = False + try: + s = socket.socket() + s.setblocking(1) + s.connect((HOST, server.port)) + if test_support.verbose: + sys.stdout.write("\n") + for indata in msgs: + if test_support.verbose: + sys.stdout.write( + " client: sending %s...\n" % repr(indata)) + if wrapped: + conn.write(indata) + outdata = conn.read() + else: + s.send(indata) + outdata = s.recv(1024) + if (indata == "STARTTLS" and + outdata.strip().lower().startswith("ok")): + # STARTTLS ok, switch to secure mode + if test_support.verbose: + sys.stdout.write( + " client: read %s from server, starting TLS...\n" + % repr(outdata)) + conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1) + wrapped = True + elif (indata == "ENDTLS" and + outdata.strip().lower().startswith("ok")): + # ENDTLS ok, switch back to clear text + if test_support.verbose: + sys.stdout.write( + " client: read %s from server, ending TLS...\n" + % repr(outdata)) + s = conn.unwrap() + wrapped = False + else: + if test_support.verbose: + sys.stdout.write( + " client: read %s from server\n" % repr(outdata)) + if test_support.verbose: + sys.stdout.write(" client: closing connection.\n") + if wrapped: + conn.write("over\n") + else: + s.send("over\n") + s.close() + finally: + server.stop() + server.join() + + def test_socketserver(self): + """Using a SocketServer to create and manage SSL connections.""" + server = SocketServerHTTPSServer(CERTFILE) + flag = threading.Event() + server.start(flag) + # wait for it to start + flag.wait() + # try to connect + try: + if test_support.verbose: + sys.stdout.write('\n') + with open(CERTFILE, 'rb') as f: + d1 = f.read() + d2 = '' + # now fetch the same data from the HTTPS server + url = 'https://127.0.0.1:%d/%s' % ( + server.port, os.path.split(CERTFILE)[1]) + with test_support.check_py3k_warnings(): + f = urllib.urlopen(url) + dlen = f.info().getheader("content-length") + if dlen and (int(dlen) > 0): + d2 = f.read(int(dlen)) + if test_support.verbose: + sys.stdout.write( + " client: read %d bytes from remote server '%s'\n" + % (len(d2), server)) + f.close() + self.assertEqual(d1, d2) + finally: + server.stop() + server.join() + + def test_wrapped_accept(self): + """Check the accept() method on SSL sockets.""" + if test_support.verbose: + sys.stdout.write("\n") + server_params_test(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED, + CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23, + chatty=True, connectionchatty=True, + wrap_accepting_socket=True) + + def test_asyncore_server(self): + """Check the example asyncore integration.""" + indata = "TEST MESSAGE of mixed case\n" + + if test_support.verbose: + sys.stdout.write("\n") + server = AsyncoreEchoServer(CERTFILE) + flag = threading.Event() + server.start(flag) + # wait for it to start + flag.wait() + # try to connect + try: + s = ssl.wrap_socket(socket.socket()) + s.connect(('127.0.0.1', server.port)) + if test_support.verbose: + sys.stdout.write( + " client: sending %s...\n" % (repr(indata))) + s.write(indata) + outdata = s.read() + if test_support.verbose: + sys.stdout.write(" client: read %s\n" % repr(outdata)) + if outdata != indata.lower(): + self.fail( + "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" + % (outdata[:min(len(outdata),20)], len(outdata), + indata[:min(len(indata),20)].lower(), len(indata))) + s.write("over\n") + if test_support.verbose: + sys.stdout.write(" client: closing connection.\n") + s.close() + finally: + server.stop() + # wait for server thread to end + server.join() + + def test_recv_send(self): + """Test recv(), send() and friends.""" + if test_support.verbose: + sys.stdout.write("\n") + + server = ThreadedEchoServer(CERTFILE, + certreqs=ssl.CERT_NONE, + ssl_version=ssl.PROTOCOL_TLSv1, + cacerts=CERTFILE, + chatty=True, + connectionchatty=False) + flag = threading.Event() + server.start(flag) + # wait for it to start + flag.wait() + # try to connect + s = ssl.wrap_socket(socket.socket(), + server_side=False, + certfile=CERTFILE, + ca_certs=CERTFILE, + cert_reqs=ssl.CERT_NONE, + ssl_version=ssl.PROTOCOL_TLSv1) + s.connect((HOST, server.port)) + try: + # helper methods for standardising recv* method signatures + def _recv_into(): + b = bytearray("\0"*100) + count = s.recv_into(b) + return b[:count] + + def _recvfrom_into(): + b = bytearray("\0"*100) + count, addr = s.recvfrom_into(b) + return b[:count] + + # (name, method, whether to expect success, *args) + send_methods = [ + ('send', s.send, True, []), + ('sendto', s.sendto, False, ["some.address"]), + ('sendall', s.sendall, True, []), + ] + recv_methods = [ + ('recv', s.recv, True, []), + ('recvfrom', s.recvfrom, False, ["some.address"]), + ('recv_into', _recv_into, True, []), + ('recvfrom_into', _recvfrom_into, False, []), + ] + data_prefix = u"PREFIX_" + + for meth_name, send_meth, expect_success, args in send_methods: + indata = data_prefix + meth_name + try: + send_meth(indata.encode('ASCII', 'strict'), *args) + outdata = s.read() + outdata = outdata.decode('ASCII', 'strict') + if outdata != indata.lower(): + self.fail( + "While sending with <<%s>> bad data " + "<<%r>> (%d) received; " + "expected <<%r>> (%d)\n" % ( + meth_name, outdata[:20], len(outdata), + indata[:20], len(indata) + ) + ) + except ValueError as e: + if expect_success: + self.fail( + "Failed to send with method <<%s>>; " + "expected to succeed.\n" % (meth_name,) + ) + if not str(e).startswith(meth_name): + self.fail( + "Method <<%s>> failed with unexpected " + "exception message: %s\n" % ( + meth_name, e + ) + ) + + for meth_name, recv_meth, expect_success, args in recv_methods: + indata = data_prefix + meth_name + try: + s.send(indata.encode('ASCII', 'strict')) + outdata = recv_meth(*args) + outdata = outdata.decode('ASCII', 'strict') + if outdata != indata.lower(): + self.fail( + "While receiving with <<%s>> bad data " + "<<%r>> (%d) received; " + "expected <<%r>> (%d)\n" % ( + meth_name, outdata[:20], len(outdata), + indata[:20], len(indata) + ) + ) + except ValueError as e: + if expect_success: + self.fail( + "Failed to receive with method <<%s>>; " + "expected to succeed.\n" % (meth_name,) + ) + if not str(e).startswith(meth_name): + self.fail( + "Method <<%s>> failed with unexpected " + "exception message: %s\n" % ( + meth_name, e + ) + ) + # consume data + s.read() + + s.write("over\n".encode("ASCII", "strict")) + s.close() + finally: + server.stop() + server.join() + + def test_handshake_timeout(self): + # Issue #5103: SSL handshake must respect the socket timeout + server = socket.socket(socket.AF_INET) + host = "127.0.0.1" + port = test_support.bind_port(server) + started = threading.Event() + finish = False + + def serve(): + server.listen(5) + started.set() + conns = [] + while not finish: + r, w, e = select.select([server], [], [], 0.1) + if server in r: + # Let the socket hang around rather than having + # it closed by garbage collection. + conns.append(server.accept()[0]) + + t = threading.Thread(target=serve) + t.start() + started.wait() + + try: + try: + c = socket.socket(socket.AF_INET) + c.settimeout(0.2) + c.connect((host, port)) + # Will attempt handshake and time out + self.assertRaisesRegexp(ssl.SSLError, "timed out", + ssl.wrap_socket, c) + finally: + c.close() + try: + c = socket.socket(socket.AF_INET) + c.settimeout(0.2) + c = ssl.wrap_socket(c) + # Will attempt handshake and time out + self.assertRaisesRegexp(ssl.SSLError, "timed out", + c.connect, (host, port)) + finally: + c.close() + finally: + finish = True + t.join() + server.close() + + +def test_main(verbose=False): + global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT + CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, + "keycert.pem") + SVN_PYTHON_ORG_ROOT_CERT = os.path.join( + os.path.dirname(__file__) or os.curdir, + "https_svn_python_org_root.pem") + + if (not os.path.exists(CERTFILE) or + not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)): + raise test_support.TestFailed("Can't read certificate files!") + + tests = [BasicTests, BasicSocketTests] + + if test_support.is_resource_enabled('network'): + tests.append(NetworkedTests) + + if _have_threads: + thread_info = test_support.threading_setup() + if thread_info and test_support.is_resource_enabled('network'): + tests.append(ThreadedTests) + + try: + test_support.run_unittest(*tests) + finally: + if _have_threads: + test_support.threading_cleanup(*thread_info) + +if __name__ == "__main__": + test_main() -- cgit v1.2.3