aboutsummaryrefslogtreecommitdiffhomepage
path: root/libs/apprise/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'libs/apprise/utils.py')
-rw-r--r--libs/apprise/utils.py446
1 files changed, 425 insertions, 21 deletions
diff --git a/libs/apprise/utils.py b/libs/apprise/utils.py
index 8d0920071..27b263c34 100644
--- a/libs/apprise/utils.py
+++ b/libs/apprise/utils.py
@@ -25,6 +25,7 @@
import re
import six
+import json
import contextlib
import os
from os.path import expanduser
@@ -95,9 +96,10 @@ TIDY_NUX_TRIM_RE = re.compile(
# The handling of custom arguments passed in the URL; we treat any
# argument (which would otherwise appear in the qsd area of our parse_url()
-# function differently if they start with a + or - value
+# function differently if they start with a +, - or : value
NOTIFY_CUSTOM_ADD_TOKENS = re.compile(r'^( |\+)(?P<key>.*)\s*')
NOTIFY_CUSTOM_DEL_TOKENS = re.compile(r'^-(?P<key>.*)\s*')
+NOTIFY_CUSTOM_COLON_TOKENS = re.compile(r'^:(?P<key>.*)\s*')
# Used for attempting to acquire the schema if the URL can't be parsed.
GET_SCHEMA_RE = re.compile(r'\s*(?P<schema>[a-z0-9]{2,9})://.*$', re.I)
@@ -113,18 +115,23 @@ GET_SCHEMA_RE = re.compile(r'\s*(?P<schema>[a-z0-9]{2,9})://.*$', re.I)
GET_EMAIL_RE = re.compile(
- r'((?P<name>[^:<]+)?[:<\s]+)?'
+ r'(([\s"\']+)?(?P<name>[^:<"\']+)?[:<\s"\']+)?'
r'(?P<full_email>((?P<label>[^+]+)\+)?'
r'(?P<email>(?P<userid>[a-z0-9$%=_~-]+'
r'(?:\.[a-z0-9$%+=_~-]+)'
r'*)@(?P<domain>('
- r'(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+'
- r'[a-z0-9](?:[a-z0-9-]*[a-z0-9]))|'
- r'[a-z0-9][a-z0-9-]{5,})))'
+ r'(?:[a-z0-9](?:[a-z0-9_-]*[a-z0-9])?\.)+'
+ r'[a-z0-9](?:[a-z0-9_-]*[a-z0-9]))|'
+ r'[a-z0-9][a-z0-9_-]{5,})))'
r'\s*>?', re.IGNORECASE)
-# Regular expression used to extract a phone number
-GET_PHONE_NO_RE = re.compile(r'^\+?(?P<phone>[0-9\s)(+-]+)\s*$')
+# A simple verification check to make sure the content specified
+# rougly conforms to a phone number before we parse it further
+IS_PHONE_NO = re.compile(r'^\+?(?P<phone>[0-9\s)(+-]+)\s*$')
+
+# Regular expression used to destinguish between multiple phone numbers
+PHONE_NO_DETECTION_RE = re.compile(
+ r'\s*([+(\s]*[0-9][0-9()\s-]+[0-9])(?=$|[\s,+(]+[0-9])', re.I)
# Regular expression used to destinguish between multiple URLs
URL_DETECTION_RE = re.compile(
@@ -136,11 +143,29 @@ EMAIL_DETECTION_RE = re.compile(
r'[^@\s,]+@[^\s,]+)',
re.IGNORECASE)
+# Used to prepare our UUID regex matching
+UUID4_RE = re.compile(
+ r'[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}',
+ re.IGNORECASE)
+
# validate_regex() utilizes this mapping to track and re-use pre-complied
# regular expressions
REGEX_VALIDATE_LOOKUP = {}
+class TemplateType(object):
+ """
+ Defines the different template types we can perform parsing on
+ """
+ # RAW does nothing at all to the content being parsed
+ # data is taken at it's absolute value
+ RAW = 'raw'
+
+ # Data is presumed to be of type JSON and is therefore escaped
+ # if required to do so (such as single quotes)
+ JSON = 'json'
+
+
def is_ipaddr(addr, ipv4=True, ipv6=True):
"""
Validates against IPV4 and IPV6 IP Addresses
@@ -191,7 +216,7 @@ def is_ipaddr(addr, ipv4=True, ipv6=True):
return False
-def is_hostname(hostname, ipv4=True, ipv6=True):
+def is_hostname(hostname, ipv4=True, ipv6=True, underscore=True):
"""
Validate hostname
"""
@@ -200,7 +225,7 @@ def is_hostname(hostname, ipv4=True, ipv6=True):
if len(hostname) > 253 or len(hostname) == 0:
return False
- # Strip trailling period on hostname (if one exists)
+ # Strip trailing period on hostname (if one exists)
if hostname[-1] == ".":
hostname = hostname[:-1]
@@ -217,9 +242,14 @@ def is_hostname(hostname, ipv4=True, ipv6=True):
# - Hostnames can ony be comprised of alpha-numeric characters and the
# hyphen (-) character.
# - Hostnames can not start with the hyphen (-) character.
+ # - as a workaround for https://github.com/docker/compose/issues/229 to
+ # being able to address services in other stacks, we also allow
+ # underscores in hostnames (if flag is set accordingly)
# - labels can not exceed 63 characters
+ # - allow single character alpha characters
allowed = re.compile(
- r'(?!-)[a-z0-9][a-z0-9-]{1,62}(?<!-)$',
+ r'^([a-z0-9][a-z0-9_-]{1,62}|[a-z_-])(?<![_-])$' if underscore else
+ r'^([a-z0-9][a-z0-9-]{1,62}|[a-z-])(?<!-)$',
re.IGNORECASE,
)
@@ -229,6 +259,119 @@ def is_hostname(hostname, ipv4=True, ipv6=True):
return hostname
+def is_uuid(uuid):
+ """Determine if the specified entry is uuid v4 string
+
+ Args:
+ address (str): The string you want to check.
+
+ Returns:
+ bool: Returns False if the specified element is not a uuid otherwise
+ it returns True
+ """
+
+ try:
+ match = UUID4_RE.match(uuid)
+
+ except TypeError:
+ # not parseable content
+ return False
+
+ return True if match else False
+
+
+def is_phone_no(phone, min_len=11):
+ """Determine if the specified entry is a phone number
+
+ Args:
+ phone (str): The string you want to check.
+ min_len (int): Defines the smallest expected length of the phone
+ before it's to be considered invalid. By default
+ the phone number can't be any larger then 14
+
+ Returns:
+ bool: Returns False if the address specified is not a phone number
+ and a dictionary of the parsed phone number if it is as:
+ {
+ 'country': '1',
+ 'area': '800',
+ 'line': '1234567',
+ 'full': '18001234567',
+ 'pretty': '+1 800-123-4567',
+ }
+
+ Non conventional numbers such as 411 would look like provided that
+ `min_len` is set to at least a 3:
+ {
+ 'country': '',
+ 'area': '',
+ 'line': '411',
+ 'full': '411',
+ 'pretty': '411',
+ }
+
+ """
+
+ try:
+ if not IS_PHONE_NO.match(phone):
+ # not parseable content as it does not even conform closely to a
+ # phone number)
+ return False
+
+ except TypeError:
+ return False
+
+ # Tidy phone number up first
+ phone = re.sub(r'[^\d]+', '', phone)
+ if len(phone) > 14 or len(phone) < min_len:
+ # Invalid phone number
+ return False
+
+ # Full phone number without any markup is as is now
+ full = phone
+
+ # Break apart our phone number
+ line = phone[-7:]
+ phone = phone[:len(phone) - 7] if len(phone) > 7 else ''
+
+ # the area code (if present)
+ area = phone[-3:] if phone else ''
+
+ # The country code is the leftovers
+ country = phone[:len(phone) - 3] if len(phone) > 3 else ''
+
+ # Prepare a nicely (consistently) formatted phone number
+ pretty = ''
+
+ if country:
+ # The leftover is the country code
+ pretty += '+{} '.format(country)
+
+ if area:
+ pretty += '{}-'.format(area)
+
+ if len(line) >= 7:
+ pretty += '{}-{}'.format(line[:3], line[3:])
+
+ else:
+ pretty += line
+
+ return {
+ # The line code (last 7 digits)
+ 'line': line,
+ # Area code
+ 'area': area,
+ # The country code (if identified)
+ 'country': country,
+
+ # A nicely formatted phone no
+ 'pretty': pretty,
+
+ # All digits in-line
+ 'full': full,
+ }
+
+
def is_email(address):
"""Determine if the specified entry is an email address
@@ -236,8 +379,17 @@ def is_email(address):
address (str): The string you want to check.
Returns:
- bool: Returns True if the address specified is an email address
- and False if it isn't.
+ bool: Returns False if the address specified is not an email address
+ and a dictionary of the parsed email if it is as:
+ {
+ 'name': 'Parse Name'
+ 'email': '[email protected]'
+ 'full_email': '[email protected]'
+ 'label': 'label'
+ 'user': 'user',
+ 'domain': 'domain.com'
+ }
+
"""
try:
@@ -318,10 +470,11 @@ def parse_qsd(qs):
'qsd': {},
# Detected Entries that start with + or - are additionally stored in
- # these values (un-touched). The +/- however are stripped from their
+ # these values (un-touched). The :,+,- however are stripped from their
# name before they are stored here.
'qsd+': {},
'qsd-': {},
+ 'qsd:': {},
}
pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')]
@@ -361,6 +514,12 @@ def parse_qsd(qs):
# Store content 'as-is'
result['qsd-'][k.group('key')] = val
+ # Check for tokens that start with a colon symbol (:)
+ k = NOTIFY_CUSTOM_COLON_TOKENS.match(key)
+ if k is not None:
+ # Store content 'as-is'
+ result['qsd:'][k.group('key')] = val
+
return result
@@ -418,11 +577,12 @@ def parse_url(url, default_schema='http', verify_host=True):
# qsd = Query String Dictionary
'qsd': {},
- # Detected Entries that start with + or - are additionally stored in
- # these values (un-touched). The +/- however are stripped from their
- # name before they are stored here.
+ # Detected Entries that start with +, - or : are additionally stored in
+ # these values (un-touched). The +, -, and : however are stripped
+ # from their name before they are stored here.
'qsd+': {},
'qsd-': {},
+ 'qsd:': {},
}
qsdata = ''
@@ -534,10 +694,7 @@ def parse_url(url, default_schema='http', verify_host=True):
def parse_bool(arg, default=False):
"""
- NZBGet uses 'yes' and 'no' as well as other strings such as 'on' or
- 'off' etch to handle boolean operations from it's control interface.
-
- This method can just simplify checks to these variables.
+ Support string based boolean settings.
If the content could not be parsed, then the default is returned.
"""
@@ -572,9 +729,46 @@ def parse_bool(arg, default=False):
return bool(arg)
+def parse_phone_no(*args, **kwargs):
+ """
+ Takes a string containing phone numbers separated by comma's and/or spaces
+ and returns a list.
+ """
+
+ # for Python 2.7 support, store_unparsable is not in the url above
+ # as just parse_emails(*args, store_unparseable=True) since it is
+ # an invalid syntax. This is the workaround to be backards compatible:
+ store_unparseable = kwargs.get('store_unparseable', True)
+
+ result = []
+ for arg in args:
+ if isinstance(arg, six.string_types) and arg:
+ _result = PHONE_NO_DETECTION_RE.findall(arg)
+ if _result:
+ result += _result
+
+ elif not _result and store_unparseable:
+ # we had content passed into us that was lost because it was
+ # so poorly formatted that it didn't even come close to
+ # meeting the regular expression we defined. We intentially
+ # keep it as part of our result set so that parsing done
+ # at a higher level can at least report this to the end user
+ # and hopefully give them some indication as to what they
+ # may have done wrong.
+ result += \
+ [x for x in filter(bool, re.split(STRING_DELIMITERS, arg))]
+
+ elif isinstance(arg, (set, list, tuple)):
+ # Use recursion to handle the list of phone numbers
+ result += parse_phone_no(
+ *arg, store_unparseable=store_unparseable)
+
+ return result
+
+
def parse_emails(*args, **kwargs):
"""
- Takes a string containing URLs separated by comma's and/or spaces and
+ Takes a string containing emails separated by comma's and/or spaces and
returns a list.
"""
@@ -821,6 +1015,174 @@ def validate_regex(value, regex=r'[^\s]+', flags=re.I, strip=True, fmt=None):
return value.strip() if strip else value
+def cwe312_word(word, force=False, advanced=True, threshold=5):
+ """
+ This function was written to help mask secure/private information that may
+ or may not be found within Apprise. The idea is to provide a presentable
+ word response that the user who prepared it would understand, yet not
+ reveal any private information for any potential intruder
+
+ For more detail see CWE-312 @
+ https://cwe.mitre.org/data/definitions/312.html
+
+ The `force` is an optional argument used to keep the string formatting
+ consistent and in one place. If set, the content passed in is presumed
+ to be containing secret information and will be updated accordingly.
+
+ If advanced is set to `True` then content is additionally checked for
+ upper/lower/ascii/numerical variances. If an obscurity threshold is
+ reached, then content is considered secret
+ """
+
+ class Variance(object):
+ """
+ A Simple List of Possible Character Variances
+ """
+ # An Upper Case Character (ABCDEF... etc)
+ ALPHA_UPPER = '+'
+ # An Lower Case Character (abcdef... etc)
+ ALPHA_LOWER = '-'
+ # A Special Character ($%^;... etc)
+ SPECIAL = 's'
+ # A Numerical Character (1234... etc)
+ NUMERIC = 'n'
+
+ if not (isinstance(word, six.string_types) and word.strip()):
+ # not a password if it's not something we even support
+ return word
+
+ # Formatting
+ word = word.strip()
+ if force:
+ # We're forcing the representation to be a secret
+ # We do this for consistency
+ return '{}...{}'.format(word[0:1], word[-1:])
+
+ elif len(word) > 1 and \
+ not is_hostname(word, ipv4=True, ipv6=True, underscore=False):
+ # Verify if it is a hostname or not
+ return '{}...{}'.format(word[0:1], word[-1:])
+
+ elif len(word) >= 16:
+ # an IP will be 15 characters so we don't want to use a smaller
+ # value then 16 (e.g 101.102.103.104)
+ # we can assume very long words are passwords otherwise
+ return '{}...{}'.format(word[0:1], word[-1:])
+
+ if advanced:
+ #
+ # Mark word a secret based on it's obscurity
+ #
+
+ # Our variances will increase depending on these variables:
+ last_variance = None
+ obscurity = 0
+
+ for c in word:
+ # Detect our variance
+ if c.isdigit():
+ variance = Variance.NUMERIC
+ elif c.isalpha() and c.isupper():
+ variance = Variance.ALPHA_UPPER
+ elif c.isalpha() and c.islower():
+ variance = Variance.ALPHA_LOWER
+ else:
+ variance = Variance.SPECIAL
+
+ if last_variance != variance or variance == Variance.SPECIAL:
+ obscurity += 1
+
+ if obscurity >= threshold:
+ return '{}...{}'.format(word[0:1], word[-1:])
+
+ last_variance = variance
+
+ # Otherwise we're good; return our word
+ return word
+
+
+def cwe312_url(url):
+ """
+ This function was written to help mask secure/private information that may
+ or may not be found on an Apprise URL. The idea is to not disrupt the
+ structure of the previous URL too much, yet still protect the users
+ private information from being logged directly to screen.
+
+ For more detail see CWE-312 @
+ https://cwe.mitre.org/data/definitions/312.html
+
+ For example, consider the URL: http://user:password@localhost/
+
+ When passed into this function, the return value would be:
+ http://user:****@localhost/
+
+ Since apprise allows you to put private information everywhere in it's
+ custom URLs, it uses this function to manipulate the content before
+ returning to any kind of logger.
+
+ The idea is that the URL can still be interpreted by the person who
+ constructed them, but not to an intruder.
+ """
+ # Parse our URL
+ results = parse_url(url)
+ if not results:
+ # Nothing was returned (invalid data was fed in); return our
+ # information as it was fed to us (without changing it)
+ return url
+
+ # Update our URL with values
+ results['password'] = cwe312_word(results['password'], force=True)
+ if not results['schema'].startswith('http'):
+ results['user'] = cwe312_word(results['user'])
+ results['host'] = cwe312_word(results['host'])
+
+ else:
+ results['host'] = cwe312_word(results['host'], advanced=False)
+ results['user'] = cwe312_word(results['user'], advanced=False)
+
+ # Apply our full path scan in all cases
+ results['fullpath'] = '/' + \
+ '/'.join([cwe312_word(x)
+ for x in re.split(
+ r'[\\/]+',
+ results['fullpath'].lstrip('/'))]) \
+ if results['fullpath'] else ''
+
+ #
+ # Now re-assemble our URL for display purposes
+ #
+
+ # Determine Authentication
+ auth = ''
+ if results['user'] and results['password']:
+ auth = '{user}:{password}@'.format(
+ user=results['user'],
+ password=results['password'],
+ )
+ elif results['user']:
+ auth = '{user}@'.format(
+ user=results['user'],
+ )
+
+ params = ''
+ if results['qsd']:
+ params = '?{}'.format(
+ "&".join(["{}={}".format(k, cwe312_word(v, force=(
+ k in ('password', 'secret', 'pass', 'token', 'key',
+ 'id', 'apikey', 'to'))))
+ for k, v in results['qsd'].items()]))
+
+ return '{schema}://{auth}{hostname}{port}{fullpath}{params}'.format(
+ schema=results['schema'],
+ auth=auth,
+ # never encode hostname since we're expecting it to be a valid one
+ hostname=results['host'],
+ port='' if not results['port'] else ':{}'.format(results['port']),
+ fullpath=results['fullpath'] if results['fullpath'] else '',
+ params=params,
+ )
+
+
@contextlib.contextmanager
def environ(*remove, **update):
"""
@@ -845,3 +1207,45 @@ def environ(*remove, **update):
finally:
# Restore our snapshot
os.environ = env_orig.copy()
+
+
+def apply_template(template, app_mode=TemplateType.RAW, **kwargs):
+ """
+ Takes a template in a str format and applies all of the keywords
+ and their values to it.
+
+ The app$mode is used to dictact any pre-processing that needs to take place
+ to the escaped string prior to it being placed. The idea here is for
+ elements to be placed in a JSON response for example should be escaped
+ early in their string format.
+
+ The template must contain keywords wrapped in in double
+ squirly braces like {{keyword}}. These are matched to the respected
+ kwargs passed into this function.
+
+ If there is no match found, content is not swapped.
+
+ """
+
+ def _escape_raw(content):
+ # No escaping necessary
+ return content
+
+ def _escape_json(content):
+ # remove surounding quotes
+ return json.dumps(content)[1:-1]
+
+ # Our escape function
+ fn = _escape_json if app_mode == TemplateType.JSON else _escape_raw
+
+ lookup = [re.escape(x) for x in kwargs.keys()]
+
+ # Compile this into a list
+ mask_r = re.compile(
+ re.escape('{{') + r'\s*(' + '|'.join(lookup) + r')\s*'
+ + re.escape('}}'), re.IGNORECASE)
+
+ # we index 2 characters off the head and 2 characters from the tail
+ # to drop the '{{' and '}}' surrounding our match so that we can
+ # re-index it back into our list
+ return mask_r.sub(lambda x: fn(kwargs[x.group()[2:-2].strip()]), template)