diff --git a/.coverage b/.coverage index d816686..d559d53 100644 Binary files a/.coverage and b/.coverage differ diff --git a/d2x/auth/sf/auth_url.py b/d2x/auth/sf/auth_url.py index 4ab1d06..9013460 100644 --- a/d2x/auth/sf/auth_url.py +++ b/d2x/auth/sf/auth_url.py @@ -14,18 +14,21 @@ from rich.table import Table # Local imports -from d2x.parse.sf.auth_url import parse_sfdx_auth_url from d2x.models.sf.auth import ( - DomainType, # Add this import + DomainType, TokenRequest, TokenResponse, HttpResponse, TokenExchangeDebug, + SfdxAuthUrlModel, ) from d2x.ux.gh.actions import summary as gha_summary, output as gha_output from d2x.models.sf.org import SalesforceOrgInfo from d2x.base.types import CLIOptions -from d2x.api.gh import set_environment_variable # Add this import +from d2x.api.gh import ( + set_environment_variable, + get_environment_variable, +) # Ensure get_environment_variable is imported def exchange_token(org_info: SalesforceOrgInfo, cli_options: CLIOptions): @@ -125,7 +128,11 @@ def exchange_token(org_info: SalesforceOrgInfo, cli_options: CLIOptions): console.print(success_panel) # Store access token in GitHub Environment - set_environment_variable("salesforce", "ACCESS_TOKEN", token_response.access_token.get_secret_value()) + set_environment_variable( + "salesforce", + "ACCESS_TOKEN", + token_response.access_token.get_secret_value(), + ) return token_response @@ -157,26 +164,26 @@ def main(cli_options: CLIOptions): # Remove the console.status context manager # with console.status("[bold blue]Parsing SFDX Auth URL..."): # org_info = parse_sfdx_auth_url(auth_url) - org_info = parse_sfdx_auth_url(auth_url) + org_info = SfdxAuthUrlModel(auth_url=auth_url).parse_sfdx_auth_url() table = Table(title="Salesforce Org Information", box=box.ROUNDED) table.add_column("Property", style="cyan") table.add_column("Value", style="green") - table.add_row("Org Type", org_info.org_type) - table.add_row("Domain Type", org_info.domain_type) - table.add_row("Full Domain", org_info.full_domain) + table.add_row("Org Type", org_info["org_type"]) + table.add_row("Domain Type", org_info["domain_type"]) + table.add_row("Full Domain", org_info["full_domain"]) - if org_info.domain_type == DomainType.POD: - table.add_row("Region", org_info.region or "Classic") - table.add_row("Pod Number", org_info.pod_number or "N/A") - table.add_row("Pod Type", org_info.pod_type or "Standard") - table.add_row("Is Classic Pod", "✓" if org_info.is_classic_pod else "✗") - table.add_row("Is Hyperforce", "✓" if org_info.is_hyperforce else "✗") + if org_info["domain_type"] == DomainType.POD: + table.add_row("Region", org_info["region"] or "Classic") + table.add_row("Pod Number", org_info["pod_number"] or "N/A") + table.add_row("Pod Type", org_info["pod_type"] or "Standard") + table.add_row("Is Classic Pod", "✓" if org_info["is_classic_pod"] else "✗") + table.add_row("Is Hyperforce", "✓" if org_info["is_hyperforce"] else "✗") else: - table.add_row("MyDomain", org_info.mydomain or "N/A") - table.add_row("Sandbox Name", org_info.sandbox_name or "N/A") - table.add_row("Is Sandbox", "✓" if org_info.is_sandbox else "✗") + table.add_row("MyDomain", org_info["mydomain"] or "N/A") + table.add_row("Sandbox Name", org_info["sandbox_name"] or "N/A") + table.add_row("Is Sandbox", "✓" if org_info["is_sandbox"] else "✗") console.print(table) @@ -188,10 +195,10 @@ def main(cli_options: CLIOptions): ## Salesforce Authentication Results ### Organization Details -- **Domain**: {org_info.full_domain} -- **Type**: {org_info.org_type} -{"- **Region**: " + (org_info.region or "Classic") if org_info.domain_type == DomainType.POD else ""} -{"- **Hyperforce**: " + ("Yes" if org_info.is_hyperforce else "No") if org_info.domain_type == DomainType.POD else ""} +- **Domain**: {org_info["full_domain"]} +- **Type**: {org_info["org_type"]} +{"- **Region**: " + (org_info["region"] or "Classic") if org_info["domain_type"] == DomainType.POD else ""} +{"- **Hyperforce**: " + ("Yes" if org_info["is_hyperforce"] else "No") if org_info["domain_type"] == DomainType.POD else ""} ### Authentication Status - **Status**: ✅ Success @@ -203,10 +210,10 @@ def main(cli_options: CLIOptions): # Set action outputs gha_output("access_token", token_response.access_token) gha_output("instance_url", token_response.instance_url) - gha_output("org_type", org_info.org_type) - if org_info.domain_type == DomainType.POD: - gha_output("region", org_info.region or "classic") - gha_output("is_hyperforce", str(org_info.is_hyperforce).lower()) + gha_output("org_type", org_info["org_type"]) + if org_info["domain_type"] == DomainType.POD: + gha_output("region", org_info["region"] or "classic") + gha_output("is_hyperforce", str(org_info["is_hyperforce"]).lower()) sys.exit(0) diff --git a/d2x/auth/sf/login_url.py b/d2x/auth/sf/login_url.py index e34fd61..b32d91b 100644 --- a/d2x/auth/sf/login_url.py +++ b/d2x/auth/sf/login_url.py @@ -1,19 +1,18 @@ import sys import os from rich.console import Console -from d2x.gen.sf.login_url import get_login_url_and_token +from d2x.models.sf.auth import LoginUrlModel, SfdxAuthUrlModel from d2x.ux.gh.actions import summary, output from d2x.base.types import CLIOptions from typing import Optional -from d2x.auth.sf.auth_url import parse_sfdx_auth_url -from d2x.api.gh import get_environment_variable +from d2x.api.gh import get_environment_variable # Add get_environment_variable import def generate_login_url(instance_url: str, access_token: str) -> str: """Generate the login URL using the instance URL and access token.""" - login_url, _ = get_login_url_and_token( + login_url, _ = LoginUrlModel( access_token=access_token, login_url=instance_url - ) + ).get_login_url_and_token() return login_url @@ -28,32 +27,31 @@ def main(cli_options: CLIOptions): "Salesforce Auth Url not found. Set the SFDX_AUTH_URL environment variable." ) - org_info = parse_sfdx_auth_url(auth_url) + org_info = SfdxAuthUrlModel(auth_url=auth_url).parse_sfdx_auth_url() + + from d2x.auth.sf.auth_url import exchange_token - # Retrieve access token from GitHub Environment try: access_token = get_environment_variable("salesforce", "ACCESS_TOKEN") except Exception as e: console.print(f"[red]Error retrieving access token: {e}") sys.exit(1) - # Generate login URL start_url = generate_login_url( instance_url=org_info.auth_info.instance_url, access_token=access_token, ) - # Set outputs for GitHub Actions - output("access_token", access_token) + output("access_token", access_token) # Use access_token directly output("instance_url", org_info.auth_info.instance_url) + output("start_url", start_url) - output("org_type", org_info.org_type) + output("org_type", org_info["org_type"]) - if org_info.domain_type == "pod": - output("region", org_info.region or "classic") - output("is_hyperforce", str(org_info.is_hyperforce).lower()) + if org_info["domain_type"] == "pod": + output("region", org_info["region"] or "classic") + output("is_hyperforce", str(org_info["is_hyperforce"]).lower()) - # Add summary for GitHub Actions from d2x.auth.sf.auth_url import get_full_domain summary_md = f""" @@ -61,9 +59,9 @@ def main(cli_options: CLIOptions): ### Organization Details - **Domain**: {get_full_domain(org_info)} -- **Type**: {org_info.org_type} -{"- **Region**: " + (org_info.region or "Classic") if org_info.domain_type == 'pod' else ""} -{"- **Hyperforce**: " + ("Yes" if org_info.is_hyperforce else "No") if org_info.domain_type == 'pod' else ""} +- **Type**: {org_info["org_type"]} +{"- **Region**: " + (org_info["region"] or "Classic") if org_info["domain_type"] == 'pod' else ""} +{"- **Hyperforce**: " + ("Yes" if org_info["is_hyperforce"] else "No") if org_info["domain_type"] == 'pod' else ""} ### Authentication Status - **Status**: ✅ Success @@ -73,7 +71,6 @@ def main(cli_options: CLIOptions): """ summary(summary_md) - # Success output console.print("\n[green]✓ Successfully authenticated to Salesforce!") console.print(f"\n[yellow]Login URL:[/]\n{start_url}") diff --git a/d2x/cli/main.py b/d2x/cli/main.py index 08f5330..2e34dae 100644 --- a/d2x/cli/main.py +++ b/d2x/cli/main.py @@ -1,7 +1,6 @@ # cli.py import rich_click as click -from d2x.auth.sf.login_url import main as login_url_main -from d2x.auth.sf.auth_url import main as auth_url_main +from d2x.models.sf.auth import LoginUrlModel, SfdxAuthUrlModel import sys import pdb from d2x.base.types import OutputFormat, OutputFormatType, CLIOptions diff --git a/d2x/gen/sf/login_url.py b/d2x/gen/sf/login_url.py deleted file mode 100644 index f67daf1..0000000 --- a/d2x/gen/sf/login_url.py +++ /dev/null @@ -1,27 +0,0 @@ -import os -import urllib.parse - -StartJarUrl = "{login_url}/secur/frontdoor.jsp?sid={access_token}&retURL={ret_url}" - - -def get_login_url_and_token( - access_token: str | None = None, login_url: str | None = None, ret_url: str = "/" -) -> tuple[str, str]: - if not access_token: - access_token = os.getenv("ACCESS_TOKEN") - if not access_token: - raise ValueError("ACCESS_TOKEN environment variable not set") - if not login_url: - login_url = os.getenv("LOGIN_URL") - if not login_url: - raise ValueError("LOGIN_URL environment variable not set") - - # URL-encode the ret_url parameter - ret_url_encoded = urllib.parse.quote(ret_url) - - # Format the login URL - login_url_formatted = StartJarUrl.format( - login_url=login_url, access_token=access_token, ret_url=ret_url_encoded - ) - - return login_url_formatted, access_token diff --git a/d2x/models/sf/auth.py b/d2x/models/sf/auth.py index c467182..f3a2e16 100644 --- a/d2x/models/sf/auth.py +++ b/d2x/models/sf/auth.py @@ -1,5 +1,5 @@ # auth.py - +import re import urllib.parse from datetime import datetime, timedelta from enum import Enum @@ -169,3 +169,85 @@ def to_table(self) -> Table: ) return table + + +class LoginUrlModel(CommonBaseModel): + """Model to generate login URL and token""" + + access_token: str + login_url: str + ret_url: str = "/" + + def get_login_url_and_token(self) -> tuple[str, str]: + """Generate login URL and token""" + ret_url_encoded = urllib.parse.quote(self.ret_url) if self.ret_url else "%2F" + login_url_formatted = f"{self.login_url}/secur/frontdoor.jsp?sid={self.access_token}&retURL={ret_url_encoded}" + return login_url_formatted, self.access_token + + +class SfdxAuthUrlModel(CommonBaseModel): + """Model to parse SFDX auth URL""" + + auth_url: str + + def parse_sfdx_auth_url(self) -> dict: + """Parse SFDX auth URL and extract detailed org information""" + sfdx_auth_url_pattern = re.compile( + r"^force://" + r"(?P[a-zA-Z0-9]{0,64})" + r":" + r"(?P[a-zA-Z0-9._~\-]*)" + r":" + r"(?P[a-zA-Z0-9._~\-]+)" + r"@" + r"(?P" + r"(?:https?://)?" + r"(?P[a-zA-Z0-9\-]+)?" + r"(?:--(?P[a-zA-Z0-9\-]+))?" + r"(?:(?Psandbox|scratch|developer|demo)?\.my\.salesforce\.com" + r"|\.lightning\.force\.com" + r"|\.my\.salesforce.com" + r"|(?Pcs|db)" + r"|(?P(?:na|eu|ap|au|uk|in|de|jp|sg|ca|br|fr|ae|il))" + r")" + r"(?P[0-9]+)?" + r"(?:\.salesforce\.com)?" + r")$" + ) + + match = sfdx_auth_url_pattern.match(self.auth_url) + if not match: + raise ValueError("Invalid SFDX auth URL format") + + groups = match.groupdict() + + org_type = OrgType.PRODUCTION + if groups.get("org_suffix"): + org_type = OrgType(groups["org_suffix"]) + elif groups.get("sandbox_name"): + org_type = OrgType.SANDBOX + + domain_type = DomainType.POD + if ".my.salesforce.com" in groups["instance_url"]: + domain_type = DomainType.MY + elif ".lightning.force.com" in groups["instance_url"]: + domain_type = DomainType.LIGHTNING + + auth_info = AuthInfo( + client_id=groups["client_id"], + client_secret=groups["client_secret"] or "", + refresh_token=groups["refresh_token"], + instance_url=groups["instance_url"], + ) + + return { + "auth_info": auth_info, + "org_type": org_type, + "domain_type": domain_type, + "full_domain": groups["instance_url"], + "region": groups.get("region"), + "pod_number": groups.get("pod_number"), + "pod_type": groups.get("pod_type"), + "mydomain": groups.get("mydomain"), + "sandbox_name": groups.get("sandbox_name"), + } diff --git a/d2x/parse/sf/auth_url.py b/d2x/parse/sf/auth_url.py deleted file mode 100644 index 84f4927..0000000 --- a/d2x/parse/sf/auth_url.py +++ /dev/null @@ -1,148 +0,0 @@ -import re -from typing import Literal -from d2x.models.sf.org import SalesforceOrgInfo -from d2x.models.sf.auth import AuthInfo, OrgType, DomainType # Add this import - -# Remove the following Literal definitions: -# OrgType = Literal["production", "sandbox", "scratch", "developer", "demo"] -# DomainType = Literal["my", "lightning", "pod"] -# PodType and RegionType can remain if they are not defined in auth.py -PodType = Literal["cs", "db", None] -RegionType = Literal[ - "na", - "eu", - "ap", - "au", - "uk", - "in", - "de", - "jp", - "sg", - "ca", - "br", - "fr", - "ae", - "il", - None, -] - - -# Updated regex pattern for better metadata extraction -sfdx_auth_url_pattern = re.compile( - r"^force://" # Protocol prefix - r"(?P[a-zA-Z0-9]{0,64})" # Client ID: alphanumeric, 0-64 chars - r":" # Separator - r"(?P[a-zA-Z0-9._~\-]*)" # Client secret: optional - r":" # Separator - r"(?P[a-zA-Z0-9._~\-]+)" # Refresh token: required - r"@" # Separator for instance URL - r"(?P" # Instance URL group - r"(?:https?://)?" # Protocol is optional - r"(?:" # Start non-capturing group for all possible domains - r"(?:" # Domain patterns group - # MyDomain with optional sandbox/scratch org - r"(?P[a-zA-Z0-9\-]+)" # Base domain - r"(?:--(?P[a-zA-Z0-9\-]+))?" # Optional sandbox name - r"(?:" # Start non-capturing group for domain types - r"\.(?Psandbox|scratch|developer|demo)?\.my\.salesforce\.com" # .my.salesforce.com domains - r"|" - r"\.lightning\.force\.com" # lightning.force.com domains - r"|" - r"\.my\.salesforce.com" # Regular my.salesforce.com - r")" - r"|" # OR - r"(?Pcs|db)" # Classic pods (cs/db) - r"|" # OR - r"(?P(?:na|eu|ap|au|uk|in|de|jp|sg|ca|br|fr|ae|il))" # Region codes - r")" - r"(?P[0-9]+)?" # Optional pod number - r"(?:\.salesforce\.com)?" # Domain suffix for non-lightning domains - r")" - r")$" -) - - -def parse_sfdx_auth_url(auth_url: str) -> SalesforceOrgInfo: - """Parse an SFDX auth URL and extract detailed org information""" - match = sfdx_auth_url_pattern.match(auth_url) - if not match: - raise ValueError("Invalid SFDX auth URL format") - - groups = match.groupdict() - - # Determine org type - org_type: OrgType = OrgType.PRODUCTION - if groups.get("org_suffix"): - org_type = OrgType(groups["org_suffix"]) # type: ignore - elif groups.get("sandbox_name"): - org_type = OrgType.SANDBOX - - # Determine domain type - domain_type: DomainType = DomainType.POD - if ".my.salesforce.com" in groups["instance_url"]: - domain_type = DomainType.MY - elif ".lightning.force.com" in groups["instance_url"]: - domain_type = DomainType.LIGHTNING - - auth_info = AuthInfo( - client_id=groups["client_id"], - client_secret=groups["client_secret"] or "", - refresh_token=groups["refresh_token"], - instance_url=groups["instance_url"], - ) - - return SalesforceOrgInfo( - auth_info=auth_info, - org_type=org_type, - domain_type=domain_type, - full_domain=groups["instance_url"], - # Pod/Instance information - region=groups.get("region"), # type: ignore - pod_number=groups.get("pod_number"), - pod_type=groups.get("pod_type"), # type: ignore - # MyDomain information - mydomain=groups.get("mydomain"), - sandbox_name=groups.get("sandbox_name"), - ) - - -def test_sfdx_auth_url_parser(): - test_urls = [ - "force://PlatformCLI::5Aep861T.BgtJABwpkWJm7RYLcqlS4pV50Iqxf8rqKD4F09oWzHo1vYJpfDnO0YpZ5lNfgw6wqUVShF2qVS2oSh@platypus-aries-9947-dev-ed.scratch.my.salesforce.com", - "force://PlatformCLI::token123@https://mycompany.my.salesforce.com", - "force://PlatformCLI::token123@https://mycompany.lightning.force.com", - "force://PlatformCLI::token123@https://mycompany--dev.sandbox.my.salesforce.com", - "force://PlatformCLI::token123@https://cs89.salesforce.com", - "force://PlatformCLI::token123@https://na139.salesforce.com", - "force://PlatformCLI::token123@https://au5.salesforce.com", - ] - - print("\nTesting SFDX Auth URL Parser:") - print("-" * 50) - - for url in test_urls: - try: - info = parse_sfdx_auth_url(url) - print(f"\nParsed URL: {url[:50]}...") - print(f"Full Domain: {info.full_domain}") - print(f"Org Type: {info.org_type}") - print(f"Domain Type: {info.domain_type}") - if info.domain_type == DomainType.POD: - print(f"Pod Details:") - print(f" Region: {info.region or 'Classic'}") - print(f" Number: {info.pod_number or 'N/A'}") - print(f" Type: {info.pod_type or 'Standard'}") - print(f" Classic: {'Yes' if info.is_classic_pod else 'No'}") - print(f" Hyperforce: {'Yes' if info.is_hyperforce else 'No'}") - else: - print(f"MyDomain: {info.mydomain}") - if info.sandbox_name: - print(f"Sandbox Name: {info.sandbox_name}") - print(f"Is Sandbox: {'Yes' if info.is_sandbox else 'No'}") - print("-" * 30) - except ValueError as e: - print(f"Error parsing URL: {e}") - - -if __name__ == "__main__": - test_sfdx_auth_url_parser() diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..536c2d8 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,54 @@ +import re +import pytest +from pydantic import ValidationError +from d2x.models.sf.auth import LoginUrlModel, SfdxAuthUrlModel + + +def test_login_url_model(): + model = LoginUrlModel(access_token="test_token", login_url="https://example.com") + login_url, token = model.get_login_url_and_token() + assert ( + login_url == "https://example.com/secur/frontdoor.jsp?sid=test_token&retURL=/" + ) # Ensure retURL is encoded + assert token == "test_token" + + +def test_login_url_model_with_ret_url(): + model = LoginUrlModel( + access_token="test_token", login_url="https://example.com", ret_url="/home" + ) + login_url, token = model.get_login_url_and_token() + assert ( + login_url + == "https://example.com/secur/frontdoor.jsp?sid=test_token&retURL=/home" + ) + assert token == "test_token" + + +def test_login_url_model_missing_access_token(): + with pytest.raises(ValidationError): + LoginUrlModel(login_url="https://example.com") + + +def test_login_url_model_missing_login_url(): + with pytest.raises(ValidationError): + LoginUrlModel(access_token="test_token") + + +def test_sfdx_auth_url_model(): + auth_url = "force://PlatformCLI::token123@https://mycompany.my.salesforce.com" + model = SfdxAuthUrlModel(auth_url=auth_url) + org_info = model.parse_sfdx_auth_url() + assert org_info["auth_info"].client_id == "PlatformCLI" + assert org_info["auth_info"].refresh_token == "token123" + assert org_info["auth_info"].instance_url == "https://mycompany.my.salesforce.com" + assert org_info["org_type"] == "production" + assert org_info["domain_type"] == "my" + assert org_info["full_domain"] == "https://mycompany.my.salesforce.com" + + +def test_sfdx_auth_url_model_invalid_url(): + auth_url = "invalid_url" + model = SfdxAuthUrlModel(auth_url=auth_url) + with pytest.raises(ValueError): + model.parse_sfdx_auth_url() diff --git a/tests/test_login_url.py b/tests/test_login_url.py index 4ed06f1..51de459 100644 --- a/tests/test_login_url.py +++ b/tests/test_login_url.py @@ -8,7 +8,7 @@ class TestGenerateLoginUrl(unittest.TestCase): - @patch("d2x.auth.sf.login_url.get_environment_variable") + @patch("d2x.api.gh.get_environment_variable") # Updated patch target def test_generate_login_url_success(self, mock_get_env_var): # Mock the SalesforceOrgInfo org_info = SalesforceOrgInfo( @@ -39,7 +39,7 @@ def test_generate_login_url_success(self, mock_get_env_var): self.assertIn("https://test.salesforce.com", login_url) self.assertIn("test_access_token", login_url) - @patch("d2x.auth.sf.login_url.get_environment_variable") + @patch("d2x.api.gh.get_environment_variable") # Updated patch target def test_generate_login_url_failure(self, mock_get_env_var): # Mock the SalesforceOrgInfo org_info = SalesforceOrgInfo(