From 9298a66f5f0e44144882e4aceb56a76f4d3d929d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Szil=C3=A1rd=20Pfeiffer?= Date: Fri, 28 Jun 2019 22:53:24 +0200 Subject: [PATCH] feat(ssh)!: Implement host key related messages (#43) --- CHANGELOG.md | 2 + cryptoparser/common/key.py | 41 ++++ cryptoparser/common/parse.py | 2 +- cryptoparser/ssh/key.py | 325 ++++++++++++++++++++++++++++++++ cryptoparser/ssh/subprotocol.py | 10 +- test/ssh/test_key.py | 196 +++++++++++++++++++ test/ssh/test_subprotocol.py | 25 ++- tox.ini | 2 +- 8 files changed, 592 insertions(+), 11 deletions(-) create mode 100644 cryptoparser/common/key.py create mode 100644 cryptoparser/ssh/key.py create mode 100644 test/ssh/test_key.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 02f022f..07b20b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ # Changelog - SSH (`ssh`) + - Public Keys (`pubkeys`) + - add [public key](https://datatracker.ietf.org/doc/html/rfc4253#section-6.6) related classes (\#43) - Versions (`versions`) - add [software version](https://tools.ietf.org/html/rfc4253#section-4.2) related classes (\#46) diff --git a/cryptoparser/common/key.py b/cryptoparser/common/key.py new file mode 100644 index 0000000..fccdf62 --- /dev/null +++ b/cryptoparser/common/key.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- + +import abc +import hashlib + +import six + +from cryptoparser.common.algorithm import Hash +from cryptoparser.common.base import Serializable + + +class PublicKey(Serializable): + _HASHLIB_FUNCS = { + Hash.MD5: hashlib.md5, + Hash.SHA1: hashlib.sha1, + Hash.SHA2_256: hashlib.sha256 + } + + @property + @abc.abstractmethod + def key_type(self): + raise NotImplementedError() + + @property + @abc.abstractmethod + def key_size(self): + raise NotImplementedError() + + @property + @abc.abstractmethod + def key_bytes(self): + raise NotImplementedError() + + @classmethod + def get_digest(cls, hash_type, key_bytes): + try: + hashlib_funcs = cls._HASHLIB_FUNCS[hash_type] + except KeyError as e: + six.raise_from(NotImplementedError(), e) + + return hashlib_funcs(key_bytes).digest() diff --git a/cryptoparser/common/parse.py b/cryptoparser/common/parse.py index d102645..d4f0ec9 100644 --- a/cryptoparser/common/parse.py +++ b/cryptoparser/common/parse.py @@ -654,7 +654,7 @@ def _compose_string_array(self, values, encoding, separator): if isinstance(value, ParsableBaseNoABC): composed_str += value.compose() else: - composed_str += value.encode(encoding) + composed_str += six.text_type(value).encode(encoding) except UnicodeError as e: six.raise_from(InvalidValue(value, type(self)), e) diff --git a/cryptoparser/ssh/key.py b/cryptoparser/ssh/key.py new file mode 100644 index 0000000..112657d --- /dev/null +++ b/cryptoparser/ssh/key.py @@ -0,0 +1,325 @@ +# -*- coding: utf-8 -*- + +import abc +import base64 +import binascii +import collections +import textwrap + +from collections import OrderedDict + +import attr +import six + +from cryptoparser.common.algorithm import Hash +from cryptoparser.common.base import VariantParsable +from cryptoparser.common.exception import InvalidType, NotEnoughData +from cryptoparser.common.key import PublicKey +from cryptoparser.common.parse import ParsableBase, ParserBinary, ComposerBinary + +from cryptoparser.ssh.ciphersuite import SshHostKeyAlgorithm + + +@attr.s +class SshPublicKeyBase(PublicKey): + host_key_algorithm = attr.ib( + converter=SshHostKeyAlgorithm, + validator=attr.validators.instance_of(SshHostKeyAlgorithm) + ) + + _HEADER_SIZE = 4 + + @classmethod + def get_host_key_algorithms(cls): + raise NotImplementedError() + + @property + def key_type(self): + return self.host_key_algorithm.value.authentication + + @property + @abc.abstractmethod + def key_size(self): + raise NotImplementedError() + + @property + @abc.abstractmethod + def key_bytes(self): + raise NotImplementedError() + + @classmethod + def _fingerprint(cls, hash_type, key_bytes, prefix): + digest = cls.get_digest(hash_type, key_bytes) + + if hash_type == Hash.MD5: + fingerprint = ':'.join(textwrap.wrap(bytes.decode(binascii.hexlify(digest), 'ascii'), 2)) + else: + fingerprint = bytes.decode(base64.b64encode(digest), 'ascii') + + return ':'.join((prefix, fingerprint)) + + @property + def fingerprints(self): + key_bytes = self.key_bytes + return OrderedDict([ + (hash_type, self._fingerprint(hash_type, key_bytes, prefix)) + for hash_type, prefix in [(Hash.SHA2_256, 'SHA256'), (Hash.SHA1, 'SHA1'), (Hash.MD5, 'MD5')] + ]) + + def _host_key_asdict(self): + known_hosts = base64.b64encode(self.key_bytes).decode('ascii') + + key_dict = OrderedDict([ + ('known_hosts', known_hosts), + ('fingerprints', self.fingerprints), + ('key_type', self.key_type), + ('key_name', self.host_key_algorithm), + ('key_size', self.key_size), + ]) + + return key_dict + + def _asdict(self): + return self._host_key_asdict() + + @classmethod + def _parse_host_key_algorithm(cls, parsable): + if len(parsable) < cls._HEADER_SIZE: + raise NotEnoughData(cls._HEADER_SIZE - len(parsable)) + + parser = ParserBinary(parsable) + + parser.parse_parsable('host_key_algorithm', SshHostKeyAlgorithm, 4) + + if parser['host_key_algorithm'] not in cls.get_host_key_algorithms(): + raise InvalidType() + + return parser + + def _compose_host_key_algorithm(self): + composer = ComposerBinary() + + host_key_algorithm_bytes = self.host_key_algorithm.compose() + composer.compose_bytes(host_key_algorithm_bytes, 4) + + return composer + + +class SshHostKeyBase(SshPublicKeyBase): + @classmethod + @abc.abstractmethod + def get_host_key_algorithms(cls): + raise NotImplementedError() + + @property + @abc.abstractmethod + def key_size(self): + raise NotImplementedError() + + +class SshHostKeyParserBase(ParsableBase): + @classmethod + @abc.abstractmethod + def _parse_host_key_algorithm(cls, parsable): + raise NotImplementedError() + + @abc.abstractmethod + def _compose_host_key_algorithm(self): + raise NotImplementedError() + + @classmethod + @abc.abstractmethod + def _parse_host_key_params(cls, parser): + raise NotImplementedError() + + @abc.abstractmethod + def _compose_host_key_params(self, composer): + raise NotImplementedError() + + @classmethod + def _parse(cls, parsable): + parser = cls._parse_host_key_algorithm(parsable) + + cls._parse_host_key_params(parser) + + return cls(**parser), parser.parsed_length + + def compose(self): + composer = self._compose_host_key_algorithm() + + self._compose_host_key_params(composer) + + return composer.composed + + +@attr.s +class SshHostKeyDSSBase(SshHostKeyBase): + p = attr.ib(validator=attr.validators.instance_of((bytes, bytearray))) + g = attr.ib(validator=attr.validators.instance_of((bytes, bytearray))) + q = attr.ib(validator=attr.validators.instance_of((bytes, bytearray))) + y = attr.ib(validator=attr.validators.instance_of((bytes, bytearray))) + + @property + @abc.abstractmethod + def key_bytes(self): + raise NotImplementedError() + + @classmethod + def get_host_key_algorithms(cls): + return [ + SshHostKeyAlgorithm.SSH_DSS, + SshHostKeyAlgorithm.SSH_DSS_SHA224_SSH_COM, + SshHostKeyAlgorithm.SSH_DSS_SHA256_SSH_COM, + SshHostKeyAlgorithm.SSH_DSS_SHA384_SSH_COM, + SshHostKeyAlgorithm.SSH_DSS_SHA512_SSH_COM, + ] + + @property + def key_size(self): + return (len(self.p) - 1) * 8 + + @classmethod + def _parse_host_key_params(cls, parser): + for param_name in ['p', 'q', 'g', 'y']: + parser.parse_bytes(param_name, 4) + + def _compose_host_key_params(self, composer): + for param_name in ['p', 'q', 'g', 'y']: + value = getattr(self, param_name) + composer.compose_bytes(value, 4) + + +@attr.s +class SshHostKeyDSS(SshHostKeyDSSBase, SshHostKeyParserBase): + @property + def key_bytes(self): + return self.compose() + + +@attr.s +class SshHostKeyRSABase(SshHostKeyBase): + exponent = attr.ib(validator=attr.validators.instance_of((bytes, bytearray))) + modulus = attr.ib(validator=attr.validators.instance_of((bytes, bytearray))) + + @property + @abc.abstractmethod + def key_bytes(self): + raise NotImplementedError() + + @classmethod + def get_host_key_algorithms(cls): + return [ + SshHostKeyAlgorithm.SSH_RSA, + SshHostKeyAlgorithm.SSH_RSA_SHA224_SSH_COM, + SshHostKeyAlgorithm.SSH_RSA_SHA256_SSH_COM, + SshHostKeyAlgorithm.SSH_RSA_SHA384_SSH_COM, + SshHostKeyAlgorithm.SSH_RSA_SHA512_SSH_COM, + ] + + @property + def key_size(self): + return (len(self.modulus) - 1) * 8 + + @classmethod + def _parse_host_key_params(cls, parser): + parser.parse_bytes('exponent', 4) + parser.parse_bytes('modulus', 4) + + def _compose_host_key_params(self, composer): + composer.compose_bytes(self.exponent, 4) + composer.compose_bytes(self.modulus, 4) + + +@attr.s +class SshHostKeyRSA(SshHostKeyRSABase, SshHostKeyParserBase): + @property + def key_bytes(self): + return self.compose() + + +@attr.s +class SshHostKeyECDSABase(SshHostKeyBase): + curve_name = attr.ib(validator=attr.validators.instance_of(six.string_types)) + curve_data = attr.ib(validator=attr.validators.instance_of((bytes, bytearray))) + + @property + @abc.abstractmethod + def key_bytes(self): + raise NotImplementedError() + + @classmethod + def get_host_key_algorithms(cls): + return [ + SshHostKeyAlgorithm.ECDSA_SHA2_NISTP256, + SshHostKeyAlgorithm.ECDSA_SHA2_NISTP384, + SshHostKeyAlgorithm.ECDSA_SHA2_NISTP521, + ] + + @property + def key_size(self): + return int(self.curve_name[len('nistp'):]) + + @classmethod + def _parse_host_key_params(cls, parser): + parser.parse_string('curve_name', 4, 'ascii') + parser.parse_bytes('curve_data', 4) + + def _compose_host_key_params(self, composer): + composer.compose_string(self.curve_name, 'ascii', 4) + composer.compose_bytes(self.curve_data, 4) + + +@attr.s +class SshHostKeyECDSA(SshHostKeyECDSABase, SshHostKeyParserBase): + @property + def key_bytes(self): + return self.compose() + + +@attr.s +class SshHostKeyEDDSABase(SshHostKeyBase): + key_data = attr.ib(validator=attr.validators.instance_of((bytes, bytearray))) + + @property + @abc.abstractmethod + def key_bytes(self): + raise NotImplementedError() + + @classmethod + def get_host_key_algorithms(cls): + return [SshHostKeyAlgorithm.SSH_ED25519, ] + + @property + def key_size(self): + return len(self.key_data) * 8 + + @classmethod + def _parse_host_key_params(cls, parser): + parser.parse_bytes('key_data', 4) + + def _compose_host_key_params(self, composer): + composer.compose_bytes(self.key_data, 4) + + +@attr.s +class SshHostKeyEDDSA(SshHostKeyEDDSABase, SshHostKeyParserBase): + @property + def key_bytes(self): + return self.compose() + + +class SshHostPublicKeyVariant(VariantParsable): + _VARIANTS = collections.OrderedDict([ + (SshHostKeyAlgorithm.SSH_ED25519, (SshHostKeyEDDSA, )), + (SshHostKeyAlgorithm.SSH_RSA, (SshHostKeyRSA, )), + (SshHostKeyAlgorithm.RSA_SHA2_256, (SshHostKeyRSA, )), + (SshHostKeyAlgorithm.RSA_SHA2_512, (SshHostKeyRSA, )), + (SshHostKeyAlgorithm.SSH_DSS, (SshHostKeyDSS, )), + (SshHostKeyAlgorithm.ECDSA_SHA2_NISTP256, (SshHostKeyECDSA, )), + (SshHostKeyAlgorithm.ECDSA_SHA2_NISTP384, (SshHostKeyECDSA, )), + (SshHostKeyAlgorithm.ECDSA_SHA2_NISTP521, (SshHostKeyECDSA, )), + ]) + + @classmethod + def _get_variants(cls): + return cls._VARIANTS diff --git a/cryptoparser/ssh/subprotocol.py b/cryptoparser/ssh/subprotocol.py index 797c9a8..23de88c 100644 --- a/cryptoparser/ssh/subprotocol.py +++ b/cryptoparser/ssh/subprotocol.py @@ -1,13 +1,12 @@ -#!/usr/bin/env python # -*- coding: utf-8 -*- import abc import collections import enum import random -import six import attr +import six from cryptoparser.common.base import ( VariantParsable, @@ -25,6 +24,7 @@ SshMacAlgorithm, SshCompressionAlgorithm, ) +from cryptoparser.ssh.key import SshPublicKeyBase, SshHostPublicKeyVariant from cryptoparser.ssh.version import ( SshProtocolVersion, SshSoftwareVersionBase, @@ -424,7 +424,7 @@ def get_message_code(cls): @attr.s class SshDHKeyExchangeReplyBase(SshMessageBase): - host_public_key = attr.ib(validator=attr.validators.instance_of((bytes, bytearray))) + host_public_key = attr.ib(validator=attr.validators.instance_of(SshPublicKeyBase)) ephemeral_public_key = attr.ib(validator=attr.validators.instance_of((bytes, bytearray))) signature = attr.ib(validator=attr.validators.instance_of((bytes, bytearray))) @@ -435,7 +435,7 @@ def get_message_code(cls): @staticmethod def _parse_ssh_key(parser): - parser.parse_bytes('host_public_key', 4) + parser.parse_parsable('host_public_key', SshHostPublicKeyVariant, 4) parser.parse_bytes('ephemeral_public_key', 4) parser.parse_bytes('signature', 4) @@ -443,7 +443,7 @@ def _parse_ssh_key(parser): @staticmethod def _compose_ssh_key(composer, host_public_key, ephemeral_public_key_bytes, signature_bytes): - composer.compose_bytes(host_public_key, 4) + composer.compose_parsable(host_public_key, 4) composer.compose_bytes(ephemeral_public_key_bytes, 4) composer.compose_bytes(signature_bytes, 4) diff --git a/test/ssh/test_key.py b/test/ssh/test_key.py new file mode 100644 index 0000000..741c857 --- /dev/null +++ b/test/ssh/test_key.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import unittest + +from collections import OrderedDict + +from cryptoparser.common.algorithm import Hash, Authentication +from cryptoparser.common.exception import InvalidValue, NotEnoughData + +from cryptoparser.ssh.ciphersuite import SshHostKeyAlgorithm +from cryptoparser.ssh.key import ( + SshHostPublicKeyVariant, + SshHostKeyDSS, + SshHostKeyRSA, + SshHostKeyECDSA, + SshHostKeyEDDSA, +) + + +class TestPublicKeyBase(unittest.TestCase): + def test_error(self): + with self.assertRaises(NotEnoughData) as context_manager: + SshHostKeyRSA.parse_exact_size(b'') + self.assertEqual(context_manager.exception.bytes_needed, 4) + + with self.assertRaises(InvalidValue) as context_manager: + SshHostKeyRSA.parse_exact_size(b'\x00\x00\x00\x18' + b'non-existing-type-name') + self.assertEqual(context_manager.exception.value, b'non-existing-type-name') + + with self.assertRaises(NotImplementedError): + SshHostKeyRSA.get_digest(Hash.MD4, b'') + + +class TestHostKeyDSS(unittest.TestCase): + def setUp(self): + self.host_key_bytes = bytes( + b'\x00\x00\x00\x07' + # host_key_algorithm_length + b'ssh-dss' + # host_key_algorithm + b'\x00\x00\x00\x04' + # p_length + b'\x00\x01\x02\x03' + # p + b'\x00\x00\x00\x04' + # q_length + b'\x04\x05\x06\x07' + # q + b'\x00\x00\x00\x04' + # g_length + b'\x08\x09\x0a\x0b' + # g + b'\x00\x00\x00\x04' + # y_length + b'\x0c\x0d\x0e\x0f' + # y + b'' + ) + self.host_key = SshHostKeyDSS( + host_key_algorithm=SshHostKeyAlgorithm.SSH_DSS, + p=b'\x00\x01\x02\x03', + q=b'\x04\x05\x06\x07', + g=b'\x08\x09\x0a\x0b', + y=b'\x0c\x0d\x0e\x0f', + ) + + def test_parse(self): + host_key = SshHostPublicKeyVariant.parse_exact_size(self.host_key_bytes) + self.assertEqual(host_key.p, b'\x00\x01\x02\x03') + self.assertEqual(host_key.q, b'\x04\x05\x06\x07') + self.assertEqual(host_key.g, b'\x08\x09\x0a\x0b') + self.assertEqual(host_key.y, b'\x0c\x0d\x0e\x0f') + self.assertEqual(host_key.key_size, 24) + + def test_compose(self): + self.assertEqual(self.host_key.compose(), self.host_key_bytes) + + def test_asdict(self): + self.assertEqual(self.host_key._asdict(), OrderedDict([ + ('known_hosts', 'AAAAB3NzaC1kc3MAAAAEAAECAwAAAAQEBQYHAAAABAgJCgsAAAAEDA0ODw=='), + ('fingerprints', OrderedDict([ + (Hash.SHA2_256, 'SHA256:lEPxr9u8WS8znTORYUn96XCgk3oLL6inrDPtZtjJAIU='), + (Hash.SHA1, 'SHA1:J4w+3Z3ZXqjGIvAPiBvVVmKooMQ='), + (Hash.MD5, 'MD5:1d:dc:2f:78:65:e2:b0:eb:33:3c:5c:a1:e6:ab:d9:8b'), + ])), + ('key_type', Authentication.DSS), + ('key_name', SshHostKeyAlgorithm.SSH_DSS), + ('key_size', 24), + ])) + + +class TestHostKeyRSA(unittest.TestCase): + def setUp(self): + self.host_key_bytes = bytes( + b'\x00\x00\x00\x07' + # host_key_algorithm_length + b'ssh-rsa' + # host_key_algorithm + b'\x00\x00\x00\x04' + # exponent_length + b'\x00\x01\x02\x03' + # exponent + b'\x00\x00\x00\x04' + # modulus_length + b'\x04\x05\x06\x07' + # modulus + b'' + ) + self.host_key = SshHostKeyRSA( + host_key_algorithm=SshHostKeyAlgorithm.SSH_RSA, + exponent=b'\x00\x01\x02\x03', + modulus=b'\x04\x05\x06\x07', + ) + + def test_parse(self): + host_key = SshHostPublicKeyVariant.parse_exact_size(self.host_key_bytes) + self.assertEqual(host_key.exponent, b'\x00\x01\x02\x03') + self.assertEqual(host_key.modulus, b'\x04\x05\x06\x07') + self.assertEqual(host_key.key_size, 24) + + def test_compose(self): + self.assertEqual(self.host_key.compose(), self.host_key_bytes) + + def test_asdict(self): + self.assertEqual(self.host_key._asdict(), OrderedDict([ + ('known_hosts', 'AAAAB3NzaC1yc2EAAAAEAAECAwAAAAQEBQYH'), + ('fingerprints', OrderedDict([ + (Hash.SHA2_256, 'SHA256:FnFNj+ZfblG+vS4Nf5hGsYREPdThfPMSfrw1C5NKii0='), + (Hash.SHA1, 'SHA1:vnrbaS5XtHUQZrTFZ3c7ftyvNa4='), + (Hash.MD5, 'MD5:30:e8:0a:83:b5:9e:b5:6a:8a:4c:0c:f1:4b:58:10:1b'), + ])), + ('key_type', Authentication.RSA), + ('key_name', SshHostKeyAlgorithm.SSH_RSA), + ('key_size', 24), + ])) + + +class TestHostKeyECDSA(unittest.TestCase): + def setUp(self): + self.host_key_bytes = bytes( + b'\x00\x00\x00\x13' + # host_key_algorithm_length + b'ecdsa-sha2-nistp256' + # host_key_algorithm + b'\x00\x00\x00\x08' + # curve_name_length + b'nistp256' + # curve_name + b'\x00\x00\x00\x04' + # curve_data_length + b'\x00\x01\x02\x03' + # curve_data + b'' + ) + self.host_key = SshHostKeyECDSA( + host_key_algorithm=SshHostKeyAlgorithm.ECDSA_SHA2_NISTP256, + curve_name='nistp256', + curve_data=b'\x00\x01\x02\x03', + ) + + def test_parse(self): + host_key = SshHostPublicKeyVariant.parse_exact_size(self.host_key_bytes) + self.assertEqual(host_key.curve_name, 'nistp256') + self.assertEqual(host_key.curve_data, b'\x00\x01\x02\x03') + self.assertEqual(host_key.key_size, 256) + + def test_compose(self): + self.assertEqual(self.host_key.compose(), self.host_key_bytes) + + def test_asdict(self): + self.assertEqual(self.host_key._asdict(), OrderedDict([ + ('known_hosts', 'AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAAAEAAECAw=='), + ('fingerprints', OrderedDict([ + (Hash.SHA2_256, 'SHA256:W1agtTnzki6Rcqu/dMFfzswy99uD8TsO11b5Fk6RDUo='), + (Hash.SHA1, 'SHA1:02+/4xDo1z/zl1l1QRTb5uBxnGg='), + (Hash.MD5, 'MD5:2f:b0:36:9d:54:d5:56:ce:a7:be:84:da:8c:08:f9:dc'), + ])), + ('key_type', Authentication.ECDSA), + ('key_name', SshHostKeyAlgorithm.ECDSA_SHA2_NISTP256), + ('key_size', 256), + ])) + + +class TestHostKeyEDDSA(unittest.TestCase): + def setUp(self): + self.host_key_bytes = bytes( + b'\x00\x00\x00\x0b' + # host_key_algorithm_length + b'ssh-ed25519' + # host_key_algorithm + b'\x00\x00\x00\x04' + # key_data_length + b'\x00\x01\x02\x03' + # key_data + b'' + ) + self.host_key = SshHostKeyEDDSA( + host_key_algorithm=SshHostKeyAlgorithm.SSH_ED25519, + key_data=b'\x00\x01\x02\x03', + ) + + def test_parse(self): + host_key = SshHostPublicKeyVariant.parse_exact_size(self.host_key_bytes) + self.assertEqual(host_key.key_data, b'\x00\x01\x02\x03') + self.assertEqual(host_key.key_size, 32) + + def test_compose(self): + self.assertEqual(self.host_key.compose(), self.host_key_bytes) + + def test_asdict(self): + self.assertEqual(self.host_key._asdict(), OrderedDict([ + ('known_hosts', 'AAAAC3NzaC1lZDI1NTE5AAAABAABAgM='), + ('fingerprints', OrderedDict([ + (Hash.SHA2_256, 'SHA256:tisjNupmcCLFV3HIx3sTEZMsjE8wuPrxRta6wD7P2qE='), + (Hash.SHA1, 'SHA1:LnkkIzv+iqBzToK/hB/Ou2vjbQw='), + (Hash.MD5, 'MD5:90:d1:22:82:5e:7e:e0:cc:dc:1a:74:aa:14:c8:51:b3'), + ])), + ('key_type', Authentication.EDDSA), + ('key_name', SshHostKeyAlgorithm.SSH_ED25519), + ('key_size', 32), + ])) diff --git a/test/ssh/test_subprotocol.py b/test/ssh/test_subprotocol.py index c1aef58..751d3dc 100644 --- a/test/ssh/test_subprotocol.py +++ b/test/ssh/test_subprotocol.py @@ -16,6 +16,7 @@ SshKexAlgorithm, SshMacAlgorithm, ) +from cryptoparser.ssh.key import SshHostKeyRSA from cryptoparser.ssh.subprotocol import ( SshDHGroupExchangeInit, SshDHGroupExchangeGroup, @@ -268,9 +269,13 @@ class TestDHKeyExchangeReply(unittest.TestCase): def setUp(self): self.dh_key_exchange_reply_dict = collections.OrderedDict([ ('message_code', b'\x1f'), # DH_KEX_REPLY - ('host_key_length', b'\x00\x00\x00\x10'), + ('host_key_length', b'\x00\x00\x00\x23'), ('host_public_key', ( + b'\x00\x00\x00\x07' + + b'ssh-rsa' + + b'\x00\x00\x00\x08' + b'\x00\x01\x02\x03\x04\x05\x06\x07' + + b'\x00\x00\x00\x08' + b'\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f' + b'' )), @@ -289,7 +294,11 @@ def setUp(self): ]) self.dh_key_exchange_reply_bytes = b''.join(self.dh_key_exchange_reply_dict.values()) self.dh_key_exchange_reply = SshDHKeyExchangeReply( - host_public_key=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', + host_public_key=SshHostKeyRSA( + host_key_algorithm=SshHostKeyAlgorithm.SSH_RSA, + exponent=b'\x00\x01\x02\x03\x04\x05\x06\x07', + modulus=b'\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', + ), ephemeral_public_key=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', signature=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', ) @@ -306,9 +315,13 @@ class TestDHGroupExchangeReply(unittest.TestCase): def setUp(self): self.dh_group_exchange_reply_dict = collections.OrderedDict([ ('message_code', b'\x21'), # DH_GEX_REPLY - ('host_key_length', b'\x00\x00\x00\x10'), + ('host_key_length', b'\x00\x00\x00\x23'), ('host_public_key', ( + b'\x00\x00\x00\x07' + + b'ssh-rsa' + + b'\x00\x00\x00\x08' + b'\x00\x01\x02\x03\x04\x05\x06\x07' + + b'\x00\x00\x00\x08' + b'\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f' + b'' )), @@ -327,7 +340,11 @@ def setUp(self): ]) self.dh_group_exchange_reply_bytes = b''.join(self.dh_group_exchange_reply_dict.values()) self.dh_group_exchange_reply = SshDHGroupExchangeReply( - host_public_key=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', + host_public_key=SshHostKeyRSA( + host_key_algorithm=SshHostKeyAlgorithm.SSH_RSA, + exponent=b'\x00\x01\x02\x03\x04\x05\x06\x07', + modulus=b'\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', + ), ephemeral_public_key=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', signature=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', ) diff --git a/tox.ini b/tox.ini index a5abdc2..b634bb0 100644 --- a/tox.ini +++ b/tox.ini @@ -42,4 +42,4 @@ disable = missing-docstring,too-few-public-methods,too-many-ancestors,useless-ob class-attribute-rgx = ([A-Za-z_][A-Za-z0-9_]{2,50}|(__.*__))$ method-rgx = [a-z_][a-z0-9_]{2,50}$ variable-rgx = [a-z_][a-z0-9_]{2,50}$ -good-names = e,g,p,_,H2 +good-names = e,f,g,p,q,y,_,H2