Skip to content

Commit

Permalink
add cleaned up code for Mozilla PSL from @morkrost
Browse files Browse the repository at this point in the history
  • Loading branch information
jschlyter committed Nov 15, 2024
1 parent f1b40b8 commit bc980b5
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 13 deletions.
Empty file added dnstapir/dns/__init__.py
Empty file.
135 changes: 135 additions & 0 deletions dnstapir/dns/mozpsl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import io
from typing import Self

import httpx
import punycode


class TrieNode:
""" "Storage class for Trie"""

def __init__(self) -> None:
self.count = 0
self.icann: bool | None = None
self.children: dict[str, Self] = {}


class Trie:
def __init__(self) -> None:
self.root = TrieNode()

def __repr__(self) -> str:
"""Print full Trie structure"""

def recur(node: TrieNode, indent: str):
return "".join(
indent + key + (f" {child.count}" if child.count else "") + recur(child, indent + " - ")
for key, child in node.children.items()
)

return recur(self.root, "\n")

def insert(self, array: list[str], nlbl: int, icann: bool) -> None:
"""Add data to Trie"""
node = self.root
for x in array:
if x in node.children:
node = node.children[x]
else:
child = TrieNode()
node.children[x] = child
node = child
node.count = nlbl
node.icann = icann

def search(self, key: list[str]) -> tuple[int, int]:
"""Search Trie"""
current = self.root
for label in key:
if current.icann:
core = current.count
else:
pcore = current.count
if label not in current.children:
if current.count != 0:
break
else:
raise KeyError
current = current.children[label]
if pcore == core:
pcore = 0
return (core, pcore)


class PublicSuffixList:
"""Mozilla Public Suffix List"""

def __init__(self) -> None:
self.trie = Trie()

def load_psl_url(self, url: str) -> None:
"""Load PSL from URL"""
response = httpx.get(
url,
headers={
"Accept-Encoding": "gzip",
},
)
response.raise_for_status()
self.load_psl(io.StringIO(response.text))

def load_psl(self, stream: io.StringIO) -> None:
"""Load PSL from stream"""
icann = False
for line in stream:
line = line.rstrip()

if "===BEGIN ICANN DOMAINS===" in line:
# Mark ICANN domains
icann = True
elif "===BEGIN PRIVATE DOMAINS===" in line:
# Mark PRIVATE domains
icann = False

if (line.strip() == "") or (line[0] == "/"):
# Remove empty or comment lines
continue

# Set number of labels in core domain
labels = len(line.split(".")) + 1

# Wildcards
if line[0] == "*":
line = line[2:]

# Exclusions... .ck and .jp, just stop
if line[0] == "!":
line = line[1:]
labels -= 2

# Convert from Unicode
lbls = punycode.convert(line).split(".")

# Store reversed
lbls.reverse()

# Insert into Trie
self.trie.insert(lbls, labels, icann)

def coredomain(self, domain: str) -> tuple[str, str]:
"""Find ICANN and private name cut-off for domain"""
domain = domain.rstrip(".")
lbls = domain.split(".")
lbls.reverse()
c, p = self.trie.search(lbls)
core = lbls[0:c]
core.reverse()
pcore = lbls[0:p]
pcore.reverse()
return (".".join(core), ".".join(pcore))

def rdomain(self, rdomain: str) -> tuple[str, str]:
"""Find ICANN and private name cut-off for domain, reverse order process"""
lbls = rdomain.split(".")
c, p = self.trie.search(lbls)
return (".".join(lbls[0:c]), ".".join(lbls[0:p]))
49 changes: 36 additions & 13 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ httpx = { version = ">=0.27.2", optional = true }
pydantic = { version = "^2.9.2", optional = true }
pymongo = { version = "^4.10.1", optional = true }
redis = { version = "^5.1.1", optional = true }
punycode = "^0.2.1"

[tool.poetry.group.dev.dependencies]
pytest = "^8.2.0"
Expand Down
26 changes: 26 additions & 0 deletions tests/test_dns_mozpsl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import pytest

from dnstapir.dns.mozpsl import PublicSuffixList

MOZ_PSL = "https://publicsuffix.org/list/public_suffix_list.dat"


def test_mozpsl():
psl = PublicSuffixList()
psl.load_psl_url(url=MOZ_PSL)

assert psl.coredomain("www.ck.") == ("www.ck", "")
assert psl.coredomain("www.something.gov.ck.") == ("something.gov.ck", "")
assert psl.coredomain("www.something.or.other.microsoft.com.") == ("microsoft.com", "")
assert psl.coredomain("www.something.or.other.microsoft.com.br.") == ("microsoft.com.br", "")
assert psl.coredomain("www.something.emrstudio-prod.us-gov-east-1.amazonaws.com.") == (
"amazonaws.com",
"something.emrstudio-prod.us-gov-east-1.amazonaws.com",
)
assert psl.rdomain("com.amazonaws.us-gov-east-1.emrstudio-prod.www.something.emrstudio-prod") == (
"com.amazonaws",
"com.amazonaws.us-gov-east-1.emrstudio-prod.www",
)

with pytest.raises(KeyError):
psl.coredomain("local.")

0 comments on commit bc980b5

Please sign in to comment.