Skip to content

Commit

Permalink
Merge pull request #44 from muselab-d2x/jlantz/refactor-pydantic-models
Browse files Browse the repository at this point in the history
Refactor d2x.gen and d2x.parse as Pydantic models
  • Loading branch information
jlantz authored Oct 31, 2024
2 parents ea2df94 + ae55608 commit 8462b62
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 224 deletions.
Binary file modified .coverage
Binary file not shown.
57 changes: 32 additions & 25 deletions d2x/auth/sf/auth_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,21 @@
from rich.table import Table

# Local imports
from d2x.parse.sf.auth_url import parse_sfdx_auth_url
from d2x.models.sf.auth import (
DomainType, # Add this import
DomainType,
TokenRequest,
TokenResponse,
HttpResponse,
TokenExchangeDebug,
SfdxAuthUrlModel,
)
from d2x.ux.gh.actions import summary as gha_summary, output as gha_output
from d2x.models.sf.org import SalesforceOrgInfo
from d2x.base.types import CLIOptions
from d2x.api.gh import set_environment_variable # Add this import
from d2x.api.gh import (
set_environment_variable,
get_environment_variable,
) # Ensure get_environment_variable is imported


def exchange_token(org_info: SalesforceOrgInfo, cli_options: CLIOptions):
Expand Down Expand Up @@ -125,7 +128,11 @@ def exchange_token(org_info: SalesforceOrgInfo, cli_options: CLIOptions):
console.print(success_panel)

# Store access token in GitHub Environment
set_environment_variable("salesforce", "ACCESS_TOKEN", token_response.access_token.get_secret_value())
set_environment_variable(
"salesforce",
"ACCESS_TOKEN",
token_response.access_token.get_secret_value(),
)

return token_response

Expand Down Expand Up @@ -157,26 +164,26 @@ def main(cli_options: CLIOptions):
# Remove the console.status context manager
# with console.status("[bold blue]Parsing SFDX Auth URL..."):
# org_info = parse_sfdx_auth_url(auth_url)
org_info = parse_sfdx_auth_url(auth_url)
org_info = SfdxAuthUrlModel(auth_url=auth_url).parse_sfdx_auth_url()

table = Table(title="Salesforce Org Information", box=box.ROUNDED)
table.add_column("Property", style="cyan")
table.add_column("Value", style="green")

table.add_row("Org Type", org_info.org_type)
table.add_row("Domain Type", org_info.domain_type)
table.add_row("Full Domain", org_info.full_domain)
table.add_row("Org Type", org_info["org_type"])
table.add_row("Domain Type", org_info["domain_type"])
table.add_row("Full Domain", org_info["full_domain"])

if org_info.domain_type == DomainType.POD:
table.add_row("Region", org_info.region or "Classic")
table.add_row("Pod Number", org_info.pod_number or "N/A")
table.add_row("Pod Type", org_info.pod_type or "Standard")
table.add_row("Is Classic Pod", "✓" if org_info.is_classic_pod else "✗")
table.add_row("Is Hyperforce", "✓" if org_info.is_hyperforce else "✗")
if org_info["domain_type"] == DomainType.POD:
table.add_row("Region", org_info["region"] or "Classic")
table.add_row("Pod Number", org_info["pod_number"] or "N/A")
table.add_row("Pod Type", org_info["pod_type"] or "Standard")
table.add_row("Is Classic Pod", "✓" if org_info["is_classic_pod"] else "✗")
table.add_row("Is Hyperforce", "✓" if org_info["is_hyperforce"] else "✗")
else:
table.add_row("MyDomain", org_info.mydomain or "N/A")
table.add_row("Sandbox Name", org_info.sandbox_name or "N/A")
table.add_row("Is Sandbox", "✓" if org_info.is_sandbox else "✗")
table.add_row("MyDomain", org_info["mydomain"] or "N/A")
table.add_row("Sandbox Name", org_info["sandbox_name"] or "N/A")
table.add_row("Is Sandbox", "✓" if org_info["is_sandbox"] else "✗")

console.print(table)

