aboutsummaryrefslogtreecommitdiffhomepage
path: root/libs/tld/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'libs/tld/utils.py')
-rw-r--r--libs/tld/utils.py624
1 files changed, 624 insertions, 0 deletions
diff --git a/libs/tld/utils.py b/libs/tld/utils.py
new file mode 100644
index 000000000..6465eefaa
--- /dev/null
+++ b/libs/tld/utils.py
@@ -0,0 +1,624 @@
+from __future__ import unicode_literals
+import argparse
+from codecs import open as codecs_open
+from functools import lru_cache
+# codecs_open = open
+from os.path import isabs
+import sys
+from typing import Dict, Type, Union, Tuple, List, Optional
+from urllib.parse import urlsplit, SplitResult
+
+from .base import BaseTLDSourceParser
+from .exceptions import (
+ TldBadUrl,
+ TldDomainNotFound,
+ TldImproperlyConfigured,
+ TldIOError,
+)
+from .helpers import project_dir
+from .trie import Trie
+from .registry import Registry
+from .result import Result
+
+__author__ = 'Artur Barseghyan'
+__copyright__ = '2013-2020 Artur Barseghyan'
+__license__ = 'MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later'
+__all__ = (
+ 'BaseMozillaTLDSourceParser',
+ 'get_fld',
+ 'get_tld',
+ 'get_tld_names',
+ 'get_tld_names_container',
+ 'is_tld',
+ 'MozillaTLDSourceParser',
+ 'parse_tld',
+ 'pop_tld_names_container',
+ 'process_url',
+ 'reset_tld_names',
+ 'Result',
+ 'tld_names',
+ 'update_tld_names',
+ 'update_tld_names_cli',
+ 'update_tld_names_container',
+)
+
+tld_names: Dict[str, Trie] = {}
+
+
+def get_tld_names_container() -> Dict[str, Trie]:
+ """Get container of all tld names.
+
+ :return:
+ :rtype dict:
+ """
+ global tld_names
+ return tld_names
+
+
+def update_tld_names_container(tld_names_local_path: str,
+ trie_obj: Trie) -> None:
+ """Update TLD Names container item.
+
+ :param tld_names_local_path:
+ :param trie_obj:
+ :return:
+ """
+ global tld_names
+ # tld_names.update({tld_names_local_path: trie_obj})
+ tld_names[tld_names_local_path] = trie_obj
+
+
+def pop_tld_names_container(tld_names_local_path: str) -> None:
+ """Remove TLD names container item.
+
+ :param tld_names_local_path:
+ :return:
+ """
+ global tld_names
+ tld_names.pop(tld_names_local_path, None)
+
+
+@lru_cache(maxsize=128, typed=True)
+def update_tld_names(
+ fail_silently: bool = False,
+ parser_uid: str = None
+) -> bool:
+ """Update TLD names.
+
+ :param fail_silently:
+ :param parser_uid:
+ :return:
+ """
+ results: List[bool] = []
+ results_append = results.append
+ if parser_uid:
+ parser_cls = Registry.get(parser_uid, None)
+ if parser_cls and parser_cls.source_url:
+ results_append(
+ parser_cls.update_tld_names(fail_silently=fail_silently)
+ )
+ else:
+ for parser_uid, parser_cls in Registry.items():
+ if parser_cls and parser_cls.source_url:
+ results_append(
+ parser_cls.update_tld_names(fail_silently=fail_silently)
+ )
+
+ return all(results)
+
+
+def update_tld_names_cli() -> int:
+ """CLI wrapper for update_tld_names.
+
+ Since update_tld_names returns True on success, we need to negate the
+ result to match CLI semantics.
+ """
+ parser = argparse.ArgumentParser(description='Update TLD names')
+ parser.add_argument(
+ 'parser_uid',
+ nargs='?',
+ default=None,
+ help="UID of the parser to update TLD names for.",
+ )
+ parser.add_argument(
+ '--fail-silently',
+ dest="fail_silently",
+ default=False,
+ action='store_true',
+ help="Fail silently",
+ )
+ args = parser.parse_args(sys.argv[1:])
+ parser_uid = args.parser_uid
+ fail_silently = args.fail_silently
+ return int(
+ not update_tld_names(
+ parser_uid=parser_uid,
+ fail_silently=fail_silently
+ )
+ )
+
+
+def get_tld_names(
+ fail_silently: bool = False,
+ retry_count: int = 0,
+ parser_class: Type[BaseTLDSourceParser] = None
+) -> Dict[str, Trie]:
+ """Build the ``tlds`` list if empty. Recursive.
+
+ :param fail_silently: If set to True, no exceptions are raised and None
+ is returned on failure.
+ :param retry_count: If greater than 1, we raise an exception in order
+ to avoid infinite loops.
+ :param parser_class:
+ :type fail_silently: bool
+ :type retry_count: int
+ :type parser_class: BaseTLDSourceParser
+ :return: List of TLD names
+ :rtype: obj:`tld.utils.Trie`
+ """
+ if not parser_class:
+ parser_class = MozillaTLDSourceParser
+
+ return parser_class.get_tld_names(
+ fail_silently=fail_silently,
+ retry_count=retry_count
+ )
+
+
+# **************************************************************************
+# **************************** Parser classes ******************************
+# **************************************************************************
+
+class BaseMozillaTLDSourceParser(BaseTLDSourceParser):
+
+ @classmethod
+ def get_tld_names(
+ cls,
+ fail_silently: bool = False,
+ retry_count: int = 0
+ ) -> Optional[Dict[str, Trie]]:
+ """Parse.
+
+ :param fail_silently:
+ :param retry_count:
+ :return:
+ """
+ if retry_count > 1:
+ if fail_silently:
+ return None
+ else:
+ raise TldIOError
+
+ global tld_names
+ _tld_names = tld_names
+ # _tld_names = get_tld_names_container()
+
+ # If already loaded, return
+ if (
+ cls.local_path in _tld_names
+ and _tld_names[cls.local_path] is not None
+ ):
+ return _tld_names
+
+ try:
+ # Load the TLD names file
+ if isabs(cls.local_path):
+ local_path = cls.local_path
+ else:
+ local_path = project_dir(cls.local_path)
+ local_file = codecs_open(
+ local_path,
+ 'r',
+ encoding='utf8'
+ )
+ trie = Trie()
+ trie_add = trie.add # Performance opt
+ # Make a list of it all, strip all garbage
+ private_section = False
+
+ for line in local_file:
+ if '===BEGIN PRIVATE DOMAINS===' in line:
+ private_section = True
+
+ # Puny code TLD names
+ if '// xn--' in line:
+ line = line.split()[1]
+
+ if line[0] in ('/', '\n'):
+ continue
+
+ trie_add(
+ f'{line.strip()}',
+ private=private_section
+ )
+
+ update_tld_names_container(cls.local_path, trie)
+
+ local_file.close()
+ except IOError as err:
+ # Grab the file
+ cls.update_tld_names(
+ fail_silently=fail_silently
+ )
+ # Increment ``retry_count`` in order to avoid infinite loops
+ retry_count += 1
+ # Run again
+ return cls.get_tld_names(
+ fail_silently=fail_silently,
+ retry_count=retry_count
+ )
+ except Exception as err:
+ if fail_silently:
+ return None
+ else:
+ raise err
+ finally:
+ try:
+ local_file.close()
+ except Exception:
+ pass
+
+ return _tld_names
+
+
+class MozillaTLDSourceParser(BaseMozillaTLDSourceParser):
+ """Mozilla TLD source."""
+
+ uid: str = 'mozilla'
+ source_url: str = 'https://publicsuffix.org/list/public_suffix_list.dat'
+ local_path: str = 'res/effective_tld_names.dat.txt'
+
+# **************************************************************************
+# **************************** Core functions ******************************
+# **************************************************************************
+
+
+def process_url(
+ url: str,
+ fail_silently: bool = False,
+ fix_protocol: bool = False,
+ search_public: bool = True,
+ search_private: bool = True,
+ parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser
+) -> Union[Tuple[List[str], int, SplitResult], Tuple[None, None, SplitResult]]:
+ """Process URL.
+
+ :param parser_class:
+ :param url:
+ :param fail_silently:
+ :param fix_protocol:
+ :param search_public:
+ :param search_private:
+ :return:
+ """
+ if not (search_public or search_private):
+ raise TldImproperlyConfigured(
+ "Either `search_public` or `search_private` (or both) shall be "
+ "set to True."
+ )
+
+ # Init
+ _tld_names = get_tld_names(
+ fail_silently=fail_silently,
+ parser_class=parser_class
+ )
+
+ if not isinstance(url, SplitResult):
+ url = url.lower()
+
+ if (
+ fix_protocol and not url.startswith(('//', 'http://', 'https://'))
+ ):
+ url = f'https://{url}'
+
+ # Get parsed URL as we might need it later
+ parsed_url = urlsplit(url)
+ else:
+ parsed_url = url
+
+ # Get (sub) domain name
+ domain_name = parsed_url.hostname
+
+ if not domain_name:
+ if fail_silently:
+ return None, None, parsed_url
+ else:
+ raise TldBadUrl(url=url)
+
+ # This will correctly handle dots at the end of domain name in URLs like
+ # https://github.com............/barseghyanartur/tld/
+ if domain_name.endswith('.'):
+ domain_name = domain_name.rstrip('.')
+
+ domain_parts = domain_name.split('.')
+ tld_names_local_path = parser_class.local_path
+
+ # Now we query our Trie iterating on the domain parts in reverse order
+ node = _tld_names[tld_names_local_path].root
+ current_length = 0
+ tld_length = 0
+ match = None
+ len_domain_parts = len(domain_parts)
+ for i in range(len_domain_parts-1, -1, -1):
+ part = domain_parts[i]
+
+ # Cannot go deeper
+ if node.children is None:
+ break
+
+ # Exception
+ if part == node.exception:
+ break
+
+ child = node.children.get(part)
+
+ # Wildcards
+ if child is None:
+ child = node.children.get('*')
+
+ # If the current part is not in current node's children, we can stop
+ if child is None:
+ break
+
+ # Else we move deeper and increment our tld offset
+ current_length += 1
+ node = child
+
+ if node.leaf:
+ tld_length = current_length
+ match = node
+
+ # Checking the node we finished on is a leaf and is one we allow
+ if (
+ (match is None) or
+ (not match.leaf) or
+ (not search_public and not match.private) or
+ (not search_private and match.private)
+ ):
+ if fail_silently:
+ return None, None, parsed_url
+ else:
+ raise TldDomainNotFound(domain_name=domain_name)
+
+ if len_domain_parts == tld_length:
+ non_zero_i = -1 # hostname = tld
+ else:
+ non_zero_i = max(1, len_domain_parts - tld_length)
+
+ return domain_parts, non_zero_i, parsed_url
+
+
+def get_fld(
+ url: str,
+ fail_silently: bool = False,
+ fix_protocol: bool = False,
+ search_public: bool = True,
+ search_private: bool = True,
+ parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser,
+ **kwargs
+) -> Optional[str]:
+ """Extract the first level domain.
+
+ Extract the top level domain based on the mozilla's effective TLD names
+ dat file. Returns a string. May throw ``TldBadUrl`` or
+ ``TldDomainNotFound`` exceptions if there's bad URL provided or no TLD
+ match found respectively.
+
+ :param url: URL to get top level domain from.
+ :param fail_silently: If set to True, no exceptions are raised and None
+ is returned on failure.
+ :param fix_protocol: If set to True, missing or wrong protocol is
+ ignored (https is appended instead).
+ :param search_public: If set to True, search in public domains.
+ :param search_private: If set to True, search in private domains.
+ :param parser_class:
+ :type url: str
+ :type fail_silently: bool
+ :type fix_protocol: bool
+ :type search_public: bool
+ :type search_private: bool
+ :return: String with top level domain (if ``as_object`` argument
+ is set to False) or a ``tld.utils.Result`` object (if ``as_object``
+ argument is set to True); returns None on failure.
+ :rtype: str
+ """
+ if 'as_object' in kwargs:
+ raise TldImproperlyConfigured(
+ "`as_object` argument is deprecated for `get_fld`. Use `get_tld` "
+ "instead."
+ )
+
+ domain_parts, non_zero_i, parsed_url = process_url(
+ url=url,
+ fail_silently=fail_silently,
+ fix_protocol=fix_protocol,
+ search_public=search_public,
+ search_private=search_private,
+ parser_class=parser_class
+ )
+
+ if domain_parts is None:
+ return None
+
+ # This should be None when domain_parts is None
+ # but mypy isn't quite smart enough to figure that out yet
+ assert non_zero_i is not None
+ if non_zero_i < 0:
+ # hostname = tld
+ return parsed_url.hostname
+
+ return ".".join(domain_parts[non_zero_i-1:])
+
+
+def get_tld(
+ url: str,
+ fail_silently: bool = False,
+ as_object: bool = False,
+ fix_protocol: bool = False,
+ search_public: bool = True,
+ search_private: bool = True,
+ parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser
+) -> Optional[Union[str, Result]]:
+ """Extract the top level domain.
+
+ Extract the top level domain based on the mozilla's effective TLD names
+ dat file. Returns a string. May throw ``TldBadUrl`` or
+ ``TldDomainNotFound`` exceptions if there's bad URL provided or no TLD
+ match found respectively.
+
+ :param url: URL to get top level domain from.
+ :param fail_silently: If set to True, no exceptions are raised and None
+ is returned on failure.
+ :param as_object: If set to True, ``tld.utils.Result`` object is returned,
+ ``domain``, ``suffix`` and ``tld`` properties.
+ :param fix_protocol: If set to True, missing or wrong protocol is
+ ignored (https is appended instead).
+ :param search_public: If set to True, search in public domains.
+ :param search_private: If set to True, search in private domains.
+ :param parser_class:
+ :type url: str
+ :type fail_silently: bool
+ :type as_object: bool
+ :type fix_protocol: bool
+ :type search_public: bool
+ :type search_private: bool
+ :return: String with top level domain (if ``as_object`` argument
+ is set to False) or a ``tld.utils.Result`` object (if ``as_object``
+ argument is set to True); returns None on failure.
+ :rtype: str
+ """
+ domain_parts, non_zero_i, parsed_url = process_url(
+ url=url,
+ fail_silently=fail_silently,
+ fix_protocol=fix_protocol,
+ search_public=search_public,
+ search_private=search_private,
+ parser_class=parser_class
+ )
+
+ if domain_parts is None:
+ return None
+
+ # This should be None when domain_parts is None
+ # but mypy isn't quite smart enough to figure that out yet
+ assert non_zero_i is not None
+
+ if not as_object:
+ if non_zero_i < 0:
+ # hostname = tld
+ return parsed_url.hostname
+ return ".".join(domain_parts[non_zero_i:])
+
+ if non_zero_i < 0:
+ # hostname = tld
+ subdomain = ""
+ domain = ""
+ # This is checked in process_url but the type is ambiguous (Optional[str])
+ # so this assertion is just to satisfy mypy
+ assert parsed_url.hostname is not None, "No hostname in URL"
+ _tld = parsed_url.hostname
+ else:
+ subdomain = ".".join(domain_parts[:non_zero_i-1])
+ domain = ".".join(
+ domain_parts[non_zero_i-1:non_zero_i]
+ )
+ _tld = ".".join(domain_parts[non_zero_i:])
+
+ return Result(
+ subdomain=subdomain,
+ domain=domain,
+ tld=_tld,
+ parsed_url=parsed_url
+ )
+
+
+def parse_tld(
+ url: str,
+ fail_silently: bool = False,
+ fix_protocol: bool = False,
+ search_public: bool = True,
+ search_private: bool = True,
+ parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser
+) -> Union[Tuple[None, None, None], Tuple[str, str, str]]:
+ """Parse TLD into parts.
+
+ :param url:
+ :param fail_silently:
+ :param fix_protocol:
+ :param search_public:
+ :param search_private:
+ :param parser_class:
+ :return: Tuple (tld, domain, subdomain)
+ :rtype: tuple
+ """
+ try:
+ obj = get_tld(
+ url,
+ fail_silently=fail_silently,
+ as_object=True,
+ fix_protocol=fix_protocol,
+ search_public=search_public,
+ search_private=search_private,
+ parser_class=parser_class
+ )
+ if obj is None:
+ return None, None, None
+
+ return obj.tld, obj.domain, obj.subdomain # type: ignore
+
+ except (
+ TldBadUrl,
+ TldDomainNotFound,
+ TldImproperlyConfigured,
+ TldIOError
+ ):
+ pass
+
+ return None, None, None
+
+
+def is_tld(
+ value: str,
+ search_public: bool = True,
+ search_private: bool = True,
+ parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser
+) -> bool:
+ """Check if given URL is tld.
+
+ :param value: URL to get top level domain from.
+ :param search_public: If set to True, search in public domains.
+ :param search_private: If set to True, search in private domains.
+ :param parser_class:
+ :type value: str
+ :type search_public: bool
+ :type search_private: bool
+ :return:
+ :rtype: bool
+ """
+ _tld = get_tld(
+ url=value,
+ fail_silently=True,
+ fix_protocol=True,
+ search_public=search_public,
+ search_private=search_private,
+ parser_class=parser_class
+ )
+ return value == _tld
+
+
+def reset_tld_names(tld_names_local_path: str = None) -> None:
+ """Reset the ``tld_names`` to empty value.
+
+ If ``tld_names_local_path`` is given, removes specified
+ entry from ``tld_names`` instead.
+
+ :param tld_names_local_path:
+ :type tld_names_local_path: str
+ :return:
+ """
+
+ if tld_names_local_path:
+ pop_tld_names_container(tld_names_local_path)
+ else:
+ global tld_names
+ tld_names = {}