summaryrefslogtreecommitdiffhomepage
path: root/libs/apprise/plugins/NotifyMQTT.py
diff options
context:
space:
mode:
authormorpheus65535 <[email protected]>2021-12-01 21:19:18 -0500
committermorpheus65535 <[email protected]>2021-12-01 21:19:18 -0500
commitd51dc68ebb3910ca09bb40c33814d43b93d916b8 (patch)
tree569807abb70b41eb98dded56c17bd504e793da5b /libs/apprise/plugins/NotifyMQTT.py
parent402c82d84f7bd51353348bea7d1a876ad9ecc5b1 (diff)
downloadbazarr-d51dc68ebb3910ca09bb40c33814d43b93d916b8.tar.gz
bazarr-d51dc68ebb3910ca09bb40c33814d43b93d916b8.zip
Updated Apprise notification module to the latest providers.v1.0.2-beta.2
Diffstat (limited to 'libs/apprise/plugins/NotifyMQTT.py')
-rw-r--r--libs/apprise/plugins/NotifyMQTT.py536
1 files changed, 536 insertions, 0 deletions
diff --git a/libs/apprise/plugins/NotifyMQTT.py b/libs/apprise/plugins/NotifyMQTT.py
new file mode 100644
index 000000000..377107e68
--- /dev/null
+++ b/libs/apprise/plugins/NotifyMQTT.py
@@ -0,0 +1,536 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2021 Chris Caron <[email protected]>
+# All rights reserved.
+#
+# This code is licensed under the MIT License.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files(the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions :
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+# PAHO MQTT Documentation:
+# https://www.eclipse.org/paho/index.php?page=clients/python/docs/index.php
+#
+# Looking at the PAHO MQTT Source can help shed light on what's going on too
+# as their inline documentation is pretty good!
+# https://github.com/eclipse/paho.mqtt.python\
+# /blob/master/src/paho/mqtt/client.py
+import ssl
+import re
+import six
+from time import sleep
+from datetime import datetime
+from os.path import isfile
+from .NotifyBase import NotifyBase
+from ..URLBase import PrivacyMode
+from ..common import NotifyType
+from ..utils import parse_list
+from ..utils import parse_bool
+from ..AppriseLocale import gettext_lazy as _
+
+# Default our global support flag
+NOTIFY_MQTT_SUPPORT_ENABLED = False
+
+if six.PY2:
+ # handle Python v2.7 suport
+ class ConnectionError(Exception):
+ pass
+
+try:
+ # 3rd party modules
+ import paho.mqtt.client as mqtt
+
+ # We're good to go!
+ NOTIFY_MQTT_SUPPORT_ENABLED = True
+
+ MQTT_PROTOCOL_MAP = {
+ # v3.1.1
+ "311": mqtt.MQTTv311,
+ # v3.1
+ "31": mqtt.MQTTv31,
+ # v5.0
+ "5": mqtt.MQTTv5,
+ # v5.0 (alias)
+ "50": mqtt.MQTTv5,
+ }
+
+except ImportError:
+ # No problem; we just simply can't support this plugin because we're
+ # either using Linux, or simply do not have pywin32 installed.
+ MQTT_PROTOCOL_MAP = {}
+
+# A lookup map for relaying version to user
+HUMAN_MQTT_PROTOCOL_MAP = {
+ "v3.1.1": "311",
+ "v3.1": "31",
+ "v5.0": "5",
+}
+
+
+class NotifyMQTT(NotifyBase):
+ """
+ A wrapper for MQTT Notifications
+ """
+
+ # Set our global enabled flag
+ enabled = NOTIFY_MQTT_SUPPORT_ENABLED
+
+ requirements = {
+ # Define our required packaging in order to work
+ 'packages_required': 'paho-mqtt'
+ }
+
+ # The default descriptive name associated with the Notification
+ service_name = 'MQTT Notification'
+
+ # The default protocol
+ protocol = 'mqtt'
+
+ # Secure protocol
+ secure_protocol = 'mqtts'
+
+ # A URL that takes you to the setup/help of the specific protocol
+ setup_url = 'https://github.com/caronc/apprise/wiki/Notify_mqtt'
+
+ # MQTT does not have a title
+ title_maxlen = 0
+
+ # The maximum length a body can be set to
+ body_maxlen = 268435455
+
+ # Use a throttle; but it doesn't need to be so strict since most
+ # MQTT server hostings can handle the small bursts of packets and are
+ # locally hosted anyway
+ request_rate_per_sec = 0.5
+
+ # Port Defaults (unless otherwise specified)
+ mqtt_insecure_port = 1883
+
+ # The default secure port to use (if mqtts://)
+ mqtt_secure_port = 8883
+
+ # The default mqtt keepalive value
+ mqtt_keepalive = 30
+
+ # The default mqtt transport
+ mqtt_transport = "tcp"
+
+ # The number of seconds to wait for a publish to occur at before
+ # checking to see if it's been sent yet.
+ mqtt_block_time_sec = 0.2
+
+ # Set the maximum number of messages with QoS>0 that can be part way
+ # through their network flow at once.
+ mqtt_inflight_messages = 200
+
+ # Taken from https://golang.org/src/crypto/x509/root_linux.go
+ CA_CERTIFICATE_FILE_LOCATIONS = [
+ # Debian/Ubuntu/Gentoo etc.
+ "/etc/ssl/certs/ca-certificates.crt",
+ # Fedora/RHEL 6
+ "/etc/pki/tls/certs/ca-bundle.crt",
+ # OpenSUSE
+ "/etc/ssl/ca-bundle.pem",
+ # OpenELEC
+ "/etc/pki/tls/cacert.pem",
+ # CentOS/RHEL 7
+ "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem",
+ ]
+
+ # Define object templates
+ templates = (
+ '{schema}://{user}@{host}/{topic}',
+ '{schema}://{user}@{host}:{port}/{topic}',
+ '{schema}://{user}:{password}@{host}/{topic}',
+ '{schema}://{user}:{password}@{host}:{port}/{topic}',
+ )
+
+ template_tokens = dict(NotifyBase.template_tokens, **{
+ 'host': {
+ 'name': _('Hostname'),
+ 'type': 'string',
+ 'required': True,
+ },
+ 'port': {
+ 'name': _('Port'),
+ 'type': 'int',
+ 'min': 1,
+ 'max': 65535,
+ },
+ 'user': {
+ 'name': _('User Name'),
+ 'type': 'string',
+ 'required': True,
+ },
+ 'password': {
+ 'name': _('Password'),
+ 'type': 'string',
+ 'private': True,
+ 'required': True,
+ },
+ 'topic': {
+ 'name': _('Target Queue'),
+ 'type': 'string',
+ 'map_to': 'targets',
+ },
+ 'targets': {
+ 'name': _('Targets'),
+ 'type': 'list:string',
+ },
+ })
+
+ # Define our template arguments
+ template_args = dict(NotifyBase.template_args, **{
+ 'to': {
+ 'alias_of': 'targets',
+ },
+ 'qos': {
+ 'name': _('QOS'),
+ 'type': 'int',
+ 'default': 0,
+ 'min': 0,
+ 'max': 2,
+ },
+ 'version': {
+ 'name': _('Version'),
+ 'type': 'choice:string',
+ 'values': HUMAN_MQTT_PROTOCOL_MAP,
+ 'default': "v3.1.1",
+ },
+ 'client_id': {
+ 'name': _('Client ID'),
+ 'type': 'string',
+ },
+ 'session': {
+ 'name': _('Use Session'),
+ 'type': 'bool',
+ 'default': False,
+ },
+ })
+
+ def __init__(self, targets=None, version=None, qos=None,
+ client_id=None, session=None, **kwargs):
+ """
+ Initialize MQTT Object
+ """
+
+ super(NotifyMQTT, self).__init__(**kwargs)
+
+ # Initialize topics
+ self.topics = parse_list(targets)
+
+ if version is None:
+ self.version = self.template_args['version']['default']
+ else:
+ self.version = version
+
+ # Save our client id if specified
+ self.client_id = client_id
+
+ # Maintain our session (associated with our user id if set)
+ self.session = self.template_args['session']['default'] \
+ if session is None or not self.client_id \
+ else parse_bool(session)
+
+ # Set up our Quality of Service (QoS)
+ try:
+ self.qos = self.template_args['qos']['default'] \
+ if qos is None else int(qos)
+
+ if self.qos < self.template_args['qos']['min'] \
+ or self.qos > self.template_args['qos']['max']:
+ # Let error get handle on exceptio higher up
+ raise ValueError("")
+
+ except (ValueError, TypeError):
+ msg = 'An invalid MQTT QOS ({}) was specified.'.format(qos)
+ self.logger.warning(msg)
+ raise TypeError(msg)
+
+ if not self.port:
+ # Assign port (if not otherwise set)
+ self.port = self.mqtt_secure_port \
+ if self.secure else self.mqtt_insecure_port
+
+ self.ca_certs = None
+ if self.secure:
+ # verify SSL key or abort
+ self.ca_certs = next(
+ (cert for cert in self.CA_CERTIFICATE_FILE_LOCATIONS
+ if isfile(cert)), None)
+
+ # Set up our MQTT Publisher
+ try:
+ # Get our protocol
+ self.mqtt_protocol = \
+ MQTT_PROTOCOL_MAP[re.sub(r'[^0-9]+', '', self.version)]
+
+ except (KeyError):
+ msg = 'An invalid MQTT Protocol version ' \
+ '({}) was specified.'.format(version)
+ self.logger.warning(msg)
+ raise TypeError(msg)
+
+ # Our MQTT Client Object
+ self.client = mqtt.Client(
+ client_id=self.client_id,
+ clean_session=not self.session, userdata=None,
+ protocol=self.mqtt_protocol, transport=self.mqtt_transport,
+ )
+
+ # Our maximum number of in-flight messages
+ self.client.max_inflight_messages_set(self.mqtt_inflight_messages)
+
+ # Toggled to False once our connection has been established at least
+ # once
+ self.__initial_connect = True
+
+ def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
+ """
+ Perform MQTT Notification
+ """
+
+ if len(self.topics) == 0:
+ # There were no services to notify
+ self.logger.warning('There were no MQTT topics to notify.')
+ return False
+
+ # For logging:
+ url = '{host}:{port}'.format(host=self.host, port=self.port)
+
+ try:
+ if self.__initial_connect:
+ # Our initial connection
+ if self.user:
+ self.client.username_pw_set(
+ self.user, password=self.password)
+
+ if self.secure:
+ if self.ca_certs is None:
+ self.logger.warning(
+ 'MQTT Secure comunication can not be verified; '
+ 'no local CA certificate file')
+ return False
+
+ self.client.tls_set(
+ ca_certs=self.ca_certs, certfile=None, keyfile=None,
+ cert_reqs=ssl.CERT_REQUIRED,
+ tls_version=ssl.PROTOCOL_TLS,
+ ciphers=None)
+
+ # Set our TLS Verify Flag
+ self.client.tls_insecure_set(self.verify_certificate)
+
+ # Establish our connection
+ if self.client.connect(
+ self.host, port=self.port,
+ keepalive=self.mqtt_keepalive) \
+ != mqtt.MQTT_ERR_SUCCESS:
+ self.logger.warning(
+ 'An MQTT connection could not be established for {}'.
+ format(url))
+ return False
+
+ # Start our client loop
+ self.client.loop_start()
+
+ # Throttle our start otherwise the starting handshaking doesnt
+ # work. I'm not sure if this is a bug or not, but with qos=0,
+ # and without this sleep(), the messages randomly fails to be
+ # delivered.
+ sleep(0.01)
+
+ # Toggle our flag since we never need to enter this area again
+ self.__initial_connect = False
+
+ # Create a copy of the subreddits list
+ topics = list(self.topics)
+
+ has_error = False
+ while len(topics) > 0 and not has_error:
+ # Retrieve our subreddit
+ topic = topics.pop()
+
+ # For logging:
+ url = '{host}:{port}/{topic}'.format(
+ host=self.host,
+ port=self.port,
+ topic=topic)
+
+ # Always call throttle before any remote server i/o is made
+ self.throttle()
+
+ # handle a re-connection
+ if not self.client.is_connected() and \
+ self.client.reconnect() != mqtt.MQTT_ERR_SUCCESS:
+ self.logger.warning(
+ 'An MQTT connection could not be sustained for {}'.
+ format(url))
+ has_error = True
+ break
+
+ # Some Debug Logging
+ self.logger.debug('MQTT POST URL: {} (cert_verify={})'.format(
+ url, self.verify_certificate))
+ self.logger.debug('MQTT Payload: %s' % str(body))
+
+ result = self.client.publish(
+ topic, payload=body, qos=self.qos, retain=False)
+
+ if result.rc != mqtt.MQTT_ERR_SUCCESS:
+ # Toggle our status
+ self.logger.warning(
+ 'An error (rc={}) occured when sending MQTT to {}'.
+ format(result.rc, url))
+ has_error = True
+ break
+
+ elif not result.is_published():
+ self.logger.debug(
+ 'Blocking until MQTT payload is published...')
+ reference = datetime.now()
+ while not has_error and not result.is_published():
+ # Throttle
+ sleep(self.mqtt_block_time_sec)
+
+ # Our own throttle so we can abort eventually....
+ elapsed = (datetime.now() - reference).total_seconds()
+ if elapsed >= self.socket_read_timeout:
+ self.logger.warning(
+ 'The MQTT message could not be delivered')
+ has_error = True
+
+ # if we reach here; we're at the bottom of our loop
+ # we loop around and do the next topic now
+
+ except ConnectionError as e:
+ self.logger.warning(
+ 'MQTT Connection Error received from {}'.format(url))
+ self.logger.debug('Socket Exception: %s' % str(e))
+ return False
+
+ except ssl.CertificateError as e:
+ self.logger.warning(
+ 'MQTT SSL Certificate Error received from {}'.format(url))
+ self.logger.debug('Socket Exception: %s' % str(e))
+ return False
+
+ except ValueError as e:
+ # ValueError's are thrown from publish() call if there is a problem
+ self.logger.warning(
+ 'MQTT Publishing error received: from {}'.format(url))
+ self.logger.debug('Socket Exception: %s' % str(e))
+ return False
+
+ return not has_error
+
+ def url(self, privacy=False, *args, **kwargs):
+ """
+ Returns the URL built dynamically based on specified arguments.
+ """
+
+ # Define any URL parameters
+ params = {
+ 'version': self.version,
+ 'qos': str(self.qos),
+ 'session': 'yes' if self.session else 'no',
+ }
+
+ if self.client_id:
+ # Our client id is set if specified
+ params['client_id'] = self.client_id
+
+ # Extend our parameters
+ params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
+
+ # Determine Authentication
+ auth = ''
+ if self.user and self.password:
+ auth = '{user}:{password}@'.format(
+ user=NotifyMQTT.quote(self.user, safe=''),
+ password=self.pprint(
+ self.password, privacy, mode=PrivacyMode.Secret, safe=''),
+ )
+ elif self.user:
+ auth = '{user}@'.format(
+ user=NotifyMQTT.quote(self.user, safe=''),
+ )
+
+ default_port = self.mqtt_secure_port \
+ if self.secure else self.mqtt_insecure_port
+
+ return '{schema}://{auth}{hostname}{port}/{targets}?{params}'.format(
+ schema=self.secure_protocol if self.secure else self.protocol,
+ auth=auth,
+ # never encode hostname since we're expecting it to be a valid one
+ hostname=self.host,
+ port='' if self.port is None or self.port == default_port
+ else ':{}'.format(self.port),
+ targets=','.join(
+ [NotifyMQTT.quote(x, safe='/') for x in self.topics]),
+ params=NotifyMQTT.urlencode(params),
+ )
+
+ @staticmethod
+ def parse_url(url):
+ """
+ There are no parameters nessisary for this protocol; simply having
+ windows:// is all you need. This function just makes sure that
+ is in place.
+
+ """
+
+ results = NotifyBase.parse_url(url)
+ if not results:
+ # We're done early as we couldn't load the results
+ return results
+
+ try:
+ # Acquire topic(s)
+ results['targets'] = parse_list(
+ NotifyMQTT.unquote(results['fullpath'].lstrip('/')))
+
+ except AttributeError:
+ # No 'fullpath' specified
+ results['targets'] = []
+
+ # The MQTT protocol version to use
+ if 'version' in results['qsd'] and len(results['qsd']['version']):
+ results['version'] = \
+ NotifyMQTT.unquote(results['qsd']['version'])
+
+ # The MQTT Client ID
+ if 'client_id' in results['qsd'] and len(results['qsd']['client_id']):
+ results['client_id'] = \
+ NotifyMQTT.unquote(results['qsd']['client_id'])
+
+ if 'session' in results['qsd'] and len(results['qsd']['session']):
+ results['session'] = parse_bool(results['qsd']['session'])
+
+ # The MQTT Quality of Service to use
+ if 'qos' in results['qsd'] and len(results['qsd']['qos']):
+ results['qos'] = \
+ NotifyMQTT.unquote(results['qsd']['qos'])
+
+ # The 'to' makes it easier to use yaml configuration
+ if 'to' in results['qsd'] and len(results['qsd']['to']):
+ results['targets'].extend(
+ NotifyMQTT.parse_list(results['qsd']['to']))
+
+ # return results
+ return results