Skip to content

Commit

Permalink
Merge branch '46-ssh-software-version-analysis'
Browse files Browse the repository at this point in the history
Closes: #46
  • Loading branch information
c0r0n3r committed Jan 15, 2022
2 parents 4c34996 + fda8f54 commit 6f94588
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 44 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

- SSH (`ssh`)
- Versions (`versions`)
- add [software version](https://tools.ietf.org/html/rfc4253#section-4.2) related classes (\#46)

## 0.7.3 - 2021-12-26

- Generic
Expand Down
31 changes: 18 additions & 13 deletions cryptoparser/ssh/subprotocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
SshMacAlgorithm,
SshCompressionAlgorithm,
)
from cryptoparser.ssh.version import SshProtocolVersion
from cryptoparser.ssh.version import (
SshProtocolVersion,
SshSoftwareVersionBase,
SshSoftwareVersionParsedVariant,
SshSoftwareVersionUnparsed,
)


class SshMessageCode(enum.IntEnum):
Expand Down Expand Up @@ -74,18 +79,9 @@ def _compose_header(cls):
@attr.s
class SshProtocolMessage(ParsableBase):
protocol_version = attr.ib(validator=attr.validators.instance_of(SshProtocolVersion))
software_version = attr.ib(validator=attr.validators.instance_of(six.string_types))
software_version = attr.ib(validator=attr.validators.instance_of(SshSoftwareVersionBase))
comment = attr.ib(validator=attr.validators.optional(attr.validators.instance_of(six.string_types)), default=None)

@software_version.validator
def software_version_validator(self, _, value): # pylint: disable=no-self-use
if '\r' in value or '\n' in value or ' ' in value:
raise InvalidValue(value, SshProtocolMessage, 'software_version')
try:
value.encode('ascii')
except UnicodeEncodeError as e:
six.raise_from(InvalidValue(value, SshProtocolMessage, 'software_version'), e)

@comment.validator
def comment_validator(self, _, value): # pylint: disable=no-self-use
if value is not None:
Expand Down Expand Up @@ -114,7 +110,12 @@ def _parse(cls, parsable):
if software_version_and_comment[-1][-1] == '\r':
software_version_and_comment[-1] = software_version_and_comment[-1][:-1]

software_version = software_version_and_comment[0]
software_version_parser = ParserText(software_version_and_comment[0].encode('ascii'))
try:
software_version_parser.parse_parsable('value', SshSoftwareVersionParsedVariant)
except InvalidValue:
software_version_parser.parse_parsable('value', SshSoftwareVersionUnparsed)

if len(software_version_and_comment) > 1:
comment = ' '.join(software_version_and_comment[1:])
else:
Expand All @@ -124,7 +125,11 @@ def _parse(cls, parsable):
if parser.parsed_length > 255:
raise TooMuchData(parser.parsed_length - 255)

return SshProtocolMessage(parser['protocol_version'], software_version, comment), parser.parsed_length
return SshProtocolMessage(
parser['protocol_version'],
software_version_parser['value'],
comment
), parser.parsed_length

def compose(self):
composer = ComposerText()
Expand Down
177 changes: 174 additions & 3 deletions cryptoparser/ssh/version.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import abc
import collections
import enum
import six

import attr

from cryptoparser.common.base import ProtocolVersionBase
from cryptoparser.common.exception import InvalidValue
from cryptoparser.common.parse import ParserText, ComposerText
from cryptoparser.common.base import ProtocolVersionBase, Serializable, VariantParsable
from cryptoparser.common.exception import InvalidType, InvalidValue
from cryptoparser.common.parse import ParsableBase, ParserText, ComposerText


class SshVersion(enum.IntEnum):
Expand Down Expand Up @@ -56,3 +58,172 @@ def supported_versions(self):
return [SshVersion.SSH1, SshVersion.SSH2]

return [self.major, ]


class SshSoftwareVersionBase(ParsableBase):
@classmethod
@abc.abstractmethod
def _parse(cls, parsable):
raise NotImplementedError()

@abc.abstractmethod
def compose(self):
raise NotImplementedError()


@attr.s
class SshSoftwareVersionUnparsed(SshSoftwareVersionBase, Serializable):
raw = attr.ib(validator=attr.validators.instance_of(six.string_types))

