diff --git a/.travis.yml b/.travis.yml index 838e5019..c8a2f760 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,7 +12,7 @@ matrix: install: # newer versions of PyYAML dropped support for Python 3.4 - if [ $TRAVIS_PYTHON_VERSION == "3.4" ]; then pip install PyYAML==5.2; fi - - pip install coverage flake8 flake8-docstrings flake8-import-order pytest PyYAML + - pip install coverage flake8 flake8-docstrings flake8-import-order pytest PyYAML mock script: - PYTHONPATH=`pwd` pytest -s -v test notifications: diff --git a/test/test_base.py b/test/test_base.py new file mode 100644 index 00000000..b92816d7 --- /dev/null +++ b/test/test_base.py @@ -0,0 +1,124 @@ +import os +import shutil +import tempfile +import unittest + +try: + from urllib.error import HTTPError +except ImportError: + from urllib2 import HTTPError + + +try: + from unittest import mock +except ImportError: + import mock + +from vcstool.clients import vcs_base + + +class TestBase(unittest.TestCase): + + @mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True) + @mock.patch('vcstool.clients.vcs_base._netrc_open', autospec=True) + def test_load_url_calls_urlopen(self, netrc_open_mock, urlopen_mock): + urlopen_read_mock = urlopen_mock.return_value.read + + vcs_base.load_url('example.com', timeout=123) + + urlopen_mock.assert_called_once_with('example.com', timeout=123) + urlopen_read_mock.assert_called_once_with() + self.assertFalse(netrc_open_mock.mock_calls) + + @mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True) + @mock.patch('vcstool.clients.vcs_base._netrc_open', autospec=True) + def test_load_url_calls_netrc_open(self, netrc_open_mock, urlopen_mock): + for code in (401, 404): + urlopen_mock.side_effect = [ + HTTPError(None, code, None, None, None)] + urlopen_read_mock = urlopen_mock.return_value.read + + vcs_base.load_url('example.com', timeout=123) + + urlopen_mock.assert_called_once_with('example.com', timeout=123) + self.assertFalse(urlopen_read_mock.mock_calls) + + netrc_open_mock.assert_called_once_with('example.com', timeout=123) + + netrc_open_mock.reset_mock() + urlopen_mock.reset_mock() + + def test_netrc_open_no_such_file(self): + try: + self.assertEqual(vcs_base._netrc_open( + 'https://example.com', filename='/non-existent'), None) + except Exception: + self.fail( + 'The lack of a .netrc file should not result in an exception') + + @mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True) + @mock.patch('vcstool.clients.vcs_base.build_opener', autospec=True) + def test_netrc_open_basic_auth(self, build_opener_mock, urlopen_mock): + open_mock = build_opener_mock.return_value.open + + tmpdir = tempfile.mkdtemp() + netrc_file = os.path.join(tmpdir, 'netrc') + machine = 'example.com' + with open(netrc_file, 'w') as f: + f.write('machine %s\n' % machine) + f.write('login username\n') + f.write('password password') + + url = 'https://%s/foo/bar' % machine + try: + vcs_base._netrc_open(url, filename=netrc_file, timeout=123) + finally: + shutil.rmtree(tmpdir) + + self.assertFalse(urlopen_mock.mock_calls) + + class _HTTPBasicAuthHandlerMatcher(object): + def __init__(self, test): + self.test = test + + def __eq__(self, other): + manager = other.passwd + self.test.assertEqual( + manager.find_user_password(None, 'example.com'), + ('username', 'password')) + return True + + build_opener_mock.assert_called_once_with( + _HTTPBasicAuthHandlerMatcher(self)) + open_mock.assert_called_once_with(url, timeout=123) + + @mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True) + @mock.patch('vcstool.clients.vcs_base.build_opener', autospec=True) + def test_netrc_open_token_auth(self, build_opener_mock, urlopen_mock): + tmpdir = tempfile.mkdtemp() + netrc_file = os.path.join(tmpdir, 'netrc') + machine = 'example.com' + with open(netrc_file, 'w') as f: + f.write('machine %s\n' % machine) + f.write('password password') + + url = 'https://%s/foo/bar' % machine + try: + vcs_base._netrc_open(url, filename=netrc_file, timeout=123) + finally: + shutil.rmtree(tmpdir) + + self.assertFalse(build_opener_mock.mock_calls) + + class _RequestMatcher(object): + def __init__(self, test): + self.test = test + + def __eq__(self, other): + self.test.assertEqual(other.get_full_url(), url) + self.test.assertEqual( + other.get_header('Private-token'), 'password') + return True + + urlopen_mock.assert_called_once_with( + _RequestMatcher(self), timeout=123) diff --git a/vcstool/clients/vcs_base.py b/vcstool/clients/vcs_base.py index 19b6e96a..bb3db2d8 100644 --- a/vcstool/clients/vcs_base.py +++ b/vcstool/clients/vcs_base.py @@ -1,3 +1,6 @@ +import errno +import logging +import netrc import os import socket import subprocess @@ -5,6 +8,10 @@ try: from urllib.request import Request from urllib.request import urlopen + from urllib.request import HTTPPasswordMgrWithDefaultRealm + from urllib.request import HTTPBasicAuthHandler + from urllib.request import build_opener + from urllib.parse import urlparse from urllib.error import HTTPError from urllib.error import URLError except ImportError: @@ -12,6 +19,10 @@ from urllib2 import Request from urllib2 import URLError from urllib2 import urlopen + from urllib2 import HTTPPasswordMgrWithDefaultRealm + from urllib2 import HTTPBasicAuthHandler + from urllib2 import build_opener + from urlparse import urlparse try: from shutil import which # noqa @@ -91,7 +102,7 @@ def run_command(cmd, cwd, env=None): def load_url(url, retry=2, retry_period=1, timeout=10): try: - fh = urlopen(url, timeout=timeout) + fh = _urlopen_netrc(url, timeout=timeout) except HTTPError as e: if e.code == 503 and retry: time.sleep(retry_period) @@ -132,3 +143,50 @@ def test_url(url, retry=2, retry_period=1, timeout=10): timeout=timeout) raise URLError(str(e) + ' (%s)' % url) return response + + +def _urlopen_netrc(uri, timeout=None): + try: + return urlopen(uri, timeout=timeout) + except HTTPError as e: + if e.code in (401, 404): + # Try again with netrc credentials + result = _netrc_open(uri, timeout=timeout) + if result is not None: + return result + raise + + +def _netrc_open(uri, filename=None, timeout=None): + parsed_uri = urlparse(uri) + machine = parsed_uri.netloc + if not machine: + return None + + opener = None + try: + info = netrc.netrc(filename).authenticators(machine) + if info is None: + # caught below, like other netrc parse errors + raise netrc.NetrcParseError('No authenticators for "%s"' % machine) + + (username, _, password) = info + if username and password: + pass_man = HTTPPasswordMgrWithDefaultRealm() + pass_man.add_password(None, machine, username, password) + authhandler = HTTPBasicAuthHandler(pass_man) + opener = build_opener(authhandler) + return opener.open(uri, timeout=timeout) + elif password: + request = Request(uri) + request.add_header('PRIVATE-TOKEN', password) + return urlopen(request, timeout=timeout) + except EnvironmentError as e: + # Don't error just because the user doesn't have a .netrc file + if e.errno != errno.ENOENT: + raise + except netrc.NetrcParseError as neterr: + logging.getLogger(__name__).warn( + 'WARNING: parsing .netrc: %s' % str(neterr)) + + return None