summaryrefslogtreecommitdiffhomepage
path: root/libs/apprise/plugins/NotifyFCM/oauth.py
diff options
context:
space:
mode:
Diffstat (limited to 'libs/apprise/plugins/NotifyFCM/oauth.py')
-rw-r--r--libs/apprise/plugins/NotifyFCM/oauth.py319
1 files changed, 319 insertions, 0 deletions
diff --git a/libs/apprise/plugins/NotifyFCM/oauth.py b/libs/apprise/plugins/NotifyFCM/oauth.py
new file mode 100644
index 000000000..fbde3ccf7
--- /dev/null
+++ b/libs/apprise/plugins/NotifyFCM/oauth.py
@@ -0,0 +1,319 @@
+# -*- coding: utf-8 -*-
+# BSD 2-Clause License
+#
+# Apprise - Push Notification Library.
+# Copyright (c) 2024, Chris Caron <[email protected]>
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice,
+# this list of conditions and the following disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# To generate a private key file for your service account:
+#
+# 1. In the Firebase console, open Settings > Service Accounts.
+# 2. Click Generate New Private Key, then confirm by clicking Generate Key.
+# 3. Securely store the JSON file containing the key.
+
+import requests
+import base64
+import json
+import calendar
+from cryptography.hazmat import backends
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives import asymmetric
+from cryptography.exceptions import UnsupportedAlgorithm
+from datetime import datetime
+from datetime import timedelta
+from datetime import timezone
+from json.decoder import JSONDecodeError
+from urllib.parse import urlencode as _urlencode
+
+from ...logger import logger
+
+
+class GoogleOAuth:
+ """
+ A OAuth simplified implimentation to Google's Firebase Cloud Messaging
+
+ """
+ scopes = [
+ 'https://www.googleapis.com/auth/firebase.messaging',
+ ]
+
+ # 1 hour in seconds (the lifetime of our token)
+ access_token_lifetime_sec = timedelta(seconds=3600)
+
+ # The default URI to use if one is not found
+ default_token_uri = 'https://oauth2.googleapis.com/token'
+
+ # Taken right from google.auth.helpers:
+ clock_skew = timedelta(seconds=10)
+
+ def __init__(self, user_agent=None, timeout=(5, 4),
+ verify_certificate=True):
+ """
+ Initialize our OAuth object
+ """
+
+ # Wether or not to verify ssl
+ self.verify_certificate = verify_certificate
+
+ # Our (connect, read) timeout
+ self.request_timeout = timeout
+
+ # assign our user-agent if defined
+ self.user_agent = user_agent
+
+ # initialize our other object variables
+ self.__reset()
+
+ def __reset(self):
+ """
+ Reset object internal variables
+ """
+
+ # Google Keyfile Encoding
+ self.encoding = 'utf-8'
+
+ # Our retrieved JSON content (unmangled)
+ self.content = None
+
+ # Our generated key information we cache once loaded
+ self.private_key = None
+
+ # Our keys we build using the provided content
+ self.__refresh_token = None
+ self.__access_token = None
+ self.__access_token_expiry = datetime.now(timezone.utc)
+
+ def load(self, path):
+ """
+ Generate our SSL details
+ """
+
+ # Reset our objects
+ self.content = None
+ self.private_key = None
+ self.__access_token = None
+ self.__access_token_expiry = datetime.now(timezone.utc)
+
+ try:
+ with open(path, mode="r", encoding=self.encoding) as fp:
+ self.content = json.loads(fp.read())
+
+ except (OSError, IOError):
+ logger.debug('FCM keyfile {} could not be accessed'.format(path))
+ return False
+
+ except JSONDecodeError as e:
+ logger.debug(
+ 'FCM keyfile {} generated a JSONDecodeError: {}'.format(
+ path, e))
+ return False
+
+ if not isinstance(self.content, dict):
+ logger.debug(
+ 'FCM keyfile {} is incorrectly structured'.format(path))
+ self.__reset()
+ return False
+
+ # Verify we've got the correct tokens in our content to work with
+ is_valid = next((False for k in (
+ 'client_email', 'private_key_id', 'private_key',
+ 'type', 'project_id') if not self.content.get(k)), True)
+
+ if not is_valid:
+ logger.debug(
+ 'FCM keyfile {} is missing required information'.format(path))
+ self.__reset()
+ return False
+
+ # Verify our service_account type
+ if self.content.get('type') != 'service_account':
+ logger.debug(
+ 'FCM keyfile {} is not of type service_account'.format(path))
+ self.__reset()
+ return False
+
+ # Prepare our private key which is in PKCS8 PEM format
+ try:
+ self.private_key = serialization.load_pem_private_key(
+ self.content.get('private_key').encode(self.encoding),
+ password=None, backend=backends.default_backend())
+
+ except (TypeError, ValueError):
+ # ValueError: If the PEM data could not be decrypted or if its
+ # structure could not be decoded successfully.
+ # TypeError: If a password was given and the private key was
+ # not encrypted. Or if the key was encrypted but
+ # no password was supplied.
+ logger.error('FCM provided private key is invalid.')
+ self.__reset()
+ return False
+
+ except UnsupportedAlgorithm:
+ # If the serialized key is of a type that is not supported by
+ # the backend.
+ logger.error('FCM provided private key is not supported')
+ self.__reset()
+ return False
+
+ # We've done enough validation to move on
+ return True
+
+ @property
+ def access_token(self):
+ """
+ Returns our access token (if it hasn't expired yet)
+ - if we do not have one we'll fetch one.
+ - if it expired, we'll renew it
+ - if a key simply can't be acquired, then we return None
+ """
+
+ if not self.private_key or not self.content:
+ # invalid content (or not loaded)
+ logger.error(
+ 'No FCM JSON keyfile content loaded to generate a access '
+ 'token with.')
+ return None
+
+ if self.__access_token_expiry > datetime.now(timezone.utc):
+ # Return our no-expired key
+ return self.__access_token
+
+ # If we reach here we need to prepare our payload
+ token_uri = self.content.get('token_uri', self.default_token_uri)
+ service_email = self.content.get('client_email')
+ key_identifier = self.content.get('private_key_id')
+
+ # Generate our Assertion
+ now = datetime.now(timezone.utc)
+ expiry = now + self.access_token_lifetime_sec
+
+ payload = {
+ # The number of seconds since the UNIX epoch.
+ "iat": calendar.timegm(now.utctimetuple()),
+ "exp": calendar.timegm(expiry.utctimetuple()),
+ # The issuer must be the service account email.
+ "iss": service_email,
+ # The audience must be the auth token endpoint's URI
+ "aud": token_uri,
+ # Our token scopes
+ "scope": " ".join(self.scopes),
+ }
+
+ # JWT Details
+ header = {
+ 'typ': 'JWT',
+ 'alg': 'RS256' if isinstance(
+ self.private_key, asymmetric.rsa.RSAPrivateKey) else 'ES256',
+
+ # Key Identifier
+ 'kid': key_identifier,
+ }
+
+ # Encodes base64 strings removing any padding characters.
+ segments = [
+ base64.urlsafe_b64encode(
+ json.dumps(header).encode(self.encoding)).rstrip(b"="),
+ base64.urlsafe_b64encode(
+ json.dumps(payload).encode(self.encoding)).rstrip(b"="),
+ ]
+
+ signing_input = b".".join(segments)
+ signature = self.private_key.sign(
+ signing_input,
+ asymmetric.padding.PKCS1v15(),
+ hashes.SHA256(),
+ )
+
+ # Finally append our segment
+ segments.append(base64.urlsafe_b64encode(signature).rstrip(b"="))
+ assertion = b".".join(segments)
+
+ http_payload = _urlencode({
+ 'assertion': assertion,
+ 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
+ })
+
+ http_headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ }
+ if self.user_agent:
+ http_headers['User-Agent'] = self.user_agent
+
+ logger.info('Refreshing FCM Access Token')
+ try:
+ r = requests.post(
+ token_uri,
+ data=http_payload,
+ headers=http_headers,
+ verify=self.verify_certificate,
+ timeout=self.request_timeout,
+ )
+ if r.status_code != requests.codes.ok:
+ # We had a problem
+ logger.warning(
+ 'Failed to update FCM Access Token error={}.'
+ .format(r.status_code))
+
+ logger.debug(
+ 'Response Details:\r\n%s', r.content)
+ return None
+
+ except requests.RequestException as e:
+ logger.warning(
+ 'A Connection error occurred refreshing FCM '
+ 'Access Token.'
+ )
+ logger.debug('Socket Exception: %s', str(e))
+ return None
+
+ # If we get here, we made our request successfully, now we need
+ # to parse out the data
+ response = json.loads(r.content)
+ self.__access_token = response['access_token']
+ self.__refresh_token = response.get(
+ 'refresh_token', self.__refresh_token)
+
+ if 'expires_in' in response:
+ delta = timedelta(seconds=int(response['expires_in']))
+ self.__access_token_expiry = \
+ delta + datetime.now(timezone.utc) - self.clock_skew
+
+ else:
+ # Allow some grace before we expire
+ self.__access_token_expiry = expiry - self.clock_skew
+
+ logger.debug(
+ 'Access Token successfully acquired: %s', self.__access_token)
+
+ # Return our token
+ return self.__access_token
+
+ @property
+ def project_id(self):
+ """
+ Returns the project id found in the file
+ """
+ return None if not self.content \
+ else self.content.get('project_id')