From b7f27ae50d3978d5d754e4a4d280ef730c63e7e0 Mon Sep 17 00:00:00 2001 From: Kyle Fazzari Date: Mon, 18 May 2020 13:40:59 -0700 Subject: [PATCH] Support authentication `vcs_base.load_url()` currently doesn't support authentication. Add support for both basic and token-based authentication by parsing netrc-formatted files. Use `appdirs` to support vcstool-specific authentication files for both the user and the system (user takes precedence). Signed-off-by: Kyle Fazzari --- .travis.yml | 2 +- test/test_base.py | 252 ++++++++++++++++++++++++++++++++++++ vcstool/clients/vcs_base.py | 118 ++++++++++++++++- 3 files changed, 370 insertions(+), 2 deletions(-) create mode 100644 test/test_base.py diff --git a/.travis.yml b/.travis.yml index 838e5019..5b3f6129 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,7 +12,7 @@ matrix: install: # newer versions of PyYAML dropped support for Python 3.4 - if [ $TRAVIS_PYTHON_VERSION == "3.4" ]; then pip install PyYAML==5.2; fi - - pip install coverage flake8 flake8-docstrings flake8-import-order pytest PyYAML + - pip install appdirs coverage flake8 flake8-docstrings flake8-import-order pytest PyYAML mock script: - PYTHONPATH=`pwd` pytest -s -v test notifications: diff --git a/test/test_base.py b/test/test_base.py new file mode 100644 index 00000000..7db9bb51 --- /dev/null +++ b/test/test_base.py @@ -0,0 +1,252 @@ +import os +import shutil +import tempfile +import textwrap +import unittest + +try: + from urllib.error import HTTPError +except ImportError: + from urllib2 import HTTPError + + +try: + from unittest import mock +except ImportError: + import mock + +from vcstool.clients import vcs_base + + +class TestBase(unittest.TestCase): + + def setUp(self): + self.default_auth_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.default_auth_dir) + self.user_auth_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.user_auth_dir) + self.system_auth_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.system_auth_dir) + + self._previous_home = os.getenv("HOME") + os.environ["HOME"] = self.default_auth_dir + + patcher = mock.patch( + 'vcstool.clients.vcs_base.appdirs.user_config_dir', + return_value=self.user_auth_dir) + patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + 'vcstool.clients.vcs_base.appdirs.site_config_dir', + return_value=self.system_auth_dir) + patcher.start() + self.addCleanup(patcher.stop) + + def tearDown(self): + if self._previous_home: + os.environ["HOME"] = self._previous_home + else: + del os.environ["HOME"] + + @mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True) + @mock.patch( + 'vcstool.clients.vcs_base._authenticated_urlopen', autospec=True) + def test_load_url_calls_urlopen( + self, authenticated_urlopen_mock, urlopen_mock): + urlopen_read_mock = urlopen_mock.return_value.read + + vcs_base.load_url('example.com', timeout=123) + + urlopen_mock.assert_called_once_with('example.com', timeout=123) + urlopen_read_mock.assert_called_once_with() + self.assertFalse(authenticated_urlopen_mock.mock_calls) + + @mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True) + @mock.patch( + 'vcstool.clients.vcs_base._authenticated_urlopen', autospec=True) + def test_load_url_calls_authenticated_urlopen( + self, authenticated_urlopen_mock, urlopen_mock): + for code in (401, 404): + urlopen_mock.side_effect = [ + HTTPError(None, code, None, None, None)] + urlopen_read_mock = urlopen_mock.return_value.read + + vcs_base.load_url('example.com', timeout=123) + + urlopen_mock.assert_called_once_with('example.com', timeout=123) + self.assertFalse(urlopen_read_mock.mock_calls) + + authenticated_urlopen_mock.assert_called_once_with( + 'example.com', timeout=123) + + authenticated_urlopen_mock.reset_mock() + urlopen_mock.reset_mock() + + def test_netrc_open_no_such_file(self): + try: + self.assertEqual(vcs_base._authenticated_urlopen( + 'https://example.com'), None) + except Exception: + self.fail( + 'The lack of a .netrc file should not result in an exception') + + def test_netrc_file_precedence(self): + machine = 'example.com' + + default_auth_file_path = os.path.join(self.default_auth_dir, '.netrc') + user_auth_file_path = os.path.join( + self.user_auth_dir, vcs_base._AUTHENTICATION_CONFIGURATION_FILE) + system_auth_file_path = os.path.join( + self.system_auth_dir, vcs_base._AUTHENTICATION_CONFIGURATION_FILE) + + for path in ( + default_auth_file_path, user_auth_file_path, + system_auth_file_path): + _create_netrc_file(path, textwrap.dedent('''\ + machine %s + password %s + ''' % (machine, path))) + + credentials = vcs_base._credentials_for_machine(machine) + self.assertIsNotNone(credentials) + self.assertEqual(len(credentials), 3) + self.assertEqual(credentials[2], default_auth_file_path) + + # Remove default auth file and assert that the user auth file is used + os.unlink(default_auth_file_path) + credentials = vcs_base._credentials_for_machine(machine) + self.assertIsNotNone(credentials) + self.assertEqual(len(credentials), 3) + self.assertEqual(credentials[2], user_auth_file_path) + + # Remove user auth file and assert that the system auth file is used + os.unlink(user_auth_file_path) + credentials = vcs_base._credentials_for_machine(machine) + self.assertIsNotNone(credentials) + self.assertEqual(len(credentials), 3) + self.assertEqual(credentials[2], system_auth_file_path) + + # Remove system auth file and assert that no creds are found + os.unlink(system_auth_file_path) + self.assertIsNone(vcs_base._credentials_for_machine(machine)) + + def test_netrc_file_skip_errors(self): + machine = 'example.com' + + default_auth_file_path = os.path.join(self.default_auth_dir, '.netrc') + user_auth_file_path = os.path.join( + self.user_auth_dir, vcs_base._AUTHENTICATION_CONFIGURATION_FILE) + + _create_netrc_file(default_auth_file_path, 'skip-me-invalid') + + _create_netrc_file(user_auth_file_path, textwrap.dedent('''\ + machine %s + password %s + ''' % (machine, user_auth_file_path))) + + credentials = vcs_base._credentials_for_machine(machine) + self.assertIsNotNone(credentials) + self.assertEqual(len(credentials), 3) + self.assertEqual(credentials[2], user_auth_file_path) + + def test_auth_parts(self): + user_auth_file_path = os.path.join( + self.user_auth_dir, vcs_base._AUTHENTICATION_CONFIGURATION_FILE) + user_auth_file_part_path = os.path.join( + self.user_auth_dir, + vcs_base._AUTHENTICATION_CONFIGURATION_PARTS_DIR, 'test.conf') + os.makedirs(os.path.dirname(user_auth_file_part_path)) + + auth_machine = 'auth.example.com' + parts_machine = 'parts.example.com' + + for path in (user_auth_file_path, user_auth_file_part_path): + _create_netrc_file(path, textwrap.dedent('''\ + machine %s + password %s + ''' % (auth_machine, path))) + with open(user_auth_file_part_path, 'a') as f: + f.write('machine %s\n' % parts_machine) + f.write('password %s\n' % path) + + credentials = vcs_base._credentials_for_machine(auth_machine) + self.assertIsNotNone(credentials) + self.assertEqual(len(credentials), 3) + self.assertEqual(credentials[2], user_auth_file_path) + + credentials = vcs_base._credentials_for_machine(parts_machine) + self.assertIsNotNone(credentials) + self.assertEqual(len(credentials), 3) + self.assertEqual(credentials[2], user_auth_file_part_path) + + @mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True) + @mock.patch('vcstool.clients.vcs_base.build_opener', autospec=True) + def test_authenticated_urlopen_basic_auth( + self, build_opener_mock, urlopen_mock): + open_mock = build_opener_mock.return_value.open + + machine = 'example.com' + _create_netrc_file( + os.path.join(self.default_auth_dir, '.netrc'), + textwrap.dedent('''\ + machine %s + login username + password password + ''' % machine)) + + url = 'https://%s/foo/bar' % machine + vcs_base._authenticated_urlopen(url) + + self.assertFalse(urlopen_mock.mock_calls) + + class _HTTPBasicAuthHandlerMatcher(object): + def __init__(self, test): + self.test = test + + def __eq__(self, other): + manager = other.passwd + self.test.assertEqual( + manager.find_user_password(None, 'example.com'), + ('username', 'password')) + return True + + build_opener_mock.assert_called_once_with( + _HTTPBasicAuthHandlerMatcher(self)) + open_mock.assert_called_once_with(url, timeout=None) + + @mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True) + @mock.patch('vcstool.clients.vcs_base.build_opener', autospec=True) + def test_authenticated_urlopen_token_auth( + self, build_opener_mock, urlopen_mock): + machine = 'example.com' + _create_netrc_file( + os.path.join(self.default_auth_dir, '.netrc'), + textwrap.dedent('''\ + machine %s + password password + ''' % machine)) + + url = 'https://%s/foo/bar' % machine + vcs_base._authenticated_urlopen(url) + + self.assertFalse(build_opener_mock.mock_calls) + + class _RequestMatcher(object): + def __init__(self, test): + self.test = test + + def __eq__(self, other): + self.test.assertEqual(other.get_full_url(), url) + self.test.assertEqual( + other.get_header('Private-token'), 'password') + return True + + urlopen_mock.assert_called_once_with( + _RequestMatcher(self), timeout=None) + + +def _create_netrc_file(path, contents): + with open(path, 'w') as f: + f.write(contents) + os.chmod(path, 0o600) diff --git a/vcstool/clients/vcs_base.py b/vcstool/clients/vcs_base.py index 19b6e96a..3f4f0ca7 100644 --- a/vcstool/clients/vcs_base.py +++ b/vcstool/clients/vcs_base.py @@ -1,3 +1,7 @@ +import errno +import glob +import logging +import netrc import os import socket import subprocess @@ -5,6 +9,10 @@ try: from urllib.request import Request from urllib.request import urlopen + from urllib.request import HTTPPasswordMgrWithDefaultRealm + from urllib.request import HTTPBasicAuthHandler + from urllib.request import build_opener + from urllib.parse import urlparse from urllib.error import HTTPError from urllib.error import URLError except ImportError: @@ -12,12 +20,24 @@ from urllib2 import Request from urllib2 import URLError from urllib2 import urlopen + from urllib2 import HTTPPasswordMgrWithDefaultRealm + from urllib2 import HTTPBasicAuthHandler + from urllib2 import build_opener + from urlparse import urlparse try: from shutil import which # noqa except ImportError: from vcstool.compat.shutil import which # noqa +import appdirs + +_AUTHENTICATION_CONFIGURATION_FILE = "auth.conf" +_AUTHENTICATION_CONFIGURATION_PARTS_DIR = "auth.conf.d" +_APPDIRS_PROJECT_NAME = 'vcstool' + +logger = logging.getLogger(__name__) + class VcsClientBase(object): @@ -93,7 +113,12 @@ def load_url(url, retry=2, retry_period=1, timeout=10): try: fh = urlopen(url, timeout=timeout) except HTTPError as e: - if e.code == 503 and retry: + if e.code in (401, 404): + # Try again, but with authentication + fh = _authenticated_urlopen(url, timeout=timeout) + if fh is not None: + return fh.read() + elif e.code == 503 and retry: time.sleep(retry_period) return load_url( url, retry=retry - 1, retry_period=retry_period, @@ -132,3 +157,94 @@ def test_url(url, retry=2, retry_period=1, timeout=10): timeout=timeout) raise URLError(str(e) + ' (%s)' % url) return response + + +def _authenticated_urlopen(uri, timeout=None): + machine = urlparse(uri).netloc + if not machine: + return None + + credentials = _credentials_for_machine(machine) + if credentials is None: + logger.warning('No credentials found for "%s"' % machine) + return None + + (username, account, password) = credentials + + # If we have both a username and a password, use basic auth + if username and password: + password_manager = HTTPPasswordMgrWithDefaultRealm() + password_manager.add_password(None, machine, username, password) + auth_handler = HTTPBasicAuthHandler(password_manager) + opener = build_opener(auth_handler) + return opener.open(uri, timeout=timeout) + + # If we have only a password, use token auth + elif password: + request = Request(uri) + request.add_header('PRIVATE-TOKEN', password) + return urlopen(request, timeout=timeout) + + return None + + +def _credentials_for_machine(machine): + # First check the default .netrc file, if any-- it takes precedence over + # everything else + credentials = _credentials_for_machine_in_file(None, machine) + if credentials: + return credentials + + # If that file either didn't exist or didn't match for the machine, + # check the user auth directory for vcstool + credentials = _credentials_for_machine_in_dir( + appdirs.user_config_dir(_APPDIRS_PROJECT_NAME), machine) + if credentials: + return credentials + + # Finally, check the system-wide auth directory for vcstool + return _credentials_for_machine_in_dir( + appdirs.site_config_dir(_APPDIRS_PROJECT_NAME), machine) + + +def _credentials_for_machine_in_dir(directory, machine): + # The idea here is similar to how Debian handles authenticated apt repos: + # https://manpages.debian.org/testing/apt/apt_auth.conf.5.en.html + + # Check the auth.conf file first + auth_file_path = os.path.join( + directory, _AUTHENTICATION_CONFIGURATION_FILE) + credentials = _credentials_for_machine_in_file(auth_file_path, machine) + if credentials: + return credentials + + # If that file either didn't exist or didn't match for the machine, check + # the .conf files in the configuration parts dir + configuration_parts_dir = os.path.join( + directory, _AUTHENTICATION_CONFIGURATION_PARTS_DIR) + auth_files = glob.glob(os.path.join(configuration_parts_dir, '*.conf')) + for auth_file in sorted(auth_files): + auth_file_path = os.path.join(configuration_parts_dir, auth_file) + credentials = _credentials_for_machine_in_file(auth_file_path, machine) + if credentials: + return credentials + + # Nothing matched + return None + + +def _credentials_for_machine_in_file(filename, machine): + credentials = None + try: + credentials = netrc.netrc(file=filename).authenticators(machine) + except EnvironmentError as e: + # Don't error just because the file didn't exist or we didn't have + # permission to access it. Catching this situation this way to be + # compatible with python 2 and 3. + if e.errno not in (errno.ENOENT, errno.EACCES): + raise + except netrc.NetrcParseError: + # If this file had issues, don't error out so we can try fallbacks + pass + + return credentials