summaryrefslogtreecommitdiffhomepage
path: root/libs/apprise/config/ConfigBase.py
diff options
context:
space:
mode:
Diffstat (limited to 'libs/apprise/config/ConfigBase.py')
-rw-r--r--libs/apprise/config/ConfigBase.py284
1 files changed, 248 insertions, 36 deletions
diff --git a/libs/apprise/config/ConfigBase.py b/libs/apprise/config/ConfigBase.py
index 22efd8e29..f2b958ed8 100644
--- a/libs/apprise/config/ConfigBase.py
+++ b/libs/apprise/config/ConfigBase.py
@@ -34,13 +34,18 @@ from ..AppriseAsset import AppriseAsset
from ..URLBase import URLBase
from ..common import ConfigFormat
from ..common import CONFIG_FORMATS
-from ..common import ConfigIncludeMode
+from ..common import ContentIncludeMode
from ..utils import GET_SCHEMA_RE
from ..utils import parse_list
from ..utils import parse_bool
from ..utils import parse_urls
+from ..utils import cwe312_url
from . import SCHEMA_MAP
+# Test whether token is valid or not
+VALID_TOKEN = re.compile(
+ r'(?P<token>[a-z0-9][a-z0-9_]+)', re.I)
+
class ConfigBase(URLBase):
"""
@@ -65,7 +70,7 @@ class ConfigBase(URLBase):
# By default all configuration is not includable using the 'include'
# line found in configuration files.
- allow_cross_includes = ConfigIncludeMode.NEVER
+ allow_cross_includes = ContentIncludeMode.NEVER
# the config path manages the handling of relative include
config_path = os.getcwd()
@@ -205,8 +210,8 @@ class ConfigBase(URLBase):
# Configuration files were detected; recursively populate them
# If we have been configured to do so
for url in configs:
- if self.recursion > 0:
+ if self.recursion > 0:
# Attempt to acquire the schema at the very least to allow
# our configuration based urls.
schema = GET_SCHEMA_RE.match(url)
@@ -219,6 +224,7 @@ class ConfigBase(URLBase):
url = os.path.join(self.config_path, url)
url = '{}://{}'.format(schema, URLBase.quote(url))
+
else:
# Ensure our schema is always in lower case
schema = schema.group('schema').lower()
@@ -229,27 +235,31 @@ class ConfigBase(URLBase):
'Unsupported include schema {}.'.format(schema))
continue
+ # CWE-312 (Secure Logging) Handling
+ loggable_url = url if not asset.secure_logging \
+ else cwe312_url(url)
+
# Parse our url details of the server object as dictionary
# containing all of the information parsed from our URL
results = SCHEMA_MAP[schema].parse_url(url)
if not results:
# Failed to parse the server URL
self.logger.warning(
- 'Unparseable include URL {}'.format(url))
+ 'Unparseable include URL {}'.format(loggable_url))
continue
# Handle cross inclusion based on allow_cross_includes rules
if (SCHEMA_MAP[schema].allow_cross_includes ==
- ConfigIncludeMode.STRICT
+ ContentIncludeMode.STRICT
and schema not in self.schemas()
and not self.insecure_includes) or \
SCHEMA_MAP[schema].allow_cross_includes == \
- ConfigIncludeMode.NEVER:
+ ContentIncludeMode.NEVER:
# Prevent the loading if insecure base protocols
ConfigBase.logger.warning(
'Including {}:// based configuration is prohibited. '
- 'Ignoring URL {}'.format(schema, url))
+ 'Ignoring URL {}'.format(schema, loggable_url))
continue
# Prepare our Asset Object
@@ -275,7 +285,7 @@ class ConfigBase(URLBase):
except Exception as e:
# the arguments are invalid or can not be used.
self.logger.warning(
- 'Could not load include URL: {}'.format(url))
+ 'Could not load include URL: {}'.format(loggable_url))
self.logger.debug('Loading Exception: {}'.format(str(e)))
continue
@@ -288,16 +298,23 @@ class ConfigBase(URLBase):
del cfg_plugin
else:
+ # CWE-312 (Secure Logging) Handling
+ loggable_url = url if not asset.secure_logging \
+ else cwe312_url(url)
+
self.logger.debug(
- 'Recursion limit reached; ignoring Include URL: %s' % url)
+ 'Recursion limit reached; ignoring Include URL: %s',
+ loggable_url)
if self._cached_servers:
- self.logger.info('Loaded {} entries from {}'.format(
- len(self._cached_servers), self.url()))
+ self.logger.info(
+ 'Loaded {} entries from {}'.format(
+ len(self._cached_servers),
+ self.url(privacy=asset.secure_logging)))
else:
self.logger.warning(
'Failed to load Apprise configuration from {}'.format(
- self.url()))
+ self.url(privacy=asset.secure_logging)))
# Set the time our content was cached at
self._cached_time = time.time()
@@ -527,6 +544,9 @@ class ConfigBase(URLBase):
# the include keyword
configs = list()
+ # Prepare our Asset Object
+ asset = asset if isinstance(asset, AppriseAsset) else AppriseAsset()
+
# Define what a valid line should look like
valid_line_re = re.compile(
r'^\s*(?P<line>([;#]+(?P<comment>.*))|'
@@ -563,27 +583,37 @@ class ConfigBase(URLBase):
continue
if config:
- ConfigBase.logger.debug('Include URL: {}'.format(config))
+ # CWE-312 (Secure Logging) Handling
+ loggable_url = config if not asset.secure_logging \
+ else cwe312_url(config)
+
+ ConfigBase.logger.debug(
+ 'Include URL: {}'.format(loggable_url))
# Store our include line
configs.append(config.strip())
continue
+ # CWE-312 (Secure Logging) Handling
+ loggable_url = url if not asset.secure_logging \
+ else cwe312_url(url)
+
# Acquire our url tokens
- results = plugins.url_to_dict(url)
+ results = plugins.url_to_dict(
+ url, secure_logging=asset.secure_logging)
if results is None:
# Failed to parse the server URL
ConfigBase.logger.warning(
- 'Unparseable URL {} on line {}.'.format(url, line))
+ 'Unparseable URL {} on line {}.'.format(
+ loggable_url, line))
continue
# Build a list of tags to associate with the newly added
# notifications if any were set
results['tag'] = set(parse_list(result.group('tags')))
- # Prepare our Asset Object
- results['asset'] = \
- asset if isinstance(asset, AppriseAsset) else AppriseAsset()
+ # Set our Asset Object
+ results['asset'] = asset
try:
# Attempt to create an instance of our plugin using the
@@ -591,13 +621,14 @@ class ConfigBase(URLBase):
plugin = plugins.SCHEMA_MAP[results['schema']](**results)
# Create log entry of loaded URL
- ConfigBase.logger.debug('Loaded URL: {}'.format(plugin.url()))
+ ConfigBase.logger.debug(
+ 'Loaded URL: %s', plugin.url(privacy=asset.secure_logging))
except Exception as e:
# the arguments are invalid or can not be used.
ConfigBase.logger.warning(
'Could not load URL {} on line {}.'.format(
- url, line))
+ loggable_url, line))
ConfigBase.logger.debug('Loading Exception: %s' % str(e))
continue
@@ -633,7 +664,9 @@ class ConfigBase(URLBase):
# Load our data (safely)
result = yaml.load(content, Loader=yaml.SafeLoader)
- except (AttributeError, yaml.error.MarkedYAMLError) as e:
+ except (AttributeError,
+ yaml.parser.ParserError,
+ yaml.error.MarkedYAMLError) as e:
# Invalid content
ConfigBase.logger.error(
'Invalid Apprise YAML data specified.')
@@ -671,7 +704,9 @@ class ConfigBase(URLBase):
continue
if not (hasattr(asset, k) and
- isinstance(getattr(asset, k), six.string_types)):
+ isinstance(getattr(asset, k),
+ (bool, six.string_types))):
+
# We can't set a function or non-string set value
ConfigBase.logger.warning(
'Invalid asset key "{}".'.format(k))
@@ -681,15 +716,23 @@ class ConfigBase(URLBase):
# Convert to an empty string
v = ''
- if not isinstance(v, six.string_types):
+ if (isinstance(v, (bool, six.string_types))
+ and isinstance(getattr(asset, k), bool)):
+
+ # If the object in the Asset is a boolean, then
+ # we want to convert the specified string to
+ # match that.
+ setattr(asset, k, parse_bool(v))
+
+ elif isinstance(v, six.string_types):
+ # Set our asset object with the new value
+ setattr(asset, k, v.strip())
+
+ else:
# we must set strings with a string
ConfigBase.logger.warning(
'Invalid asset value to "{}".'.format(k))
continue
-
- # Set our asset object with the new value
- setattr(asset, k, v.strip())
-
#
# global tag root directive
#
@@ -740,6 +783,10 @@ class ConfigBase(URLBase):
# we can. Reset it to None on each iteration
results = list()
+ # CWE-312 (Secure Logging) Handling
+ loggable_url = url if not asset.secure_logging \
+ else cwe312_url(url)
+
if isinstance(url, six.string_types):
# We're just a simple URL string...
schema = GET_SCHEMA_RE.match(url)
@@ -748,16 +795,18 @@ class ConfigBase(URLBase):
# config file at least has something to take action
# with.
ConfigBase.logger.warning(
- 'Invalid URL {}, entry #{}'.format(url, no + 1))
+ 'Invalid URL {}, entry #{}'.format(
+ loggable_url, no + 1))
continue
# We found a valid schema worthy of tracking; store it's
# details:
- _results = plugins.url_to_dict(url)
+ _results = plugins.url_to_dict(
+ url, secure_logging=asset.secure_logging)
if _results is None:
ConfigBase.logger.warning(
'Unparseable URL {}, entry #{}'.format(
- url, no + 1))
+ loggable_url, no + 1))
continue
# add our results to our global set
@@ -791,19 +840,20 @@ class ConfigBase(URLBase):
.format(key, no + 1))
continue
- # Store our URL and Schema Regex
- _url = key
-
# Store our schema
schema = _schema.group('schema').lower()
+ # Store our URL and Schema Regex
+ _url = key
+
if _url is None:
# the loop above failed to match anything
ConfigBase.logger.warning(
- 'Unsupported schema in urls, entry #{}'.format(no + 1))
+ 'Unsupported URL, entry #{}'.format(no + 1))
continue
- _results = plugins.url_to_dict(_url)
+ _results = plugins.url_to_dict(
+ _url, secure_logging=asset.secure_logging)
if _results is None:
# Setup dictionary
_results = {
@@ -830,12 +880,33 @@ class ConfigBase(URLBase):
if 'schema' in entries:
del entries['schema']
+ # support our special tokens (if they're present)
+ if schema in plugins.SCHEMA_MAP:
+ entries = ConfigBase._special_token_handler(
+ schema, entries)
+
# Extend our dictionary with our new entries
r.update(entries)
# add our results to our global set
results.append(r)
+ elif isinstance(tokens, dict):
+ # support our special tokens (if they're present)
+ if schema in plugins.SCHEMA_MAP:
+ tokens = ConfigBase._special_token_handler(
+ schema, tokens)
+
+ # Copy ourselves a template of our parsed URL as a base to
+ # work with
+ r = _results.copy()
+
+ # add our result set
+ r.update(tokens)
+
+ # add our results to our global set
+ results.append(r)
+
else:
# add our results to our global set
results.append(_results)
@@ -867,6 +938,17 @@ class ConfigBase(URLBase):
# Just use the global settings
_results['tag'] = global_tags
+ for key in list(_results.keys()):
+ # Strip out any tokens we know that we can't accept and
+ # warn the user
+ match = VALID_TOKEN.match(key)
+ if not match:
+ ConfigBase.logger.warning(
+ 'Ignoring invalid token ({}) found in YAML '
+ 'configuration entry #{}, item #{}'
+ .format(key, no + 1, entry))
+ del _results[key]
+
ConfigBase.logger.trace(
'URL #{}: {} unpacked as:{}{}'
.format(no + 1, url, os.linesep, os.linesep.join(
@@ -883,7 +965,8 @@ class ConfigBase(URLBase):
# Create log entry of loaded URL
ConfigBase.logger.debug(
- 'Loaded URL: {}'.format(plugin.url()))
+ 'Loaded URL: {}'.format(
+ plugin.url(privacy=asset.secure_logging)))
except Exception as e:
# the arguments are invalid or can not be used.
@@ -913,6 +996,135 @@ class ConfigBase(URLBase):
# Pop the element off of the stack
return self._cached_servers.pop(index)
+ @staticmethod
+ def _special_token_handler(schema, tokens):
+ """
+ This function takes a list of tokens and updates them to no longer
+ include any special tokens such as +,-, and :
+
+ - schema must be a valid schema of a supported plugin type
+ - tokens must be a dictionary containing the yaml entries parsed.
+
+ The idea here is we can post process a set of tokens provided in
+ a YAML file where the user provided some of the special keywords.
+
+ We effectivley look up what these keywords map to their appropriate
+ value they're expected
+ """
+ # Create a copy of our dictionary
+ tokens = tokens.copy()
+
+ for kw, meta in plugins.SCHEMA_MAP[schema]\
+ .template_kwargs.items():
+
+ # Determine our prefix:
+ prefix = meta.get('prefix', '+')
+
+ # Detect any matches
+ matches = \
+ {k[1:]: str(v) for k, v in tokens.items()
+ if k.startswith(prefix)}
+
+ if not matches:
+ # we're done with this entry
+ continue
+
+ if not isinstance(tokens.get(kw), dict):
+ # Invalid; correct it
+ tokens[kw] = dict()
+
+ # strip out processed tokens
+ tokens = {k: v for k, v in tokens.items()
+ if not k.startswith(prefix)}
+
+ # Update our entries
+ tokens[kw].update(matches)
+
+ # Now map our tokens accordingly to the class templates defined by
+ # each service.
+ #
+ # This is specifically used for YAML file parsing. It allows a user to
+ # define an entry such as:
+ #
+ # urls:
+ # - mailto://user:pass@domain:
+ #
+ # Under the hood, the NotifyEmail() class does not parse the `to`
+ # argument. It's contents needs to be mapped to `targets`. This is
+ # defined in the class via the `template_args` and template_tokens`
+ # section.
+ #
+ # This function here allows these mappings to take place within the
+ # YAML file as independant arguments.
+ class_templates = \
+ plugins.details(plugins.SCHEMA_MAP[schema])
+
+ for key in list(tokens.keys()):
+
+ if key not in class_templates['args']:
+ # No need to handle non-arg entries
+ continue
+
+ # get our `map_to` and/or 'alias_of' value (if it exists)
+ map_to = class_templates['args'][key].get(
+ 'alias_of', class_templates['args'][key].get('map_to', ''))
+
+ if map_to == key:
+ # We're already good as we are now
+ continue
+
+ if map_to in class_templates['tokens']:
+ meta = class_templates['tokens'][map_to]
+
+ else:
+ meta = class_templates['args'].get(
+ map_to, class_templates['args'][key])
+
+ # Perform a translation/mapping if our code reaches here
+ value = tokens[key]
+ del tokens[key]
+
+ # Detect if we're dealign with a list or not
+ is_list = re.search(
+ r'^(list|choice):.*',
+ meta.get('type'),
+ re.IGNORECASE)
+
+ if map_to not in tokens:
+ tokens[map_to] = [] if is_list \
+ else meta.get('default')
+
+ elif is_list and not isinstance(tokens.get(map_to), list):
+ # Convert ourselves to a list if we aren't already
+ tokens[map_to] = [tokens[map_to]]
+
+ # Type Conversion
+ if re.search(
+ r'^(choice:)?string',
+ meta.get('type'),
+ re.IGNORECASE) \
+ and not isinstance(value, six.string_types):
+
+ # Ensure our format is as expected
+ value = str(value)
+
+ # Apply any further translations if required (absolute map)
+ # This is the case when an arg maps to a token which further
+ # maps to a different function arg on the class constructor
+ abs_map = meta.get('map_to', map_to)
+
+ # Set our token as how it was provided by the configuration
+ if isinstance(tokens.get(map_to), list):
+ tokens[abs_map].append(value)
+
+ else:
+ tokens[abs_map] = value
+
+ # Return our tokens
+ return tokens
+
def __getitem__(self, index):
"""
Returns the indexed server entry associated with the loaded