Skip to content

Commit

Permalink
Merge pull request #19 from dnstapir/mozpsl
Browse files Browse the repository at this point in the history
Mozilla PSL helper
  • Loading branch information
jschlyter authored Nov 16, 2024
2 parents e97fc5e + 8caa7b9 commit 5352312
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 5 deletions.
Empty file added dnstapir/dns/__init__.py
Empty file.
142 changes: 142 additions & 0 deletions dnstapir/dns/mozpsl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import io
from typing import Self

import httpx


class TrieNode:
""" "Storage class for Trie"""

def __init__(self) -> None:
self.count = 0
self.icann: bool | None = None
self.children: dict[str, Self] = {}


class Trie:
def __init__(self) -> None:
self.root = TrieNode()

def __repr__(self) -> str:
"""Print full Trie structure"""

def recur(node: TrieNode, indent: str) -> str:
return "".join(
indent + key + (f" {child.count}" if child.count else "") + recur(child, indent + " - ")
for key, child in node.children.items()
)

return recur(self.root, "\n")

def insert(self, array: list[str], nlbl: int, icann: bool) -> None:
"""Add data to Trie"""
node = self.root
for x in array:
if x in node.children:
node = node.children[x]
else:
child = TrieNode()
node.children[x] = child
node = child
node.count = nlbl
node.icann = icann

def search(self, key: list[str]) -> tuple[int, int]:
"""Search Trie"""
core = 0
pcore = 0
current = self.root
for label in key:
if current.icann is True:
core = current.count
elif current.icann is False:
pcore = current.count
# # If current.icann is None, do not update core or pcore
if label not in current.children:
if current.count != 0:
break
else:
raise KeyError
current = current.children[label]
if pcore == core:
pcore = 0
return (core, pcore)


class PublicSuffixList:
"""Mozilla Public Suffix List"""

def __init__(self) -> None:
self.trie = Trie()

def load_psl_url(self, url: str) -> None:
"""Load PSL from URL"""
response = httpx.get(
url,
headers={
"Accept-Encoding": "gzip",
},
)
response.raise_for_status()
self.load_psl(io.StringIO(response.text))

def load_psl(self, stream: io.StringIO) -> None:
"""Load PSL from stream"""
icann = False
for line in stream:
line = line.rstrip()

if "===BEGIN ICANN DOMAINS===" in line:
# Mark ICANN domains
icann = True
elif "===BEGIN PRIVATE DOMAINS===" in line:
# Mark PRIVATE domains
icann = False

if (line.strip() == "") or (line[0] == "/"):
# Remove empty or comment lines
continue

# Set number of labels in core domain
labels = len(line.split(".")) + 1

# Wildcards
if line[0] == "*":
line = line[2:]

# Exclusions... .ck and .jp, just stop
if line[0] == "!":
line = line[1:]
labels -= 2

# Convert from Unicode
lbls = line.encode("idna").decode().split(".")

# Store reversed
lbls.reverse()

# Insert into Trie
self.trie.insert(lbls, labels, icann)

def coredomain(self, domain: str) -> tuple[str, str]:
"""Find ICANN and private name cut-off for domain"""
if not domain:
raise ValueError
try:
domain = domain.rstrip(".")
except AttributeError as exc:
raise ValueError from exc
lbls = domain.split(".")
lbls.reverse()
c, p = self.trie.search(lbls)
core = lbls[0:c]
core.reverse()
pcore = lbls[0:p]
pcore.reverse()
return (".".join(core), ".".join(pcore))

def rdomain(self, rdomain: str) -> tuple[str, str]:
"""Find ICANN and private name cut-off for domain, reverse order process"""
lbls = rdomain.split(".")
c, p = self.trie.search(lbls)
return (".".join(lbls[0:c]), ".".join(lbls[0:p]))
10 changes: 5 additions & 5 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions tests/test_dns_mozpsl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import pytest

from dnstapir.dns.mozpsl import PublicSuffixList

MOZ_PSL = "https://publicsuffix.org/list/public_suffix_list.dat"


def test_mozpsl():
psl = PublicSuffixList()
psl.load_psl_url(url=MOZ_PSL)

assert psl.coredomain("www.ck.") == ("www.ck", "")
assert psl.coredomain("www.something.gov.ck.") == ("something.gov.ck", "")
assert psl.coredomain("www.something.or.other.microsoft.com.") == ("microsoft.com", "")
assert psl.coredomain("www.something.or.other.microsoft.com.br.") == ("microsoft.com.br", "")
assert psl.coredomain("www.something.emrstudio-prod.us-gov-east-1.amazonaws.com.") == (
"amazonaws.com",
"something.emrstudio-prod.us-gov-east-1.amazonaws.com",
)
assert psl.rdomain("com.amazonaws.us-gov-east-1.emrstudio-prod.www.something.emrstudio-prod") == (
"com.amazonaws",
"com.amazonaws.us-gov-east-1.emrstudio-prod.www",
)

with pytest.raises(KeyError):
psl.coredomain("local.")

# IDN test
assert psl.coredomain("www.xn--mnchen-3ya.de.") == ("xn--mnchen-3ya.de", "")

# Edge cases
with pytest.raises(ValueError):
psl.coredomain("")
with pytest.raises(ValueError):
psl.coredomain(None)
with pytest.raises(KeyError):
psl.coredomain("invalid..domain.")

0 comments on commit 5352312

Please sign in to comment.