diff --git a/src/twisted/cred/test/test_cramauth.py b/src/twisted/cred/test/test_cramauth.py index 1ee0871..d3c8dc6 100644 --- a/src/twisted/cred/test/test_cramauth.py +++ b/src/twisted/cred/test/test_cramauth.py @@ -30,56 +30,6 @@ class CramMD5CredentialsTests(TestCase): self.assertEqual(chal, c.getChallenge()) - def test_checkPassword(self): - """ - When a valid response (which is a hex digest of the challenge that has - been encrypted by the user's shared secret) is set on the - L{CramMD5Credentials} that created the challenge, and C{checkPassword} - is called with the user's shared secret, it will return L{True}. - """ - c = CramMD5Credentials() - chal = c.getChallenge() - c.response = hexlify(HMAC(b'secret', chal).digest()) - self.assertTrue(c.checkPassword(b'secret')) - - - def test_noResponse(self): - """ - When there is no response set, calling C{checkPassword} will return - L{False}. - """ - c = CramMD5Credentials() - self.assertFalse(c.checkPassword(b'secret')) - - - def test_wrongPassword(self): - """ - When an invalid response is set on the L{CramMD5Credentials} (one that - is not the hex digest of the challenge, encrypted with the user's shared - secret) and C{checkPassword} is called with the user's correct shared - secret, it will return L{False}. - """ - c = CramMD5Credentials() - chal = c.getChallenge() - c.response = hexlify(HMAC(b'thewrongsecret', chal).digest()) - self.assertFalse(c.checkPassword(b'secret')) - - - def test_setResponse(self): - """ - When C{setResponse} is called with a string that is the username and - the hashed challenge separated with a space, they will be set on the - L{CramMD5Credentials}. - """ - c = CramMD5Credentials() - chal = c.getChallenge() - c.setResponse(b" ".join( - (b"squirrel", - hexlify(HMAC(b'supersecret', chal).digest())))) - self.assertTrue(c.checkPassword(b'supersecret')) - self.assertEqual(c.username, b"squirrel") - - def test_interface(self): """ L{CramMD5Credentials} implements the L{IUsernameHashedPassword} diff --git a/src/twisted/mail/test/test_imap.py b/src/twisted/mail/test/test_imap.py index 231140c..5d22daf 100644 --- a/src/twisted/mail/test/test_imap.py +++ b/src/twisted/mail/test/test_imap.py @@ -3881,25 +3881,6 @@ class AuthenticatorTests(IMAP4HelperMixin, unittest.TestCase): self.assertEqual(self.server.account, self.account) - def testFailedCramMD5(self): - self.server.challengers[b'CRAM-MD5'] = CramMD5Credentials - cAuth = imap4.CramMD5ClientAuthenticator(b'testuser') - self.client.registerAuthenticator(cAuth) - - def misauth(): - return self.client.authenticate(b'not the secret') - def authed(): - self.authenticated = 1 - def misauthed(): - self.authenticated = -1 - - d1 = self.connected.addCallback(strip(misauth)) - d1.addCallbacks(strip(authed), strip(misauthed)) - d1.addCallbacks(self._cbStopClient, self._ebGeneral) - d = defer.gatherResults([self.loopback(), d1]) - return d.addCallback(self._cbTestFailedCramMD5) - - def _cbTestFailedCramMD5(self, ignored): self.assertEqual(self.authenticated, -1) self.assertEqual(self.server.account, None) diff --git a/src/twisted/mail/test/test_pop3.py b/src/twisted/mail/test/test_pop3.py index 4a59c3b..d87b666 100644 --- a/src/twisted/mail/test/test_pop3.py +++ b/src/twisted/mail/test/test_pop3.py @@ -1069,44 +1069,6 @@ class TestRealm: -class SASLTests(unittest.TestCase): - """ - Tests for L{pop3.POP3}'s SASL implementation. - """ - def test_ValidLogin(self): - """ - A CRAM-MD5-based SASL login attempt succeeds if it uses a username and - a hashed password known to the server's credentials checker. - """ - p = pop3.POP3() - p.factory = TestServerFactory() - p.factory.challengers = {b'CRAM-MD5': - cred.credentials.CramMD5Credentials} - p.portal = cred.portal.Portal(TestRealm()) - ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse() - ch.addUser(b'testuser', b'testpassword') - p.portal.registerChecker(ch) - - s = BytesIO() - p.transport = internet.protocol.FileWrapper(s) - p.connectionMade() - - p.lineReceived(b"CAPA") - self.assertTrue(s.getvalue().find(b"SASL CRAM-MD5") >= 0) - - p.lineReceived(b"AUTH CRAM-MD5") - chal = s.getvalue().splitlines()[-1][2:] - chal = base64.decodestring(chal) - response = hmac.HMAC(b'testpassword', chal).hexdigest().encode("ascii") - - p.lineReceived( - base64.encodestring(b'testuser ' + response).rstrip(b'\n')) - self.assertTrue(p.mbox) - self.assertTrue(s.getvalue().splitlines()[-1].find(b"+OK") >= 0) - p.connectionLost(failure.Failure(Exception("Test harness disconnect"))) - - - class CommandMixin: """ Tests for all the commands a POP3 server is allowed to receive. diff --git a/src/twisted/mail/test/test_smtp.py b/src/twisted/mail/test/test_smtp.py index 777bc3e..7488cf2 100644 --- a/src/twisted/mail/test/test_smtp.py +++ b/src/twisted/mail/test/test_smtp.py @@ -7,34 +7,28 @@ Test cases for twisted.mail.smtp module. from __future__ import absolute_import, division -import inspect import base64 - +import inspect +import re from io import BytesIO -from zope.interface import implementer, directlyProvides - -from twisted.python.util import LineLog -from twisted.trial import unittest -from twisted.protocols import basic, loopback -from twisted.internet import defer, protocol, reactor, interfaces -from twisted.internet import address, error, task -from twisted.test.proto_helpers import MemoryReactor, StringTransport - -from twisted import cred -import twisted.cred.error -import twisted.cred.portal import twisted.cred.checkers import twisted.cred.credentials - -from twisted.cred.portal import IRealm, Portal -from twisted.cred.checkers import ICredentialsChecker, AllowAnonymousAccess +import twisted.cred.error +import twisted.cred.portal +from twisted import cred +from twisted.cred.checkers import AllowAnonymousAccess, ICredentialsChecker from twisted.cred.credentials import IAnonymous from twisted.cred.error import UnauthorizedLogin - +from twisted.cred.portal import IRealm, Portal +from twisted.internet import address, defer, error, interfaces, protocol, reactor, task from twisted.mail import smtp from twisted.mail._cred import LOGINCredentials - +from twisted.protocols import basic, loopback +from twisted.python.util import LineLog +from twisted.test.proto_helpers import MemoryReactor, StringTransport +from twisted.trial import unittest +from zope.interface import directlyProvides, implementer try: from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext @@ -43,14 +37,11 @@ except ImportError: else: sslSkip = None -import re - def spameater(*spam, **eggs): return None - @implementer(smtp.IMessage) class BrokenMessage(object): """ @@ -58,23 +49,20 @@ class BrokenMessage(object): from its C{eomReceived} method. This is useful for creating a server which can be used to test client retry behavior. """ + def __init__(self, user): pass - def lineReceived(self, line): pass - def eomReceived(self): raise RuntimeError("Some problem, delivery is failing.") - def connectionLost(self): pass - class DummyMessage(object): """ L{BrokenMessage} is an L{IMessage} which saves the message delivered to it @@ -83,78 +71,69 @@ class DummyMessage(object): @ivar domain: A L{DummyDomain} which will be used to store the message once it is received. """ + def __init__(self, domain, user): self.domain = domain self.user = user self.buffer = [] - def lineReceived(self, line): # Throw away the generated Received: header - if not re.match(b'Received: From yyy.com \(\[.*\]\) by localhost;', - line): + if not re.match(b"Received: From yyy.com \(\[.*\]\) by localhost;", line): self.buffer.append(line) - def eomReceived(self): - message = b'\n'.join(self.buffer) + b'\n' + message = b"\n".join(self.buffer) + b"\n" self.domain.messages[self.user.dest.local].append(message) deferred = defer.Deferred() deferred.callback(b"saved") return deferred - class DummyDomain(object): """ L{DummyDomain} is an L{IDomain} which keeps track of messages delivered to it in memory. """ + def __init__(self, names): self.messages = {} for name in names: self.messages[name] = [] - def exists(self, user): if user.dest.local in self.messages: return defer.succeed(lambda: DummyMessage(self, user)) return defer.fail(smtp.SMTPBadRcpt(user)) - -mail = b'''\ +mail = b"""\ Subject: hello Goodbye -''' +""" + class MyClient: def __init__(self, messageInfo=None): if messageInfo is None: - messageInfo = ( - 'moshez@foo.bar', ['moshez@foo.bar'], BytesIO(mail)) + messageInfo = ("moshez@foo.bar", ["moshez@foo.bar"], BytesIO(mail)) self._sender = messageInfo[0] self._recipient = messageInfo[1] self._data = messageInfo[2] - def getMailFrom(self): return self._sender - def getMailTo(self): return self._recipient - def getMailData(self): return self._data - def sendError(self, exc): self._error = exc - def sentMail(self, code, resp, numOk, addresses, log): # Prevent another mail from being sent. self._sender = None @@ -162,32 +141,31 @@ class MyClient: self._data = None - class MySMTPClient(MyClient, smtp.SMTPClient): def __init__(self, messageInfo=None): - smtp.SMTPClient.__init__(self, b'foo.baz') + smtp.SMTPClient.__init__(self, b"foo.baz") MyClient.__init__(self, messageInfo) - class MyESMTPClient(MyClient, smtp.ESMTPClient): - def __init__(self, secret = b'', contextFactory = None): - smtp.ESMTPClient.__init__(self, secret, contextFactory, b'foo.baz') + def __init__(self, secret=b"", contextFactory=None): + smtp.ESMTPClient.__init__(self, secret, contextFactory, b"foo.baz") MyClient.__init__(self) - class LoopbackMixin: def loopback(self, server, client): return loopback.loopbackTCP(server, client) - class FakeSMTPServer(basic.LineReceiver): clientData = [ - b'220 hello', b'250 nice to meet you', - b'250 great', b'250 great', b'354 go on, lad' + b"220 hello", + b"250 nice to meet you", + b"250 great", + b"250 great", + b"354 go on, lad", ] def connectionMade(self): @@ -196,7 +174,6 @@ class FakeSMTPServer(basic.LineReceiver): self.clientData.reverse() self.sendLine(self.clientData.pop()) - def lineReceived(self, line): self.buffer.append(line) if line == b"QUIT": @@ -211,7 +188,6 @@ class FakeSMTPServer(basic.LineReceiver): self.sendLine(self.clientData.pop()) - class SMTPClientTests(unittest.TestCase, LoopbackMixin): """ Tests for L{smtp.SMTPClient}. @@ -234,15 +210,21 @@ class SMTPClientTests(unittest.TestCase, LoopbackMixin): bytes(errors[0]), b"Timeout waiting for SMTP server response\n" b"<<< 220 hello\n" - b">>> HELO foo.baz\n") + b">>> HELO foo.baz\n", + ) expected_output = [ - b'HELO foo.baz', b'MAIL FROM:', - b'RCPT TO:', b'DATA', - b'Subject: hello', b'', b'Goodbye', b'.', b'RSET' + b"HELO foo.baz", + b"MAIL FROM:", + b"RCPT TO:", + b"DATA", + b"Subject: hello", + b"", + b"Goodbye", + b".", + b"RSET", ] - def test_messages(self): """ L{smtp.SMTPClient} sends I{HELO}, I{MAIL FROM}, I{RCPT TO}, and I{DATA} @@ -252,27 +234,26 @@ class SMTPClientTests(unittest.TestCase, LoopbackMixin): client = MySMTPClient() server = FakeSMTPServer() d = self.loopback(server, client) - d.addCallback(lambda x : - self.assertEqual(server.buffer, self.expected_output)) + d.addCallback(lambda x: self.assertEqual(server.buffer, self.expected_output)) return d - def test_transferError(self): """ If there is an error while producing the message body to the connection, the C{sendError} callback is invoked. """ client = MySMTPClient( - ('alice@example.com', ['bob@example.com'], BytesIO(b"foo"))) + ("alice@example.com", ["bob@example.com"], BytesIO(b"foo")) + ) transport = StringTransport() client.makeConnection(transport) client.dataReceived( - b'220 Ok\r\n' # Greeting - b'250 Ok\r\n' # EHLO response - b'250 Ok\r\n' # MAIL FROM response - b'250 Ok\r\n' # RCPT TO response - b'354 Ok\r\n' # DATA response - ) + b"220 Ok\r\n" # Greeting + b"250 Ok\r\n" # EHLO response + b"250 Ok\r\n" # MAIL FROM response + b"250 Ok\r\n" # RCPT TO response + b"354 Ok\r\n" # DATA response + ) # Sanity check - a pull producer should be registered now. self.assertNotIdentical(transport.producer, None) @@ -284,7 +265,6 @@ class SMTPClientTests(unittest.TestCase, LoopbackMixin): # The sendError hook should have been invoked as a result. self.assertIsInstance(client._error, Exception) - def test_sendFatalError(self): """ If L{smtp.SMTPClient.sendError} is called with an L{SMTPClientError} @@ -298,7 +278,6 @@ class SMTPClientTests(unittest.TestCase, LoopbackMixin): self.assertEqual(transport.value(), b"") self.assertTrue(transport.disconnecting) - def test_sendNonFatalError(self): """ If L{smtp.SMTPClient.sendError} is called with an L{SMTPClientError} @@ -312,7 +291,6 @@ class SMTPClientTests(unittest.TestCase, LoopbackMixin): self.assertEqual(transport.value(), b"QUIT\r\n") self.assertFalse(transport.disconnecting) - def test_sendOtherError(self): """ If L{smtp.SMTPClient.sendError} is called with an exception which is @@ -327,108 +305,115 @@ class SMTPClientTests(unittest.TestCase, LoopbackMixin): self.assertTrue(transport.disconnecting) - class DummySMTPMessage(object): - def __init__(self, protocol, users): self.protocol = protocol self.users = users self.buffer = [] - def lineReceived(self, line): self.buffer.append(line) - def eomReceived(self): - message = b'\n'.join(self.buffer) + b'\n' + message = b"\n".join(self.buffer) + b"\n" helo, origin = self.users[0].helo[0], bytes(self.users[0].orig) recipients = [] for user in self.users: recipients.append(bytes(user)) - self.protocol.message[tuple(recipients)] = (helo, origin, recipients, - message) + self.protocol.message[tuple(recipients)] = (helo, origin, recipients, message) return defer.succeed(b"saved") - class DummyProto: - def connectionMade(self): self.dummyMixinBase.connectionMade(self) self.message = {} - def receivedHeader(*spam): return None - def validateTo(self, user): self.delivery = SimpleDelivery(None) return lambda: DummySMTPMessage(self, [user]) - def validateFrom(self, helo, origin): return origin - class DummySMTP(DummyProto, smtp.SMTP): dummyMixinBase = smtp.SMTP - class DummyESMTP(DummyProto, smtp.ESMTP): dummyMixinBase = smtp.ESMTP - class AnotherTestCase: serverClass = None clientClass = None - messages = [ (b'foo.com', b'moshez@foo.com', [b'moshez@bar.com'], - b'moshez@foo.com', [b'moshez@bar.com'], b'''\ + messages = [ + ( + b"foo.com", + b"moshez@foo.com", + [b"moshez@bar.com"], + b"moshez@foo.com", + [b"moshez@bar.com"], + b"""\ From: Moshe To: Moshe Hi, how are you? -'''), - (b'foo.com', b'tttt@rrr.com', [b'uuu@ooo', b'yyy@eee'], - b'tttt@rrr.com', [b'uuu@ooo', b'yyy@eee'], b'''\ +""", + ), + ( + b"foo.com", + b"tttt@rrr.com", + [b"uuu@ooo", b"yyy@eee"], + b"tttt@rrr.com", + [b"uuu@ooo", b"yyy@eee"], + b"""\ Subject: pass ..rrrr.. -'''), - (b'foo.com', b'@this,@is,@ignored:foo@bar.com', - [b'@ignore,@this,@too:bar@foo.com'], - b'foo@bar.com', [b'bar@foo.com'], b'''\ +""", + ), + ( + b"foo.com", + b"@this,@is,@ignored:foo@bar.com", + [b"@ignore,@this,@too:bar@foo.com"], + b"foo@bar.com", + [b"bar@foo.com"], + b"""\ Subject: apa To: foo 123 . 456 -'''), - ] +""", + ), + ] data = [ - (b'', b'220.*\r\n$', None, None), - (b'HELO foo.com\r\n', b'250.*\r\n$', None, None), - (b'RSET\r\n', b'250.*\r\n$', None, None), - ] + (b"", b"220.*\r\n$", None, None), + (b"HELO foo.com\r\n", b"250.*\r\n$", None, None), + (b"RSET\r\n", b"250.*\r\n$", None, None), + ] for helo_, from_, to_, realfrom, realto, msg in messages: - data.append((b'MAIL FROM:<' + from_ + b'>\r\n', b'250.*\r\n', - None, None)) + data.append((b"MAIL FROM:<" + from_ + b">\r\n", b"250.*\r\n", None, None)) for rcpt in to_: - data.append((b'RCPT TO:<' + rcpt + b'>\r\n', b'250.*\r\n', - None, None)) - - data.append((b'DATA\r\n', b'354.*\r\n', - msg, (b'250.*\r\n', - (helo_, realfrom, realto, msg)))) - + data.append((b"RCPT TO:<" + rcpt + b">\r\n", b"250.*\r\n", None, None)) + + data.append( + ( + b"DATA\r\n", + b"354.*\r\n", + msg, + (b"250.*\r\n", (helo_, realfrom, realto, msg)), + ) + ) def test_buffer(self): """ @@ -439,8 +424,9 @@ To: foo """ transport = StringTransport() a = self.serverClass() + class fooFactory: - domain = b'foo.com' + domain = b"foo.com" a.factory = fooFactory() a.makeConnection(transport) @@ -451,12 +437,12 @@ To: foo transport.clear() if not re.match(expect, data): raise AssertionError(send, expect, data) - if data[:3] == b'354': + if data[:3] == b"354": for line in msg.splitlines(): - if line and line[0:1] == b'.': - line = b'.' + line - a.dataReceived(line + b'\r\n') - a.dataReceived(b'.\r\n') + if line and line[0:1] == b".": + line = b"." + line + a.dataReceived(line + b"\r\n") + a.dataReceived(b".\r\n") # Special case for DATA. Now we want a 250, and then # we compare the messages data = transport.value() @@ -467,34 +453,28 @@ To: foo for recip in msgdata[2]: expected = list(msgdata[:]) expected[2] = [recip] - self.assertEqual( - a.message[(recip,)], - tuple(expected) - ) + self.assertEqual(a.message[(recip,)], tuple(expected)) a.setTimeout(None) - class AnotherESMTPTests(AnotherTestCase, unittest.TestCase): serverClass = DummyESMTP clientClass = MyESMTPClient - class AnotherSMTPTests(AnotherTestCase, unittest.TestCase): serverClass = DummySMTP clientClass = MySMTPClient - @implementer(cred.checkers.ICredentialsChecker) class DummyChecker: - users = { - b'testuser': b'testpassword' - } + users = {b"testuser": b"testpassword"} - credentialInterfaces = (cred.credentials.IUsernamePassword, - cred.credentials.IUsernameHashedPassword) + credentialInterfaces = ( + cred.credentials.IUsernamePassword, + cred.credentials.IUsernameHashedPassword, + ) def requestAvatarId(self, credentials): return defer.maybeDeferred( @@ -507,59 +487,32 @@ class DummyChecker: raise cred.error.UnauthorizedLogin() - @implementer(smtp.IMessageDelivery) class SimpleDelivery(object): """ L{SimpleDelivery} is a message delivery factory with no interesting behavior. """ + def __init__(self, messageFactory): self._messageFactory = messageFactory - def receivedHeader(self, helo, origin, recipients): return None - def validateFrom(self, helo, origin): return origin - def validateTo(self, user): return lambda: self._messageFactory(user) - class DummyRealm: def requestAvatar(self, avatarId, mind, *interfaces): return smtp.IMessageDelivery, SimpleDelivery(None), lambda: None - class AuthTests(unittest.TestCase, LoopbackMixin): - def test_crammd5Auth(self): - """ - L{ESMTPClient} can authenticate using the I{CRAM-MD5} SASL mechanism. - - @see: U{http://tools.ietf.org/html/rfc2195} - """ - realm = DummyRealm() - p = cred.portal.Portal(realm) - p.registerChecker(DummyChecker()) - - server = DummyESMTP({b'CRAM-MD5': cred.credentials.CramMD5Credentials}) - server.portal = p - client = MyESMTPClient(b'testpassword') - - cAuth = smtp.CramMD5ClientAuthenticator(b'testuser') - client.registerAuthenticator(cAuth) - - d = self.loopback(server, client) - d.addCallback(lambda x: self.assertEqual(server.authenticated, 1)) - return d - - def test_loginAuth(self): """ L{ESMTPClient} can authenticate using the I{LOGIN} SASL mechanism. @@ -570,18 +523,17 @@ class AuthTests(unittest.TestCase, LoopbackMixin): p = cred.portal.Portal(realm) p.registerChecker(DummyChecker()) - server = DummyESMTP({b'LOGIN': LOGINCredentials}) + server = DummyESMTP({b"LOGIN": LOGINCredentials}) server.portal = p - client = MyESMTPClient(b'testpassword') + client = MyESMTPClient(b"testpassword") - cAuth = smtp.LOGINAuthenticator(b'testuser') + cAuth = smtp.LOGINAuthenticator(b"testuser") client.registerAuthenticator(cAuth) d = self.loopback(server, client) d.addCallback(lambda x: self.assertTrue(server.authenticated)) return d - def test_loginAgainstWeirdServer(self): """ When communicating with a server which implements the I{LOGIN} SASL @@ -593,11 +545,11 @@ class AuthTests(unittest.TestCase, LoopbackMixin): p = cred.portal.Portal(realm) p.registerChecker(DummyChecker()) - server = DummyESMTP({b'LOGIN': smtp.LOGINCredentials}) + server = DummyESMTP({b"LOGIN": smtp.LOGINCredentials}) server.portal = p - client = MyESMTPClient(b'testpassword') - cAuth = smtp.LOGINAuthenticator(b'testuser') + client = MyESMTPClient(b"testpassword") + cAuth = smtp.LOGINAuthenticator(b"testuser") client.registerAuthenticator(cAuth) d = self.loopback(server, client) @@ -605,49 +557,43 @@ class AuthTests(unittest.TestCase, LoopbackMixin): return d - class SMTPHelperTests(unittest.TestCase): def testMessageID(self): d = {} for i in range(1000): - m = smtp.messageid('testcase') + m = smtp.messageid("testcase") self.assertFalse(m in d) d[m] = None - def testQuoteAddr(self): cases = [ - [b'user@host.name', b''], - [b'"User Name" ', b''], - [smtp.Address(b'someguy@someplace'), b''], - [b'', b'<>'], - [smtp.Address(b''), b'<>'], + [b"user@host.name", b""], + [b'"User Name" ', b""], + [smtp.Address(b"someguy@someplace"), b""], + [b"", b"<>"], + [smtp.Address(b""), b"<>"], ] for (c, e) in cases: self.assertEqual(smtp.quoteaddr(c), e) - def testUser(self): - u = smtp.User(b'user@host', b'helo.host.name', None, None) - self.assertEqual(str(u), 'user@host') - + u = smtp.User(b"user@host", b"helo.host.name", None, None) + self.assertEqual(str(u), "user@host") def testXtextEncoding(self): cases = [ - (u'Hello world', b'Hello+20world'), - (u'Hello+world', b'Hello+2Bworld'), - (u'\0\1\2\3\4\5', b'+00+01+02+03+04+05'), - (u'e=mc2@example.com', b'e+3Dmc2@example.com') + (u"Hello world", b"Hello+20world"), + (u"Hello+world", b"Hello+2Bworld"), + (u"\0\1\2\3\4\5", b"+00+01+02+03+04+05"), + (u"e=mc2@example.com", b"e+3Dmc2@example.com"), ] for (case, expected) in cases: self.assertEqual(smtp.xtext_encode(case), (expected, len(case))) - self.assertEqual(case.encode('xtext'), expected) - self.assertEqual( - smtp.xtext_decode(expected), (case, len(expected))) - self.assertEqual(expected.decode('xtext'), case) - + self.assertEqual(case.encode("xtext"), expected) + self.assertEqual(smtp.xtext_decode(expected), (case, len(expected))) + self.assertEqual(expected.decode("xtext"), case) def test_encodeWithErrors(self): """ @@ -655,44 +601,35 @@ class SMTPHelperTests(unittest.TestCase): I{xtext} codec should produce the same result as not specifying the error policy. """ - text = u'Hello world' - self.assertEqual( - smtp.xtext_encode(text, 'strict'), - (text.encode('xtext'), len(text))) + text = u"Hello world" self.assertEqual( - text.encode('xtext', 'strict'), - text.encode('xtext')) - + smtp.xtext_encode(text, "strict"), (text.encode("xtext"), len(text)) + ) + self.assertEqual(text.encode("xtext", "strict"), text.encode("xtext")) def test_decodeWithErrors(self): """ Similar to L{test_encodeWithErrors}, but for C{bytes.decode}. """ - bytes = b'Hello world' - self.assertEqual( - smtp.xtext_decode(bytes, 'strict'), - (bytes.decode('xtext'), len(bytes))) + bytes = b"Hello world" self.assertEqual( - bytes.decode('xtext', 'strict'), - bytes.decode('xtext')) - + smtp.xtext_decode(bytes, "strict"), (bytes.decode("xtext"), len(bytes)) + ) + self.assertEqual(bytes.decode("xtext", "strict"), bytes.decode("xtext")) class NoticeTLSClient(MyESMTPClient): tls = False - def esmtpState_starttls(self, code, resp): MyESMTPClient.esmtpState_starttls(self, code, resp) self.tls = True - class TLSTests(unittest.TestCase, LoopbackMixin): if sslSkip is not None: skip = sslSkip - def testTLS(self): clientCTX = ClientTLSContext() serverCTX = ServerTLSContext() @@ -706,14 +643,13 @@ class TLSTests(unittest.TestCase, LoopbackMixin): return self.loopback(server, client).addCallback(check) + if not interfaces.IReactorSSL.providedBy(reactor): for case in (TLSTests,): case.skip = "Reactor doesn't support SSL" - class EmptyLineTests(unittest.TestCase): - def test_emptyLineSyntaxError(self): """ If L{smtp.SMTP} receives an empty line, it responds with a 500 error @@ -722,16 +658,15 @@ class EmptyLineTests(unittest.TestCase): proto = smtp.SMTP() transport = StringTransport() proto.makeConnection(transport) - proto.lineReceived(b'') + proto.lineReceived(b"") proto.setTimeout(None) out = transport.value().splitlines() self.assertEqual(len(out), 2) - self.assertTrue(out[0].startswith(b'220')) + self.assertTrue(out[0].startswith(b"220")) self.assertEqual(out[1], b"500 Error: bad syntax") - class TimeoutTests(unittest.TestCase, LoopbackMixin): """ Check that SMTP client factories correctly use the timeout. @@ -743,22 +678,23 @@ class TimeoutTests(unittest.TestCase, LoopbackMixin): """ clock = task.Clock() client = clientFactory.buildProtocol( - address.IPv4Address('TCP', 'example.net', 25)) + address.IPv4Address("TCP", "example.net", 25) + ) client.callLater = clock.callLater t = StringTransport() client.makeConnection(t) t.protocol = client + def check(ign): self.assertEqual(clock.seconds(), 0.5) - d = self.assertFailure(onDone, smtp.SMTPTimeoutError - ).addCallback(check) + + d = self.assertFailure(onDone, smtp.SMTPTimeoutError).addCallback(check) # The first call should not trigger the timeout clock.advance(0.1) # But this one should clock.advance(0.4) return d - def test_SMTPClientRecipientBytes(self): """ Test timeout for L{smtp.SMTPSenderFactory}: the response L{Deferred} @@ -766,36 +702,45 @@ class TimeoutTests(unittest.TestCase, LoopbackMixin): """ onDone = defer.Deferred() clientFactory = smtp.SMTPSenderFactory( - 'source@address', b'recipient@address', - BytesIO(b"Message body"), onDone, - retries=0, timeout=0.5) + "source@address", + b"recipient@address", + BytesIO(b"Message body"), + onDone, + retries=0, + timeout=0.5, + ) return self._timeoutTest(onDone, clientFactory) - def test_SMTPClientRecipientUnicode(self): """ Use a L{unicode} recipient. """ onDone = defer.Deferred() clientFactory = smtp.SMTPSenderFactory( - 'source@address', u'recipient@address', - BytesIO(b"Message body"), onDone, - retries=0, timeout=0.5) + "source@address", + u"recipient@address", + BytesIO(b"Message body"), + onDone, + retries=0, + timeout=0.5, + ) return self._timeoutTest(onDone, clientFactory) - def test_SMTPClientRecipientList(self): """ Use a L{list} of recipients. """ onDone = defer.Deferred() clientFactory = smtp.SMTPSenderFactory( - 'source@address', (u'recipient1@address', b'recipient2@address'), - BytesIO(b"Message body"), onDone, - retries=0, timeout=0.5) + "source@address", + (u"recipient1@address", b"recipient2@address"), + BytesIO(b"Message body"), + onDone, + retries=0, + timeout=0.5, + ) return self._timeoutTest(onDone, clientFactory) - def test_ESMTPClient(self): """ Test timeout for L{smtp.ESMTPSenderFactory}: the response L{Deferred} @@ -803,53 +748,65 @@ class TimeoutTests(unittest.TestCase, LoopbackMixin): """ onDone = defer.Deferred() clientFactory = smtp.ESMTPSenderFactory( - 'username', 'password', - 'source@address', 'recipient@address', - BytesIO(b"Message body"), onDone, - retries=0, timeout=0.5) + "username", + "password", + "source@address", + "recipient@address", + BytesIO(b"Message body"), + onDone, + retries=0, + timeout=0.5, + ) return self._timeoutTest(onDone, clientFactory) - def test_resetTimeoutWhileSending(self): """ The timeout is not allowed to expire after the server has accepted a DATA command and the client is actively sending data to it. """ + class SlowFile: """ A file-like which returns one byte from each read call until the specified number of bytes have been returned. """ + def __init__(self, size): self._size = size def read(self, max=None): if self._size: self._size -= 1 - return b'x' - return b'' + return b"x" + return b"" failed = [] onDone = defer.Deferred() onDone.addErrback(failed.append) clientFactory = smtp.SMTPSenderFactory( - 'source@address', 'recipient@address', - SlowFile(1), onDone, retries=0, timeout=3) + "source@address", + "recipient@address", + SlowFile(1), + onDone, + retries=0, + timeout=3, + ) clientFactory.domain = b"example.org" clock = task.Clock() client = clientFactory.buildProtocol( - address.IPv4Address('TCP', 'example.net', 25)) + address.IPv4Address("TCP", "example.net", 25) + ) client.callLater = clock.callLater transport = StringTransport() client.makeConnection(transport) client.dataReceived( - b"220 Ok\r\n" # Greet the client - b"250 Ok\r\n" # Respond to HELO - b"250 Ok\r\n" # Respond to MAIL FROM - b"250 Ok\r\n" # Respond to RCPT TO - b"354 Ok\r\n" # Respond to DATA - ) + b"220 Ok\r\n" # Greet the client + b"250 Ok\r\n" # Respond to HELO + b"250 Ok\r\n" # Respond to MAIL FROM + b"250 Ok\r\n" # Respond to RCPT TO + b"354 Ok\r\n" # Respond to DATA + ) # Now the client is producing data to the server. Any time # resumeProducing is called on the producer, the timeout should be @@ -888,8 +845,8 @@ class TimeoutTests(unittest.TestCase, LoopbackMixin): b".\r\n" # This RSET is just an implementation detail. It's nice, but this # test doesn't really care about it. - b"RSET\r\n") - + b"RSET\r\n", + ) class MultipleDeliveryFactorySMTPServerFactory(protocol.ServerFactory): @@ -899,21 +856,21 @@ class MultipleDeliveryFactorySMTPServerFactory(protocol.ServerFactory): factory is used for one connection and then discarded. Factories are used in the order they are supplied. """ + def __init__(self, messageFactories): self._messageFactories = messageFactories - def buildProtocol(self, addr): p = protocol.ServerFactory.buildProtocol(self, addr) p.delivery = SimpleDelivery(self._messageFactories.pop(0)) return p - class SMTPSenderFactoryTests(unittest.TestCase): """ Tests for L{smtp.SMTPSenderFactory}. """ + def test_removeCurrentProtocolWhenClientConnectionLost(self): """ L{smtp.SMTPSenderFactory} removes the current protocol when the client @@ -922,15 +879,13 @@ class SMTPSenderFactoryTests(unittest.TestCase): reactor = MemoryReactor() sentDeferred = defer.Deferred() clientFactory = smtp.SMTPSenderFactory( - "source@address", "recipient@address", - BytesIO(b"message"), sentDeferred) + "source@address", "recipient@address", BytesIO(b"message"), sentDeferred + ) connector = reactor.connectTCP("localhost", 25, clientFactory) clientFactory.buildProtocol(None) - clientFactory.clientConnectionLost(connector, - error.ConnectionDone("Bye.")) + clientFactory.clientConnectionLost(connector, error.ConnectionDone("Bye.")) self.assertEqual(clientFactory.currentProtocol, None) - def test_removeCurrentProtocolWhenClientConnectionFailed(self): """ L{smtp.SMTPSenderFactory} removes the current protocol when the client @@ -939,27 +894,26 @@ class SMTPSenderFactoryTests(unittest.TestCase): reactor = MemoryReactor() sentDeferred = defer.Deferred() clientFactory = smtp.SMTPSenderFactory( - "source@address", "recipient@address", - BytesIO(b"message"), sentDeferred) + "source@address", "recipient@address", BytesIO(b"message"), sentDeferred + ) connector = reactor.connectTCP("localhost", 25, clientFactory) clientFactory.buildProtocol(None) - clientFactory.clientConnectionFailed(connector, - error.ConnectionDone("Bye.")) + clientFactory.clientConnectionFailed(connector, error.ConnectionDone("Bye.")) self.assertEqual(clientFactory.currentProtocol, None) - class SMTPSenderFactoryRetryTests(unittest.TestCase): """ Tests for the retry behavior of L{smtp.SMTPSenderFactory}. """ + def test_retryAfterDisconnect(self): """ If the protocol created by L{SMTPSenderFactory} loses its connection before receiving confirmation of message delivery, it reconnects and tries to deliver the message again. """ - recipient = b'alice' + recipient = b"alice" message = b"some message text" domain = DummyDomain([recipient]) @@ -968,6 +922,7 @@ class SMTPSenderFactoryRetryTests(unittest.TestCase): An SMTP subclass which ensures that its transport will be disconnected before the test ends. """ + def makeConnection(innerSelf, transport): self.addCleanup(transport.loseConnection) smtp.SMTP.makeConnection(innerSelf, transport) @@ -975,11 +930,11 @@ class SMTPSenderFactoryRetryTests(unittest.TestCase): # Create a server which will fail the first message deliver attempt to # it with a 500 and a disconnect, but which will accept a message # delivered over the 2nd connection to it. - serverFactory = MultipleDeliveryFactorySMTPServerFactory([ - BrokenMessage, - lambda user: DummyMessage(domain, user)]) + serverFactory = MultipleDeliveryFactorySMTPServerFactory( + [BrokenMessage, lambda user: DummyMessage(domain, user)] + ) serverFactory.protocol = CleanSMTP - serverPort = reactor.listenTCP(0, serverFactory, interface='127.0.0.1') + serverPort = reactor.listenTCP(0, serverFactory, interface="127.0.0.1") serverHost = serverPort.getHost() self.addCleanup(serverPort.stopListening) @@ -987,11 +942,15 @@ class SMTPSenderFactoryRetryTests(unittest.TestCase): # server. sentDeferred = defer.Deferred() clientFactory = smtp.SMTPSenderFactory( - b"bob@example.org", recipient + b"@example.com", - BytesIO(message), sentDeferred) + b"bob@example.org", + recipient + b"@example.com", + BytesIO(message), + sentDeferred, + ) clientFactory.domain = b"example.org" clientConnector = reactor.connectTCP( - serverHost.host, serverHost.port, clientFactory) + serverHost.host, serverHost.port, clientFactory + ) self.addCleanup(clientConnector.disconnect) def cbSent(ignored): @@ -999,58 +958,54 @@ class SMTPSenderFactoryRetryTests(unittest.TestCase): Verify that the message was successfully delivered and flush the error which caused the first attempt to fail. """ - self.assertEqual( - domain.messages, - {recipient: [b"\n" + message + b"\n"]}) + self.assertEqual(domain.messages, {recipient: [b"\n" + message + b"\n"]}) # Flush the RuntimeError that BrokenMessage caused to be logged. self.assertEqual(len(self.flushLoggedErrors(RuntimeError)), 1) + sentDeferred.addCallback(cbSent) return sentDeferred - @implementer(IRealm) class SingletonRealm(object): """ Trivial realm implementation which is constructed with an interface and an avatar and returns that avatar when asked for that interface. """ + def __init__(self, interface, avatar): self.interface = interface self.avatar = avatar - def requestAvatar(self, avatarId, mind, *interfaces): for iface in interfaces: if iface is self.interface: return iface, self.avatar, lambda: None - class NotImplementedDelivery(object): """ Non-implementation of L{smtp.IMessageDelivery} which only has methods which raise L{NotImplementedError}. Subclassed by various tests to provide the particular behavior being tested. """ + def validateFrom(self, helo, origin): raise NotImplementedError("This oughtn't be called in the course of this test.") - def validateTo(self, user): raise NotImplementedError("This oughtn't be called in the course of this test.") - def receivedHeader(self, helo, origin, recipients): raise NotImplementedError("This oughtn't be called in the course of this test.") - class SMTPServerTests(unittest.TestCase): """ Test various behaviors of L{twisted.mail.smtp.SMTP} and L{twisted.mail.smtp.ESMTP}. """ + def testSMTPGreetingHost(self, serverClass=smtp.SMTP): """ Test that the specified hostname shows up in the SMTP server's @@ -1063,7 +1018,6 @@ class SMTPServerTests(unittest.TestCase): s.connectionLost(error.ConnectionDone()) self.assertIn(b"example.com", t.value()) - def testSMTPGreetingNotExtended(self): """ Test that the string "ESMTP" does not appear in the SMTP server's @@ -1076,14 +1030,12 @@ class SMTPServerTests(unittest.TestCase): s.connectionLost(error.ConnectionDone()) self.assertNotIn(b"ESMTP", t.value()) - def testESMTPGreetingHost(self): """ Similar to testSMTPGreetingHost, but for the L{smtp.ESMTP} class. """ self.testSMTPGreetingHost(smtp.ESMTP) - def testESMTPGreetingExtended(self): """ Test that the string "ESMTP" does appear in the ESMTP server's @@ -1096,7 +1048,6 @@ class SMTPServerTests(unittest.TestCase): s.connectionLost(error.ConnectionDone()) self.assertIn(b"ESMTP", t.value()) - def test_SMTPUnknownCommand(self): """ Sending an unimplemented command is responded to with a 500. @@ -1108,16 +1059,17 @@ class SMTPServerTests(unittest.TestCase): s.connectionLost(error.ConnectionDone()) self.assertIn(b"500 Command not implemented", t.value()) - def test_acceptSenderAddress(self): """ Test that a C{MAIL FROM} command with an acceptable address is responded to with the correct success code. """ + class AcceptanceDelivery(NotImplementedDelivery): """ Delivery object which accepts all senders as valid. """ + def validateFrom(self, helo, origin): return origin @@ -1129,21 +1081,18 @@ class SMTPServerTests(unittest.TestCase): proto.makeConnection(trans) # Deal with the necessary preliminaries - proto.dataReceived(b'HELO example.com\r\n') + proto.dataReceived(b"HELO example.com\r\n") trans.clear() # Try to specify our sender address - proto.dataReceived(b'MAIL FROM:\r\n') + proto.dataReceived(b"MAIL FROM:\r\n") # Clean up the protocol before doing anything that might raise an # exception. proto.connectionLost(error.ConnectionLost()) # Make sure that we received exactly the correct response - self.assertEqual( - trans.value(), - b'250 Sender address accepted\r\n') - + self.assertEqual(trans.value(), b"250 Sender address accepted\r\n") def test_deliveryRejectedSenderAddress(self): """ @@ -1151,10 +1100,12 @@ class SMTPServerTests(unittest.TestCase): L{smtp.IMessageDelivery} instance is responded to with the correct error code. """ + class RejectionDelivery(NotImplementedDelivery): """ Delivery object which rejects all senders as invalid. """ + def validateFrom(self, helo, origin): raise smtp.SMTPBadSender(origin) @@ -1166,11 +1117,11 @@ class SMTPServerTests(unittest.TestCase): proto.makeConnection(trans) # Deal with the necessary preliminaries - proto.dataReceived(b'HELO example.com\r\n') + proto.dataReceived(b"HELO example.com\r\n") trans.clear() # Try to specify our sender address - proto.dataReceived(b'MAIL FROM:\r\n') + proto.dataReceived(b"MAIL FROM:\r\n") # Clean up the protocol before doing anything that might raise an # exception. @@ -1179,9 +1130,9 @@ class SMTPServerTests(unittest.TestCase): # Make sure that we received exactly the correct response self.assertEqual( trans.value(), - b'550 Cannot receive from specified address ' - b': Sender not acceptable\r\n') - + b"550 Cannot receive from specified address " + b": Sender not acceptable\r\n", + ) @implementer(ICredentialsChecker) def test_portalRejectedSenderAddress(self): @@ -1190,10 +1141,12 @@ class SMTPServerTests(unittest.TestCase): L{smtp.SMTP} instance's portal is responded to with the correct error code. """ + class DisallowAnonymousAccess(object): """ Checker for L{IAnonymous} which rejects authentication attempts. """ + credentialInterfaces = (IAnonymous,) def requestAvatarId(self, credentials): @@ -1207,11 +1160,11 @@ class SMTPServerTests(unittest.TestCase): proto.makeConnection(trans) # Deal with the necessary preliminaries - proto.dataReceived(b'HELO example.com\r\n') + proto.dataReceived(b"HELO example.com\r\n") trans.clear() # Try to specify our sender address - proto.dataReceived(b'MAIL FROM:\r\n') + proto.dataReceived(b"MAIL FROM:\r\n") # Clean up the protocol before doing anything that might raise an # exception. @@ -1220,9 +1173,9 @@ class SMTPServerTests(unittest.TestCase): # Make sure that we received exactly the correct response self.assertEqual( trans.value(), - b'550 Cannot receive from specified address ' - b': Sender not acceptable\r\n') - + b"550 Cannot receive from specified address " + b": Sender not acceptable\r\n", + ) def test_portalRejectedAnonymousSender(self): """ @@ -1238,11 +1191,11 @@ class SMTPServerTests(unittest.TestCase): proto.makeConnection(trans) # Deal with the necessary preliminaries - proto.dataReceived(b'HELO example.com\r\n') + proto.dataReceived(b"HELO example.com\r\n") trans.clear() # Try to specify our sender address - proto.dataReceived(b'MAIL FROM:\r\n') + proto.dataReceived(b"MAIL FROM:\r\n") # Clean up the protocol before doing anything that might raise an # exception. @@ -1251,9 +1204,9 @@ class SMTPServerTests(unittest.TestCase): # Make sure that we received exactly the correct response self.assertEqual( trans.value(), - b'550 Cannot receive from specified address ' - b': Unauthenticated senders not allowed\r\n') - + b"550 Cannot receive from specified address " + b": Unauthenticated senders not allowed\r\n", + ) class ESMTPAuthenticationTests(unittest.TestCase): @@ -1267,13 +1220,11 @@ class ESMTPAuthenticationTests(unittest.TestCase): """ self.transport.clear() self.server.dataReceived(bytes) - self.assertEqual( - response, - self.transport.value().splitlines()) + self.assertEqual(response, self.transport.value().splitlines()) - - def assertServerAuthenticated(self, loginArgs, username=b"username", - password=b"password"): + def assertServerAuthenticated( + self, loginArgs, username=b"username", password=b"password" + ): """ Assert that a login attempt has been made, that the credentials and interfaces passed to it are correct, and that when the login request @@ -1286,7 +1237,9 @@ class ESMTPAuthenticationTests(unittest.TestCase): """ d, credentials, mind, interfaces = loginArgs.pop() self.assertEqual(loginArgs, []) - self.assertTrue(twisted.cred.credentials.IUsernamePassword.providedBy(credentials)) + self.assertTrue( + twisted.cred.credentials.IUsernamePassword.providedBy(credentials) + ) self.assertEqual(credentials.username, username) self.assertTrue(credentials.checkPassword(password)) self.assertIn(smtp.IMessageDeliveryFactory, interfaces) @@ -1294,54 +1247,48 @@ class ESMTPAuthenticationTests(unittest.TestCase): d.callback((smtp.IMessageDeliveryFactory, None, lambda: None)) self.assertEqual( - [b"235 Authentication successful."], - self.transport.value().splitlines()) - + [b"235 Authentication successful."], self.transport.value().splitlines() + ) def setUp(self): """ Create an ESMTP instance attached to a StringTransport. """ - self.server = smtp.ESMTP({ - b'LOGIN': LOGINCredentials}) - self.server.host = b'localhost' + self.server = smtp.ESMTP({b"LOGIN": LOGINCredentials}) + self.server.host = b"localhost" self.transport = StringTransport( - peerAddress=address.IPv4Address('TCP', '127.0.0.1', 12345)) + peerAddress=address.IPv4Address("TCP", "127.0.0.1", 12345) + ) self.server.makeConnection(self.transport) - def tearDown(self): """ Disconnect the ESMTP instance to clean up its timeout DelayedCall. """ self.server.connectionLost(error.ConnectionDone()) - def portalFactory(self, loginList): class DummyPortal: def login(self, credentials, mind, *interfaces): d = defer.Deferred() loginList.append((d, credentials, mind, interfaces)) return d - return DummyPortal() + return DummyPortal() def test_authenticationCapabilityAdvertised(self): """ Test that AUTH is advertised to clients which issue an EHLO command. """ self.transport.clear() - self.server.dataReceived(b'EHLO\r\n') + self.server.dataReceived(b"EHLO\r\n") responseLines = self.transport.value().splitlines() self.assertEqual( - responseLines[0], - b"250-localhost Hello 127.0.0.1, nice to meet you") - self.assertEqual( - responseLines[1], - b"250 AUTH LOGIN") + responseLines[0], b"250-localhost Hello 127.0.0.1, nice to meet you" + ) + self.assertEqual(responseLines[1], b"250 AUTH LOGIN") self.assertEqual(len(responseLines), 2) - def test_plainAuthentication(self): """ Test that the LOGIN authentication mechanism can be used @@ -1349,24 +1296,22 @@ class ESMTPAuthenticationTests(unittest.TestCase): loginArgs = [] self.server.portal = self.portalFactory(loginArgs) - self.server.dataReceived(b'EHLO\r\n') + self.server.dataReceived(b"EHLO\r\n") self.transport.clear() self.assertServerResponse( - b'AUTH LOGIN\r\n', - [b"334 " + base64.b64encode(b"User Name\0").strip()]) + b"AUTH LOGIN\r\n", [b"334 " + base64.b64encode(b"User Name\0").strip()] + ) self.assertServerResponse( - base64.b64encode(b'username') + b'\r\n', - [b"334 " + base64.b64encode(b"Password\0").strip()]) + base64.b64encode(b"username") + b"\r\n", + [b"334 " + base64.b64encode(b"Password\0").strip()], + ) - self.assertServerResponse( - base64.b64encode(b'password').strip() + b'\r\n', - []) + self.assertServerResponse(base64.b64encode(b"password").strip() + b"\r\n", []) self.assertServerAuthenticated(loginArgs) - def test_plainAuthenticationEmptyPassword(self): """ Test that giving an empty password for plain auth succeeds. @@ -1374,20 +1319,20 @@ class ESMTPAuthenticationTests(unittest.TestCase): loginArgs = [] self.server.portal = self.portalFactory(loginArgs) - self.server.dataReceived(b'EHLO\r\n') + self.server.dataReceived(b"EHLO\r\n") self.transport.clear() self.assertServerResponse( - b'AUTH LOGIN\r\n', - [b"334 " + base64.b64encode(b"User Name\0").strip()]) + b"AUTH LOGIN\r\n", [b"334 " + base64.b64encode(b"User Name\0").strip()] + ) self.assertServerResponse( - base64.b64encode(b'username') + b'\r\n', - [b"334 " + base64.b64encode(b"Password\0").strip()]) - - self.assertServerResponse(b'\r\n', []) - self.assertServerAuthenticated(loginArgs, password=b'') + base64.b64encode(b"username") + b"\r\n", + [b"334 " + base64.b64encode(b"Password\0").strip()], + ) + self.assertServerResponse(b"\r\n", []) + self.assertServerAuthenticated(loginArgs, password=b"") def test_plainAuthenticationInitialResponse(self): """ @@ -1397,20 +1342,18 @@ class ESMTPAuthenticationTests(unittest.TestCase): loginArgs = [] self.server.portal = self.portalFactory(loginArgs) - self.server.dataReceived(b'EHLO\r\n') + self.server.dataReceived(b"EHLO\r\n") self.transport.clear() self.assertServerResponse( - b'AUTH LOGIN ' + base64.b64encode(b"username").strip() + b'\r\n', - [b"334 " + base64.b64encode(b"Password\0").strip()]) + b"AUTH LOGIN " + base64.b64encode(b"username").strip() + b"\r\n", + [b"334 " + base64.b64encode(b"Password\0").strip()], + ) - self.assertServerResponse( - base64.b64encode(b'password').strip() + b'\r\n', - []) + self.assertServerResponse(base64.b64encode(b"password").strip() + b"\r\n", []) self.assertServerAuthenticated(loginArgs) - def test_abortAuthentication(self): """ Test that a challenge/response sequence can be aborted by the client. @@ -1418,13 +1361,10 @@ class ESMTPAuthenticationTests(unittest.TestCase): loginArgs = [] self.server.portal = self.portalFactory(loginArgs) - self.server.dataReceived(b'EHLO\r\n') - self.server.dataReceived(b'AUTH LOGIN\r\n') - - self.assertServerResponse( - b'*\r\n', - [b'501 Authentication aborted']) + self.server.dataReceived(b"EHLO\r\n") + self.server.dataReceived(b"AUTH LOGIN\r\n") + self.assertServerResponse(b"*\r\n", [b"501 Authentication aborted"]) def test_invalidBase64EncodedResponse(self): """ @@ -1434,16 +1374,15 @@ class ESMTPAuthenticationTests(unittest.TestCase): loginArgs = [] self.server.portal = self.portalFactory(loginArgs) - self.server.dataReceived(b'EHLO\r\n') - self.server.dataReceived(b'AUTH LOGIN\r\n') + self.server.dataReceived(b"EHLO\r\n") + self.server.dataReceived(b"AUTH LOGIN\r\n") self.assertServerResponse( - b'x\r\n', - [b'501 Syntax error in parameters or arguments']) + b"x\r\n", [b"501 Syntax error in parameters or arguments"] + ) self.assertEqual(loginArgs, []) - def test_invalidBase64EncodedInitialResponse(self): """ Like L{test_invalidBase64EncodedResponse} but for the case of an @@ -1452,14 +1391,13 @@ class ESMTPAuthenticationTests(unittest.TestCase): loginArgs = [] self.server.portal = self.portalFactory(loginArgs) - self.server.dataReceived(b'EHLO\r\n') + self.server.dataReceived(b"EHLO\r\n") self.assertServerResponse( - b'AUTH LOGIN x\r\n', - [b'501 Syntax error in parameters or arguments']) + b"AUTH LOGIN x\r\n", [b"501 Syntax error in parameters or arguments"] + ) self.assertEqual(loginArgs, []) - def test_unexpectedLoginFailure(self): """ If the L{Deferred} returned by L{Portal.login} fires with an @@ -1470,31 +1408,31 @@ class ESMTPAuthenticationTests(unittest.TestCase): loginArgs = [] self.server.portal = self.portalFactory(loginArgs) - self.server.dataReceived(b'EHLO\r\n') + self.server.dataReceived(b"EHLO\r\n") self.transport.clear() self.assertServerResponse( - b'AUTH LOGIN ' + base64.b64encode(b'username').strip() + b'\r\n', - [b'334 ' + base64.b64encode(b'Password\0').strip()]) - self.assertServerResponse( - base64.b64encode(b'password').strip() + b'\r\n', - []) + b"AUTH LOGIN " + base64.b64encode(b"username").strip() + b"\r\n", + [b"334 " + base64.b64encode(b"Password\0").strip()], + ) + self.assertServerResponse(base64.b64encode(b"password").strip() + b"\r\n", []) d, credentials, mind, interfaces = loginArgs.pop() d.errback(RuntimeError("Something wrong with the server")) self.assertEqual( - b'451 Requested action aborted: local error in processing\r\n', - self.transport.value()) + b"451 Requested action aborted: local error in processing\r\n", + self.transport.value(), + ) self.assertEqual(len(self.flushLoggedErrors(RuntimeError)), 1) - class SMTPClientErrorTests(unittest.TestCase): """ Tests for L{smtp.SMTPClientError}. """ + def test_str(self): """ The string representation of a L{SMTPClientError} instance includes @@ -1503,7 +1441,6 @@ class SMTPClientErrorTests(unittest.TestCase): err = smtp.SMTPClientError(123, "some text") self.assertEqual(str(err), "123 some text") - def test_strWithNegativeCode(self): """ If the response code supplied to L{SMTPClientError} is negative, it @@ -1512,7 +1449,6 @@ class SMTPClientErrorTests(unittest.TestCase): err = smtp.SMTPClientError(-1, b"foo bar") self.assertEqual(str(err), "foo bar") - def test_strWithLog(self): """ If a line log is supplied to L{SMTPClientError}, its contents are @@ -1522,12 +1458,7 @@ class SMTPClientErrorTests(unittest.TestCase): log.append(b"testlog") log.append(b"secondline") err = smtp.SMTPClientError(100, "test error", log=log.str()) - self.assertEqual( - str(err), - "100 test error\n" - "testlog\n" - "secondline\n") - + self.assertEqual(str(err), "100 test error\n" "testlog\n" "secondline\n") class SenderMixinSentMailTests(unittest.TestCase): @@ -1535,39 +1466,48 @@ class SenderMixinSentMailTests(unittest.TestCase): Tests for L{smtp.SenderMixin.sentMail}, used in particular by L{smtp.SMTPSenderFactory} and L{smtp.ESMTPSenderFactory}. """ + def test_onlyLogFailedAddresses(self): """ L{smtp.SenderMixin.sentMail} adds only the addresses with failing SMTP response codes to the log passed to the factory's errback. """ onDone = self.assertFailure(defer.Deferred(), smtp.SMTPDeliveryError) - onDone.addCallback(lambda e: self.assertEqual( - e.log, b"bob@example.com: 199 Error in sending.\n")) + onDone.addCallback( + lambda e: self.assertEqual( + e.log, b"bob@example.com: 199 Error in sending.\n" + ) + ) clientFactory = smtp.SMTPSenderFactory( - 'source@address', 'recipient@address', - BytesIO(b"Message body"), onDone, - retries=0, timeout=0.5) + "source@address", + "recipient@address", + BytesIO(b"Message body"), + onDone, + retries=0, + timeout=0.5, + ) client = clientFactory.buildProtocol( - address.IPv4Address('TCP', 'example.net', 25)) + address.IPv4Address("TCP", "example.net", 25) + ) - addresses = [(b"alice@example.com", 200, b"No errors here!"), - (b"bob@example.com", 199, b"Error in sending.")] + addresses = [ + (b"alice@example.com", 200, b"No errors here!"), + (b"bob@example.com", 199, b"Error in sending."), + ] client.sentMail(199, b"Test response", 1, addresses, client.log) return onDone - class ESMTPDowngradeTestCase(unittest.TestCase): """ Tests for the ESMTP -> SMTP downgrade functionality in L{smtp.ESMTPClient}. """ - def setUp(self): - self.clientProtocol = smtp.ESMTPClient( - b"testpassword", None, b"testuser") + def setUp(self): + self.clientProtocol = smtp.ESMTPClient(b"testpassword", None, b"testuser") def test_requireHELOFallbackOperates(self): """ @@ -1585,7 +1525,6 @@ class ESMTPDowngradeTestCase(unittest.TestCase): self.clientProtocol.dataReceived(b"500 not an esmtp server\r\n") self.assertEqual(b"HELO testuser\r\n", transport.value()) - def test_requireAuthFailsHELOFallback(self): """ If authentication is required, and HELO fallback is on, HELO fallback @@ -1602,7 +1541,6 @@ class ESMTPDowngradeTestCase(unittest.TestCase): self.clientProtocol.dataReceived(b"500 not an esmtp server\r\n") self.assertEqual(b"QUIT\r\n", transport.value()) - def test_requireTLSFailsHELOFallback(self): """ If TLS is required and the connection is insecure, HELO fallback must @@ -1619,7 +1557,6 @@ class ESMTPDowngradeTestCase(unittest.TestCase): self.clientProtocol.dataReceived(b"500 not an esmtp server\r\n") self.assertEqual(b"QUIT\r\n", transport.value()) - def test_requireTLSAndHELOFallbackSucceedsIfOverTLS(self): """ If TLS is provided at the transport level, we can honour the HELO @@ -1638,11 +1575,11 @@ class ESMTPDowngradeTestCase(unittest.TestCase): self.assertEqual(b"HELO testuser\r\n", transport.value()) - class SSLTestCase(unittest.TestCase): """ Tests for the TLS negotiation done by L{smtp.ESMTPClient}. """ + if sslSkip is not None: skip = sslSkip @@ -1651,11 +1588,11 @@ class SSLTestCase(unittest.TestCase): def setUp(self): self.clientProtocol = smtp.ESMTPClient( - b"testpassword", ClientTLSContext(), b"testuser") + b"testpassword", ClientTLSContext(), b"testuser" + ) self.clientProtocol.requireTransportSecurity = True self.clientProtocol.getMailFrom = lambda: "test@example.org" - def _requireTransportSecurityOverSSLTest(self, capabilities): """ Verify that when L{smtp.ESMTPClient} connects to a server over a @@ -1685,10 +1622,7 @@ class SSLTestCase(unittest.TestCase): # The client should now try to send a message - without first trying to # negotiate TLS, since the transport is already secure. - self.assertEqual( - b"MAIL FROM:\r\n", - transport.value()) - + self.assertEqual(b"MAIL FROM:\r\n", transport.value()) def test_requireTransportSecurityOverSSL(self): """ @@ -1697,7 +1631,6 @@ class SSLTestCase(unittest.TestCase): """ self._requireTransportSecurityOverSSLTest(b"250 AUTH LOGIN\r\n") - def test_requireTransportSecurityTLSOffered(self): """ When C{requireTransportSecurity} is C{True} and the client is connected @@ -1713,14 +1646,12 @@ class SSLTestCase(unittest.TestCase): # Tell the client about the server's capabilities - including STARTTLS self.clientProtocol.dataReceived( - self.EHLO_RESPONSE + - b"250-AUTH LOGIN\r\n" - b"250 STARTTLS\r\n") + self.EHLO_RESPONSE + b"250-AUTH LOGIN\r\n" b"250 STARTTLS\r\n" + ) # The client should try to start TLS before sending the message. self.assertEqual(b"STARTTLS\r\n", transport.value()) - def test_requireTransportSecurityTLSOfferedOverSSL(self): """ When C{requireTransportSecurity} is C{True} and the client is connected @@ -1728,9 +1659,8 @@ class SSLTestCase(unittest.TestCase): extension, it is not used before mail is delivered. """ self._requireTransportSecurityOverSSLTest( - b"250-AUTH LOGIN\r\n" - b"250 STARTTLS\r\n") - + b"250-AUTH LOGIN\r\n" b"250 STARTTLS\r\n" + ) def test_requireTransportSecurityTLSNotOffered(self): """ @@ -1746,14 +1676,11 @@ class SSLTestCase(unittest.TestCase): transport.clear() # Tell the client about the server's capabilities - excluding STARTTLS - self.clientProtocol.dataReceived( - self.EHLO_RESPONSE + - b"250 AUTH LOGIN\r\n") + self.clientProtocol.dataReceived(self.EHLO_RESPONSE + b"250 AUTH LOGIN\r\n") # The client give up self.assertEqual(b"QUIT\r\n", transport.value()) - def test_esmtpClientTlsModeDeprecationGet(self): """ L{smtp.ESMTPClient.tlsMode} is deprecated. @@ -1761,24 +1688,22 @@ class SSLTestCase(unittest.TestCase): val = self.clientProtocol.tlsMode del val warningsShown = self.flushWarnings( - offendingFunctions=[self.test_esmtpClientTlsModeDeprecationGet]) + offendingFunctions=[self.test_esmtpClientTlsModeDeprecationGet] + ) self.assertEqual(len(warningsShown), 1) - self.assertIdentical( - warningsShown[0]['category'], DeprecationWarning) + self.assertIdentical(warningsShown[0]["category"], DeprecationWarning) self.assertEqual( - warningsShown[0]['message'], + warningsShown[0]["message"], "tlsMode attribute of twisted.mail.smtp.ESMTPClient " - "is deprecated since Twisted 13.0") - + "is deprecated since Twisted 13.0", + ) def test_esmtpClientTlsModeDeprecationGetAttributeError(self): """ L{smtp.ESMTPClient.__getattr__} raises an attribute error for other attribute names which do not exist. """ - self.assertRaises( - AttributeError, lambda: self.clientProtocol.doesNotExist) - + self.assertRaises(AttributeError, lambda: self.clientProtocol.doesNotExist) def test_esmtpClientTlsModeDeprecationSet(self): """ @@ -1786,25 +1711,25 @@ class SSLTestCase(unittest.TestCase): """ self.clientProtocol.tlsMode = False warningsShown = self.flushWarnings( - offendingFunctions=[self.test_esmtpClientTlsModeDeprecationSet]) + offendingFunctions=[self.test_esmtpClientTlsModeDeprecationSet] + ) self.assertEqual(len(warningsShown), 1) - self.assertIdentical( - warningsShown[0]['category'], DeprecationWarning) + self.assertIdentical(warningsShown[0]["category"], DeprecationWarning) self.assertEqual( - warningsShown[0]['message'], + warningsShown[0]["message"], "tlsMode attribute of twisted.mail.smtp.ESMTPClient " - "is deprecated since Twisted 13.0") - + "is deprecated since Twisted 13.0", + ) class AbortableStringTransport(StringTransport): """ A version of L{StringTransport} that supports C{abortConnection}. """ + # This should be replaced by a common version in #6530. aborting = False - def abortConnection(self): """ A testable version of the C{ITCPTransport.abortConnection} method. @@ -1816,11 +1741,11 @@ class AbortableStringTransport(StringTransport): self.loseConnection() - class SendmailTests(unittest.TestCase): """ Tests for L{twisted.mail.smtp.sendmail}. """ + def test_defaultReactorIsGlobalReactor(self): """ The default C{reactor} parameter of L{twisted.mail.smtp.sendmail} is @@ -1829,24 +1754,29 @@ class SendmailTests(unittest.TestCase): args, varArgs, keywords, defaults = inspect.getargspec(smtp.sendmail) self.assertEqual(reactor, defaults[2]) - def _honorsESMTPArguments(self, username, password): """ L{twisted.mail.smtp.sendmail} creates the ESMTP factory with the ESMTP arguments. """ reactor = MemoryReactor() - smtp.sendmail("localhost", "source@address", "recipient@address", - b"message", reactor=reactor, username=username, - password=password, requireTransportSecurity=True, - requireAuthentication=True) + smtp.sendmail( + "localhost", + "source@address", + "recipient@address", + b"message", + reactor=reactor, + username=username, + password=password, + requireTransportSecurity=True, + requireAuthentication=True, + ) factory = reactor.tcpClients[0][2] self.assertEqual(factory._requireTransportSecurity, True) self.assertEqual(factory._requireAuthentication, True) self.assertEqual(factory.username, b"foo") self.assertEqual(factory.password, b"bar") - def test_honorsESMTPArgumentsUnicodeUserPW(self): """ L{twisted.mail.smtp.sendmail} should accept C{username} and C{password} @@ -1854,7 +1784,6 @@ class SendmailTests(unittest.TestCase): """ return self._honorsESMTPArguments(username=u"foo", password=u"bar") - def test_honorsESMTPArgumentsBytesUserPW(self): """ L{twisted.mail.smtp.sendmail} should accept C{username} and C{password} @@ -1862,7 +1791,6 @@ class SendmailTests(unittest.TestCase): """ return self._honorsESMTPArguments(username=b"foo", password=b"bar") - def test_messageFilePassthrough(self): """ L{twisted.mail.smtp.sendmail} will pass through the message untouched @@ -1871,38 +1799,51 @@ class SendmailTests(unittest.TestCase): reactor = MemoryReactor() messageFile = BytesIO(b"File!") - smtp.sendmail("localhost", "source@address", "recipient@address", - messageFile, reactor=reactor) + smtp.sendmail( + "localhost", + "source@address", + "recipient@address", + messageFile, + reactor=reactor, + ) factory = reactor.tcpClients[0][2] self.assertIs(factory.file, messageFile) - def test_messageStringMadeFile(self): """ L{twisted.mail.smtp.sendmail} will turn non-file-like objects (eg. strings) into file-like objects before sending. """ reactor = MemoryReactor() - smtp.sendmail("localhost", "source@address", "recipient@address", - b"message", reactor=reactor) + smtp.sendmail( + "localhost", + "source@address", + "recipient@address", + b"message", + reactor=reactor, + ) factory = reactor.tcpClients[0][2] messageFile = factory.file messageFile.seek(0) self.assertEqual(messageFile.read(), b"message") - def test_senderDomainName(self): """ L{twisted.mail.smtp.sendmail} passes through the sender domain name, if provided. """ reactor = MemoryReactor() - smtp.sendmail("localhost", "source@address", "recipient@address", - b"message", reactor=reactor, senderDomainName="foo") + smtp.sendmail( + "localhost", + "source@address", + "recipient@address", + b"message", + reactor=reactor, + senderDomainName="foo", + ) factory = reactor.tcpClients[0][2] self.assertEqual(factory.domain, b"foo") - def test_cancelBeforeConnectionMade(self): """ When a user cancels L{twisted.mail.smtp.sendmail} before the connection @@ -1910,14 +1851,18 @@ class SendmailTests(unittest.TestCase): L{twisted.internet.interfaces.IConnector.disconnect}. """ reactor = MemoryReactor() - d = smtp.sendmail("localhost", "source@address", "recipient@address", - b"message", reactor=reactor) + d = smtp.sendmail( + "localhost", + "source@address", + "recipient@address", + b"message", + reactor=reactor, + ) d.cancel() self.assertEqual(reactor.connectors[0]._disconnected, True) failure = self.failureResultOf(d) failure.trap(defer.CancelledError) - def test_cancelAfterConnectionMade(self): """ When a user cancels L{twisted.mail.smtp.sendmail} after the connection @@ -1926,8 +1871,13 @@ class SendmailTests(unittest.TestCase): """ reactor = MemoryReactor() transport = AbortableStringTransport() - d = smtp.sendmail("localhost", "source@address", "recipient@address", - b"message", reactor=reactor) + d = smtp.sendmail( + "localhost", + "source@address", + "recipient@address", + b"message", + reactor=reactor, + ) factory = reactor.tcpClients[0][2] p = factory.buildProtocol(None) p.makeConnection(transport) diff --git a/src/twisted/web/test/test_http.py b/src/twisted/web/test/test_http.py index a3067f7..5188372 100644 --- a/src/twisted/web/test/test_http.py +++ b/src/twisted/web/test/test_http.py @@ -2371,20 +2371,6 @@ ok class QueryArgumentsTests(unittest.TestCase): - def testParseqs(self): - self.assertEqual( - cgi.parse_qs(b"a=b&d=c;+=f"), - http.parse_qs(b"a=b&d=c;+=f")) - self.assertRaises( - ValueError, http.parse_qs, b"blah", strict_parsing=True) - self.assertEqual( - cgi.parse_qs(b"a=&b=c", keep_blank_values=1), - http.parse_qs(b"a=&b=c", keep_blank_values=1)) - self.assertEqual( - cgi.parse_qs(b"a=&b=c"), - http.parse_qs(b"a=&b=c")) - - def test_urlparse(self): """ For a given URL, L{http.urlparse} should behave the same as L{urlparse},