Expand All @@ -188,10 +195,10 @@ def main(cli_options: CLIOptions):
## Salesforce Authentication Results
### Organization Details
- **Domain**: {org_info.full_domain}
- **Type**: {org_info.org_type}
{"- **Region**: " + (org_info.region or "Classic") if org_info.domain_type == DomainType.POD else ""}
{"- **Hyperforce**: " + ("Yes" if org_info.is_hyperforce else "No") if org_info.domain_type == DomainType.POD else ""}
- **Domain**: {org_info["full_domain"]}
- **Type**: {org_info["org_type"]}
{"- **Region**: " + (org_info["region"] or "Classic") if org_info["domain_type"] == DomainType.POD else ""}
{"- **Hyperforce**: " + ("Yes" if org_info["is_hyperforce"] else "No") if org_info["domain_type"] == DomainType.POD else ""}
### Authentication Status
- **Status**: ✅ Success
Expand All @@ -203,10 +210,10 @@ def main(cli_options: CLIOptions):
# Set action outputs
gha_output("access_token", token_response.access_token)
gha_output("instance_url", token_response.instance_url)
gha_output("org_type", org_info.org_type)
if org_info.domain_type == DomainType.POD:
gha_output("region", org_info.region or "classic")
gha_output("is_hyperforce", str(org_info.is_hyperforce).lower())
gha_output("org_type", org_info["org_type"])
if org_info["domain_type"] == DomainType.POD:
gha_output("region", org_info["region"] or "classic")
gha_output("is_hyperforce", str(org_info["is_hyperforce"]).lower())

sys.exit(0)

Expand Down
35 changes: 16 additions & 19 deletions d2x/auth/sf/login_url.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
import sys
import os
from rich.console import Console
from d2x.gen.sf.login_url import get_login_url_and_token
from d2x.models.sf.auth import LoginUrlModel, SfdxAuthUrlModel
from d2x.ux.gh.actions import summary, output
from d2x.base.types import CLIOptions
from typing import Optional
from d2x.auth.sf.auth_url import parse_sfdx_auth_url
from d2x.api.gh import get_environment_variable
from d2x.api.gh import get_environment_variable # Add get_environment_variable import


def generate_login_url(instance_url: str, access_token: str) -> str:
"""Generate the login URL using the instance URL and access token."""
login_url, _ = get_login_url_and_token(
login_url, _ = LoginUrlModel(
access_token=access_token, login_url=instance_url
)
).get_login_url_and_token()
return login_url


Expand All @@ -28,42 +27,41 @@ def main(cli_options: CLIOptions):
"Salesforce Auth Url not found. Set the SFDX_AUTH_URL environment variable."
)

org_info = parse_sfdx_auth_url(auth_url)
org_info = SfdxAuthUrlModel(auth_url=auth_url).parse_sfdx_auth_url()

from d2x.auth.sf.auth_url import exchange_token

# Retrieve access token from GitHub Environment
try:
access_token = get_environment_variable("salesforce", "ACCESS_TOKEN")
except Exception as e:
console.print(f"[red]Error retrieving access token: {e}")
sys.exit(1)

# Generate login URL
start_url = generate_login_url(
instance_url=org_info.auth_info.instance_url,
access_token=access_token,
)

# Set outputs for GitHub Actions
output("access_token", access_token)
output("access_token", access_token) # Use access_token directly
output("instance_url", org_info.auth_info.instance_url)

output("start_url", start_url)
output("org_type", org_info.org_type)
output("org_type", org_info["org_type"])

if org_info.domain_type == "pod":
output("region", org_info.region or "classic")
output("is_hyperforce", str(org_info.is_hyperforce).lower())
if org_info["domain_type"] == "pod":
output("region", org_info["region"] or "classic")
output("is_hyperforce", str(org_info["is_hyperforce"]).lower())

# Add summary for GitHub Actions
from d2x.auth.sf.auth_url import get_full_domain

summary_md = f"""
## Salesforce Authentication Successful 🚀
### Organization Details
- **Domain**: {get_full_domain(org_info)}
- **Type**: {org_info.org_type}
{"- **Region**: " + (org_info.region or "Classic") if org_info.domain_type == 'pod' else ""}
{"- **Hyperforce**: " + ("Yes" if org_info.is_hyperforce else "No") if org_info.domain_type == 'pod' else ""}
- **Type**: {org_info["org_type"]}
{"- **Region**: " + (org_info["region"] or "Classic") if org_info["domain_type"] == 'pod' else ""}
{"- **Hyperforce**: " + ("Yes" if org_info["is_hyperforce"] else "No") if org_info["domain_type"] == 'pod' else ""}
### Authentication Status
- **Status**: ✅ Success
Expand All @@ -73,7 +71,6 @@ def main(cli_options: CLIOptions):
"""
summary(summary_md)

