diff options
Diffstat (limited to 'libs/tld/utils.py')
-rw-r--r-- | libs/tld/utils.py | 624 |
1 files changed, 624 insertions, 0 deletions
diff --git a/libs/tld/utils.py b/libs/tld/utils.py new file mode 100644 index 000000000..6465eefaa --- /dev/null +++ b/libs/tld/utils.py @@ -0,0 +1,624 @@ +from __future__ import unicode_literals +import argparse +from codecs import open as codecs_open +from functools import lru_cache +# codecs_open = open +from os.path import isabs +import sys +from typing import Dict, Type, Union, Tuple, List, Optional +from urllib.parse import urlsplit, SplitResult + +from .base import BaseTLDSourceParser +from .exceptions import ( + TldBadUrl, + TldDomainNotFound, + TldImproperlyConfigured, + TldIOError, +) +from .helpers import project_dir +from .trie import Trie +from .registry import Registry +from .result import Result + +__author__ = 'Artur Barseghyan' +__copyright__ = '2013-2020 Artur Barseghyan' +__license__ = 'MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later' +__all__ = ( + 'BaseMozillaTLDSourceParser', + 'get_fld', + 'get_tld', + 'get_tld_names', + 'get_tld_names_container', + 'is_tld', + 'MozillaTLDSourceParser', + 'parse_tld', + 'pop_tld_names_container', + 'process_url', + 'reset_tld_names', + 'Result', + 'tld_names', + 'update_tld_names', + 'update_tld_names_cli', + 'update_tld_names_container', +) + +tld_names: Dict[str, Trie] = {} + + +def get_tld_names_container() -> Dict[str, Trie]: + """Get container of all tld names. + + :return: + :rtype dict: + """ + global tld_names + return tld_names + + +def update_tld_names_container(tld_names_local_path: str, + trie_obj: Trie) -> None: + """Update TLD Names container item. + + :param tld_names_local_path: + :param trie_obj: + :return: + """ + global tld_names + # tld_names.update({tld_names_local_path: trie_obj}) + tld_names[tld_names_local_path] = trie_obj + + +def pop_tld_names_container(tld_names_local_path: str) -> None: + """Remove TLD names container item. + + :param tld_names_local_path: + :return: + """ + global tld_names + tld_names.pop(tld_names_local_path, None) + + +@lru_cache(maxsize=128, typed=True) +def update_tld_names( + fail_silently: bool = False, + parser_uid: str = None +) -> bool: + """Update TLD names. + + :param fail_silently: + :param parser_uid: + :return: + """ + results: List[bool] = [] + results_append = results.append + if parser_uid: + parser_cls = Registry.get(parser_uid, None) + if parser_cls and parser_cls.source_url: + results_append( + parser_cls.update_tld_names(fail_silently=fail_silently) + ) + else: + for parser_uid, parser_cls in Registry.items(): + if parser_cls and parser_cls.source_url: + results_append( + parser_cls.update_tld_names(fail_silently=fail_silently) + ) + + return all(results) + + +def update_tld_names_cli() -> int: + """CLI wrapper for update_tld_names. + + Since update_tld_names returns True on success, we need to negate the + result to match CLI semantics. + """ + parser = argparse.ArgumentParser(description='Update TLD names') + parser.add_argument( + 'parser_uid', + nargs='?', + default=None, + help="UID of the parser to update TLD names for.", + ) + parser.add_argument( + '--fail-silently', + dest="fail_silently", + default=False, + action='store_true', + help="Fail silently", + ) + args = parser.parse_args(sys.argv[1:]) + parser_uid = args.parser_uid + fail_silently = args.fail_silently + return int( + not update_tld_names( + parser_uid=parser_uid, + fail_silently=fail_silently + ) + ) + + +def get_tld_names( + fail_silently: bool = False, + retry_count: int = 0, + parser_class: Type[BaseTLDSourceParser] = None +) -> Dict[str, Trie]: + """Build the ``tlds`` list if empty. Recursive. + + :param fail_silently: If set to True, no exceptions are raised and None + is returned on failure. + :param retry_count: If greater than 1, we raise an exception in order + to avoid infinite loops. + :param parser_class: + :type fail_silently: bool + :type retry_count: int + :type parser_class: BaseTLDSourceParser + :return: List of TLD names + :rtype: obj:`tld.utils.Trie` + """ + if not parser_class: + parser_class = MozillaTLDSourceParser + + return parser_class.get_tld_names( + fail_silently=fail_silently, + retry_count=retry_count + ) + + +# ************************************************************************** +# **************************** Parser classes ****************************** +# ************************************************************************** + +class BaseMozillaTLDSourceParser(BaseTLDSourceParser): + + @classmethod + def get_tld_names( + cls, + fail_silently: bool = False, + retry_count: int = 0 + ) -> Optional[Dict[str, Trie]]: + """Parse. + + :param fail_silently: + :param retry_count: + :return: + """ + if retry_count > 1: + if fail_silently: + return None + else: + raise TldIOError + + global tld_names + _tld_names = tld_names + # _tld_names = get_tld_names_container() + + # If already loaded, return + if ( + cls.local_path in _tld_names + and _tld_names[cls.local_path] is not None + ): + return _tld_names + + try: + # Load the TLD names file + if isabs(cls.local_path): + local_path = cls.local_path + else: + local_path = project_dir(cls.local_path) + local_file = codecs_open( + local_path, + 'r', + encoding='utf8' + ) + trie = Trie() + trie_add = trie.add # Performance opt + # Make a list of it all, strip all garbage + private_section = False + + for line in local_file: + if '===BEGIN PRIVATE DOMAINS===' in line: + private_section = True + + # Puny code TLD names + if '// xn--' in line: + line = line.split()[1] + + if line[0] in ('/', '\n'): + continue + + trie_add( + f'{line.strip()}', + private=private_section + ) + + update_tld_names_container(cls.local_path, trie) + + local_file.close() + except IOError as err: + # Grab the file + cls.update_tld_names( + fail_silently=fail_silently + ) + # Increment ``retry_count`` in order to avoid infinite loops + retry_count += 1 + # Run again + return cls.get_tld_names( + fail_silently=fail_silently, + retry_count=retry_count + ) + except Exception as err: + if fail_silently: + return None + else: + raise err + finally: + try: + local_file.close() + except Exception: + pass + + return _tld_names + + +class MozillaTLDSourceParser(BaseMozillaTLDSourceParser): + """Mozilla TLD source.""" + + uid: str = 'mozilla' + source_url: str = 'https://publicsuffix.org/list/public_suffix_list.dat' + local_path: str = 'res/effective_tld_names.dat.txt' + +# ************************************************************************** +# **************************** Core functions ****************************** +# ************************************************************************** + + +def process_url( + url: str, + fail_silently: bool = False, + fix_protocol: bool = False, + search_public: bool = True, + search_private: bool = True, + parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser +) -> Union[Tuple[List[str], int, SplitResult], Tuple[None, None, SplitResult]]: + """Process URL. + + :param parser_class: + :param url: + :param fail_silently: + :param fix_protocol: + :param search_public: + :param search_private: + :return: + """ + if not (search_public or search_private): + raise TldImproperlyConfigured( + "Either `search_public` or `search_private` (or both) shall be " + "set to True." + ) + + # Init + _tld_names = get_tld_names( + fail_silently=fail_silently, + parser_class=parser_class + ) + + if not isinstance(url, SplitResult): + url = url.lower() + + if ( + fix_protocol and not url.startswith(('//', 'http://', 'https://')) + ): + url = f'https://{url}' + + # Get parsed URL as we might need it later + parsed_url = urlsplit(url) + else: + parsed_url = url + + # Get (sub) domain name + domain_name = parsed_url.hostname + + if not domain_name: + if fail_silently: + return None, None, parsed_url + else: + raise TldBadUrl(url=url) + + # This will correctly handle dots at the end of domain name in URLs like + # https://github.com............/barseghyanartur/tld/ + if domain_name.endswith('.'): + domain_name = domain_name.rstrip('.') + + domain_parts = domain_name.split('.') + tld_names_local_path = parser_class.local_path + + # Now we query our Trie iterating on the domain parts in reverse order + node = _tld_names[tld_names_local_path].root + current_length = 0 + tld_length = 0 + match = None + len_domain_parts = len(domain_parts) + for i in range(len_domain_parts-1, -1, -1): + part = domain_parts[i] + + # Cannot go deeper + if node.children is None: + break + + # Exception + if part == node.exception: + break + + child = node.children.get(part) + + # Wildcards + if child is None: + child = node.children.get('*') + + # If the current part is not in current node's children, we can stop + if child is None: + break + + # Else we move deeper and increment our tld offset + current_length += 1 + node = child + + if node.leaf: + tld_length = current_length + match = node + + # Checking the node we finished on is a leaf and is one we allow + if ( + (match is None) or + (not match.leaf) or + (not search_public and not match.private) or + (not search_private and match.private) + ): + if fail_silently: + return None, None, parsed_url + else: + raise TldDomainNotFound(domain_name=domain_name) + + if len_domain_parts == tld_length: + non_zero_i = -1 # hostname = tld + else: + non_zero_i = max(1, len_domain_parts - tld_length) + + return domain_parts, non_zero_i, parsed_url + + +def get_fld( + url: str, + fail_silently: bool = False, + fix_protocol: bool = False, + search_public: bool = True, + search_private: bool = True, + parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser, + **kwargs +) -> Optional[str]: + """Extract the first level domain. + + Extract the top level domain based on the mozilla's effective TLD names + dat file. Returns a string. May throw ``TldBadUrl`` or + ``TldDomainNotFound`` exceptions if there's bad URL provided or no TLD + match found respectively. + + :param url: URL to get top level domain from. + :param fail_silently: If set to True, no exceptions are raised and None + is returned on failure. + :param fix_protocol: If set to True, missing or wrong protocol is + ignored (https is appended instead). + :param search_public: If set to True, search in public domains. + :param search_private: If set to True, search in private domains. + :param parser_class: + :type url: str + :type fail_silently: bool + :type fix_protocol: bool + :type search_public: bool + :type search_private: bool + :return: String with top level domain (if ``as_object`` argument + is set to False) or a ``tld.utils.Result`` object (if ``as_object`` + argument is set to True); returns None on failure. + :rtype: str + """ + if 'as_object' in kwargs: + raise TldImproperlyConfigured( + "`as_object` argument is deprecated for `get_fld`. Use `get_tld` " + "instead." + ) + + domain_parts, non_zero_i, parsed_url = process_url( + url=url, + fail_silently=fail_silently, + fix_protocol=fix_protocol, + search_public=search_public, + search_private=search_private, + parser_class=parser_class + ) + + if domain_parts is None: + return None + + # This should be None when domain_parts is None + # but mypy isn't quite smart enough to figure that out yet + assert non_zero_i is not None + if non_zero_i < 0: + # hostname = tld + return parsed_url.hostname + + return ".".join(domain_parts[non_zero_i-1:]) + + +def get_tld( + url: str, + fail_silently: bool = False, + as_object: bool = False, + fix_protocol: bool = False, + search_public: bool = True, + search_private: bool = True, + parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser +) -> Optional[Union[str, Result]]: + """Extract the top level domain. + + Extract the top level domain based on the mozilla's effective TLD names + dat file. Returns a string. May throw ``TldBadUrl`` or + ``TldDomainNotFound`` exceptions if there's bad URL provided or no TLD + match found respectively. + + :param url: URL to get top level domain from. + :param fail_silently: If set to True, no exceptions are raised and None + is returned on failure. + :param as_object: If set to True, ``tld.utils.Result`` object is returned, + ``domain``, ``suffix`` and ``tld`` properties. + :param fix_protocol: If set to True, missing or wrong protocol is + ignored (https is appended instead). + :param search_public: If set to True, search in public domains. + :param search_private: If set to True, search in private domains. + :param parser_class: + :type url: str + :type fail_silently: bool + :type as_object: bool + :type fix_protocol: bool + :type search_public: bool + :type search_private: bool + :return: String with top level domain (if ``as_object`` argument + is set to False) or a ``tld.utils.Result`` object (if ``as_object`` + argument is set to True); returns None on failure. + :rtype: str + """ + domain_parts, non_zero_i, parsed_url = process_url( + url=url, + fail_silently=fail_silently, + fix_protocol=fix_protocol, + search_public=search_public, + search_private=search_private, + parser_class=parser_class + ) + + if domain_parts is None: + return None + + # This should be None when domain_parts is None + # but mypy isn't quite smart enough to figure that out yet + assert non_zero_i is not None + + if not as_object: + if non_zero_i < 0: + # hostname = tld + return parsed_url.hostname + return ".".join(domain_parts[non_zero_i:]) + + if non_zero_i < 0: + # hostname = tld + subdomain = "" + domain = "" + # This is checked in process_url but the type is ambiguous (Optional[str]) + # so this assertion is just to satisfy mypy + assert parsed_url.hostname is not None, "No hostname in URL" + _tld = parsed_url.hostname + else: + subdomain = ".".join(domain_parts[:non_zero_i-1]) + domain = ".".join( + domain_parts[non_zero_i-1:non_zero_i] + ) + _tld = ".".join(domain_parts[non_zero_i:]) + + return Result( + subdomain=subdomain, + domain=domain, + tld=_tld, + parsed_url=parsed_url + ) + + +def parse_tld( + url: str, + fail_silently: bool = False, + fix_protocol: bool = False, + search_public: bool = True, + search_private: bool = True, + parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser +) -> Union[Tuple[None, None, None], Tuple[str, str, str]]: + """Parse TLD into parts. + + :param url: + :param fail_silently: + :param fix_protocol: + :param search_public: + :param search_private: + :param parser_class: + :return: Tuple (tld, domain, subdomain) + :rtype: tuple + """ + try: + obj = get_tld( + url, + fail_silently=fail_silently, + as_object=True, + fix_protocol=fix_protocol, + search_public=search_public, + search_private=search_private, + parser_class=parser_class + ) + if obj is None: + return None, None, None + + return obj.tld, obj.domain, obj.subdomain # type: ignore + + except ( + TldBadUrl, + TldDomainNotFound, + TldImproperlyConfigured, + TldIOError + ): + pass + + return None, None, None + + +def is_tld( + value: str, + search_public: bool = True, + search_private: bool = True, + parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser +) -> bool: + """Check if given URL is tld. + + :param value: URL to get top level domain from. + :param search_public: If set to True, search in public domains. + :param search_private: If set to True, search in private domains. + :param parser_class: + :type value: str + :type search_public: bool + :type search_private: bool + :return: + :rtype: bool + """ + _tld = get_tld( + url=value, + fail_silently=True, + fix_protocol=True, + search_public=search_public, + search_private=search_private, + parser_class=parser_class + ) + return value == _tld + + +def reset_tld_names(tld_names_local_path: str = None) -> None: + """Reset the ``tld_names`` to empty value. + + If ``tld_names_local_path`` is given, removes specified + entry from ``tld_names`` instead. + + :param tld_names_local_path: + :type tld_names_local_path: str + :return: + """ + + if tld_names_local_path: + pop_tld_names_container(tld_names_local_path) + else: + global tld_names + tld_names = {} |