diff options
Diffstat (limited to 'libs/apprise/utils.py')
-rw-r--r-- | libs/apprise/utils.py | 446 |
1 files changed, 425 insertions, 21 deletions
diff --git a/libs/apprise/utils.py b/libs/apprise/utils.py index 8d0920071..27b263c34 100644 --- a/libs/apprise/utils.py +++ b/libs/apprise/utils.py @@ -25,6 +25,7 @@ import re import six +import json import contextlib import os from os.path import expanduser @@ -95,9 +96,10 @@ TIDY_NUX_TRIM_RE = re.compile( # The handling of custom arguments passed in the URL; we treat any # argument (which would otherwise appear in the qsd area of our parse_url() -# function differently if they start with a + or - value +# function differently if they start with a +, - or : value NOTIFY_CUSTOM_ADD_TOKENS = re.compile(r'^( |\+)(?P<key>.*)\s*') NOTIFY_CUSTOM_DEL_TOKENS = re.compile(r'^-(?P<key>.*)\s*') +NOTIFY_CUSTOM_COLON_TOKENS = re.compile(r'^:(?P<key>.*)\s*') # Used for attempting to acquire the schema if the URL can't be parsed. GET_SCHEMA_RE = re.compile(r'\s*(?P<schema>[a-z0-9]{2,9})://.*$', re.I) @@ -113,18 +115,23 @@ GET_SCHEMA_RE = re.compile(r'\s*(?P<schema>[a-z0-9]{2,9})://.*$', re.I) GET_EMAIL_RE = re.compile( - r'((?P<name>[^:<]+)?[:<\s]+)?' + r'(([\s"\']+)?(?P<name>[^:<"\']+)?[:<\s"\']+)?' r'(?P<full_email>((?P<label>[^+]+)\+)?' r'(?P<email>(?P<userid>[a-z0-9$%=_~-]+' r'(?:\.[a-z0-9$%+=_~-]+)' r'*)@(?P<domain>(' - r'(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+' - r'[a-z0-9](?:[a-z0-9-]*[a-z0-9]))|' - r'[a-z0-9][a-z0-9-]{5,})))' + r'(?:[a-z0-9](?:[a-z0-9_-]*[a-z0-9])?\.)+' + r'[a-z0-9](?:[a-z0-9_-]*[a-z0-9]))|' + r'[a-z0-9][a-z0-9_-]{5,})))' r'\s*>?', re.IGNORECASE) -# Regular expression used to extract a phone number -GET_PHONE_NO_RE = re.compile(r'^\+?(?P<phone>[0-9\s)(+-]+)\s*$') +# A simple verification check to make sure the content specified +# rougly conforms to a phone number before we parse it further +IS_PHONE_NO = re.compile(r'^\+?(?P<phone>[0-9\s)(+-]+)\s*$') + +# Regular expression used to destinguish between multiple phone numbers +PHONE_NO_DETECTION_RE = re.compile( + r'\s*([+(\s]*[0-9][0-9()\s-]+[0-9])(?=$|[\s,+(]+[0-9])', re.I) # Regular expression used to destinguish between multiple URLs URL_DETECTION_RE = re.compile( @@ -136,11 +143,29 @@ EMAIL_DETECTION_RE = re.compile( r'[^@\s,]+@[^\s,]+)', re.IGNORECASE) +# Used to prepare our UUID regex matching +UUID4_RE = re.compile( + r'[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}', + re.IGNORECASE) + # validate_regex() utilizes this mapping to track and re-use pre-complied # regular expressions REGEX_VALIDATE_LOOKUP = {} +class TemplateType(object): + """ + Defines the different template types we can perform parsing on + """ + # RAW does nothing at all to the content being parsed + # data is taken at it's absolute value + RAW = 'raw' + + # Data is presumed to be of type JSON and is therefore escaped + # if required to do so (such as single quotes) + JSON = 'json' + + def is_ipaddr(addr, ipv4=True, ipv6=True): """ Validates against IPV4 and IPV6 IP Addresses @@ -191,7 +216,7 @@ def is_ipaddr(addr, ipv4=True, ipv6=True): return False -def is_hostname(hostname, ipv4=True, ipv6=True): +def is_hostname(hostname, ipv4=True, ipv6=True, underscore=True): """ Validate hostname """ @@ -200,7 +225,7 @@ def is_hostname(hostname, ipv4=True, ipv6=True): if len(hostname) > 253 or len(hostname) == 0: return False - # Strip trailling period on hostname (if one exists) + # Strip trailing period on hostname (if one exists) if hostname[-1] == ".": hostname = hostname[:-1] @@ -217,9 +242,14 @@ def is_hostname(hostname, ipv4=True, ipv6=True): # - Hostnames can ony be comprised of alpha-numeric characters and the # hyphen (-) character. # - Hostnames can not start with the hyphen (-) character. + # - as a workaround for https://github.com/docker/compose/issues/229 to + # being able to address services in other stacks, we also allow + # underscores in hostnames (if flag is set accordingly) # - labels can not exceed 63 characters + # - allow single character alpha characters allowed = re.compile( - r'(?!-)[a-z0-9][a-z0-9-]{1,62}(?<!-)$', + r'^([a-z0-9][a-z0-9_-]{1,62}|[a-z_-])(?<![_-])$' if underscore else + r'^([a-z0-9][a-z0-9-]{1,62}|[a-z-])(?<!-)$', re.IGNORECASE, ) @@ -229,6 +259,119 @@ def is_hostname(hostname, ipv4=True, ipv6=True): return hostname +def is_uuid(uuid): + """Determine if the specified entry is uuid v4 string + + Args: + address (str): The string you want to check. + + Returns: + bool: Returns False if the specified element is not a uuid otherwise + it returns True + """ + + try: + match = UUID4_RE.match(uuid) + + except TypeError: + # not parseable content + return False + + return True if match else False + + +def is_phone_no(phone, min_len=11): + """Determine if the specified entry is a phone number + + Args: + phone (str): The string you want to check. + min_len (int): Defines the smallest expected length of the phone + before it's to be considered invalid. By default + the phone number can't be any larger then 14 + + Returns: + bool: Returns False if the address specified is not a phone number + and a dictionary of the parsed phone number if it is as: + { + 'country': '1', + 'area': '800', + 'line': '1234567', + 'full': '18001234567', + 'pretty': '+1 800-123-4567', + } + + Non conventional numbers such as 411 would look like provided that + `min_len` is set to at least a 3: + { + 'country': '', + 'area': '', + 'line': '411', + 'full': '411', + 'pretty': '411', + } + + """ + + try: + if not IS_PHONE_NO.match(phone): + # not parseable content as it does not even conform closely to a + # phone number) + return False + + except TypeError: + return False + + # Tidy phone number up first + phone = re.sub(r'[^\d]+', '', phone) + if len(phone) > 14 or len(phone) < min_len: + # Invalid phone number + return False + + # Full phone number without any markup is as is now + full = phone + + # Break apart our phone number + line = phone[-7:] + phone = phone[:len(phone) - 7] if len(phone) > 7 else '' + + # the area code (if present) + area = phone[-3:] if phone else '' + + # The country code is the leftovers + country = phone[:len(phone) - 3] if len(phone) > 3 else '' + + # Prepare a nicely (consistently) formatted phone number + pretty = '' + + if country: + # The leftover is the country code + pretty += '+{} '.format(country) + + if area: + pretty += '{}-'.format(area) + + if len(line) >= 7: + pretty += '{}-{}'.format(line[:3], line[3:]) + + else: + pretty += line + + return { + # The line code (last 7 digits) + 'line': line, + # Area code + 'area': area, + # The country code (if identified) + 'country': country, + + # A nicely formatted phone no + 'pretty': pretty, + + # All digits in-line + 'full': full, + } + + def is_email(address): """Determine if the specified entry is an email address @@ -236,8 +379,17 @@ def is_email(address): address (str): The string you want to check. Returns: - bool: Returns True if the address specified is an email address - and False if it isn't. + bool: Returns False if the address specified is not an email address + and a dictionary of the parsed email if it is as: + { + 'name': 'Parse Name' + 'email': '[email protected]' + 'full_email': '[email protected]' + 'label': 'label' + 'user': 'user', + 'domain': 'domain.com' + } + """ try: @@ -318,10 +470,11 @@ def parse_qsd(qs): 'qsd': {}, # Detected Entries that start with + or - are additionally stored in - # these values (un-touched). The +/- however are stripped from their + # these values (un-touched). The :,+,- however are stripped from their # name before they are stored here. 'qsd+': {}, 'qsd-': {}, + 'qsd:': {}, } pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')] @@ -361,6 +514,12 @@ def parse_qsd(qs): # Store content 'as-is' result['qsd-'][k.group('key')] = val + # Check for tokens that start with a colon symbol (:) + k = NOTIFY_CUSTOM_COLON_TOKENS.match(key) + if k is not None: + # Store content 'as-is' + result['qsd:'][k.group('key')] = val + return result @@ -418,11 +577,12 @@ def parse_url(url, default_schema='http', verify_host=True): # qsd = Query String Dictionary 'qsd': {}, - # Detected Entries that start with + or - are additionally stored in - # these values (un-touched). The +/- however are stripped from their - # name before they are stored here. + # Detected Entries that start with +, - or : are additionally stored in + # these values (un-touched). The +, -, and : however are stripped + # from their name before they are stored here. 'qsd+': {}, 'qsd-': {}, + 'qsd:': {}, } qsdata = '' @@ -534,10 +694,7 @@ def parse_url(url, default_schema='http', verify_host=True): def parse_bool(arg, default=False): """ - NZBGet uses 'yes' and 'no' as well as other strings such as 'on' or - 'off' etch to handle boolean operations from it's control interface. - - This method can just simplify checks to these variables. + Support string based boolean settings. If the content could not be parsed, then the default is returned. """ @@ -572,9 +729,46 @@ def parse_bool(arg, default=False): return bool(arg) +def parse_phone_no(*args, **kwargs): + """ + Takes a string containing phone numbers separated by comma's and/or spaces + and returns a list. + """ + + # for Python 2.7 support, store_unparsable is not in the url above + # as just parse_emails(*args, store_unparseable=True) since it is + # an invalid syntax. This is the workaround to be backards compatible: + store_unparseable = kwargs.get('store_unparseable', True) + + result = [] + for arg in args: + if isinstance(arg, six.string_types) and arg: + _result = PHONE_NO_DETECTION_RE.findall(arg) + if _result: + result += _result + + elif not _result and store_unparseable: + # we had content passed into us that was lost because it was + # so poorly formatted that it didn't even come close to + # meeting the regular expression we defined. We intentially + # keep it as part of our result set so that parsing done + # at a higher level can at least report this to the end user + # and hopefully give them some indication as to what they + # may have done wrong. + result += \ + [x for x in filter(bool, re.split(STRING_DELIMITERS, arg))] + + elif isinstance(arg, (set, list, tuple)): + # Use recursion to handle the list of phone numbers + result += parse_phone_no( + *arg, store_unparseable=store_unparseable) + + return result + + def parse_emails(*args, **kwargs): """ - Takes a string containing URLs separated by comma's and/or spaces and + Takes a string containing emails separated by comma's and/or spaces and returns a list. """ @@ -821,6 +1015,174 @@ def validate_regex(value, regex=r'[^\s]+', flags=re.I, strip=True, fmt=None): return value.strip() if strip else value +def cwe312_word(word, force=False, advanced=True, threshold=5): + """ + This function was written to help mask secure/private information that may + or may not be found within Apprise. The idea is to provide a presentable + word response that the user who prepared it would understand, yet not + reveal any private information for any potential intruder + + For more detail see CWE-312 @ + https://cwe.mitre.org/data/definitions/312.html + + The `force` is an optional argument used to keep the string formatting + consistent and in one place. If set, the content passed in is presumed + to be containing secret information and will be updated accordingly. + + If advanced is set to `True` then content is additionally checked for + upper/lower/ascii/numerical variances. If an obscurity threshold is + reached, then content is considered secret + """ + + class Variance(object): + """ + A Simple List of Possible Character Variances + """ + # An Upper Case Character (ABCDEF... etc) + ALPHA_UPPER = '+' + # An Lower Case Character (abcdef... etc) + ALPHA_LOWER = '-' + # A Special Character ($%^;... etc) + SPECIAL = 's' + # A Numerical Character (1234... etc) + NUMERIC = 'n' + + if not (isinstance(word, six.string_types) and word.strip()): + # not a password if it's not something we even support + return word + + # Formatting + word = word.strip() + if force: + # We're forcing the representation to be a secret + # We do this for consistency + return '{}...{}'.format(word[0:1], word[-1:]) + + elif len(word) > 1 and \ + not is_hostname(word, ipv4=True, ipv6=True, underscore=False): + # Verify if it is a hostname or not + return '{}...{}'.format(word[0:1], word[-1:]) + + elif len(word) >= 16: + # an IP will be 15 characters so we don't want to use a smaller + # value then 16 (e.g 101.102.103.104) + # we can assume very long words are passwords otherwise + return '{}...{}'.format(word[0:1], word[-1:]) + + if advanced: + # + # Mark word a secret based on it's obscurity + # + + # Our variances will increase depending on these variables: + last_variance = None + obscurity = 0 + + for c in word: + # Detect our variance + if c.isdigit(): + variance = Variance.NUMERIC + elif c.isalpha() and c.isupper(): + variance = Variance.ALPHA_UPPER + elif c.isalpha() and c.islower(): + variance = Variance.ALPHA_LOWER + else: + variance = Variance.SPECIAL + + if last_variance != variance or variance == Variance.SPECIAL: + obscurity += 1 + + if obscurity >= threshold: + return '{}...{}'.format(word[0:1], word[-1:]) + + last_variance = variance + + # Otherwise we're good; return our word + return word + + +def cwe312_url(url): + """ + This function was written to help mask secure/private information that may + or may not be found on an Apprise URL. The idea is to not disrupt the + structure of the previous URL too much, yet still protect the users + private information from being logged directly to screen. + + For more detail see CWE-312 @ + https://cwe.mitre.org/data/definitions/312.html + + For example, consider the URL: http://user:password@localhost/ + + When passed into this function, the return value would be: + http://user:****@localhost/ + + Since apprise allows you to put private information everywhere in it's + custom URLs, it uses this function to manipulate the content before + returning to any kind of logger. + + The idea is that the URL can still be interpreted by the person who + constructed them, but not to an intruder. + """ + # Parse our URL + results = parse_url(url) + if not results: + # Nothing was returned (invalid data was fed in); return our + # information as it was fed to us (without changing it) + return url + + # Update our URL with values + results['password'] = cwe312_word(results['password'], force=True) + if not results['schema'].startswith('http'): + results['user'] = cwe312_word(results['user']) + results['host'] = cwe312_word(results['host']) + + else: + results['host'] = cwe312_word(results['host'], advanced=False) + results['user'] = cwe312_word(results['user'], advanced=False) + + # Apply our full path scan in all cases + results['fullpath'] = '/' + \ + '/'.join([cwe312_word(x) + for x in re.split( + r'[\\/]+', + results['fullpath'].lstrip('/'))]) \ + if results['fullpath'] else '' + + # + # Now re-assemble our URL for display purposes + # + + # Determine Authentication + auth = '' + if results['user'] and results['password']: + auth = '{user}:{password}@'.format( + user=results['user'], + password=results['password'], + ) + elif results['user']: + auth = '{user}@'.format( + user=results['user'], + ) + + params = '' + if results['qsd']: + params = '?{}'.format( + "&".join(["{}={}".format(k, cwe312_word(v, force=( + k in ('password', 'secret', 'pass', 'token', 'key', + 'id', 'apikey', 'to')))) + for k, v in results['qsd'].items()])) + + return '{schema}://{auth}{hostname}{port}{fullpath}{params}'.format( + schema=results['schema'], + auth=auth, + # never encode hostname since we're expecting it to be a valid one + hostname=results['host'], + port='' if not results['port'] else ':{}'.format(results['port']), + fullpath=results['fullpath'] if results['fullpath'] else '', + params=params, + ) + + @contextlib.contextmanager def environ(*remove, **update): """ @@ -845,3 +1207,45 @@ def environ(*remove, **update): finally: # Restore our snapshot os.environ = env_orig.copy() + + +def apply_template(template, app_mode=TemplateType.RAW, **kwargs): + """ + Takes a template in a str format and applies all of the keywords + and their values to it. + + The app$mode is used to dictact any pre-processing that needs to take place + to the escaped string prior to it being placed. The idea here is for + elements to be placed in a JSON response for example should be escaped + early in their string format. + + The template must contain keywords wrapped in in double + squirly braces like {{keyword}}. These are matched to the respected + kwargs passed into this function. + + If there is no match found, content is not swapped. + + """ + + def _escape_raw(content): + # No escaping necessary + return content + + def _escape_json(content): + # remove surounding quotes + return json.dumps(content)[1:-1] + + # Our escape function + fn = _escape_json if app_mode == TemplateType.JSON else _escape_raw + + lookup = [re.escape(x) for x in kwargs.keys()] + + # Compile this into a list + mask_r = re.compile( + re.escape('{{') + r'\s*(' + '|'.join(lookup) + r')\s*' + + re.escape('}}'), re.IGNORECASE) + + # we index 2 characters off the head and 2 characters from the tail + # to drop the '{{' and '}}' surrounding our match so that we can + # re-index it back into our list + return mask_r.sub(lambda x: fn(kwargs[x.group()[2:-2].strip()]), template) |