diff --git a/pyproject.toml b/pyproject.toml index 582ccaa9..e976d56d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ dependencies = [ "rfc3986 >= 1.4.0", "rich >= 12.0.0", "packaging", + "id", ] dynamic = ["version"] diff --git a/twine/auth.py b/twine/auth.py index 59e2e21d..49ab8987 100644 --- a/twine/auth.py +++ b/twine/auth.py @@ -1,17 +1,25 @@ import functools import getpass +import json import logging from typing import TYPE_CHECKING, Callable, Optional, Type, cast +from urllib.parse import urlparse + +from id import AmbientCredentialError # type: ignore +from id import detect_credential # keyring has an indirect dependency on PyCA cryptography, which has no # pre-built wheels for ppc64le and s390x, see #1158. if TYPE_CHECKING: import keyring + from keyring.errors import NoKeyringError else: try: import keyring + from keyring.errors import NoKeyringError except ModuleNotFoundError: # pragma: no cover keyring = None + NoKeyringError = None from twine import exceptions from twine import utils @@ -28,7 +36,11 @@ def __init__( class Resolver: - def __init__(self, config: utils.RepositoryConfig, input: CredentialInput) -> None: + def __init__( + self, + config: utils.RepositoryConfig, + input: CredentialInput, + ) -> None: self.config = config self.input = input @@ -57,9 +69,65 @@ def password(self) -> Optional[str]: self.input.password, self.config, key="password", - prompt_strategy=self.password_from_keyring_or_prompt, + prompt_strategy=self.password_from_keyring_or_trusted_publishing_or_prompt, ) + def make_trusted_publishing_token(self) -> Optional[str]: + # Trusted publishing (OpenID Connect): get one token from the CI + # system, and exchange that for a PyPI token. + repository_domain = cast(str, urlparse(self.system).netloc) + session = utils.make_requests_session() + + # Indices are expected to support `https://{domain}/_/oidc/audience`, + # which tells OIDC exchange clients which audience to use. + audience_url = f"https://{repository_domain}/_/oidc/audience" + resp = session.get(audience_url, timeout=5) + resp.raise_for_status() + audience = cast(str, resp.json()["audience"]) + + try: + oidc_token = detect_credential(audience) + except AmbientCredentialError as e: + # If we get here, we're on a supported CI platform for trusted + # publishing, and we have not been given any token, so we can error. + raise exceptions.TrustedPublishingFailure( + "Unable to retrieve an OIDC token from the CI platform for " + f"trusted publishing {e}" + ) + + if oidc_token is None: + logger.debug("This environment is not supported for trusted publishing") + return None # Fall back to prompting for a token (if possible) + + logger.debug("Got OIDC token for audience %s", audience) + + token_exchange_url = f"https://{repository_domain}/_/oidc/mint-token" + + mint_token_resp = session.post( + token_exchange_url, + json={"token": oidc_token}, + timeout=5, # S113 wants a timeout + ) + try: + mint_token_payload = mint_token_resp.json() + except json.JSONDecodeError: + raise exceptions.TrustedPublishingFailure( + "The token-minting request returned invalid JSON" + ) + + if not mint_token_resp.ok: + reasons = "\n".join( + f'* `{error["code"]}`: {error["description"]}' + for error in mint_token_payload["errors"] + ) + raise exceptions.TrustedPublishingFailure( + "The token request failed; the index server gave the following " + f"reasons:\n\n{reasons}" + ) + + logger.debug("Minted upload token for trusted publishing") + return cast(str, mint_token_payload["token"]) + @property def system(self) -> Optional[str]: return self.config["repository"] @@ -90,6 +158,8 @@ def get_password_from_keyring(self) -> Optional[str]: username = cast(str, self.username) logger.info("Querying keyring for password") return cast(str, keyring.get_password(system, username)) + except NoKeyringError: + logger.info("No keyring backend found") except Exception as exc: logger.warning("Error getting password from keyring", exc_info=exc) return None @@ -102,12 +172,19 @@ def username_from_keyring_or_prompt(self) -> str: return self.prompt("username", input) - def password_from_keyring_or_prompt(self) -> str: + def password_from_keyring_or_trusted_publishing_or_prompt(self) -> str: password = self.get_password_from_keyring() if password: logger.info("password set from keyring") return password + if self.is_pypi() and self.username == "__token__": + logger.debug( + "Trying to use trusted publishing (no token was explicitly provided)" + ) + if (token := self.make_trusted_publishing_token()) is not None: + return token + # Prompt for API token when required. what = "API token" if self.is_pypi() else "password" diff --git a/twine/cli.py b/twine/cli.py index 6ff60d4d..f44426e3 100644 --- a/twine/cli.py +++ b/twine/cli.py @@ -81,6 +81,7 @@ def list_dependencies_and_versions() -> List[Tuple[str, str]]: "requests", "requests-toolbelt", "urllib3", + "id", ] if sys.version_info < (3, 10): deps.append("importlib-metadata") diff --git a/twine/exceptions.py b/twine/exceptions.py index b87b938b..29b7d8a1 100644 --- a/twine/exceptions.py +++ b/twine/exceptions.py @@ -116,6 +116,12 @@ class NonInteractive(TwineException): pass +class TrustedPublishingFailure(TwineException): + """Raised if we expected to use trusted publishing but couldn't.""" + + pass + + class InvalidPyPIUploadURL(TwineException): """Repository configuration tries to use PyPI with an incorrect URL. diff --git a/twine/repository.py b/twine/repository.py index b512bfb3..f04f9dbc 100644 --- a/twine/repository.py +++ b/twine/repository.py @@ -12,18 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Any, Dict, List, Optional, Set, Tuple, cast +from typing import Any, Dict, List, Optional, Set, Tuple import requests import requests_toolbelt import rich.progress -import urllib3 -from requests import adapters -from requests_toolbelt.utils import user_agent from rich import print -import twine from twine import package as package_file +from twine.utils import make_requests_session KEYWORDS_TO_NOT_FLATTEN = {"gpg_signature", "attestations", "content"} @@ -47,7 +44,7 @@ def __init__( ) -> None: self.url = repository_url - self.session = requests.session() + self.session = make_requests_session() # requests.Session.auth should be Union[None, Tuple[str, str], ...] # But username or password could be None # See TODO for utils.RepositoryConfig @@ -57,35 +54,10 @@ def __init__( logger.info(f"username: {username if username else ''}") logger.info(f"password: <{'hidden' if password else 'empty'}>") - self.session.headers["User-Agent"] = self._make_user_agent_string() - for scheme in ("http://", "https://"): - self.session.mount(scheme, self._make_adapter_with_retries()) - # Working around https://github.com/python/typing/issues/182 self._releases_json_data: Dict[str, Dict[str, Any]] = {} self.disable_progress_bar = disable_progress_bar - @staticmethod - def _make_adapter_with_retries() -> adapters.HTTPAdapter: - retry = urllib3.Retry( - allowed_methods=["GET"], - connect=5, - total=10, - status_forcelist=[500, 501, 502, 503], - ) - - return adapters.HTTPAdapter(max_retries=retry) - - @staticmethod - def _make_user_agent_string() -> str: - user_agent_string = ( - user_agent.UserAgentBuilder("twine", twine.__version__) - .include_implementation() - .build() - ) - - return cast(str, user_agent_string) - def close(self) -> None: self.session.close() diff --git a/twine/utils.py b/twine/utils.py index 10e374e7..2bc56d0b 100644 --- a/twine/utils.py +++ b/twine/utils.py @@ -25,7 +25,11 @@ import requests import rfc3986 +import urllib3 +from requests.adapters import HTTPAdapter +from requests_toolbelt.utils import user_agent +import twine from twine import exceptions # Shim for input to allow testing. @@ -304,6 +308,28 @@ def get_userpass_value( get_clientcert = functools.partial(get_userpass_value, key="client_cert") +def make_requests_session() -> requests.Session: + """Prepare a requests Session with retries & twine's user-agent string.""" + s = requests.Session() + + retry = urllib3.Retry( + allowed_methods=["GET"], + connect=5, + total=10, + status_forcelist=[500, 501, 502, 503], + ) + + for scheme in ("http://", "https://"): + s.mount(scheme, HTTPAdapter(max_retries=retry)) + + s.headers["User-Agent"] = ( + user_agent.UserAgentBuilder("twine", twine.__version__) + .include_implementation() + .build() + ) + return s + + class EnvironmentDefault(argparse.Action): """Get values from environment variable."""