summaryrefslogtreecommitdiffhomepage
path: root/libs/signalrcore
diff options
context:
space:
mode:
authormorpheus65535 <[email protected]>2021-05-08 10:39:00 -0400
committerGitHub <[email protected]>2021-05-08 10:39:00 -0400
commit44c51b2e2c3bffdfc0e0c447c038f6cd0bfd2cbe (patch)
treef4c16f35853afd27abb4e48fb2a72a11145360a7 /libs/signalrcore
parent72b6ab3c6a11e1c12d86563989d88d73e4e64377 (diff)
downloadbazarr-44c51b2e2c3bffdfc0e0c447c038f6cd0bfd2cbe.tar.gz
bazarr-44c51b2e2c3bffdfc0e0c447c038f6cd0bfd2cbe.zip
Added real-time sync with Sonarr v3 and Radarr v3 by feeding from SignalR feeds. You can now reduce frequency of sync tasks to something like once a day.
Diffstat (limited to 'libs/signalrcore')
-rw-r--r--libs/signalrcore/__init__.py0
-rw-r--r--libs/signalrcore/helpers.py99
-rw-r--r--libs/signalrcore/hub/__init__.py0
-rw-r--r--libs/signalrcore/hub/auth_hub_connection.py22
-rw-r--r--libs/signalrcore/hub/base_hub_connection.py238
-rw-r--r--libs/signalrcore/hub/errors.py10
-rw-r--r--libs/signalrcore/hub/handlers.py51
-rw-r--r--libs/signalrcore/hub_connection_builder.py245
-rw-r--r--libs/signalrcore/messages/__init__.py0
-rw-r--r--libs/signalrcore/messages/base_message.py15
-rw-r--r--libs/signalrcore/messages/cancel_invocation_message.py24
-rw-r--r--libs/signalrcore/messages/close_message.py32
-rw-r--r--libs/signalrcore/messages/completion_message.py77
-rw-r--r--libs/signalrcore/messages/handshake/__init__.py0
-rw-r--r--libs/signalrcore/messages/handshake/request.py5
-rw-r--r--libs/signalrcore/messages/handshake/response.py4
-rw-r--r--libs/signalrcore/messages/invocation_message.py78
-rw-r--r--libs/signalrcore/messages/message_type.py12
-rw-r--r--libs/signalrcore/messages/ping_message.py20
-rw-r--r--libs/signalrcore/messages/stream_invocation_message.py42
-rw-r--r--libs/signalrcore/messages/stream_item_message.py31
-rw-r--r--libs/signalrcore/protocol/__init__.py0
-rw-r--r--libs/signalrcore/protocol/base_hub_protocol.py60
-rw-r--r--libs/signalrcore/protocol/handshake/__init__.py0
-rw-r--r--libs/signalrcore/protocol/json_hub_protocol.py50
-rw-r--r--libs/signalrcore/protocol/messagepack_protocol.py169
-rw-r--r--libs/signalrcore/subject.py68
-rw-r--r--libs/signalrcore/transport/__init__.py0
-rw-r--r--libs/signalrcore/transport/base_transport.py29
-rw-r--r--libs/signalrcore/transport/websockets/__init__.py0
-rw-r--r--libs/signalrcore/transport/websockets/connection.py8
-rw-r--r--libs/signalrcore/transport/websockets/reconnection.py87
-rw-r--r--libs/signalrcore/transport/websockets/websocket_transport.py240
33 files changed, 1716 insertions, 0 deletions
diff --git a/libs/signalrcore/__init__.py b/libs/signalrcore/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/libs/signalrcore/__init__.py
diff --git a/libs/signalrcore/helpers.py b/libs/signalrcore/helpers.py
new file mode 100644
index 000000000..ab7bac4de
--- /dev/null
+++ b/libs/signalrcore/helpers.py
@@ -0,0 +1,99 @@
+import logging
+import urllib.parse as parse
+
+
+class Helpers:
+ @staticmethod
+ def configure_logger(level=logging.INFO, handler=None):
+ logger = Helpers.get_logger()
+ if handler is None:
+ handler = logging.StreamHandler()
+ handler.setFormatter(
+ logging.Formatter(
+ '%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
+ handler.setLevel(level)
+ logger.addHandler(handler)
+ logger.setLevel(level)
+
+ @staticmethod
+ def get_logger():
+ return logging.getLogger("SignalRCoreClient")
+
+ @staticmethod
+ def has_querystring(url):
+ return "?" in url
+
+ @staticmethod
+ def split_querystring(url):
+ parts = url.split("?")
+ return parts[0], parts[1]
+
+ @staticmethod
+ def replace_scheme(
+ url,
+ root_scheme,
+ source,
+ secure_source,
+ destination,
+ secure_destination):
+ url_parts = parse.urlsplit(url)
+
+ if root_scheme not in url_parts.scheme:
+ if url_parts.scheme == secure_source:
+ url_parts = url_parts._replace(scheme=secure_destination)
+ if url_parts.scheme == source:
+ url_parts = url_parts._replace(scheme=destination)
+
+ return parse.urlunsplit(url_parts)
+
+ @staticmethod
+ def websocket_to_http(url):
+ return Helpers.replace_scheme(
+ url,
+ "http",
+ "ws",
+ "wss",
+ "http",
+ "https")
+
+ @staticmethod
+ def http_to_websocket(url):
+ return Helpers.replace_scheme(
+ url,
+ "ws",
+ "http",
+ "https",
+ "ws",
+ "wss"
+ )
+
+ @staticmethod
+ def get_negotiate_url(url):
+ querystring = ""
+ if Helpers.has_querystring(url):
+ url, querystring = Helpers.split_querystring(url)
+
+ url_parts = parse.urlsplit(Helpers.websocket_to_http(url))
+
+ negotiate_suffix = "negotiate"\
+ if url_parts.path.endswith('/')\
+ else "/negotiate"
+
+ url_parts = url_parts._replace(path=url_parts.path + negotiate_suffix)
+
+ return parse.urlunsplit(url_parts) \
+ if querystring == "" else\
+ parse.urlunsplit(url_parts) + "?" + querystring
+
+ @staticmethod
+ def encode_connection_id(url, id):
+ url_parts = parse.urlsplit(url)
+ query_string_parts = parse.parse_qs(url_parts.query)
+ query_string_parts["id"] = id
+
+ url_parts = url_parts._replace(
+ query=parse.urlencode(
+ query_string_parts,
+ doseq=True))
+
+ return Helpers.http_to_websocket(parse.urlunsplit(url_parts))
diff --git a/libs/signalrcore/hub/__init__.py b/libs/signalrcore/hub/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/libs/signalrcore/hub/__init__.py
diff --git a/libs/signalrcore/hub/auth_hub_connection.py b/libs/signalrcore/hub/auth_hub_connection.py
new file mode 100644
index 000000000..9be7d0182
--- /dev/null
+++ b/libs/signalrcore/hub/auth_hub_connection.py
@@ -0,0 +1,22 @@
+from .base_hub_connection import BaseHubConnection
+from ..helpers import Helpers
+
+
+class AuthHubConnection(BaseHubConnection):
+ def __init__(self, auth_function, headers={}, **kwargs):
+ self.headers = headers
+ self.auth_function = auth_function
+ super(AuthHubConnection, self).__init__(**kwargs)
+
+ def start(self):
+ try:
+ Helpers.get_logger().debug("Starting connection ...")
+ self.token = self.auth_function()
+ Helpers.get_logger()\
+ .debug("auth function result {0}".format(self.token))
+ self.headers["Authorization"] = "Bearer " + self.token
+ return super(AuthHubConnection, self).start()
+ except Exception as ex:
+ Helpers.get_logger().warning(self.__class__.__name__)
+ Helpers.get_logger().warning(str(ex))
+ raise ex
diff --git a/libs/signalrcore/hub/base_hub_connection.py b/libs/signalrcore/hub/base_hub_connection.py
new file mode 100644
index 000000000..c9325bf63
--- /dev/null
+++ b/libs/signalrcore/hub/base_hub_connection.py
@@ -0,0 +1,238 @@
+import websocket
+import threading
+import requests
+import traceback
+import uuid
+import time
+import ssl
+from typing import Callable
+from signalrcore.messages.message_type import MessageType
+from signalrcore.messages.stream_invocation_message\
+ import StreamInvocationMessage
+from signalrcore.messages.ping_message import PingMessage
+from .errors import UnAuthorizedHubError, HubError, HubConnectionError
+from signalrcore.helpers import Helpers
+from .handlers import StreamHandler, InvocationHandler
+from ..protocol.messagepack_protocol import MessagePackHubProtocol
+from ..transport.websockets.websocket_transport import WebsocketTransport
+from ..helpers import Helpers
+from ..subject import Subject
+from ..messages.invocation_message import InvocationMessage
+
+class BaseHubConnection(object):
+ def __init__(
+ self,
+ url,
+ protocol,
+ headers={},
+ **kwargs):
+ self.headers = headers
+ self.logger = Helpers.get_logger()
+ self.handlers = []
+ self.stream_handlers = []
+ self._on_error = lambda error: self.logger.info(
+ "on_error not defined {0}".format(error))
+ self.transport = WebsocketTransport(
+ url=url,
+ protocol=protocol,
+ headers=headers,
+ on_message=self.on_message,
+ **kwargs)
+
+ def start(self):
+ self.logger.debug("Connection started")
+ return self.transport.start()
+
+ def stop(self):
+ self.logger.debug("Connection stop")
+ return self.transport.stop()
+
+ def on_close(self, callback):
+ """Configures on_close connection callback.
+ It will be raised on connection closed event
+ connection.on_close(lambda: print("connection closed"))
+ Args:
+ callback (function): function without params
+ """
+ self.transport.on_close_callback(callback)
+
+ def on_open(self, callback):
+ """Configures on_open connection callback.
+ It will be raised on connection open event
+ connection.on_open(lambda: print(
+ "connection opened "))
+ Args:
+ callback (function): funciton without params
+ """
+ self.transport.on_open_callback(callback)
+
+ def on_error(self, callback):
+ """Configures on_error connection callback. It will be raised
+ if any hub method throws an exception.
+ connection.on_error(lambda data:
+ print(f"An exception was thrown closed{data.error}"))
+ Args:
+ callback (function): function with one parameter.
+ A CompletionMessage object.
+ """
+ self._on_error = callback
+
+ def on(self, event, callback_function: Callable):
+ """Register a callback on the specified event
+ Args:
+ event (string): Event name
+ callback_function (Function): callback function,
+ arguments will be binded
+ """
+ self.logger.debug("Handler registered started {0}".format(event))
+ self.handlers.append((event, callback_function))
+
+ def send(self, method, arguments, on_invocation=None):
+ """Sends a message
+
+ Args:
+ method (string): Method name
+ arguments (list|Subject): Method parameters
+ on_invocation (function, optional): On invocation send callback
+ will be raised on send server function ends. Defaults to None.
+
+ Raises:
+ HubConnectionError: If hub is not ready to send
+ TypeError: If arguments are invalid list or Subject
+ """
+ if not self.transport.is_running():
+ raise HubConnectionError(
+ "Hub is not running you cand send messages")
+
+ if type(arguments) is not list and type(arguments) is not Subject:
+ raise TypeError("Arguments of a message must be a list or subject")
+
+ if type(arguments) is list:
+ message = InvocationMessage(
+ str(uuid.uuid4()),
+ method,
+ arguments,
+ headers=self.headers)
+
+ if on_invocation:
+ self.stream_handlers.append(
+ InvocationHandler(
+ message.invocation_id,
+ on_invocation))
+
+ self.transport.send(message)
+
+ if type(arguments) is Subject:
+ arguments.connection = self
+ arguments.target = method
+ arguments.start()
+
+
+ def on_message(self, messages):
+ for message in messages:
+ if message.type == MessageType.invocation_binding_failure:
+ self.logger.error(message)
+ self._on_error(message)
+ continue
+
+ if message.type == MessageType.ping:
+ continue
+
+ if message.type == MessageType.invocation:
+ fired_handlers = list(
+ filter(
+ lambda h: h[0] == message.target,
+ self.handlers))
+ if len(fired_handlers) == 0:
+ self.logger.warning(
+ "event '{0}' hasn't fire any handler".format(
+ message.target))
+ for _, handler in fired_handlers:
+ handler(message.arguments)
+
+ if message.type == MessageType.close:
+ self.logger.info("Close message received from server")
+ self.stop()
+ return
+
+ if message.type == MessageType.completion:
+ if message.error is not None and len(message.error) > 0:
+ self._on_error(message)
+
+ # Send callbacks
+ fired_handlers = list(
+ filter(
+ lambda h: h.invocation_id == message.invocation_id,
+ self.stream_handlers))
+
+ # Stream callbacks
+ for handler in fired_handlers:
+ handler.complete_callback(message)
+
+ # unregister handler
+ self.stream_handlers = list(
+ filter(
+ lambda h: h.invocation_id != message.invocation_id,
+ self.stream_handlers))
+
+ if message.type == MessageType.stream_item:
+ fired_handlers = list(
+ filter(
+ lambda h: h.invocation_id == message.invocation_id,
+ self.stream_handlers))
+ if len(fired_handlers) == 0:
+ self.logger.warning(
+ "id '{0}' hasn't fire any stream handler".format(
+ message.invocation_id))
+ for handler in fired_handlers:
+ handler.next_callback(message.item)
+
+ if message.type == MessageType.stream_invocation:
+ pass
+
+ if message.type == MessageType.cancel_invocation:
+ fired_handlers = list(
+ filter(
+ lambda h: h.invocation_id == message.invocation_id,
+ self.stream_handlers))
+ if len(fired_handlers) == 0:
+ self.logger.warning(
+ "id '{0}' hasn't fire any stream handler".format(
+ message.invocation_id))
+
+ for handler in fired_handlers:
+ handler.error_callback(message)
+
+ # unregister handler
+ self.stream_handlers = list(
+ filter(
+ lambda h: h.invocation_id != message.invocation_id,
+ self.stream_handlers))
+
+ def stream(self, event, event_params):
+ """Starts server streaming
+ connection.stream(
+ "Counter",
+ [len(self.items), 500])\
+ .subscribe({
+ "next": self.on_next,
+ "complete": self.on_complete,
+ "error": self.on_error
+ })
+ Args:
+ event (string): Method Name
+ event_params (list): Method parameters
+
+ Returns:
+ [StreamHandler]: stream handler
+ """
+ invocation_id = str(uuid.uuid4())
+ stream_obj = StreamHandler(event, invocation_id)
+ self.stream_handlers.append(stream_obj)
+ self.transport.send(
+ StreamInvocationMessage(
+ invocation_id,
+ event,
+ event_params,
+ headers=self.headers))
+ return stream_obj \ No newline at end of file
diff --git a/libs/signalrcore/hub/errors.py b/libs/signalrcore/hub/errors.py
new file mode 100644
index 000000000..354c6b8ee
--- /dev/null
+++ b/libs/signalrcore/hub/errors.py
@@ -0,0 +1,10 @@
+class HubError(OSError):
+ pass
+
+class UnAuthorizedHubError(HubError):
+ pass
+
+class HubConnectionError(ValueError):
+ """Hub connection error
+ """
+ pass
diff --git a/libs/signalrcore/hub/handlers.py b/libs/signalrcore/hub/handlers.py
new file mode 100644
index 000000000..35e558cc7
--- /dev/null
+++ b/libs/signalrcore/hub/handlers.py
@@ -0,0 +1,51 @@
+import logging
+
+from typing import Callable
+from ..helpers import Helpers
+class StreamHandler(object):
+ def __init__(self, event: str, invocation_id: str):
+ self.event = event
+ self.invocation_id = invocation_id
+ self.logger = Helpers.get_logger()
+ self.next_callback =\
+ lambda _: self.logger.warning(
+ "next stream handler fired, no callback configured")
+ self.complete_callback =\
+ lambda _: self.logger.warning(
+ "next complete handler fired, no callback configured")
+ self.error_callback =\
+ lambda _: self.logger.warning(
+ "next error handler fired, no callback configured")
+
+ def subscribe(self, subscribe_callbacks: dict):
+ error =\
+ " subscribe object must be a dict like {0}"\
+ .format({
+ "next": None,
+ "complete": None,
+ "error": None
+ })
+
+ if subscribe_callbacks is None or\
+ type(subscribe_callbacks) is not dict:
+ raise TypeError(error)
+
+ if "next" not in subscribe_callbacks or\
+ "complete" not in subscribe_callbacks \
+ or "error" not in subscribe_callbacks:
+ raise KeyError(error)
+
+ if not callable(subscribe_callbacks["next"])\
+ or not callable(subscribe_callbacks["next"]) \
+ or not callable(subscribe_callbacks["next"]):
+ raise ValueError("Suscribe callbacks must be functions")
+
+ self.next_callback = subscribe_callbacks["next"]
+ self.complete_callback = subscribe_callbacks["complete"]
+ self.error_callback = subscribe_callbacks["error"]
+
+
+class InvocationHandler(object):
+ def __init__(self, invocation_id: str, complete_callback: Callable):
+ self.invocation_id = invocation_id
+ self.complete_callback = complete_callback
diff --git a/libs/signalrcore/hub_connection_builder.py b/libs/signalrcore/hub_connection_builder.py
new file mode 100644
index 000000000..1d38cfa8e
--- /dev/null
+++ b/libs/signalrcore/hub_connection_builder.py
@@ -0,0 +1,245 @@
+import uuid
+from .hub.base_hub_connection import BaseHubConnection
+from .hub.auth_hub_connection import AuthHubConnection
+from .transport.websockets.reconnection import \
+ IntervalReconnectionHandler, RawReconnectionHandler, ReconnectionType
+from .helpers import Helpers
+from .messages.invocation_message import InvocationMessage
+from .protocol.json_hub_protocol import JsonHubProtocol
+from .subject import Subject
+
+
+class HubConnectionBuilder(object):
+ """
+ Hub connection class, manages handshake and messaging
+
+ Args:
+ hub_url: SignalR core url
+
+ Raises:
+ HubConnectionError: Raises an Exception if url is empty or None
+ """
+
+ def __init__(self):
+ self.hub_url = None
+ self.hub = None
+ self.options = {
+ "access_token_factory": None
+ }
+ self.token = None
+ self.headers = None
+ self.negotiate_headers = None
+ self.has_auth_configured = None
+ self.protocol = None
+ self.reconnection_handler = None
+ self.keep_alive_interval = None
+ self.verify_ssl = True
+ self.enable_trace = False # socket trace
+ self.skip_negotiation = False # By default do not skip negotiation
+ self.running = False
+
+ def with_url(
+ self,
+ hub_url: str,
+ options: dict = None):
+ """Configure the hub url and options like negotiation and auth function
+
+ def login(self):
+ response = requests.post(
+ self.login_url,
+ json={
+ "username": self.email,
+ "password": self.password
+ },verify=False)
+ return response.json()["token"]
+
+ self.connection = HubConnectionBuilder()\
+ .with_url(self.server_url,
+ options={
+ "verify_ssl": False,
+ "access_token_factory": self.login,
+ "headers": {
+ "mycustomheader": "mycustomheadervalue"
+ }
+ })\
+ .configure_logging(logging.ERROR)\
+ .with_automatic_reconnect({
+ "type": "raw",
+ "keep_alive_interval": 10,
+ "reconnect_interval": 5,
+ "max_attempts": 5
+ }).build()
+
+ Args:
+ hub_url (string): Hub URL
+ options ([dict], optional): [description]. Defaults to None.
+
+ Raises:
+ ValueError: If url is invalid
+ TypeError: If options are not a dict or auth function
+ is not callable
+
+ Returns:
+ [HubConnectionBuilder]: configured connection
+ """
+ if hub_url is None or hub_url.strip() == "":
+ raise ValueError("hub_url must be a valid url.")
+
+ if options is not None and type(options) != dict:
+ raise TypeError(
+ "options must be a dict {0}.".format(self.options))
+
+ if options is not None \
+ and "access_token_factory" in options.keys()\
+ and not callable(options["access_token_factory"]):
+ raise TypeError(
+ "access_token_factory must be a function without params")
+
+ if options is not None:
+ self.has_auth_configured = \
+ "access_token_factory" in options.keys()\
+ and callable(options["access_token_factory"])
+
+ self.skip_negotiation = "skip_negotiation" in options.keys()\
+ and options["skip_negotiation"]
+
+ self.hub_url = hub_url
+ self.hub = None
+ self.options = self.options if options is None else options
+ return self
+
+ def configure_logging(
+ self, logging_level, socket_trace=False, handler=None):
+ """Configures signalr logging
+
+ Args:
+ logging_level ([type]): logging.INFO | logging.DEBUG ...
+ from python logging class
+ socket_trace (bool, optional): Enables socket package trace.
+ Defaults to False.
+ handler ([type], optional): Custom logging handler.
+ Defaults to None.
+
+ Returns:
+ [HubConnectionBuilder]: Instance hub with logging configured
+ """
+ Helpers.configure_logger(logging_level, handler)
+ self.enable_trace = socket_trace
+ return self
+
+ def with_hub_protocol(self, protocol):
+ """Changes transport protocol
+ from signalrcore.protocol.messagepack_protocol\
+ import MessagePackHubProtocol
+
+ HubConnectionBuilder()\
+ .with_url(self.server_url, options={"verify_ssl":False})\
+ ...
+ .with_hub_protocol(MessagePackHubProtocol())\
+ ...
+ .build()
+ Args:
+ protocol (JsonHubProtocol|MessagePackHubProtocol):
+ protocol instance
+
+ Returns:
+ HubConnectionBuilder: instance configured
+ """
+ self.protocol = protocol
+ return self
+
+ def build(self):
+ """Configures the connection hub
+
+ Raises:
+ TypeError: Checks parameters an raises TypeError
+ if one of them is wrong
+
+ Returns:
+ [HubConnectionBuilder]: [self object for fluent interface purposes]
+ """
+ if self.protocol is None:
+ self.protocol = JsonHubProtocol()
+ self.headers = {}
+
+ if "headers" in self.options.keys()\
+ and type(self.options["headers"]) is dict:
+ self.headers = self.options["headers"]
+
+ if self.has_auth_configured:
+ auth_function = self.options["access_token_factory"]
+ if auth_function is None or not callable(auth_function):
+ raise TypeError(
+ "access_token_factory is not function")
+ if "verify_ssl" in self.options.keys()\
+ and type(self.options["verify_ssl"]) is bool:
+ self.verify_ssl = self.options["verify_ssl"]
+
+ return AuthHubConnection(
+ headers=self.headers,
+ auth_function=auth_function,
+ url=self.hub_url,
+ protocol=self.protocol,
+ keep_alive_interval=self.keep_alive_interval,
+ reconnection_handler=self.reconnection_handler,
+ verify_ssl=self.verify_ssl,
+ skip_negotiation=self.skip_negotiation,
+ enable_trace=self.enable_trace)\
+ if self.has_auth_configured else\
+ BaseHubConnection(
+ url=self.hub_url,
+ protocol=self.protocol,
+ keep_alive_interval=self.keep_alive_interval,
+ reconnection_handler=self.reconnection_handler,
+ headers=self.headers,
+ verify_ssl=self.verify_ssl,
+ skip_negotiation=self.skip_negotiation,
+ enable_trace=self.enable_trace)
+
+ def with_automatic_reconnect(self, data: dict):
+ """Configures automatic reconnection
+ https://devblogs.microsoft.com/aspnet/asp-net-core-updates-in-net-core-3-0-preview-4/
+
+ hub = HubConnectionBuilder()\
+ .with_url(self.server_url, options={"verify_ssl":False})\
+ .configure_logging(logging.ERROR)\
+ .with_automatic_reconnect({
+ "type": "raw",
+ "keep_alive_interval": 10,
+ "reconnect_interval": 5,
+ "max_attempts": 5
+ })\
+ .build()
+
+ Args:
+ data (dict): [dict with autmatic reconnection parameters]
+
+ Returns:
+ [HubConnectionBuilder]: [self object for fluent interface purposes]
+ """
+ reconnect_type = data.get("type", "raw")
+
+ # Infinite reconnect attempts
+ max_attempts = data.get("max_attempts", None)
+
+ # 5 sec interval
+ reconnect_interval = data.get("reconnect_interval", 5)
+
+ keep_alive_interval = data.get("keep_alive_interval", 15)
+
+ intervals = data.get("intervals", []) # Reconnection intervals
+
+ self.keep_alive_interval = keep_alive_interval
+
+ reconnection_type = ReconnectionType[reconnect_type]
+
+ if reconnection_type == ReconnectionType.raw:
+ self.reconnection_handler = RawReconnectionHandler(
+ reconnect_interval,
+ max_attempts
+ )
+ if reconnection_type == ReconnectionType.interval:
+ self.reconnection_handler = IntervalReconnectionHandler(
+ intervals
+ )
+ return self
diff --git a/libs/signalrcore/messages/__init__.py b/libs/signalrcore/messages/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/libs/signalrcore/messages/__init__.py
diff --git a/libs/signalrcore/messages/base_message.py b/libs/signalrcore/messages/base_message.py
new file mode 100644
index 000000000..98e7d5082
--- /dev/null
+++ b/libs/signalrcore/messages/base_message.py
@@ -0,0 +1,15 @@
+from .message_type import MessageType
+
+
+class BaseMessage(object):
+ def __init__(self, message_type, **kwargs):
+ self.type = MessageType(message_type)
+
+
+class BaseHeadersMessage(BaseMessage):
+ """
+ All messages expct ping can carry aditional headers
+ """
+ def __init__(self, message_type, headers={}, **kwargs):
+ super(BaseHeadersMessage, self).__init__(message_type)
+ self.headers = headers
diff --git a/libs/signalrcore/messages/cancel_invocation_message.py b/libs/signalrcore/messages/cancel_invocation_message.py
new file mode 100644
index 000000000..4f5f88ff3
--- /dev/null
+++ b/libs/signalrcore/messages/cancel_invocation_message.py
@@ -0,0 +1,24 @@
+from .base_message import BaseHeadersMessage
+"""
+A `CancelInvocation` message is a JSON object with the following properties
+
+* `type` - A `Number` with the literal value `5`,
+ indicating that this message is a `CancelInvocation`.
+* `invocationId` - A `String` encoding the `Invocation ID` for a message.
+
+Example
+```json
+{
+ "type": 5,
+ "invocationId": "123"
+}
+"""
+
+
+class CancelInvocationMessage(BaseHeadersMessage):
+ def __init__(
+ self,
+ invocation_id,
+ **kwargs):
+ super(CancelInvocationMessage, self).__init__(5, **kwargs)
+ self.invocation_id = invocation_id
diff --git a/libs/signalrcore/messages/close_message.py b/libs/signalrcore/messages/close_message.py
new file mode 100644
index 000000000..7a462e993
--- /dev/null
+++ b/libs/signalrcore/messages/close_message.py
@@ -0,0 +1,32 @@
+from .base_message import BaseHeadersMessage
+"""
+A `Close` message is a JSON object with the following properties
+
+* `type` - A `Number` with the literal value `7`,
+ indicating that this message is a `Close`.
+* `error` - An optional `String` encoding the error message.
+
+Example - A `Close` message without an error
+```json
+{
+ "type": 7
+}
+```
+
+Example - A `Close` message with an error
+```json
+{
+ "type": 7,
+ "error": "Connection closed because of an error!"
+}
+```
+"""
+
+
+class CloseMessage(BaseHeadersMessage):
+ def __init__(
+ self,
+ error,
+ **kwargs):
+ super(CloseMessage, self).__init__(7, **kwargs)
+ self.error = error
diff --git a/libs/signalrcore/messages/completion_message.py b/libs/signalrcore/messages/completion_message.py
new file mode 100644
index 000000000..7e8d9141f
--- /dev/null
+++ b/libs/signalrcore/messages/completion_message.py
@@ -0,0 +1,77 @@
+from .base_message import BaseHeadersMessage
+"""
+A `Completion` message is a JSON object with the following properties
+
+* `type` - A `Number` with the literal value `3`,
+ indicating that this message is a `Completion`.
+* `invocationId` - A `String` encoding the `Invocation ID` for a message.
+* `result` - A `Token` encoding the result value
+ (see "JSON Payload Encoding" for details).
+ This field is **ignored** if `error` is present.
+* `error` - A `String` encoding the error message.
+
+It is a protocol error to include both a `result` and an `error` property
+ in the `Completion` message. A conforming endpoint may immediately
+ terminate the connection upon receiving such a message.
+
+Example - A `Completion` message with no result or error
+
+```json
+{
+ "type": 3,
+ "invocationId": "123"
+}
+```
+
+Example - A `Completion` message with a result
+
+```json
+{
+ "type": 3,
+ "invocationId": "123",
+ "result": 42
+}
+```
+
+Example - A `Completion` message with an error
+
+```json
+{
+ "type": 3,
+ "invocationId": "123",
+ "error": "It didn't work!"
+}
+```
+
+Example - The following `Completion` message is a protocol error
+ because it has both of `result` and `error`
+
+```json
+{
+ "type": 3,
+ "invocationId": "123",
+ "result": 42,
+ "error": "It didn't work!"
+}
+```
+"""
+
+
+class CompletionClientStreamMessage(BaseHeadersMessage):
+ def __init__(
+ self, invocation_id, **kwargs):
+ super(CompletionClientStreamMessage, self).__init__(3, **kwargs)
+ self.invocation_id = invocation_id
+
+
+class CompletionMessage(BaseHeadersMessage):
+ def __init__(
+ self,
+ invocation_id,
+ result,
+ error,
+ **kwargs):
+ super(CompletionMessage, self).__init__(3, **kwargs)
+ self.invocation_id = invocation_id
+ self.result = result
+ self.error = error
diff --git a/libs/signalrcore/messages/handshake/__init__.py b/libs/signalrcore/messages/handshake/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/libs/signalrcore/messages/handshake/__init__.py
diff --git a/libs/signalrcore/messages/handshake/request.py b/libs/signalrcore/messages/handshake/request.py
new file mode 100644
index 000000000..1d55110c8
--- /dev/null
+++ b/libs/signalrcore/messages/handshake/request.py
@@ -0,0 +1,5 @@
+class HandshakeRequestMessage(object):
+
+ def __init__(self, protocol, version):
+ self.protocol = protocol
+ self.version = version
diff --git a/libs/signalrcore/messages/handshake/response.py b/libs/signalrcore/messages/handshake/response.py
new file mode 100644
index 000000000..0dc1460b0
--- /dev/null
+++ b/libs/signalrcore/messages/handshake/response.py
@@ -0,0 +1,4 @@
+class HandshakeResponseMessage(object):
+
+ def __init__(self, error):
+ self.error = error
diff --git a/libs/signalrcore/messages/invocation_message.py b/libs/signalrcore/messages/invocation_message.py
new file mode 100644
index 000000000..6de5364d5
--- /dev/null
+++ b/libs/signalrcore/messages/invocation_message.py
@@ -0,0 +1,78 @@
+from .base_message import BaseHeadersMessage
+"""
+
+An `Invocation` message is a JSON object with the following properties:
+
+* `type` - A `Number` with the literal value 1, indicating that this message
+ is an Invocation.
+* `invocationId` - An optional `String` encoding the `Invocation ID`
+ for a message.
+* `target` - A `String` encoding the `Target` name, as expected by the Callee's
+ Binder
+* `arguments` - An `Array` containing arguments to apply to the method
+ referred to in Target. This is a sequence of JSON `Token`s,
+ encoded as indicated below in the "JSON Payload Encoding" section
+
+Example:
+
+```json
+{
+ "type": 1,
+ "invocationId": "123",
+ "target": "Send",
+ "arguments": [
+ 42,
+ "Test Message"
+ ]
+}
+```
+Example (Non-Blocking):
+
+```json
+{
+ "type": 1,
+ "target": "Send",
+ "arguments": [
+ 42,
+ "Test Message"
+ ]
+}
+```
+
+"""
+
+
+class InvocationMessage(BaseHeadersMessage):
+ def __init__(
+ self,
+ invocation_id,
+ target,
+ arguments, **kwargs):
+ super(InvocationMessage, self).__init__(1, **kwargs)
+ self.invocation_id = invocation_id
+ self.target = target
+ self.arguments = arguments
+
+ def __repr__(self):
+ repr_str =\
+ "InvocationMessage: invocation_id {0}, target {1}, arguments {2}"
+ return repr_str.format(self.invocation_id, self.target, self.arguments)
+
+
+class InvocationClientStreamMessage(BaseHeadersMessage):
+ def __init__(
+ self,
+ stream_ids,
+ target,
+ arguments,
+ **kwargs):
+ super(InvocationClientStreamMessage, self).__init__(1, **kwargs)
+ self.target = target
+ self.arguments = arguments
+ self.stream_ids = stream_ids
+
+ def __repr__(self):
+ repr_str =\
+ "InvocationMessage: stream_ids {0}, target {1}, arguments {2}"
+ return repr_str.format(
+ self.stream_ids, self.target, self.arguments)
diff --git a/libs/signalrcore/messages/message_type.py b/libs/signalrcore/messages/message_type.py
new file mode 100644
index 000000000..155315365
--- /dev/null
+++ b/libs/signalrcore/messages/message_type.py
@@ -0,0 +1,12 @@
+from enum import Enum
+
+
+class MessageType(Enum):
+ invocation = 1
+ stream_item = 2
+ completion = 3
+ stream_invocation = 4
+ cancel_invocation = 5
+ ping = 6
+ close = 7
+ invocation_binding_failure = -1
diff --git a/libs/signalrcore/messages/ping_message.py b/libs/signalrcore/messages/ping_message.py
new file mode 100644
index 000000000..c184c9d7e
--- /dev/null
+++ b/libs/signalrcore/messages/ping_message.py
@@ -0,0 +1,20 @@
+from .base_message import BaseMessage
+"""
+A `Ping` message is a JSON object with the following properties:
+
+* `type` - A `Number` with the literal value `6`,
+ indicating that this message is a `Ping`.
+
+Example
+```json
+{
+ "type": 6
+}
+```
+"""
+
+
+class PingMessage(BaseMessage):
+ def __init__(
+ self, **kwargs):
+ super(PingMessage, self).__init__(6, **kwargs)
diff --git a/libs/signalrcore/messages/stream_invocation_message.py b/libs/signalrcore/messages/stream_invocation_message.py
new file mode 100644
index 000000000..e78759495
--- /dev/null
+++ b/libs/signalrcore/messages/stream_invocation_message.py
@@ -0,0 +1,42 @@
+from .base_message import BaseHeadersMessage
+"""
+A `StreamInvocation` message is a JSON object with the following properties:
+
+* `type` - A `Number` with the literal value 4, indicating that
+ this message is a StreamInvocation.
+* `invocationId` - A `String` encoding the `Invocation ID` for a message.
+* `target` - A `String` encoding the `Target` name, as expected
+ by the Callee's Binder.
+* `arguments` - An `Array` containing arguments to apply to
+ the method referred to in Target. This is a sequence of JSON
+ `Token`s, encoded as indicated below in the
+ "JSON Payload Encoding" section.
+
+Example:
+
+```json
+{
+ "type": 4,
+ "invocationId": "123",
+ "target": "Send",
+ "arguments": [
+ 42,
+ "Test Message"
+ ]
+}
+```
+"""
+
+
+class StreamInvocationMessage(BaseHeadersMessage):
+ def __init__(
+ self,
+ invocation_id,
+ target,
+ arguments,
+ **kwargs):
+ super(StreamInvocationMessage, self).__init__(4, **kwargs)
+ self.invocation_id = invocation_id
+ self.target = target
+ self.arguments = arguments
+ self.stream_ids = []
diff --git a/libs/signalrcore/messages/stream_item_message.py b/libs/signalrcore/messages/stream_item_message.py
new file mode 100644
index 000000000..415090f4c
--- /dev/null
+++ b/libs/signalrcore/messages/stream_item_message.py
@@ -0,0 +1,31 @@
+from .base_message import BaseHeadersMessage
+"""
+A `StreamItem` message is a JSON object with the following properties:
+
+* `type` - A `Number` with the literal value 2, indicating
+ that this message is a `StreamItem`.
+* `invocationId` - A `String` encoding the `Invocation ID` for a message.
+* `item` - A `Token` encoding the stream item
+ (see "JSON Payload Encoding" for details).
+
+Example
+
+```json
+{
+ "type": 2,
+ "invocationId": "123",
+ "item": 42
+}
+```
+"""
+
+
+class StreamItemMessage(BaseHeadersMessage):
+ def __init__(
+ self,
+ invocation_id,
+ item,
+ **kwargs):
+ super(StreamItemMessage, self).__init__(2, **kwargs)
+ self.invocation_id = invocation_id
+ self.item = item
diff --git a/libs/signalrcore/protocol/__init__.py b/libs/signalrcore/protocol/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/libs/signalrcore/protocol/__init__.py
diff --git a/libs/signalrcore/protocol/base_hub_protocol.py b/libs/signalrcore/protocol/base_hub_protocol.py
new file mode 100644
index 000000000..5e2b91877
--- /dev/null
+++ b/libs/signalrcore/protocol/base_hub_protocol.py
@@ -0,0 +1,60 @@
+import json
+
+from ..messages.handshake.request import HandshakeRequestMessage
+from ..messages.handshake.response import HandshakeResponseMessage
+from ..messages.invocation_message import InvocationMessage # 1
+from ..messages.stream_item_message import StreamItemMessage # 2
+from ..messages.completion_message import CompletionMessage # 3
+from ..messages.stream_invocation_message import StreamInvocationMessage # 4
+from ..messages.cancel_invocation_message import CancelInvocationMessage # 5
+from ..messages.ping_message import PingMessage # 6
+from ..messages.close_message import CloseMessage # 7
+from ..messages.message_type import MessageType
+from ..helpers import Helpers
+
+class BaseHubProtocol(object):
+ def __init__(self, protocol, version, transfer_format, record_separator):
+ self.protocol = protocol
+ self.version = version
+ self.transfer_format = transfer_format
+ self.record_separator = record_separator
+
+ @staticmethod
+ def get_message(dict_message):
+ message_type = MessageType.close\
+ if not "type" in dict_message.keys() else MessageType(dict_message["type"])
+
+ dict_message["invocation_id"] = dict_message.get("invocationId", None)
+ dict_message["headers"] = dict_message.get("headers", {})
+ dict_message["error"] = dict_message.get("error", None)
+ dict_message["result"] = dict_message.get("result", None)
+ if message_type is MessageType.invocation:
+ return InvocationMessage(**dict_message)
+ if message_type is MessageType.stream_item:
+ return StreamItemMessage(**dict_message)
+ if message_type is MessageType.completion:
+ return CompletionMessage(**dict_message)
+ if message_type is MessageType.stream_invocation:
+ return StreamInvocationMessage(**dict_message)
+ if message_type is MessageType.cancel_invocation:
+ return CancelInvocationMessage(**dict_message)
+ if message_type is MessageType.ping:
+ return PingMessage()
+ if message_type is MessageType.close:
+ return CloseMessage(**dict_message)
+
+ def decode_handshake(self, raw_message: str) -> HandshakeResponseMessage:
+ messages = raw_message.split(self.record_separator)
+ messages = list(filter(lambda x: x != "", messages))
+ data = json.loads(messages[0])
+ idx = raw_message.index(self.record_separator)
+ return HandshakeResponseMessage(data.get("error", None)), self.parse_messages(raw_message[idx + 1 :]) if len(messages) > 1 else []
+
+ def handshake_message(self) -> HandshakeRequestMessage:
+ return HandshakeRequestMessage(self.protocol, self.version)
+
+ def parse_messages(self, raw_message: str):
+ raise ValueError("Protocol must implement this method")
+
+ def write_message(self, hub_message):
+ raise ValueError("Protocol must implement this method")
diff --git a/libs/signalrcore/protocol/handshake/__init__.py b/libs/signalrcore/protocol/handshake/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/libs/signalrcore/protocol/handshake/__init__.py
diff --git a/libs/signalrcore/protocol/json_hub_protocol.py b/libs/signalrcore/protocol/json_hub_protocol.py
new file mode 100644
index 000000000..bcd6f9ae4
--- /dev/null
+++ b/libs/signalrcore/protocol/json_hub_protocol.py
@@ -0,0 +1,50 @@
+import json
+
+from .base_hub_protocol import BaseHubProtocol
+
+from ..messages.message_type import MessageType
+from json import JSONEncoder
+
+from signalrcore.helpers import Helpers
+
+
+class MyEncoder(JSONEncoder):
+ # https://github.com/PyCQA/pylint/issues/414
+ def default(self, o):
+ if type(o) is MessageType:
+ return o.value
+ data = o.__dict__
+ if "invocation_id" in data:
+ data["invocationId"] = data["invocation_id"]
+ del data["invocation_id"]
+ if "stream_ids" in data:
+ data["streamIds"] = data["stream_ids"]
+ del data["stream_ids"]
+ return data
+
+
+class JsonHubProtocol(BaseHubProtocol):
+ def __init__(self):
+ super(JsonHubProtocol, self).__init__("json", 1, "Text", chr(0x1E))
+ self.encoder = MyEncoder()
+
+ def parse_messages(self, raw):
+ Helpers.get_logger().debug("Raw message incomming: ")
+ Helpers.get_logger().debug(raw)
+ raw_messages = [
+ record.replace(self.record_separator, "")
+ for record in raw.split(self.record_separator)
+ if record is not None and record != ""
+ and record != self.record_separator
+ ]
+ result = []
+ for raw_message in raw_messages:
+ dict_message = json.loads(raw_message)
+ if len(dict_message.keys()) > 0:
+ result.append(self.get_message(dict_message))
+ return result
+
+ def encode(self, message):
+ Helpers.get_logger()\
+ .debug(self.encoder.encode(message) + self.record_separator)
+ return self.encoder.encode(message) + self.record_separator
diff --git a/libs/signalrcore/protocol/messagepack_protocol.py b/libs/signalrcore/protocol/messagepack_protocol.py
new file mode 100644
index 000000000..45651e4b7
--- /dev/null
+++ b/libs/signalrcore/protocol/messagepack_protocol.py
@@ -0,0 +1,169 @@
+import json
+import msgpack
+from .base_hub_protocol import BaseHubProtocol
+from ..messages.handshake.request import HandshakeRequestMessage
+from ..messages.handshake.response import HandshakeResponseMessage
+from ..messages.invocation_message\
+ import InvocationMessage, InvocationClientStreamMessage # 1
+from ..messages.stream_item_message import StreamItemMessage # 2
+from ..messages.completion_message import CompletionMessage # 3
+from ..messages.stream_invocation_message import StreamInvocationMessage # 4
+from ..messages.cancel_invocation_message import CancelInvocationMessage # 5
+from ..messages.ping_message import PingMessage # 6
+from ..messages.close_message import CloseMessage # 7
+from ..helpers import Helpers
+
+
+class MessagePackHubProtocol(BaseHubProtocol):
+
+ _priority = [
+ "type",
+ "headers",
+ "invocation_id",
+ "target",
+ "arguments",
+ "item",
+ "result_kind",
+ "result",
+ "stream_ids"
+ ]
+
+ def __init__(self):
+ super(MessagePackHubProtocol, self).__init__(
+ "messagepack", 1, "Text", chr(0x1E))
+ self.logger = Helpers.get_logger()
+
+ def parse_messages(self, raw):
+ try:
+ messages = []
+ offset = 0
+ while offset < len(raw):
+ length = msgpack.unpackb(raw[offset: offset + 1])
+ values = msgpack.unpackb(raw[offset + 1: offset + length + 1])
+ offset = offset + length + 1
+ message = self._decode_message(values)
+ messages.append(message)
+ except Exception as ex:
+ Helpers.get_logger().error("Parse messages Error {0}".format(ex))
+ Helpers.get_logger().error("raw msg '{0}'".format(raw))
+ return messages
+
+ def decode_handshake(self, raw_message):
+ try:
+ has_various_messages = 0x1E in raw_message
+ handshake_data = raw_message[0: raw_message.index(0x1E)] if has_various_messages else raw_message
+ messages = self.parse_messages(raw_message[raw_message.index(0x1E) + 1:]) if has_various_messages else []
+ data = json.loads(handshake_data)
+ return HandshakeResponseMessage(data.get("error", None)), messages
+ except Exception as ex:
+ Helpers.get_logger().error(raw_message)
+ Helpers.get_logger().error(ex)
+ raise ex
+
+ def encode(self, message):
+ if type(message) is HandshakeRequestMessage:
+ content = json.dumps(message.__dict__)
+ return content + self.record_separator
+
+ msg = self._encode_message(message)
+ encoded_message = msgpack.packb(msg)
+ varint_length = self._to_varint(len(encoded_message))
+ return varint_length + encoded_message
+
+ def _encode_message(self, message):
+ result = []
+
+ # sort attributes
+ for attribute in self._priority:
+ if hasattr(message, attribute):
+ if (attribute == "type"):
+ result.append(getattr(message, attribute).value)
+ else:
+ result.append(getattr(message, attribute))
+ return result
+
+ def _decode_message(self, raw):
+ # {} {"error"}
+ # [1, Headers, InvocationId, Target, [Arguments], [StreamIds]]
+ # [2, Headers, InvocationId, Item]
+ # [3, Headers, InvocationId, ResultKind, Result]
+ # [4, Headers, InvocationId, Target, [Arguments], [StreamIds]]
+ # [5, Headers, InvocationId]
+ # [6]
+ # [7, Error, AllowReconnect?]
+
+ if raw[0] == 1: # InvocationMessage
+ if len(raw[5]) > 0:
+ return InvocationClientStreamMessage(
+ headers=raw[1],
+ stream_ids=raw[5],
+ target=raw[3],
+ arguments=raw[4])
+ else:
+ return InvocationMessage(
+ headers=raw[1],
+ invocation_id=raw[2],
+ target=raw[3],
+ arguments=raw[4])
+
+ elif raw[0] == 2: # StreamItemMessage
+ return StreamItemMessage(
+ headers=raw[1],
+ invocation_id=raw[2],
+ item=raw[3])
+
+ elif raw[0] == 3: # CompletionMessage
+ result_kind = raw[3]
+ if result_kind == 1:
+ return CompletionMessage(
+ headers=raw[1],
+ invocation_id=raw[2],
+ result=None,
+ error=raw[4])
+
+ elif result_kind == 2:
+ return CompletionMessage(
+ headers=raw[1], invocation_id=raw[2],
+ result=None, error=None)
+
+ elif result_kind == 3:
+ return CompletionMessage(
+ headers=raw[1], invocation_id=raw[2],
+ result=raw[4], error=None)
+ else:
+ raise Exception("Unknown result kind.")
+
+ elif raw[0] == 4: # StreamInvocationMessage
+ return StreamInvocationMessage(
+ headers=raw[1], invocation_id=raw[2],
+ target=raw[3], arguments=raw[4]) # stream_id missing?
+
+ elif raw[0] == 5: # CancelInvocationMessage
+ return CancelInvocationMessage(
+ headers=raw[1], invocation_id=raw[2])
+
+ elif raw[0] == 6: # PingMessageEncoding
+ return PingMessage()
+
+ elif raw[0] == 7: # CloseMessageEncoding
+ return CloseMessage(error=raw[1]) # AllowReconnect is missing
+ print(".......................................")
+ print(raw)
+ print("---------------------------------------")
+ raise Exception("Unknown message type.")
+
+ def _to_varint(self, value):
+ buffer = b''
+
+ while True:
+
+ byte = value & 0x7f
+ value >>= 7
+
+ if value:
+ buffer += bytes((byte | 0x80, ))
+ else:
+ buffer += bytes((byte, ))
+ break
+
+ return buffer
diff --git a/libs/signalrcore/subject.py b/libs/signalrcore/subject.py
new file mode 100644
index 000000000..6561cc4de
--- /dev/null
+++ b/libs/signalrcore/subject.py
@@ -0,0 +1,68 @@
+import uuid
+import threading
+from typing import Any
+from .messages.invocation_message import InvocationClientStreamMessage
+from .messages.stream_item_message import StreamItemMessage
+from .messages.completion_message import CompletionClientStreamMessage
+
+
+class Subject(object):
+ """Client to server streaming
+ https://docs.microsoft.com/en-gb/aspnet/core/signalr/streaming?view=aspnetcore-5.0#client-to-server-streaming
+ items = list(range(0,10))
+ subject = Subject()
+ connection.send("UploadStream", subject)
+ while(len(self.items) > 0):
+ subject.next(str(self.items.pop()))
+ subject.complete()
+ """
+
+ def __init__(self):
+ self.connection = None
+ self.target = None
+ self.invocation_id = str(uuid.uuid4())
+ self.lock = threading.RLock()
+
+ def check(self):
+ """Ensures that invocation streaming object is correct
+
+ Raises:
+ ValueError: if object is not valid, exception will be raised
+ """
+ if self.connection is None\
+ or self.target is None\
+ or self.invocation_id is None:
+ raise ValueError(
+ "subject must be passed as an agument to a send function. "
+ + "hub_connection.send([method],[subject]")
+
+ def next(self, item: Any):
+ """Send next item to the server
+
+ Args:
+ item (any): Item that will be streamed
+ """
+ self.check()
+ with self.lock:
+ self.connection.transport.send(StreamItemMessage(
+ self.invocation_id,
+ item))
+
+ def start(self):
+ """Starts streaming
+ """
+ self.check()
+ with self.lock:
+ self.connection.transport.send(
+ InvocationClientStreamMessage(
+ [self.invocation_id],
+ self.target,
+ []))
+
+ def complete(self):
+ """Finish streaming
+ """
+ self.check()
+ with self.lock:
+ self.connection.transport.send(CompletionClientStreamMessage(
+ self.invocation_id))
diff --git a/libs/signalrcore/transport/__init__.py b/libs/signalrcore/transport/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/libs/signalrcore/transport/__init__.py
diff --git a/libs/signalrcore/transport/base_transport.py b/libs/signalrcore/transport/base_transport.py
new file mode 100644
index 000000000..c73a2c028
--- /dev/null
+++ b/libs/signalrcore/transport/base_transport.py
@@ -0,0 +1,29 @@
+from ..protocol.json_hub_protocol import JsonHubProtocol
+from ..helpers import Helpers
+
+class BaseTransport(object):
+ def __init__(self, protocol=JsonHubProtocol(), on_message=None):
+ self.protocol = protocol
+ self._on_message= on_message
+ self.logger = Helpers.get_logger()
+ self._on_open = lambda: self.logger.info("on_connect not defined")
+ self._on_close = lambda: self.logger.info(
+ "on_disconnect not defined")
+
+ def on_open_callback(self, callback):
+ self._on_open = callback
+
+ def on_close_callback(self, callback):
+ self._on_close = callback
+
+ def start(self): # pragma: no cover
+ raise NotImplementedError()
+
+ def stop(self): # pragma: no cover
+ raise NotImplementedError()
+
+ def is_running(self): # pragma: no cover
+ raise NotImplementedError()
+
+ def send(self, message, on_invocation = None): # pragma: no cover
+ raise NotImplementedError() \ No newline at end of file
diff --git a/libs/signalrcore/transport/websockets/__init__.py b/libs/signalrcore/transport/websockets/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/libs/signalrcore/transport/websockets/__init__.py
diff --git a/libs/signalrcore/transport/websockets/connection.py b/libs/signalrcore/transport/websockets/connection.py
new file mode 100644
index 000000000..2253f0286
--- /dev/null
+++ b/libs/signalrcore/transport/websockets/connection.py
@@ -0,0 +1,8 @@
+from enum import Enum
+
+
+class ConnectionState(Enum):
+ connecting = 0
+ connected = 1
+ reconnecting = 2
+ disconnected = 4
diff --git a/libs/signalrcore/transport/websockets/reconnection.py b/libs/signalrcore/transport/websockets/reconnection.py
new file mode 100644
index 000000000..b14b41c5b
--- /dev/null
+++ b/libs/signalrcore/transport/websockets/reconnection.py
@@ -0,0 +1,87 @@
+import threading
+import time
+from enum import Enum
+
+
+class ConnectionStateChecker(object):
+ def __init__(
+ self,
+ ping_function,
+ keep_alive_interval,
+ sleep=1):
+ self.sleep = sleep
+ self.keep_alive_interval = keep_alive_interval
+ self.last_message = time.time()
+ self.ping_function = ping_function
+ self.running = False
+ self._thread = None
+
+ def start(self):
+ self.running = True
+ self._thread = threading.Thread(target=self.run)
+ self._thread.daemon = True
+ self._thread.start()
+
+ def run(self):
+ while self.running:
+ time.sleep(self.sleep)
+ time_without_messages = time.time() - self.last_message
+ if self.keep_alive_interval < time_without_messages:
+ self.ping_function()
+
+ def stop(self):
+ self.running = False
+
+
+class ReconnectionType(Enum):
+ raw = 0 # Reconnection with max reconnections and constant sleep time
+ interval = 1 # variable sleep time
+
+
+class ReconnectionHandler(object):
+ def __init__(self):
+ self.reconnecting = False
+ self.attempt_number = 0
+ self.last_attempt = time.time()
+
+ def next(self):
+ raise NotImplementedError()
+
+ def reset(self):
+ self.attempt_number = 0
+ self.reconnecting = False
+
+
+class RawReconnectionHandler(ReconnectionHandler):
+ def __init__(self, sleep_time, max_attempts):
+ super(RawReconnectionHandler, self).__init__()
+ self.sleep_time = sleep_time
+ self.max_reconnection_attempts = max_attempts
+
+ def next(self):
+ self.reconnecting = True
+ if self.max_reconnection_attempts is not None:
+ if self.attempt_number <= self.max_reconnection_attempts:
+ self.attempt_number += 1
+ return self.sleep_time
+ else:
+ raise ValueError(
+ "Max attemps reached {0}"
+ .format(self.max_reconnection_attempts))
+ else: # Infinite reconnect
+ return self.sleep_time
+
+
+class IntervalReconnectionHandler(ReconnectionHandler):
+ def __init__(self, intervals):
+ super(IntervalReconnectionHandler, self).__init__()
+ self._intervals = intervals
+
+ def next(self):
+ self.reconnecting = True
+ index = self.attempt_number
+ self.attempt_number += 1
+ if index >= len(self._intervals):
+ raise ValueError(
+ "Max intervals reached {0}".format(self._intervals))
+ return self._intervals[index]
diff --git a/libs/signalrcore/transport/websockets/websocket_transport.py b/libs/signalrcore/transport/websockets/websocket_transport.py
new file mode 100644
index 000000000..4c05add53
--- /dev/null
+++ b/libs/signalrcore/transport/websockets/websocket_transport.py
@@ -0,0 +1,240 @@
+import websocket
+import threading
+import requests
+import traceback
+import uuid
+import time
+import ssl
+from .reconnection import ConnectionStateChecker
+from .connection import ConnectionState
+from ...messages.ping_message import PingMessage
+from ...hub.errors import HubError, HubConnectionError, UnAuthorizedHubError
+from ...protocol.messagepack_protocol import MessagePackHubProtocol
+from ...protocol.json_hub_protocol import JsonHubProtocol
+from ..base_transport import BaseTransport
+from ...helpers import Helpers
+
+class WebsocketTransport(BaseTransport):
+ def __init__(self,
+ url="",
+ headers={},
+ keep_alive_interval=15,
+ reconnection_handler=None,
+ verify_ssl=False,
+ skip_negotiation=False,
+ enable_trace=False,
+ **kwargs):
+ super(WebsocketTransport, self).__init__(**kwargs)
+ self._ws = None
+ self.enable_trace = enable_trace
+ self._thread = None
+ self.skip_negotiation = skip_negotiation
+ self.url = url
+ self.headers = headers
+ self.handshake_received = False
+ self.token = None # auth
+ self.state = ConnectionState.disconnected
+ self.connection_alive = False
+ self._thread = None
+ self._ws = None
+ self.verify_ssl = verify_ssl
+ self.connection_checker = ConnectionStateChecker(
+ lambda: self.send(PingMessage()),
+ keep_alive_interval
+ )
+ self.reconnection_handler = reconnection_handler
+
+ if len(self.logger.handlers) > 0:
+ websocket.enableTrace(self.enable_trace, self.logger.handlers[0])
+
+ def is_running(self):
+ return self.state != ConnectionState.disconnected
+
+ def stop(self):
+ if self.state == ConnectionState.connected:
+ self.connection_checker.stop()
+ self._ws.close()
+ self.state = ConnectionState.disconnected
+ self.handshake_received = False
+
+ def start(self):
+ if not self.skip_negotiation:
+ self.negotiate()
+
+ if self.state == ConnectionState.connected:
+ self.logger.warning("Already connected unable to start")
+ return False
+
+ self.state = ConnectionState.connecting
+ self.logger.debug("start url:" + self.url)
+
+ self._ws = websocket.WebSocketApp(
+ self.url,
+ header=self.headers,
+ on_message=self.on_message,
+ on_error=self.on_socket_error,
+ on_close=self.on_close,
+ on_open=self.on_open,
+ )
+
+ self._thread = threading.Thread(
+ target=lambda: self._ws.run_forever(
+ sslopt={"cert_reqs": ssl.CERT_NONE}
+ if not self.verify_ssl else {}
+ ))
+ self._thread.daemon = True
+ self._thread.start()
+ return True
+
+ def negotiate(self):
+ negotiate_url = Helpers.get_negotiate_url(self.url)
+ self.logger.debug("Negotiate url:{0}".format(negotiate_url))
+
+ response = requests.post(
+ negotiate_url, headers=self.headers, verify=self.verify_ssl)
+ self.logger.debug(
+ "Response status code{0}".format(response.status_code))
+
+ if response.status_code != 200:
+ raise HubError(response.status_code)\
+ if response.status_code != 401 else UnAuthorizedHubError()
+
+ data = response.json()
+
+ if "connectionId" in data.keys():
+ self.url = Helpers.encode_connection_id(
+ self.url, data["connectionId"])
+
+ # Azure
+ if 'url' in data.keys() and 'accessToken' in data.keys():
+ Helpers.get_logger().debug(
+ "Azure url, reformat headers, token and url {0}".format(data))
+ self.url = data["url"]\
+ if data["url"].startswith("ws") else\
+ Helpers.http_to_websocket(data["url"])
+ self.token = data["accessToken"]
+ self.headers = {"Authorization": "Bearer " + self.token}
+
+
+ def evaluate_handshake(self, message):
+ self.logger.debug("Evaluating handshake {0}".format(message))
+ msg, messages = self.protocol.decode_handshake(message)
+ if msg.error is None or msg.error == "":
+ self.handshake_received = True
+ self.state = ConnectionState.connected
+ if self.reconnection_handler is not None:
+ self.reconnection_handler.reconnecting = False
+ if not self.connection_checker.running:
+ self.connection_checker.start()
+ else:
+ self.logger.error(msg.error)
+ self.on_socket_error(msg.error)
+ self.stop()
+ raise ValueError("Handshake error {0}".format(msg.error))
+ return messages
+
+ def on_open(self):
+ self.logger.debug("-- web socket open --")
+ msg = self.protocol.handshake_message()
+ self.send(msg)
+
+ def on_close(self):
+ self.logger.debug("-- web socket close --")
+ self.state = ConnectionState.disconnected
+ if self._on_close is not None and callable(self._on_close):
+ self._on_close()
+
+ def on_socket_error(self, error):
+ """
+ Throws error related on
+ https://github.com/websocket-client/websocket-client/issues/449
+
+ Args:
+ error ([type]): [description]
+
+ Raises:
+ HubError: [description]
+ """
+ self.logger.debug("-- web socket error --")
+ if (type(error) is AttributeError and
+ "'NoneType' object has no attribute 'connected'"
+ in str(error)):
+ url = "https://github.com/websocket-client" +\
+ "/websocket-client/issues/449"
+ self.logger.warning(
+ "Websocket closing error: issue" +
+ url)
+ self._on_close()
+ else:
+ self.logger.error(traceback.format_exc(5, True))
+ self.logger.error("{0} {1}".format(self, error))
+ self.logger.error("{0} {1}".format(error, type(error)))
+ self._on_close()
+ raise HubError(error)
+
+ def on_message(self, raw_message):
+ self.logger.debug("Message received{0}".format(raw_message))
+ self.connection_checker.last_message = time.time()
+ if not self.handshake_received:
+ messages = self.evaluate_handshake(raw_message)
+ if self._on_open is not None and callable(self._on_open):
+ self.state = ConnectionState.connected
+ self._on_open()
+
+ if len(messages) > 0:
+ return self._on_message(messages)
+
+ return []
+
+ return self._on_message(
+ self.protocol.parse_messages(raw_message))
+
+ def send(self, message):
+ self.logger.debug("Sending message {0}".format(message))
+ try:
+ self._ws.send(
+ self.protocol.encode(message),
+ opcode=0x2
+ if type(self.protocol) == MessagePackHubProtocol else
+ 0x1)
+ self.connection_checker.last_message = time.time()
+ if self.reconnection_handler is not None:
+ self.reconnection_handler.reset()
+ except (
+ websocket._exceptions.WebSocketConnectionClosedException,
+ OSError) as ex:
+ self.handshake_received = False
+ self.logger.warning("Connection closed {0}".format(ex))
+ self.state = ConnectionState.disconnected
+ if self.reconnection_handler is None:
+ if self._on_close is not None and\
+ callable(self._on_close):
+ self._on_close()
+ raise ValueError(str(ex))
+ # Connection closed
+ self.handle_reconnect()
+ except Exception as ex:
+ raise ex
+
+ def handle_reconnect(self):
+ self.reconnection_handler.reconnecting = True
+ try:
+ self.stop()
+ self.start()
+ except Exception as ex:
+ self.logger.error(ex)
+ sleep_time = self.reconnection_handler.next()
+ threading.Thread(
+ target=self.deferred_reconnect,
+ args=(sleep_time,)
+ ).start()
+
+ def deferred_reconnect(self, sleep_time):
+ time.sleep(sleep_time)
+ try:
+ if not self.connection_alive:
+ self.send(PingMessage())
+ except Exception as ex:
+ self.logger.error(ex)
+ self.reconnection_handler.reconnecting = False
+ self.connection_alive = False \ No newline at end of file