forked from 18F/domain-scan
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpshtt.py
137 lines (106 loc) · 4.37 KB
/
pshtt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import codecs
import logging
import os
import re
from pshtt import pshtt
from utils import utils
###
# Measure a site's HTTP behavior using DHS NCATS' pshtt tool.
# Network timeout for each internal pshtt HTTP request.
pshtt_timeout = 5
# Default to a custom user agent that can be overridden via an environment
# variable
user_agent = os.environ.get("PSHTT_USER_AGENT", "18F/domain-scan/pshtt.py")
# Keep here to get some best-effort container reuse in Lambda.
suffix_list = None
# In Lambda, we package a snapshot of the PSL with the environment.
lambda_support = True
lambda_suffix_path = "./cache/public-suffix-list.txt"
# Download third party data once, at the top of the scan.
def init(environment, options):
logging.warning("[pshtt] Downloading third party data...")
# In local environments, download latest PSL, cache in-memory.
if environment['scan_method'] == "local":
instance, suffix_list = pshtt.load_suffix_list()
# In the cloud, we'll use a PSL snapshot instead of fresh data.
# Not worth the network transit on my end or the PSL's.
else:
suffix_list = None
return {
'preload_list': pshtt.load_preload_list(),
'preload_pending': pshtt.load_preload_pending(),
'suffix_list': suffix_list
}
# To save on bandwidth to Lambda, slice the preload and pending lists
# down to an array of just the domain and its base domain, if they
# exist. Override the list in place, which should only modify it
# per-scan.
def init_domain(domain, environment, options):
cache_dir = options.get("_", {}).get("cache_dir", "./cache")
base_domain = utils.base_domain_for(domain, cache_dir=cache_dir)
preload_list = []
if domain in environment.get("preload_list", []):
preload_list.append(domain)
if base_domain != domain and base_domain in environment.get("preload_list", []):
preload_list.append(base_domain)
environment["preload_list"] = preload_list
preload_pending = []
if domain in environment.get("preload_pending", []):
preload_pending.append(domain)
if base_domain != domain and base_domain in environment.get("preload_pending", []):
preload_pending.append(base_domain)
environment["preload_pending"] = preload_pending
return environment
# Run locally or in the cloud.
# Gets third-party data passed into the environment.
def scan(domain, environment, options):
domain = format_domain(domain)
if environment["scan_method"] == "lambda":
suffix_list = codecs.open(lambda_suffix_path, encoding='utf-8')
else: # scan_method == "local"
suffix_list = environment["suffix_list"]
# This should cause no network calls, either locally or the cloud.
pshtt.initialize_external_data(
init_preload_list=environment.get('preload_list'),
init_preload_pending=environment.get('preload_pending'),
init_suffix_list=suffix_list
)
results = pshtt.inspect_domains(
[domain],
{
'timeout': pshtt_timeout,
'user_agent': user_agent,
'debug': options.get("debug", False),
'ca_file': options.get("ca_file"),
'pt_int_ca_file': options.get("pt_int_ca_file")
}
)
# Actually triggers the work.
results = list(results)
# pshtt returns array of results, but we always send in 1.
return results[0]
# Given a response from pshtt, convert it to a CSV row.
def to_rows(data):
row = []
for field in headers:
value = data[field]
row.append(value)
return [row]
headers = [
"Canonical URL", "Live",
"Redirect", "Redirect To",
"Valid HTTPS", "Defaults to HTTPS", "Downgrades HTTPS",
"Strictly Forces HTTPS", "HTTPS Bad Chain", "HTTPS Bad Hostname",
"HTTPS Expired Cert", "HTTPS Self Signed Cert",
"HSTS", "HSTS Header", "HSTS Max Age", "HSTS Entire Domain",
"HSTS Preload Ready", "HSTS Preload Pending", "HSTS Preloaded",
"Base Domain HSTS Preloaded", "Domain Supports HTTPS",
"Domain Enforces HTTPS", "Domain Uses Strong HSTS",
"HTTPS Live", "HTTPS Full Connection", "HTTPS Client Auth Required",
"HTTPS Publicly Trusted", "HTTPS Custom Truststore Trusted",
"IP", "Server Header", "Server Version", "HTTPS Cert Chain Length",
"HTTPS Probably Missing Intermediate Cert", "Notes",
"Unknown Error",
]
def format_domain(domain):
return re.sub(r"^(https?://)?(www\.)?", "", domain)