diff options
author | morpheus65535 <[email protected]> | 2021-05-08 10:39:00 -0400 |
---|---|---|
committer | GitHub <[email protected]> | 2021-05-08 10:39:00 -0400 |
commit | 44c51b2e2c3bffdfc0e0c447c038f6cd0bfd2cbe (patch) | |
tree | f4c16f35853afd27abb4e48fb2a72a11145360a7 /libs/signalrcore | |
parent | 72b6ab3c6a11e1c12d86563989d88d73e4e64377 (diff) | |
download | bazarr-44c51b2e2c3bffdfc0e0c447c038f6cd0bfd2cbe.tar.gz bazarr-44c51b2e2c3bffdfc0e0c447c038f6cd0bfd2cbe.zip |
Added real-time sync with Sonarr v3 and Radarr v3 by feeding from SignalR feeds. You can now reduce frequency of sync tasks to something like once a day.
Diffstat (limited to 'libs/signalrcore')
33 files changed, 1716 insertions, 0 deletions
diff --git a/libs/signalrcore/__init__.py b/libs/signalrcore/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/libs/signalrcore/__init__.py diff --git a/libs/signalrcore/helpers.py b/libs/signalrcore/helpers.py new file mode 100644 index 000000000..ab7bac4de --- /dev/null +++ b/libs/signalrcore/helpers.py @@ -0,0 +1,99 @@ +import logging +import urllib.parse as parse + + +class Helpers: + @staticmethod + def configure_logger(level=logging.INFO, handler=None): + logger = Helpers.get_logger() + if handler is None: + handler = logging.StreamHandler() + handler.setFormatter( + logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s')) + handler.setLevel(level) + logger.addHandler(handler) + logger.setLevel(level) + + @staticmethod + def get_logger(): + return logging.getLogger("SignalRCoreClient") + + @staticmethod + def has_querystring(url): + return "?" in url + + @staticmethod + def split_querystring(url): + parts = url.split("?") + return parts[0], parts[1] + + @staticmethod + def replace_scheme( + url, + root_scheme, + source, + secure_source, + destination, + secure_destination): + url_parts = parse.urlsplit(url) + + if root_scheme not in url_parts.scheme: + if url_parts.scheme == secure_source: + url_parts = url_parts._replace(scheme=secure_destination) + if url_parts.scheme == source: + url_parts = url_parts._replace(scheme=destination) + + return parse.urlunsplit(url_parts) + + @staticmethod + def websocket_to_http(url): + return Helpers.replace_scheme( + url, + "http", + "ws", + "wss", + "http", + "https") + + @staticmethod + def http_to_websocket(url): + return Helpers.replace_scheme( + url, + "ws", + "http", + "https", + "ws", + "wss" + ) + + @staticmethod + def get_negotiate_url(url): + querystring = "" + if Helpers.has_querystring(url): + url, querystring = Helpers.split_querystring(url) + + url_parts = parse.urlsplit(Helpers.websocket_to_http(url)) + + negotiate_suffix = "negotiate"\ + if url_parts.path.endswith('/')\ + else "/negotiate" + + url_parts = url_parts._replace(path=url_parts.path + negotiate_suffix) + + return parse.urlunsplit(url_parts) \ + if querystring == "" else\ + parse.urlunsplit(url_parts) + "?" + querystring + + @staticmethod + def encode_connection_id(url, id): + url_parts = parse.urlsplit(url) + query_string_parts = parse.parse_qs(url_parts.query) + query_string_parts["id"] = id + + url_parts = url_parts._replace( + query=parse.urlencode( + query_string_parts, + doseq=True)) + + return Helpers.http_to_websocket(parse.urlunsplit(url_parts)) diff --git a/libs/signalrcore/hub/__init__.py b/libs/signalrcore/hub/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/libs/signalrcore/hub/__init__.py diff --git a/libs/signalrcore/hub/auth_hub_connection.py b/libs/signalrcore/hub/auth_hub_connection.py new file mode 100644 index 000000000..9be7d0182 --- /dev/null +++ b/libs/signalrcore/hub/auth_hub_connection.py @@ -0,0 +1,22 @@ +from .base_hub_connection import BaseHubConnection +from ..helpers import Helpers + + +class AuthHubConnection(BaseHubConnection): + def __init__(self, auth_function, headers={}, **kwargs): + self.headers = headers + self.auth_function = auth_function + super(AuthHubConnection, self).__init__(**kwargs) + + def start(self): + try: + Helpers.get_logger().debug("Starting connection ...") + self.token = self.auth_function() + Helpers.get_logger()\ + .debug("auth function result {0}".format(self.token)) + self.headers["Authorization"] = "Bearer " + self.token + return super(AuthHubConnection, self).start() + except Exception as ex: + Helpers.get_logger().warning(self.__class__.__name__) + Helpers.get_logger().warning(str(ex)) + raise ex diff --git a/libs/signalrcore/hub/base_hub_connection.py b/libs/signalrcore/hub/base_hub_connection.py new file mode 100644 index 000000000..c9325bf63 --- /dev/null +++ b/libs/signalrcore/hub/base_hub_connection.py @@ -0,0 +1,238 @@ +import websocket +import threading +import requests +import traceback +import uuid +import time +import ssl +from typing import Callable +from signalrcore.messages.message_type import MessageType +from signalrcore.messages.stream_invocation_message\ + import StreamInvocationMessage +from signalrcore.messages.ping_message import PingMessage +from .errors import UnAuthorizedHubError, HubError, HubConnectionError +from signalrcore.helpers import Helpers +from .handlers import StreamHandler, InvocationHandler +from ..protocol.messagepack_protocol import MessagePackHubProtocol +from ..transport.websockets.websocket_transport import WebsocketTransport +from ..helpers import Helpers +from ..subject import Subject +from ..messages.invocation_message import InvocationMessage + +class BaseHubConnection(object): + def __init__( + self, + url, + protocol, + headers={}, + **kwargs): + self.headers = headers + self.logger = Helpers.get_logger() + self.handlers = [] + self.stream_handlers = [] + self._on_error = lambda error: self.logger.info( + "on_error not defined {0}".format(error)) + self.transport = WebsocketTransport( + url=url, + protocol=protocol, + headers=headers, + on_message=self.on_message, + **kwargs) + + def start(self): + self.logger.debug("Connection started") + return self.transport.start() + + def stop(self): + self.logger.debug("Connection stop") + return self.transport.stop() + + def on_close(self, callback): + """Configures on_close connection callback. + It will be raised on connection closed event + connection.on_close(lambda: print("connection closed")) + Args: + callback (function): function without params + """ + self.transport.on_close_callback(callback) + + def on_open(self, callback): + """Configures on_open connection callback. + It will be raised on connection open event + connection.on_open(lambda: print( + "connection opened ")) + Args: + callback (function): funciton without params + """ + self.transport.on_open_callback(callback) + + def on_error(self, callback): + """Configures on_error connection callback. It will be raised + if any hub method throws an exception. + connection.on_error(lambda data: + print(f"An exception was thrown closed{data.error}")) + Args: + callback (function): function with one parameter. + A CompletionMessage object. + """ + self._on_error = callback + + def on(self, event, callback_function: Callable): + """Register a callback on the specified event + Args: + event (string): Event name + callback_function (Function): callback function, + arguments will be binded + """ + self.logger.debug("Handler registered started {0}".format(event)) + self.handlers.append((event, callback_function)) + + def send(self, method, arguments, on_invocation=None): + """Sends a message + + Args: + method (string): Method name + arguments (list|Subject): Method parameters + on_invocation (function, optional): On invocation send callback + will be raised on send server function ends. Defaults to None. + + Raises: + HubConnectionError: If hub is not ready to send + TypeError: If arguments are invalid list or Subject + """ + if not self.transport.is_running(): + raise HubConnectionError( + "Hub is not running you cand send messages") + + if type(arguments) is not list and type(arguments) is not Subject: + raise TypeError("Arguments of a message must be a list or subject") + + if type(arguments) is list: + message = InvocationMessage( + str(uuid.uuid4()), + method, + arguments, + headers=self.headers) + + if on_invocation: + self.stream_handlers.append( + InvocationHandler( + message.invocation_id, + on_invocation)) + + self.transport.send(message) + + if type(arguments) is Subject: + arguments.connection = self + arguments.target = method + arguments.start() + + + def on_message(self, messages): + for message in messages: + if message.type == MessageType.invocation_binding_failure: + self.logger.error(message) + self._on_error(message) + continue + + if message.type == MessageType.ping: + continue + + if message.type == MessageType.invocation: + fired_handlers = list( + filter( + lambda h: h[0] == message.target, + self.handlers)) + if len(fired_handlers) == 0: + self.logger.warning( + "event '{0}' hasn't fire any handler".format( + message.target)) + for _, handler in fired_handlers: + handler(message.arguments) + + if message.type == MessageType.close: + self.logger.info("Close message received from server") + self.stop() + return + + if message.type == MessageType.completion: + if message.error is not None and len(message.error) > 0: + self._on_error(message) + + # Send callbacks + fired_handlers = list( + filter( + lambda h: h.invocation_id == message.invocation_id, + self.stream_handlers)) + + # Stream callbacks + for handler in fired_handlers: + handler.complete_callback(message) + + # unregister handler + self.stream_handlers = list( + filter( + lambda h: h.invocation_id != message.invocation_id, + self.stream_handlers)) + + if message.type == MessageType.stream_item: + fired_handlers = list( + filter( + lambda h: h.invocation_id == message.invocation_id, + self.stream_handlers)) + if len(fired_handlers) == 0: + self.logger.warning( + "id '{0}' hasn't fire any stream handler".format( + message.invocation_id)) + for handler in fired_handlers: + handler.next_callback(message.item) + + if message.type == MessageType.stream_invocation: + pass + + if message.type == MessageType.cancel_invocation: + fired_handlers = list( + filter( + lambda h: h.invocation_id == message.invocation_id, + self.stream_handlers)) + if len(fired_handlers) == 0: + self.logger.warning( + "id '{0}' hasn't fire any stream handler".format( + message.invocation_id)) + + for handler in fired_handlers: + handler.error_callback(message) + + # unregister handler + self.stream_handlers = list( + filter( + lambda h: h.invocation_id != message.invocation_id, + self.stream_handlers)) + + def stream(self, event, event_params): + """Starts server streaming + connection.stream( + "Counter", + [len(self.items), 500])\ + .subscribe({ + "next": self.on_next, + "complete": self.on_complete, + "error": self.on_error + }) + Args: + event (string): Method Name + event_params (list): Method parameters + + Returns: + [StreamHandler]: stream handler + """ + invocation_id = str(uuid.uuid4()) + stream_obj = StreamHandler(event, invocation_id) + self.stream_handlers.append(stream_obj) + self.transport.send( + StreamInvocationMessage( + invocation_id, + event, + event_params, + headers=self.headers)) + return stream_obj
\ No newline at end of file diff --git a/libs/signalrcore/hub/errors.py b/libs/signalrcore/hub/errors.py new file mode 100644 index 000000000..354c6b8ee --- /dev/null +++ b/libs/signalrcore/hub/errors.py @@ -0,0 +1,10 @@ +class HubError(OSError): + pass + +class UnAuthorizedHubError(HubError): + pass + +class HubConnectionError(ValueError): + """Hub connection error + """ + pass diff --git a/libs/signalrcore/hub/handlers.py b/libs/signalrcore/hub/handlers.py new file mode 100644 index 000000000..35e558cc7 --- /dev/null +++ b/libs/signalrcore/hub/handlers.py @@ -0,0 +1,51 @@ +import logging + +from typing import Callable +from ..helpers import Helpers +class StreamHandler(object): + def __init__(self, event: str, invocation_id: str): + self.event = event + self.invocation_id = invocation_id + self.logger = Helpers.get_logger() + self.next_callback =\ + lambda _: self.logger.warning( + "next stream handler fired, no callback configured") + self.complete_callback =\ + lambda _: self.logger.warning( + "next complete handler fired, no callback configured") + self.error_callback =\ + lambda _: self.logger.warning( + "next error handler fired, no callback configured") + + def subscribe(self, subscribe_callbacks: dict): + error =\ + " subscribe object must be a dict like {0}"\ + .format({ + "next": None, + "complete": None, + "error": None + }) + + if subscribe_callbacks is None or\ + type(subscribe_callbacks) is not dict: + raise TypeError(error) + + if "next" not in subscribe_callbacks or\ + "complete" not in subscribe_callbacks \ + or "error" not in subscribe_callbacks: + raise KeyError(error) + + if not callable(subscribe_callbacks["next"])\ + or not callable(subscribe_callbacks["next"]) \ + or not callable(subscribe_callbacks["next"]): + raise ValueError("Suscribe callbacks must be functions") + + self.next_callback = subscribe_callbacks["next"] + self.complete_callback = subscribe_callbacks["complete"] + self.error_callback = subscribe_callbacks["error"] + + +class InvocationHandler(object): + def __init__(self, invocation_id: str, complete_callback: Callable): + self.invocation_id = invocation_id + self.complete_callback = complete_callback diff --git a/libs/signalrcore/hub_connection_builder.py b/libs/signalrcore/hub_connection_builder.py new file mode 100644 index 000000000..1d38cfa8e --- /dev/null +++ b/libs/signalrcore/hub_connection_builder.py @@ -0,0 +1,245 @@ +import uuid +from .hub.base_hub_connection import BaseHubConnection +from .hub.auth_hub_connection import AuthHubConnection +from .transport.websockets.reconnection import \ + IntervalReconnectionHandler, RawReconnectionHandler, ReconnectionType +from .helpers import Helpers +from .messages.invocation_message import InvocationMessage +from .protocol.json_hub_protocol import JsonHubProtocol +from .subject import Subject + + +class HubConnectionBuilder(object): + """ + Hub connection class, manages handshake and messaging + + Args: + hub_url: SignalR core url + + Raises: + HubConnectionError: Raises an Exception if url is empty or None + """ + + def __init__(self): + self.hub_url = None + self.hub = None + self.options = { + "access_token_factory": None + } + self.token = None + self.headers = None + self.negotiate_headers = None + self.has_auth_configured = None + self.protocol = None + self.reconnection_handler = None + self.keep_alive_interval = None + self.verify_ssl = True + self.enable_trace = False # socket trace + self.skip_negotiation = False # By default do not skip negotiation + self.running = False + + def with_url( + self, + hub_url: str, + options: dict = None): + """Configure the hub url and options like negotiation and auth function + + def login(self): + response = requests.post( + self.login_url, + json={ + "username": self.email, + "password": self.password + },verify=False) + return response.json()["token"] + + self.connection = HubConnectionBuilder()\ + .with_url(self.server_url, + options={ + "verify_ssl": False, + "access_token_factory": self.login, + "headers": { + "mycustomheader": "mycustomheadervalue" + } + })\ + .configure_logging(logging.ERROR)\ + .with_automatic_reconnect({ + "type": "raw", + "keep_alive_interval": 10, + "reconnect_interval": 5, + "max_attempts": 5 + }).build() + + Args: + hub_url (string): Hub URL + options ([dict], optional): [description]. Defaults to None. + + Raises: + ValueError: If url is invalid + TypeError: If options are not a dict or auth function + is not callable + + Returns: + [HubConnectionBuilder]: configured connection + """ + if hub_url is None or hub_url.strip() == "": + raise ValueError("hub_url must be a valid url.") + + if options is not None and type(options) != dict: + raise TypeError( + "options must be a dict {0}.".format(self.options)) + + if options is not None \ + and "access_token_factory" in options.keys()\ + and not callable(options["access_token_factory"]): + raise TypeError( + "access_token_factory must be a function without params") + + if options is not None: + self.has_auth_configured = \ + "access_token_factory" in options.keys()\ + and callable(options["access_token_factory"]) + + self.skip_negotiation = "skip_negotiation" in options.keys()\ + and options["skip_negotiation"] + + self.hub_url = hub_url + self.hub = None + self.options = self.options if options is None else options + return self + + def configure_logging( + self, logging_level, socket_trace=False, handler=None): + """Configures signalr logging + + Args: + logging_level ([type]): logging.INFO | logging.DEBUG ... + from python logging class + socket_trace (bool, optional): Enables socket package trace. + Defaults to False. + handler ([type], optional): Custom logging handler. + Defaults to None. + + Returns: + [HubConnectionBuilder]: Instance hub with logging configured + """ + Helpers.configure_logger(logging_level, handler) + self.enable_trace = socket_trace + return self + + def with_hub_protocol(self, protocol): + """Changes transport protocol + from signalrcore.protocol.messagepack_protocol\ + import MessagePackHubProtocol + + HubConnectionBuilder()\ + .with_url(self.server_url, options={"verify_ssl":False})\ + ... + .with_hub_protocol(MessagePackHubProtocol())\ + ... + .build() + Args: + protocol (JsonHubProtocol|MessagePackHubProtocol): + protocol instance + + Returns: + HubConnectionBuilder: instance configured + """ + self.protocol = protocol + return self + + def build(self): + """Configures the connection hub + + Raises: + TypeError: Checks parameters an raises TypeError + if one of them is wrong + + Returns: + [HubConnectionBuilder]: [self object for fluent interface purposes] + """ + if self.protocol is None: + self.protocol = JsonHubProtocol() + self.headers = {} + + if "headers" in self.options.keys()\ + and type(self.options["headers"]) is dict: + self.headers = self.options["headers"] + + if self.has_auth_configured: + auth_function = self.options["access_token_factory"] + if auth_function is None or not callable(auth_function): + raise TypeError( + "access_token_factory is not function") + if "verify_ssl" in self.options.keys()\ + and type(self.options["verify_ssl"]) is bool: + self.verify_ssl = self.options["verify_ssl"] + + return AuthHubConnection( + headers=self.headers, + auth_function=auth_function, + url=self.hub_url, + protocol=self.protocol, + keep_alive_interval=self.keep_alive_interval, + reconnection_handler=self.reconnection_handler, + verify_ssl=self.verify_ssl, + skip_negotiation=self.skip_negotiation, + enable_trace=self.enable_trace)\ + if self.has_auth_configured else\ + BaseHubConnection( + url=self.hub_url, + protocol=self.protocol, + keep_alive_interval=self.keep_alive_interval, + reconnection_handler=self.reconnection_handler, + headers=self.headers, + verify_ssl=self.verify_ssl, + skip_negotiation=self.skip_negotiation, + enable_trace=self.enable_trace) + + def with_automatic_reconnect(self, data: dict): + """Configures automatic reconnection + https://devblogs.microsoft.com/aspnet/asp-net-core-updates-in-net-core-3-0-preview-4/ + + hub = HubConnectionBuilder()\ + .with_url(self.server_url, options={"verify_ssl":False})\ + .configure_logging(logging.ERROR)\ + .with_automatic_reconnect({ + "type": "raw", + "keep_alive_interval": 10, + "reconnect_interval": 5, + "max_attempts": 5 + })\ + .build() + + Args: + data (dict): [dict with autmatic reconnection parameters] + + Returns: + [HubConnectionBuilder]: [self object for fluent interface purposes] + """ + reconnect_type = data.get("type", "raw") + + # Infinite reconnect attempts + max_attempts = data.get("max_attempts", None) + + # 5 sec interval + reconnect_interval = data.get("reconnect_interval", 5) + + keep_alive_interval = data.get("keep_alive_interval", 15) + + intervals = data.get("intervals", []) # Reconnection intervals + + self.keep_alive_interval = keep_alive_interval + + reconnection_type = ReconnectionType[reconnect_type] + + if reconnection_type == ReconnectionType.raw: + self.reconnection_handler = RawReconnectionHandler( + reconnect_interval, + max_attempts + ) + if reconnection_type == ReconnectionType.interval: + self.reconnection_handler = IntervalReconnectionHandler( + intervals + ) + return self diff --git a/libs/signalrcore/messages/__init__.py b/libs/signalrcore/messages/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/libs/signalrcore/messages/__init__.py diff --git a/libs/signalrcore/messages/base_message.py b/libs/signalrcore/messages/base_message.py new file mode 100644 index 000000000..98e7d5082 --- /dev/null +++ b/libs/signalrcore/messages/base_message.py @@ -0,0 +1,15 @@ +from .message_type import MessageType + + +class BaseMessage(object): + def __init__(self, message_type, **kwargs): + self.type = MessageType(message_type) + + +class BaseHeadersMessage(BaseMessage): + """ + All messages expct ping can carry aditional headers + """ + def __init__(self, message_type, headers={}, **kwargs): + super(BaseHeadersMessage, self).__init__(message_type) + self.headers = headers diff --git a/libs/signalrcore/messages/cancel_invocation_message.py b/libs/signalrcore/messages/cancel_invocation_message.py new file mode 100644 index 000000000..4f5f88ff3 --- /dev/null +++ b/libs/signalrcore/messages/cancel_invocation_message.py @@ -0,0 +1,24 @@ +from .base_message import BaseHeadersMessage +""" +A `CancelInvocation` message is a JSON object with the following properties + +* `type` - A `Number` with the literal value `5`, + indicating that this message is a `CancelInvocation`. +* `invocationId` - A `String` encoding the `Invocation ID` for a message. + +Example +```json +{ + "type": 5, + "invocationId": "123" +} +""" + + +class CancelInvocationMessage(BaseHeadersMessage): + def __init__( + self, + invocation_id, + **kwargs): + super(CancelInvocationMessage, self).__init__(5, **kwargs) + self.invocation_id = invocation_id diff --git a/libs/signalrcore/messages/close_message.py b/libs/signalrcore/messages/close_message.py new file mode 100644 index 000000000..7a462e993 --- /dev/null +++ b/libs/signalrcore/messages/close_message.py @@ -0,0 +1,32 @@ +from .base_message import BaseHeadersMessage +""" +A `Close` message is a JSON object with the following properties + +* `type` - A `Number` with the literal value `7`, + indicating that this message is a `Close`. +* `error` - An optional `String` encoding the error message. + +Example - A `Close` message without an error +```json +{ + "type": 7 +} +``` + +Example - A `Close` message with an error +```json +{ + "type": 7, + "error": "Connection closed because of an error!" +} +``` +""" + + +class CloseMessage(BaseHeadersMessage): + def __init__( + self, + error, + **kwargs): + super(CloseMessage, self).__init__(7, **kwargs) + self.error = error diff --git a/libs/signalrcore/messages/completion_message.py b/libs/signalrcore/messages/completion_message.py new file mode 100644 index 000000000..7e8d9141f --- /dev/null +++ b/libs/signalrcore/messages/completion_message.py @@ -0,0 +1,77 @@ +from .base_message import BaseHeadersMessage +""" +A `Completion` message is a JSON object with the following properties + +* `type` - A `Number` with the literal value `3`, + indicating that this message is a `Completion`. +* `invocationId` - A `String` encoding the `Invocation ID` for a message. +* `result` - A `Token` encoding the result value + (see "JSON Payload Encoding" for details). + This field is **ignored** if `error` is present. +* `error` - A `String` encoding the error message. + +It is a protocol error to include both a `result` and an `error` property + in the `Completion` message. A conforming endpoint may immediately + terminate the connection upon receiving such a message. + +Example - A `Completion` message with no result or error + +```json +{ + "type": 3, + "invocationId": "123" +} +``` + +Example - A `Completion` message with a result + +```json +{ + "type": 3, + "invocationId": "123", + "result": 42 +} +``` + +Example - A `Completion` message with an error + +```json +{ + "type": 3, + "invocationId": "123", + "error": "It didn't work!" +} +``` + +Example - The following `Completion` message is a protocol error + because it has both of `result` and `error` + +```json +{ + "type": 3, + "invocationId": "123", + "result": 42, + "error": "It didn't work!" +} +``` +""" + + +class CompletionClientStreamMessage(BaseHeadersMessage): + def __init__( + self, invocation_id, **kwargs): + super(CompletionClientStreamMessage, self).__init__(3, **kwargs) + self.invocation_id = invocation_id + + +class CompletionMessage(BaseHeadersMessage): + def __init__( + self, + invocation_id, + result, + error, + **kwargs): + super(CompletionMessage, self).__init__(3, **kwargs) + self.invocation_id = invocation_id + self.result = result + self.error = error diff --git a/libs/signalrcore/messages/handshake/__init__.py b/libs/signalrcore/messages/handshake/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/libs/signalrcore/messages/handshake/__init__.py diff --git a/libs/signalrcore/messages/handshake/request.py b/libs/signalrcore/messages/handshake/request.py new file mode 100644 index 000000000..1d55110c8 --- /dev/null +++ b/libs/signalrcore/messages/handshake/request.py @@ -0,0 +1,5 @@ +class HandshakeRequestMessage(object): + + def __init__(self, protocol, version): + self.protocol = protocol + self.version = version diff --git a/libs/signalrcore/messages/handshake/response.py b/libs/signalrcore/messages/handshake/response.py new file mode 100644 index 000000000..0dc1460b0 --- /dev/null +++ b/libs/signalrcore/messages/handshake/response.py @@ -0,0 +1,4 @@ +class HandshakeResponseMessage(object): + + def __init__(self, error): + self.error = error diff --git a/libs/signalrcore/messages/invocation_message.py b/libs/signalrcore/messages/invocation_message.py new file mode 100644 index 000000000..6de5364d5 --- /dev/null +++ b/libs/signalrcore/messages/invocation_message.py @@ -0,0 +1,78 @@ +from .base_message import BaseHeadersMessage +""" + +An `Invocation` message is a JSON object with the following properties: + +* `type` - A `Number` with the literal value 1, indicating that this message + is an Invocation. +* `invocationId` - An optional `String` encoding the `Invocation ID` + for a message. +* `target` - A `String` encoding the `Target` name, as expected by the Callee's + Binder +* `arguments` - An `Array` containing arguments to apply to the method + referred to in Target. This is a sequence of JSON `Token`s, + encoded as indicated below in the "JSON Payload Encoding" section + +Example: + +```json +{ + "type": 1, + "invocationId": "123", + "target": "Send", + "arguments": [ + 42, + "Test Message" + ] +} +``` +Example (Non-Blocking): + +```json +{ + "type": 1, + "target": "Send", + "arguments": [ + 42, + "Test Message" + ] +} +``` + +""" + + +class InvocationMessage(BaseHeadersMessage): + def __init__( + self, + invocation_id, + target, + arguments, **kwargs): + super(InvocationMessage, self).__init__(1, **kwargs) + self.invocation_id = invocation_id + self.target = target + self.arguments = arguments + + def __repr__(self): + repr_str =\ + "InvocationMessage: invocation_id {0}, target {1}, arguments {2}" + return repr_str.format(self.invocation_id, self.target, self.arguments) + + +class InvocationClientStreamMessage(BaseHeadersMessage): + def __init__( + self, + stream_ids, + target, + arguments, + **kwargs): + super(InvocationClientStreamMessage, self).__init__(1, **kwargs) + self.target = target + self.arguments = arguments + self.stream_ids = stream_ids + + def __repr__(self): + repr_str =\ + "InvocationMessage: stream_ids {0}, target {1}, arguments {2}" + return repr_str.format( + self.stream_ids, self.target, self.arguments) diff --git a/libs/signalrcore/messages/message_type.py b/libs/signalrcore/messages/message_type.py new file mode 100644 index 000000000..155315365 --- /dev/null +++ b/libs/signalrcore/messages/message_type.py @@ -0,0 +1,12 @@ +from enum import Enum + + +class MessageType(Enum): + invocation = 1 + stream_item = 2 + completion = 3 + stream_invocation = 4 + cancel_invocation = 5 + ping = 6 + close = 7 + invocation_binding_failure = -1 diff --git a/libs/signalrcore/messages/ping_message.py b/libs/signalrcore/messages/ping_message.py new file mode 100644 index 000000000..c184c9d7e --- /dev/null +++ b/libs/signalrcore/messages/ping_message.py @@ -0,0 +1,20 @@ +from .base_message import BaseMessage +""" +A `Ping` message is a JSON object with the following properties: + +* `type` - A `Number` with the literal value `6`, + indicating that this message is a `Ping`. + +Example +```json +{ + "type": 6 +} +``` +""" + + +class PingMessage(BaseMessage): + def __init__( + self, **kwargs): + super(PingMessage, self).__init__(6, **kwargs) diff --git a/libs/signalrcore/messages/stream_invocation_message.py b/libs/signalrcore/messages/stream_invocation_message.py new file mode 100644 index 000000000..e78759495 --- /dev/null +++ b/libs/signalrcore/messages/stream_invocation_message.py @@ -0,0 +1,42 @@ +from .base_message import BaseHeadersMessage +""" +A `StreamInvocation` message is a JSON object with the following properties: + +* `type` - A `Number` with the literal value 4, indicating that + this message is a StreamInvocation. +* `invocationId` - A `String` encoding the `Invocation ID` for a message. +* `target` - A `String` encoding the `Target` name, as expected + by the Callee's Binder. +* `arguments` - An `Array` containing arguments to apply to + the method referred to in Target. This is a sequence of JSON + `Token`s, encoded as indicated below in the + "JSON Payload Encoding" section. + +Example: + +```json +{ + "type": 4, + "invocationId": "123", + "target": "Send", + "arguments": [ + 42, + "Test Message" + ] +} +``` +""" + + +class StreamInvocationMessage(BaseHeadersMessage): + def __init__( + self, + invocation_id, + target, + arguments, + **kwargs): + super(StreamInvocationMessage, self).__init__(4, **kwargs) + self.invocation_id = invocation_id + self.target = target + self.arguments = arguments + self.stream_ids = [] diff --git a/libs/signalrcore/messages/stream_item_message.py b/libs/signalrcore/messages/stream_item_message.py new file mode 100644 index 000000000..415090f4c --- /dev/null +++ b/libs/signalrcore/messages/stream_item_message.py @@ -0,0 +1,31 @@ +from .base_message import BaseHeadersMessage +""" +A `StreamItem` message is a JSON object with the following properties: + +* `type` - A `Number` with the literal value 2, indicating + that this message is a `StreamItem`. +* `invocationId` - A `String` encoding the `Invocation ID` for a message. +* `item` - A `Token` encoding the stream item + (see "JSON Payload Encoding" for details). + +Example + +```json +{ + "type": 2, + "invocationId": "123", + "item": 42 +} +``` +""" + + +class StreamItemMessage(BaseHeadersMessage): + def __init__( + self, + invocation_id, + item, + **kwargs): + super(StreamItemMessage, self).__init__(2, **kwargs) + self.invocation_id = invocation_id + self.item = item diff --git a/libs/signalrcore/protocol/__init__.py b/libs/signalrcore/protocol/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/libs/signalrcore/protocol/__init__.py diff --git a/libs/signalrcore/protocol/base_hub_protocol.py b/libs/signalrcore/protocol/base_hub_protocol.py new file mode 100644 index 000000000..5e2b91877 --- /dev/null +++ b/libs/signalrcore/protocol/base_hub_protocol.py @@ -0,0 +1,60 @@ +import json + +from ..messages.handshake.request import HandshakeRequestMessage +from ..messages.handshake.response import HandshakeResponseMessage +from ..messages.invocation_message import InvocationMessage # 1 +from ..messages.stream_item_message import StreamItemMessage # 2 +from ..messages.completion_message import CompletionMessage # 3 +from ..messages.stream_invocation_message import StreamInvocationMessage # 4 +from ..messages.cancel_invocation_message import CancelInvocationMessage # 5 +from ..messages.ping_message import PingMessage # 6 +from ..messages.close_message import CloseMessage # 7 +from ..messages.message_type import MessageType +from ..helpers import Helpers + +class BaseHubProtocol(object): + def __init__(self, protocol, version, transfer_format, record_separator): + self.protocol = protocol + self.version = version + self.transfer_format = transfer_format + self.record_separator = record_separator + + @staticmethod + def get_message(dict_message): + message_type = MessageType.close\ + if not "type" in dict_message.keys() else MessageType(dict_message["type"]) + + dict_message["invocation_id"] = dict_message.get("invocationId", None) + dict_message["headers"] = dict_message.get("headers", {}) + dict_message["error"] = dict_message.get("error", None) + dict_message["result"] = dict_message.get("result", None) + if message_type is MessageType.invocation: + return InvocationMessage(**dict_message) + if message_type is MessageType.stream_item: + return StreamItemMessage(**dict_message) + if message_type is MessageType.completion: + return CompletionMessage(**dict_message) + if message_type is MessageType.stream_invocation: + return StreamInvocationMessage(**dict_message) + if message_type is MessageType.cancel_invocation: + return CancelInvocationMessage(**dict_message) + if message_type is MessageType.ping: + return PingMessage() + if message_type is MessageType.close: + return CloseMessage(**dict_message) + + def decode_handshake(self, raw_message: str) -> HandshakeResponseMessage: + messages = raw_message.split(self.record_separator) + messages = list(filter(lambda x: x != "", messages)) + data = json.loads(messages[0]) + idx = raw_message.index(self.record_separator) + return HandshakeResponseMessage(data.get("error", None)), self.parse_messages(raw_message[idx + 1 :]) if len(messages) > 1 else [] + + def handshake_message(self) -> HandshakeRequestMessage: + return HandshakeRequestMessage(self.protocol, self.version) + + def parse_messages(self, raw_message: str): + raise ValueError("Protocol must implement this method") + + def write_message(self, hub_message): + raise ValueError("Protocol must implement this method") diff --git a/libs/signalrcore/protocol/handshake/__init__.py b/libs/signalrcore/protocol/handshake/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/libs/signalrcore/protocol/handshake/__init__.py diff --git a/libs/signalrcore/protocol/json_hub_protocol.py b/libs/signalrcore/protocol/json_hub_protocol.py new file mode 100644 index 000000000..bcd6f9ae4 --- /dev/null +++ b/libs/signalrcore/protocol/json_hub_protocol.py @@ -0,0 +1,50 @@ +import json + +from .base_hub_protocol import BaseHubProtocol + +from ..messages.message_type import MessageType +from json import JSONEncoder + +from signalrcore.helpers import Helpers + + +class MyEncoder(JSONEncoder): + # https://github.com/PyCQA/pylint/issues/414 + def default(self, o): + if type(o) is MessageType: + return o.value + data = o.__dict__ + if "invocation_id" in data: + data["invocationId"] = data["invocation_id"] + del data["invocation_id"] + if "stream_ids" in data: + data["streamIds"] = data["stream_ids"] + del data["stream_ids"] + return data + + +class JsonHubProtocol(BaseHubProtocol): + def __init__(self): + super(JsonHubProtocol, self).__init__("json", 1, "Text", chr(0x1E)) + self.encoder = MyEncoder() + + def parse_messages(self, raw): + Helpers.get_logger().debug("Raw message incomming: ") + Helpers.get_logger().debug(raw) + raw_messages = [ + record.replace(self.record_separator, "") + for record in raw.split(self.record_separator) + if record is not None and record != "" + and record != self.record_separator + ] + result = [] + for raw_message in raw_messages: + dict_message = json.loads(raw_message) + if len(dict_message.keys()) > 0: + result.append(self.get_message(dict_message)) + return result + + def encode(self, message): + Helpers.get_logger()\ + .debug(self.encoder.encode(message) + self.record_separator) + return self.encoder.encode(message) + self.record_separator diff --git a/libs/signalrcore/protocol/messagepack_protocol.py b/libs/signalrcore/protocol/messagepack_protocol.py new file mode 100644 index 000000000..45651e4b7 --- /dev/null +++ b/libs/signalrcore/protocol/messagepack_protocol.py @@ -0,0 +1,169 @@ +import json +import msgpack +from .base_hub_protocol import BaseHubProtocol +from ..messages.handshake.request import HandshakeRequestMessage +from ..messages.handshake.response import HandshakeResponseMessage +from ..messages.invocation_message\ + import InvocationMessage, InvocationClientStreamMessage # 1 +from ..messages.stream_item_message import StreamItemMessage # 2 +from ..messages.completion_message import CompletionMessage # 3 +from ..messages.stream_invocation_message import StreamInvocationMessage # 4 +from ..messages.cancel_invocation_message import CancelInvocationMessage # 5 +from ..messages.ping_message import PingMessage # 6 +from ..messages.close_message import CloseMessage # 7 +from ..helpers import Helpers + + +class MessagePackHubProtocol(BaseHubProtocol): + + _priority = [ + "type", + "headers", + "invocation_id", + "target", + "arguments", + "item", + "result_kind", + "result", + "stream_ids" + ] + + def __init__(self): + super(MessagePackHubProtocol, self).__init__( + "messagepack", 1, "Text", chr(0x1E)) + self.logger = Helpers.get_logger() + + def parse_messages(self, raw): + try: + messages = [] + offset = 0 + while offset < len(raw): + length = msgpack.unpackb(raw[offset: offset + 1]) + values = msgpack.unpackb(raw[offset + 1: offset + length + 1]) + offset = offset + length + 1 + message = self._decode_message(values) + messages.append(message) + except Exception as ex: + Helpers.get_logger().error("Parse messages Error {0}".format(ex)) + Helpers.get_logger().error("raw msg '{0}'".format(raw)) + return messages + + def decode_handshake(self, raw_message): + try: + has_various_messages = 0x1E in raw_message + handshake_data = raw_message[0: raw_message.index(0x1E)] if has_various_messages else raw_message + messages = self.parse_messages(raw_message[raw_message.index(0x1E) + 1:]) if has_various_messages else [] + data = json.loads(handshake_data) + return HandshakeResponseMessage(data.get("error", None)), messages + except Exception as ex: + Helpers.get_logger().error(raw_message) + Helpers.get_logger().error(ex) + raise ex + + def encode(self, message): + if type(message) is HandshakeRequestMessage: + content = json.dumps(message.__dict__) + return content + self.record_separator + + msg = self._encode_message(message) + encoded_message = msgpack.packb(msg) + varint_length = self._to_varint(len(encoded_message)) + return varint_length + encoded_message + + def _encode_message(self, message): + result = [] + + # sort attributes + for attribute in self._priority: + if hasattr(message, attribute): + if (attribute == "type"): + result.append(getattr(message, attribute).value) + else: + result.append(getattr(message, attribute)) + return result + + def _decode_message(self, raw): + # {} {"error"} + # [1, Headers, InvocationId, Target, [Arguments], [StreamIds]] + # [2, Headers, InvocationId, Item] + # [3, Headers, InvocationId, ResultKind, Result] + # [4, Headers, InvocationId, Target, [Arguments], [StreamIds]] + # [5, Headers, InvocationId] + # [6] + # [7, Error, AllowReconnect?] + + if raw[0] == 1: # InvocationMessage + if len(raw[5]) > 0: + return InvocationClientStreamMessage( + headers=raw[1], + stream_ids=raw[5], + target=raw[3], + arguments=raw[4]) + else: + return InvocationMessage( + headers=raw[1], + invocation_id=raw[2], + target=raw[3], + arguments=raw[4]) + + elif raw[0] == 2: # StreamItemMessage + return StreamItemMessage( + headers=raw[1], + invocation_id=raw[2], + item=raw[3]) + + elif raw[0] == 3: # CompletionMessage + result_kind = raw[3] + if result_kind == 1: + return CompletionMessage( + headers=raw[1], + invocation_id=raw[2], + result=None, + error=raw[4]) + + elif result_kind == 2: + return CompletionMessage( + headers=raw[1], invocation_id=raw[2], + result=None, error=None) + + elif result_kind == 3: + return CompletionMessage( + headers=raw[1], invocation_id=raw[2], + result=raw[4], error=None) + else: + raise Exception("Unknown result kind.") + + elif raw[0] == 4: # StreamInvocationMessage + return StreamInvocationMessage( + headers=raw[1], invocation_id=raw[2], + target=raw[3], arguments=raw[4]) # stream_id missing? + + elif raw[0] == 5: # CancelInvocationMessage + return CancelInvocationMessage( + headers=raw[1], invocation_id=raw[2]) + + elif raw[0] == 6: # PingMessageEncoding + return PingMessage() + + elif raw[0] == 7: # CloseMessageEncoding + return CloseMessage(error=raw[1]) # AllowReconnect is missing + print(".......................................") + print(raw) + print("---------------------------------------") + raise Exception("Unknown message type.") + + def _to_varint(self, value): + buffer = b'' + + while True: + + byte = value & 0x7f + value >>= 7 + + if value: + buffer += bytes((byte | 0x80, )) + else: + buffer += bytes((byte, )) + break + + return buffer diff --git a/libs/signalrcore/subject.py b/libs/signalrcore/subject.py new file mode 100644 index 000000000..6561cc4de --- /dev/null +++ b/libs/signalrcore/subject.py @@ -0,0 +1,68 @@ +import uuid +import threading +from typing import Any +from .messages.invocation_message import InvocationClientStreamMessage +from .messages.stream_item_message import StreamItemMessage +from .messages.completion_message import CompletionClientStreamMessage + + +class Subject(object): + """Client to server streaming + https://docs.microsoft.com/en-gb/aspnet/core/signalr/streaming?view=aspnetcore-5.0#client-to-server-streaming + items = list(range(0,10)) + subject = Subject() + connection.send("UploadStream", subject) + while(len(self.items) > 0): + subject.next(str(self.items.pop())) + subject.complete() + """ + + def __init__(self): + self.connection = None + self.target = None + self.invocation_id = str(uuid.uuid4()) + self.lock = threading.RLock() + + def check(self): + """Ensures that invocation streaming object is correct + + Raises: + ValueError: if object is not valid, exception will be raised + """ + if self.connection is None\ + or self.target is None\ + or self.invocation_id is None: + raise ValueError( + "subject must be passed as an agument to a send function. " + + "hub_connection.send([method],[subject]") + + def next(self, item: Any): + """Send next item to the server + + Args: + item (any): Item that will be streamed + """ + self.check() + with self.lock: + self.connection.transport.send(StreamItemMessage( + self.invocation_id, + item)) + + def start(self): + """Starts streaming + """ + self.check() + with self.lock: + self.connection.transport.send( + InvocationClientStreamMessage( + [self.invocation_id], + self.target, + [])) + + def complete(self): + """Finish streaming + """ + self.check() + with self.lock: + self.connection.transport.send(CompletionClientStreamMessage( + self.invocation_id)) diff --git a/libs/signalrcore/transport/__init__.py b/libs/signalrcore/transport/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/libs/signalrcore/transport/__init__.py diff --git a/libs/signalrcore/transport/base_transport.py b/libs/signalrcore/transport/base_transport.py new file mode 100644 index 000000000..c73a2c028 --- /dev/null +++ b/libs/signalrcore/transport/base_transport.py @@ -0,0 +1,29 @@ +from ..protocol.json_hub_protocol import JsonHubProtocol +from ..helpers import Helpers + +class BaseTransport(object): + def __init__(self, protocol=JsonHubProtocol(), on_message=None): + self.protocol = protocol + self._on_message= on_message + self.logger = Helpers.get_logger() + self._on_open = lambda: self.logger.info("on_connect not defined") + self._on_close = lambda: self.logger.info( + "on_disconnect not defined") + + def on_open_callback(self, callback): + self._on_open = callback + + def on_close_callback(self, callback): + self._on_close = callback + + def start(self): # pragma: no cover + raise NotImplementedError() + + def stop(self): # pragma: no cover + raise NotImplementedError() + + def is_running(self): # pragma: no cover + raise NotImplementedError() + + def send(self, message, on_invocation = None): # pragma: no cover + raise NotImplementedError()
\ No newline at end of file diff --git a/libs/signalrcore/transport/websockets/__init__.py b/libs/signalrcore/transport/websockets/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/libs/signalrcore/transport/websockets/__init__.py diff --git a/libs/signalrcore/transport/websockets/connection.py b/libs/signalrcore/transport/websockets/connection.py new file mode 100644 index 000000000..2253f0286 --- /dev/null +++ b/libs/signalrcore/transport/websockets/connection.py @@ -0,0 +1,8 @@ +from enum import Enum + + +class ConnectionState(Enum): + connecting = 0 + connected = 1 + reconnecting = 2 + disconnected = 4 diff --git a/libs/signalrcore/transport/websockets/reconnection.py b/libs/signalrcore/transport/websockets/reconnection.py new file mode 100644 index 000000000..b14b41c5b --- /dev/null +++ b/libs/signalrcore/transport/websockets/reconnection.py @@ -0,0 +1,87 @@ +import threading +import time +from enum import Enum + + +class ConnectionStateChecker(object): + def __init__( + self, + ping_function, + keep_alive_interval, + sleep=1): + self.sleep = sleep + self.keep_alive_interval = keep_alive_interval + self.last_message = time.time() + self.ping_function = ping_function + self.running = False + self._thread = None + + def start(self): + self.running = True + self._thread = threading.Thread(target=self.run) + self._thread.daemon = True + self._thread.start() + + def run(self): + while self.running: + time.sleep(self.sleep) + time_without_messages = time.time() - self.last_message + if self.keep_alive_interval < time_without_messages: + self.ping_function() + + def stop(self): + self.running = False + + +class ReconnectionType(Enum): + raw = 0 # Reconnection with max reconnections and constant sleep time + interval = 1 # variable sleep time + + +class ReconnectionHandler(object): + def __init__(self): + self.reconnecting = False + self.attempt_number = 0 + self.last_attempt = time.time() + + def next(self): + raise NotImplementedError() + + def reset(self): + self.attempt_number = 0 + self.reconnecting = False + + +class RawReconnectionHandler(ReconnectionHandler): + def __init__(self, sleep_time, max_attempts): + super(RawReconnectionHandler, self).__init__() + self.sleep_time = sleep_time + self.max_reconnection_attempts = max_attempts + + def next(self): + self.reconnecting = True + if self.max_reconnection_attempts is not None: + if self.attempt_number <= self.max_reconnection_attempts: + self.attempt_number += 1 + return self.sleep_time + else: + raise ValueError( + "Max attemps reached {0}" + .format(self.max_reconnection_attempts)) + else: # Infinite reconnect + return self.sleep_time + + +class IntervalReconnectionHandler(ReconnectionHandler): + def __init__(self, intervals): + super(IntervalReconnectionHandler, self).__init__() + self._intervals = intervals + + def next(self): + self.reconnecting = True + index = self.attempt_number + self.attempt_number += 1 + if index >= len(self._intervals): + raise ValueError( + "Max intervals reached {0}".format(self._intervals)) + return self._intervals[index] diff --git a/libs/signalrcore/transport/websockets/websocket_transport.py b/libs/signalrcore/transport/websockets/websocket_transport.py new file mode 100644 index 000000000..4c05add53 --- /dev/null +++ b/libs/signalrcore/transport/websockets/websocket_transport.py @@ -0,0 +1,240 @@ +import websocket +import threading +import requests +import traceback +import uuid +import time +import ssl +from .reconnection import ConnectionStateChecker +from .connection import ConnectionState +from ...messages.ping_message import PingMessage +from ...hub.errors import HubError, HubConnectionError, UnAuthorizedHubError +from ...protocol.messagepack_protocol import MessagePackHubProtocol +from ...protocol.json_hub_protocol import JsonHubProtocol +from ..base_transport import BaseTransport +from ...helpers import Helpers + +class WebsocketTransport(BaseTransport): + def __init__(self, + url="", + headers={}, + keep_alive_interval=15, + reconnection_handler=None, + verify_ssl=False, + skip_negotiation=False, + enable_trace=False, + **kwargs): + super(WebsocketTransport, self).__init__(**kwargs) + self._ws = None + self.enable_trace = enable_trace + self._thread = None + self.skip_negotiation = skip_negotiation + self.url = url + self.headers = headers + self.handshake_received = False + self.token = None # auth + self.state = ConnectionState.disconnected + self.connection_alive = False + self._thread = None + self._ws = None + self.verify_ssl = verify_ssl + self.connection_checker = ConnectionStateChecker( + lambda: self.send(PingMessage()), + keep_alive_interval + ) + self.reconnection_handler = reconnection_handler + + if len(self.logger.handlers) > 0: + websocket.enableTrace(self.enable_trace, self.logger.handlers[0]) + + def is_running(self): + return self.state != ConnectionState.disconnected + + def stop(self): + if self.state == ConnectionState.connected: + self.connection_checker.stop() + self._ws.close() + self.state = ConnectionState.disconnected + self.handshake_received = False + + def start(self): + if not self.skip_negotiation: + self.negotiate() + + if self.state == ConnectionState.connected: + self.logger.warning("Already connected unable to start") + return False + + self.state = ConnectionState.connecting + self.logger.debug("start url:" + self.url) + + self._ws = websocket.WebSocketApp( + self.url, + header=self.headers, + on_message=self.on_message, + on_error=self.on_socket_error, + on_close=self.on_close, + on_open=self.on_open, + ) + + self._thread = threading.Thread( + target=lambda: self._ws.run_forever( + sslopt={"cert_reqs": ssl.CERT_NONE} + if not self.verify_ssl else {} + )) + self._thread.daemon = True + self._thread.start() + return True + + def negotiate(self): + negotiate_url = Helpers.get_negotiate_url(self.url) + self.logger.debug("Negotiate url:{0}".format(negotiate_url)) + + response = requests.post( + negotiate_url, headers=self.headers, verify=self.verify_ssl) + self.logger.debug( + "Response status code{0}".format(response.status_code)) + + if response.status_code != 200: + raise HubError(response.status_code)\ + if response.status_code != 401 else UnAuthorizedHubError() + + data = response.json() + + if "connectionId" in data.keys(): + self.url = Helpers.encode_connection_id( + self.url, data["connectionId"]) + + # Azure + if 'url' in data.keys() and 'accessToken' in data.keys(): + Helpers.get_logger().debug( + "Azure url, reformat headers, token and url {0}".format(data)) + self.url = data["url"]\ + if data["url"].startswith("ws") else\ + Helpers.http_to_websocket(data["url"]) + self.token = data["accessToken"] + self.headers = {"Authorization": "Bearer " + self.token} + + + def evaluate_handshake(self, message): + self.logger.debug("Evaluating handshake {0}".format(message)) + msg, messages = self.protocol.decode_handshake(message) + if msg.error is None or msg.error == "": + self.handshake_received = True + self.state = ConnectionState.connected + if self.reconnection_handler is not None: + self.reconnection_handler.reconnecting = False + if not self.connection_checker.running: + self.connection_checker.start() + else: + self.logger.error(msg.error) + self.on_socket_error(msg.error) + self.stop() + raise ValueError("Handshake error {0}".format(msg.error)) + return messages + + def on_open(self): + self.logger.debug("-- web socket open --") + msg = self.protocol.handshake_message() + self.send(msg) + + def on_close(self): + self.logger.debug("-- web socket close --") + self.state = ConnectionState.disconnected + if self._on_close is not None and callable(self._on_close): + self._on_close() + + def on_socket_error(self, error): + """ + Throws error related on + https://github.com/websocket-client/websocket-client/issues/449 + + Args: + error ([type]): [description] + + Raises: + HubError: [description] + """ + self.logger.debug("-- web socket error --") + if (type(error) is AttributeError and + "'NoneType' object has no attribute 'connected'" + in str(error)): + url = "https://github.com/websocket-client" +\ + "/websocket-client/issues/449" + self.logger.warning( + "Websocket closing error: issue" + + url) + self._on_close() + else: + self.logger.error(traceback.format_exc(5, True)) + self.logger.error("{0} {1}".format(self, error)) + self.logger.error("{0} {1}".format(error, type(error))) + self._on_close() + raise HubError(error) + + def on_message(self, raw_message): + self.logger.debug("Message received{0}".format(raw_message)) + self.connection_checker.last_message = time.time() + if not self.handshake_received: + messages = self.evaluate_handshake(raw_message) + if self._on_open is not None and callable(self._on_open): + self.state = ConnectionState.connected + self._on_open() + + if len(messages) > 0: + return self._on_message(messages) + + return [] + + return self._on_message( + self.protocol.parse_messages(raw_message)) + + def send(self, message): + self.logger.debug("Sending message {0}".format(message)) + try: + self._ws.send( + self.protocol.encode(message), + opcode=0x2 + if type(self.protocol) == MessagePackHubProtocol else + 0x1) + self.connection_checker.last_message = time.time() + if self.reconnection_handler is not None: + self.reconnection_handler.reset() + except ( + websocket._exceptions.WebSocketConnectionClosedException, + OSError) as ex: + self.handshake_received = False + self.logger.warning("Connection closed {0}".format(ex)) + self.state = ConnectionState.disconnected + if self.reconnection_handler is None: + if self._on_close is not None and\ + callable(self._on_close): + self._on_close() + raise ValueError(str(ex)) + # Connection closed + self.handle_reconnect() + except Exception as ex: + raise ex + + def handle_reconnect(self): + self.reconnection_handler.reconnecting = True + try: + self.stop() + self.start() + except Exception as ex: + self.logger.error(ex) + sleep_time = self.reconnection_handler.next() + threading.Thread( + target=self.deferred_reconnect, + args=(sleep_time,) + ).start() + + def deferred_reconnect(self, sleep_time): + time.sleep(sleep_time) + try: + if not self.connection_alive: + self.send(PingMessage()) + except Exception as ex: + self.logger.error(ex) + self.reconnection_handler.reconnecting = False + self.connection_alive = False
\ No newline at end of file |