@raw.validator
def raw_validator(self, _, value): # pylint: disable=no-self-use
if '\r' in value or '\n' in value or ' ' in value:
raise InvalidValue(value, SshSoftwareVersionUnparsed, 'raw')
try:
value.encode('ascii')
except UnicodeEncodeError as e:
six.raise_from(InvalidValue(value, SshSoftwareVersionUnparsed, 'raw'), e)

def _as_markdown(self, level):
return self._markdown_result(self.raw, level)

@classmethod
def _parse(cls, parsable):
parser = ParserText(parsable)

parser.parse_string_by_length('raw', len(parsable))

return SshSoftwareVersionUnparsed(parser['raw']), parser.parsed_length

def compose(self):
composer = ComposerText()

composer.compose_string(self.raw)

return composer.composed


@attr.s
class SshSoftwareVersionParsedBase(SshSoftwareVersionBase):
version = attr.ib(default=None, validator=attr.validators.optional(attr.validators.instance_of(six.string_types)))

@classmethod
@abc.abstractmethod
def _get_vendor(cls):
raise NotImplementedError()

@classmethod
@abc.abstractmethod
def _get_version_separator(cls):
raise NotImplementedError()

@property
def vendor(self):
return self._get_vendor()

def _asdict(self):
return collections.OrderedDict([
('vendor', self.vendor),
('version', self.version),
])

@classmethod
def _parse(cls, parsable):
parser = ParserText(parsable)

version_separator = cls._get_version_separator()
if version_separator is None:
parser.parse_string_by_length('vendor')
else:
parser.parse_string_until_separator_or_end('vendor', version_separator)

if parser['vendor'] != cls._get_vendor():
raise InvalidType()

if parser.unparsed_length > 0 and version_separator is not None:
parser.parse_separator(version_separator)
parser.parse_string_by_length('version')
version = parser['version']
else:
version = None

return cls(version), parser.parsed_length

def compose(self):
composer = ComposerText()

composer.compose_string(self.vendor)
if self.version is not None:
composer.compose_separator(self._get_version_separator())
composer.compose_string(self.version)

return composer.composed


@attr.s
class SshSoftwareVersionCryptlib(SshSoftwareVersionParsedBase):
@classmethod
def _get_vendor(cls):
return 'cryptlib'

@classmethod
def _get_version_separator(cls):
return None


@attr.s
class SshSoftwareVersionDropbear(SshSoftwareVersionParsedBase):
@classmethod
def _get_vendor(cls):
return 'dropbear'

@classmethod
def _get_version_separator(cls):
return '_'


@attr.s
class SshSoftwareVersionIPSSH(SshSoftwareVersionParsedBase):
@classmethod
def _get_vendor(cls):
return 'IPSSH'

@classmethod
def _get_version_separator(cls):
return '-'


@attr.s
class SshSoftwareVersionMonacaSSH(SshSoftwareVersionParsedBase):
@classmethod
def _get_vendor(cls):
return 'Monaca'

@classmethod
def _get_version_separator(cls):
return None


@attr.s
class SshSoftwareVersionOpenSSH(SshSoftwareVersionParsedBase):
@classmethod
def _get_vendor(cls):
return 'OpenSSH'

@classmethod
def _get_version_separator(cls):
return '_'


class SshSoftwareVersionParsedVariant(VariantParsable):
_VARIANTS = collections.OrderedDict([
(SshSoftwareVersionCryptlib, (SshSoftwareVersionCryptlib, )),
(SshSoftwareVersionDropbear, (SshSoftwareVersionDropbear, )),
(SshSoftwareVersionIPSSH, (SshSoftwareVersionIPSSH, )),
(SshSoftwareVersionMonacaSSH, (SshSoftwareVersionMonacaSSH, )),
(SshSoftwareVersionOpenSSH, (SshSoftwareVersionOpenSSH, )),
])

@classmethod
def _get_variants(cls):
return cls._VARIANTS
44 changes: 17 additions & 27 deletions test/ssh/test_subprotocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
SshProtocolMessage,
SshUnimplementedMessage,
)
from cryptoparser.ssh.version import SshProtocolVersion, SshVersion
from cryptoparser.ssh.version import SshProtocolVersion, SshSoftwareVersionUnparsed, SshVersion


