diff --git a/paramiko/__init__.py b/paramiko/__init__.py index 81da0d4b5..0d0fda1ac 100644 --- a/paramiko/__init__.py +++ b/paramiko/__init__.py @@ -33,7 +33,7 @@ ProxyCommandFailure, ) from paramiko.server import ServerInterface, SubsystemHandler, InteractiveQuery -from paramiko.rsakey import RSAKey +from paramiko.rsakey import RSAKey, RSASHA256Key, RSASHA512Key from paramiko.dsskey import DSSKey from paramiko.ecdsakey import ECDSAKey from paramiko.ed25519key import Ed25519Key @@ -92,8 +92,10 @@ 'ChannelStderrFile', 'ChannelStdinFile', 'PKey', - 'RSAKey', 'DSSKey', + 'RSAKey', + 'RSASHA256Key', + 'RSASHA512Key', 'ECDSAKey', 'Ed25519Key', 'PublicBlob', diff --git a/paramiko/rsakey.py b/paramiko/rsakey.py index 2e719deb1..397bee8db 100644 --- a/paramiko/rsakey.py +++ b/paramiko/rsakey.py @@ -40,6 +40,8 @@ class RSAKey(PKey): LEGACY_TYPE = "RSA" OPENSSH_TYPE_PREFIX = "ssh-rsa" + _signature_digest_algorithm = hashes.SHA1 + _signature_algorithm_name = "ssh-rsa" def __init__(self, msg=None, data=None, filename=None, password=None, key=None, file_obj=None, _raw=None): @@ -101,7 +103,7 @@ def __hash__(self): self.public_numbers.n)) def get_name(self): - return 'ssh-rsa' + return self._signature_algorithm_name def get_bits(self): return self.size @@ -113,24 +115,27 @@ def sign_ssh_data(self, data): sig = self.key.sign( data, padding=padding.PKCS1v15(), - algorithm=hashes.SHA1(), + algorithm=self._signature_digest_algorithm(), ) - m = Message() - m.add_string('ssh-rsa') + m.add_string(self._signature_algorithm_name) m.add_string(sig) return m def verify_ssh_sig(self, data, msg): - if msg.get_text() != 'ssh-rsa': + if msg.get_text() != self._signature_algorithm_name: return False + key = self.key if isinstance(key, rsa.RSAPrivateKey): key = key.public_key() try: key.verify( - msg.get_binary(), data, padding.PKCS1v15(), hashes.SHA1() + msg.get_binary(), + data, + padding.PKCS1v15(), + self._signature_digest_algorithm(), ) except InvalidSignature: return False @@ -205,3 +210,21 @@ def _decode_key(self, data): raise SSHException("Invalid key type") self.key = key + + +class RSASHA256Key(RSAKey): + """ + A special RSAKey that uses SHA-256 digest for sign/verify. + """ + + _signature_digest_algorithm = hashes.SHA256 + _signature_algorithm_name = "rsa-sha2-256" + + +class RSASHA512Key(RSAKey): + """ + A special RSAKey that uses SHA-256 digest for sign/verify. + """ + + _signature_digest_algorithm = hashes.SHA512 + _signature_algorithm_name = "rsa-sha2-512" diff --git a/paramiko/transport.py b/paramiko/transport.py index c7013d04a..dca563b3f 100644 --- a/paramiko/transport.py +++ b/paramiko/transport.py @@ -67,7 +67,7 @@ from paramiko.packet import Packetizer, NeedRekeyException from paramiko.primes import ModulusPack from paramiko.py3compat import string_types, long, byte_ord, b, input, PY2 -from paramiko.rsakey import RSAKey +from paramiko.rsakey import RSAKey, RSASHA256Key, RSASHA512Key from paramiko.ecdsakey import ECDSAKey from paramiko.server import ServerInterface from paramiko.sftp_client import SFTPClient @@ -128,6 +128,8 @@ class Transport(threading.Thread, ClosingContextManager): 'ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384', 'ecdsa-sha2-nistp521', + 'rsa-sha2-256', + 'rsa-sha2-512', 'ssh-rsa', ) if Ed25519Key.is_supported(): @@ -216,6 +218,10 @@ class Transport(threading.Thread, ClosingContextManager): _key_info = { 'ssh-rsa': RSAKey, 'ssh-rsa-cert-v01@openssh.com': RSAKey, + 'rsa-sha2-256': RSASHA256Key, + 'rsa-sha2-256-cert-v01@openssh.com': RSASHA256Key, + 'rsa-sha2-512': RSASHA512Key, + 'rsa-sha2-512-cert-v01@openssh.com': RSASHA512Key, 'ssh-dss': DSSKey, 'ssh-dss-cert-v01@openssh.com': DSSKey, 'ecdsa-sha2-nistp256': ECDSAKey, diff --git a/tests/test_pkey.py b/tests/test_pkey.py index fd0f29395..eac702c81 100644 --- a/tests/test_pkey.py +++ b/tests/test_pkey.py @@ -31,6 +31,8 @@ from paramiko import ( util, RSAKey, + RSASHA256Key, + RSASHA512Key, DSSKey, ECDSAKey, Ed25519Key, @@ -63,6 +65,8 @@ FINGER_ECDSA_384 = '384 c1:8d:a0:59:09:47:41:8e:a8:a6:07:01:29:23:b4:65' FINGER_ECDSA_521 = '521 44:58:22:52:12:33:16:0e:ce:0e:be:2c:7c:7e:cc:1e' SIGNED_RSA = '20:d7:8a:31:21:cb:f7:92:12:f2:a4:89:37:f5:78:af:e6:16:b6:25:b9:97:3d:a2:cd:5f:ca:20:21:73:4c:ad:34:73:8f:20:77:28:e2:94:15:08:d8:91:40:7a:85:83:bf:18:37:95:dc:54:1a:9b:88:29:6c:73:ca:38:b4:04:f1:56:b9:f2:42:9d:52:1b:29:29:b4:4f:fd:c9:2d:af:47:d2:40:76:30:f3:63:45:0c:d9:1d:43:86:0f:1c:70:e2:93:12:34:f3:ac:c5:0a:2f:14:50:66:59:f1:88:ee:c1:4a:e9:d1:9c:4e:46:f0:0e:47:6f:38:74:f1:44:a8' # noqa: E501 +SIGNED_RSA_SHA2_256 = "cc:06:60:e0:00:2c:ac:9e:26:bc:d5:68:64:3f:9f:a7:e5:aa:41:eb:88:4a:25:05:9c:93:84:66:ef:ef:60:f4:34:fb:f4:c8:3d:55:33:6a:77:bd:b2:ee:83:0f:71:27:41:7e:f5:07:05:00:a9:4c:07:80:6f:be:76:67:cb:58:35:b9:2b:f3:c2:d3:3c:ee:e1:3f:59:e0:fa:e4:5c:92:ed:ae:74:de:0d:d6:27:16:8f:84:a3:86:68:0c:94:90:7d:6e:cc:81:12:d8:b6:ad:aa:31:a8:13:3d:63:81:3e:bb:05:b6:38:4d:02:0d:1b:5b:70:de:83:cc:3a:cb:31" # noqa +SIGNED_RSA_SHA2_512 = "87:46:8b:75:92:33:78:a0:22:35:32:39:23:c6:ab:e1:06:92:ad:bc:7f:6e:ab:19:32:e4:78:b2:2c:8f:1d:0c:65:da:fc:a5:07:ca:b6:55:55:31:83:b1:a0:af:d1:95:c5:2e:af:56:ba:f5:41:64:0f:39:9d:af:82:43:22:8f:90:52:9d:89:e7:45:97:df:f3:f2:bc:7b:3a:db:89:0e:34:fd:18:62:25:1b:ef:77:aa:c6:6c:99:36:3a:84:d6:9c:2a:34:8c:7f:f4:bb:c9:a5:9a:6c:11:f2:cf:da:51:5e:1e:7f:90:27:34:de:b2:f3:15:4f:db:47:32:6b:a7" # noqa FINGER_RSA_2K_OPENSSH = '2048 68:d1:72:01:bf:c0:0c:66:97:78:df:ce:75:74:46:d6' FINGER_DSS_1K_OPENSSH = '1024 cf:1d:eb:d7:61:d3:12:94:c6:c0:c6:54:35:35:b0:82' FINGER_EC_384_OPENSSH = '384 72:14:df:c1:9a:c3:e6:0e:11:29:d6:32:18:7b:ea:9b' @@ -147,12 +151,6 @@ class KeyTest(unittest.TestCase): - def setUp(self): - pass - - def tearDown(self): - pass - def assert_keyfile_is_encrypted(self, keyfile): """ A quick check that filename looks like an encrypted key. @@ -251,6 +249,36 @@ def test_sign_rsa(self): pub = RSAKey(data=key.asbytes()) self.assertTrue(pub.verify_ssh_sig(b'ice weasels', msg)) + def test_sign_rsa_sha2_256(self): + # verify that the rsa-sha2-256 private key can sign and verify + key = RSASHA256Key.from_private_key_file(_support("test_rsa.key")) + msg = key.sign_ssh_data(b"ice weasels") + self.assertTrue(type(msg) is Message) + msg.rewind() + self.assertEqual("rsa-sha2-256", msg.get_text()) + sig = bytes().join( + [byte_chr(int(x, 16)) for x in SIGNED_RSA_SHA2_256.split(":")] + ) + self.assertEqual(sig, msg.get_binary()) + msg.rewind() + pub = RSASHA256Key(data=key.asbytes()) + self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg)) + + def test_sign_rsa_sha2_512(self): + # verify that the rsa-sha2-512 private key can sign and verify + key = RSASHA512Key.from_private_key_file(_support("test_rsa.key")) + msg = key.sign_ssh_data(b"ice weasels") + self.assertTrue(type(msg) is Message) + msg.rewind() + self.assertEqual("rsa-sha2-512", msg.get_text()) + sig = bytes().join( + [byte_chr(int(x, 16)) for x in SIGNED_RSA_SHA2_512.split(":")] + ) + self.assertEqual(sig, msg.get_binary()) + msg.rewind() + pub = RSASHA512Key(data=key.asbytes()) + self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg)) + def test_sign_dss(self): # verify that the dss private key can sign and verify key = DSSKey.from_private_key_file(_support('test_dss.key'))