# Success output
console.print("\n[green]✓ Successfully authenticated to Salesforce!")
console.print(f"\n[yellow]Login URL:[/]\n{start_url}")

Expand Down
3 changes: 1 addition & 2 deletions d2x/cli/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# cli.py
import rich_click as click
from d2x.auth.sf.login_url import main as login_url_main
from d2x.auth.sf.auth_url import main as auth_url_main
from d2x.models.sf.auth import LoginUrlModel, SfdxAuthUrlModel
import sys
import pdb
from d2x.base.types import OutputFormat, OutputFormatType, CLIOptions
Expand Down
27 changes: 0 additions & 27 deletions d2x/gen/sf/login_url.py

This file was deleted.

84 changes: 83 additions & 1 deletion d2x/models/sf/auth.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# auth.py

import re
import urllib.parse
from datetime import datetime, timedelta
from enum import Enum
Expand Down Expand Up @@ -169,3 +169,85 @@ def to_table(self) -> Table:
)

return table


class LoginUrlModel(CommonBaseModel):
"""Model to generate login URL and token"""

access_token: str
login_url: str
ret_url: str = "/"

def get_login_url_and_token(self) -> tuple[str, str]:
"""Generate login URL and token"""
ret_url_encoded = urllib.parse.quote(self.ret_url) if self.ret_url else "%2F"
login_url_formatted = f"{self.login_url}/secur/frontdoor.jsp?sid={self.access_token}&retURL={ret_url_encoded}"
return login_url_formatted, self.access_token


class SfdxAuthUrlModel(CommonBaseModel):
"""Model to parse SFDX auth URL"""

auth_url: str

def parse_sfdx_auth_url(self) -> dict:
"""Parse SFDX auth URL and extract detailed org information"""
sfdx_auth_url_pattern = re.compile(
r"^force://"
r"(?P<client_id>[a-zA-Z0-9]{0,64})"
r":"
r"(?P<client_secret>[a-zA-Z0-9._~\-]*)"
r":"
r"(?P<refresh_token>[a-zA-Z0-9._~\-]+)"
r"@"
r"(?P<instance_url>"
r"(?:https?://)?"
r"(?P<mydomain>[a-zA-Z0-9\-]+)?"
r"(?:--(?P<sandbox_name>[a-zA-Z0-9\-]+))?"
r"(?:(?P<org_suffix>sandbox|scratch|developer|demo)?\.my\.salesforce\.com"
r"|\.lightning\.force\.com"
r"|\.my\.salesforce.com"
r"|(?P<pod_type>cs|db)"
r"|(?P<region>(?:na|eu|ap|au|uk|in|de|jp|sg|ca|br|fr|ae|il))"
r")"
r"(?P<pod_number>[0-9]+)?"
r"(?:\.salesforce\.com)?"
r")$"
)

match = sfdx_auth_url_pattern.match(self.auth_url)
if not match:
raise ValueError("Invalid SFDX auth URL format")

groups = match.groupdict()

org_type = OrgType.PRODUCTION
if groups.get("org_suffix"):
org_type = OrgType(groups["org_suffix"])
elif groups.get("sandbox_name"):
org_type = OrgType.SANDBOX

domain_type = DomainType.POD
if ".my.salesforce.com" in groups["instance_url"]:
domain_type = DomainType.MY
elif ".lightning.force.com" in groups["instance_url"]:
domain_type = DomainType.LIGHTNING

auth_info = AuthInfo(
client_id=groups["client_id"],
client_secret=groups["client_secret"] or "",
refresh_token=groups["refresh_token"],
instance_url=groups["instance_url"],
)

return {
"auth_info": auth_info,
"org_type": org_type,
"domain_type": domain_type,
"full_domain": groups["instance_url"],
"region": groups.get("region"),
"pod_number": groups.get("pod_number"),
"pod_type": groups.get("pod_type"),
"mydomain": groups.get("mydomain"),
"sandbox_name": groups.get("sandbox_name"),
}
Loading

0 comments on commit 8462b62

Please sign in to comment.