class TestProtocolMessage(unittest.TestCase):
Expand All @@ -55,67 +55,57 @@ def test_error(self):
def test_parse(self):
message = SshProtocolMessage.parse_exact_size(b'SSH-1.1-software_version\r\n')
self.assertEqual(message.protocol_version, SshProtocolVersion(SshVersion.SSH1, 1))
self.assertEqual(message.software_version, 'software_version')
self.assertEqual(message.software_version, SshSoftwareVersionUnparsed('software_version'))
self.assertEqual(message.comment, None)

message = SshProtocolMessage.parse_exact_size(b'SSH-2.2-software_version\r\n')
self.assertEqual(message.protocol_version, SshProtocolVersion(SshVersion.SSH2, 2))
self.assertEqual(message.software_version, 'software_version')
self.assertEqual(message.software_version, SshSoftwareVersionUnparsed('software_version'))
self.assertEqual(message.comment, None)

message = SshProtocolMessage.parse_exact_size(b'SSH-2.0-software_version comment\r\n')
self.assertEqual(message.protocol_version, SshProtocolVersion(SshVersion.SSH2, 0))
self.assertEqual(message.software_version, 'software_version')
self.assertEqual(message.software_version, SshSoftwareVersionUnparsed('software_version'))
self.assertEqual(message.comment, 'comment')

message = SshProtocolMessage.parse_exact_size(b'SSH-2.0-software_version comment with spaces\r\n')
self.assertEqual(message.protocol_version, SshProtocolVersion(SshVersion.SSH2, 0))
self.assertEqual(message.software_version, 'software_version')
self.assertEqual(message.software_version, SshSoftwareVersionUnparsed('software_version'))
self.assertEqual(message.comment, 'comment with spaces')

def test_software_version(self):
with self.assertRaises(InvalidValue):
SshProtocolMessage(SshProtocolVersion(SshVersion.SSH2, 2), software_version=six.ensure_text('αβγ'))
with self.assertRaises(InvalidValue):
SshProtocolMessage(
SshProtocolVersion(SshVersion.SSH2, 2), software_version=six.ensure_text('software_version ')
)
with self.assertRaises(InvalidValue):
SshProtocolMessage(
SshProtocolVersion(SshVersion.SSH2, 2), software_version=six.ensure_text('software_version\r')
)
with self.assertRaises(InvalidValue):
SshProtocolMessage(
SshProtocolVersion(SshVersion.SSH2, 2), software_version=six.ensure_text('software_version\n')
)

def test_comment(self):
with self.assertRaises(InvalidValue):
SshProtocolMessage(
SshProtocolVersion(SshVersion.SSH2, 2), 'software_version', comment=six.ensure_text('αβγ')
SshProtocolVersion(SshVersion.SSH2, 2),
SshSoftwareVersionUnparsed('software_version'),
comment=six.ensure_text('αβγ'),
)
with self.assertRaises(InvalidValue):
SshProtocolMessage(
SshProtocolVersion(SshVersion.SSH2, 2), 'software_version', comment=six.ensure_text('comment\r')
SshProtocolVersion(SshVersion.SSH2, 2),
SshSoftwareVersionUnparsed('software_version'),
comment=six.ensure_text('comment\r'),
)
with self.assertRaises(InvalidValue):
SshProtocolMessage(
SshProtocolVersion(SshVersion.SSH2, 2), 'software_version', comment=six.ensure_text('comment\n')
SshProtocolVersion(SshVersion.SSH2, 2),
SshSoftwareVersionUnparsed('software_version'),
comment=six.ensure_text('comment\n'),
)

def test_compose(self):
self.assertEqual(
SshProtocolMessage(
SshProtocolVersion(SshVersion.SSH2, 2),
'software_version'
SshSoftwareVersionUnparsed('software_version'),
).compose(),
b'SSH-2.2-software_version\r\n'
)

self.assertEqual(
SshProtocolMessage(
SshProtocolVersion(SshVersion.SSH2, 2),
'software_version',
SshSoftwareVersionUnparsed('software_version'),
'comment'
).compose(),
b'SSH-2.2-software_version comment\r\n'
Expand All @@ -124,7 +114,7 @@ def test_compose(self):
self.assertEqual(
SshProtocolMessage(
SshProtocolVersion(SshVersion.SSH2, 2),
'software_version',
SshSoftwareVersionUnparsed('software_version'),
'comment with spaces'
).compose(),
b'SSH-2.2-software_version comment with spaces\r\n'
Expand Down
Loading

0 comments on commit 6f94588

Please sign in to comment.