diff --git a/dnstapir/dns/__init__.py b/dnstapir/dns/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dnstapir/dns/mozpsl.py b/dnstapir/dns/mozpsl.py new file mode 100644 index 0000000..9edd97b --- /dev/null +++ b/dnstapir/dns/mozpsl.py @@ -0,0 +1,142 @@ +import io +from typing import Self + +import httpx + + +class TrieNode: + """ "Storage class for Trie""" + + def __init__(self) -> None: + self.count = 0 + self.icann: bool | None = None + self.children: dict[str, Self] = {} + + +class Trie: + def __init__(self) -> None: + self.root = TrieNode() + + def __repr__(self) -> str: + """Print full Trie structure""" + + def recur(node: TrieNode, indent: str) -> str: + return "".join( + indent + key + (f" {child.count}" if child.count else "") + recur(child, indent + " - ") + for key, child in node.children.items() + ) + + return recur(self.root, "\n") + + def insert(self, array: list[str], nlbl: int, icann: bool) -> None: + """Add data to Trie""" + node = self.root + for x in array: + if x in node.children: + node = node.children[x] + else: + child = TrieNode() + node.children[x] = child + node = child + node.count = nlbl + node.icann = icann + + def search(self, key: list[str]) -> tuple[int, int]: + """Search Trie""" + core = 0 + pcore = 0 + current = self.root + for label in key: + if current.icann is True: + core = current.count + elif current.icann is False: + pcore = current.count + # # If current.icann is None, do not update core or pcore + if label not in current.children: + if current.count != 0: + break + else: + raise KeyError + current = current.children[label] + if pcore == core: + pcore = 0 + return (core, pcore) + + +class PublicSuffixList: + """Mozilla Public Suffix List""" + + def __init__(self) -> None: + self.trie = Trie() + + def load_psl_url(self, url: str) -> None: + """Load PSL from URL""" + response = httpx.get( + url, + headers={ + "Accept-Encoding": "gzip", + }, + ) + response.raise_for_status() + self.load_psl(io.StringIO(response.text)) + + def load_psl(self, stream: io.StringIO) -> None: + """Load PSL from stream""" + icann = False + for line in stream: + line = line.rstrip() + + if "===BEGIN ICANN DOMAINS===" in line: + # Mark ICANN domains + icann = True + elif "===BEGIN PRIVATE DOMAINS===" in line: + # Mark PRIVATE domains + icann = False + + if (line.strip() == "") or (line[0] == "/"): + # Remove empty or comment lines + continue + + # Set number of labels in core domain + labels = len(line.split(".")) + 1 + + # Wildcards + if line[0] == "*": + line = line[2:] + + # Exclusions... .ck and .jp, just stop + if line[0] == "!": + line = line[1:] + labels -= 2 + + # Convert from Unicode + lbls = line.encode("idna").decode().split(".") + + # Store reversed + lbls.reverse() + + # Insert into Trie + self.trie.insert(lbls, labels, icann) + + def coredomain(self, domain: str) -> tuple[str, str]: + """Find ICANN and private name cut-off for domain""" + if not domain: + raise ValueError + try: + domain = domain.rstrip(".") + except AttributeError as exc: + raise ValueError from exc + lbls = domain.split(".") + lbls.reverse() + c, p = self.trie.search(lbls) + core = lbls[0:c] + core.reverse() + pcore = lbls[0:p] + pcore.reverse() + return (".".join(core), ".".join(pcore)) + + def rdomain(self, rdomain: str) -> tuple[str, str]: + """Find ICANN and private name cut-off for domain, reverse order process""" + lbls = rdomain.split(".") + c, p = self.trie.search(lbls) + return (".".join(lbls[0:c]), ".".join(lbls[0:p])) diff --git a/poetry.lock b/poetry.lock index ada2644..2575a40 100644 --- a/poetry.lock +++ b/poetry.lock @@ -330,20 +330,20 @@ test-randomorder = ["pytest-randomly"] [[package]] name = "deprecated" -version = "1.2.14" +version = "1.2.15" description = "Python @deprecated decorator to deprecate old python classes, functions or methods." optional = true -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,>=2.7" files = [ - {file = "Deprecated-1.2.14-py2.py3-none-any.whl", hash = "sha256:6fac8b097794a90302bdbb17b9b815e732d3c4720583ff1b198499d78470466c"}, - {file = "Deprecated-1.2.14.tar.gz", hash = "sha256:e5323eb936458dccc2582dc6f9c322c852a775a27065ff2b0c4970b9d53d01b3"}, + {file = "Deprecated-1.2.15-py2.py3-none-any.whl", hash = "sha256:353bc4a8ac4bfc96800ddab349d89c25dec1079f65fd53acdcc1e0b975b21320"}, + {file = "deprecated-1.2.15.tar.gz", hash = "sha256:683e561a90de76239796e6b6feac66b99030d2dd3fcf61ef996330f14bbb9b0d"}, ] [package.dependencies] wrapt = ">=1.10,<2" [package.extras] -dev = ["PyTest", "PyTest-Cov", "bump2version (<1)", "sphinx (<2)", "tox"] +dev = ["PyTest", "PyTest-Cov", "bump2version (<1)", "jinja2 (>=3.0.3,<3.1.0)", "setuptools", "sphinx (<2)", "tox"] [[package]] name = "dnspython" diff --git a/tests/test_dns_mozpsl.py b/tests/test_dns_mozpsl.py new file mode 100644 index 0000000..f79cd0f --- /dev/null +++ b/tests/test_dns_mozpsl.py @@ -0,0 +1,37 @@ +import pytest + +from dnstapir.dns.mozpsl import PublicSuffixList + +MOZ_PSL = "https://publicsuffix.org/list/public_suffix_list.dat" + + +def test_mozpsl(): + psl = PublicSuffixList() + psl.load_psl_url(url=MOZ_PSL) + + assert psl.coredomain("www.ck.") == ("www.ck", "") + assert psl.coredomain("www.something.gov.ck.") == ("something.gov.ck", "") + assert psl.coredomain("www.something.or.other.microsoft.com.") == ("microsoft.com", "") + assert psl.coredomain("www.something.or.other.microsoft.com.br.") == ("microsoft.com.br", "") + assert psl.coredomain("www.something.emrstudio-prod.us-gov-east-1.amazonaws.com.") == ( + "amazonaws.com", + "something.emrstudio-prod.us-gov-east-1.amazonaws.com", + ) + assert psl.rdomain("com.amazonaws.us-gov-east-1.emrstudio-prod.www.something.emrstudio-prod") == ( + "com.amazonaws", + "com.amazonaws.us-gov-east-1.emrstudio-prod.www", + ) + + with pytest.raises(KeyError): + psl.coredomain("local.") + + # IDN test + assert psl.coredomain("www.xn--mnchen-3ya.de.") == ("xn--mnchen-3ya.de", "") + + # Edge cases + with pytest.raises(ValueError): + psl.coredomain("") + with pytest.raises(ValueError): + psl.coredomain(None) + with pytest.raises(KeyError): + psl.coredomain("invalid..domain.")