aboutsummaryrefslogtreecommitdiffhomepage
path: root/libs/socketio
diff options
context:
space:
mode:
authorLouis Vézina <[email protected]>2020-01-29 20:07:26 -0500
committerLouis Vézina <[email protected]>2020-01-29 20:07:26 -0500
commit83c95cc77dfd5ed18b439b1635f95bac129d0ce2 (patch)
treeea557727572cf3479a0af6d11434d0b486132eb8 /libs/socketio
parent95b8aadb239bdce8d7f7a03e4ab995e56bf4e820 (diff)
downloadbazarr-83c95cc77dfd5ed18b439b1635f95bac129d0ce2.tar.gz
bazarr-83c95cc77dfd5ed18b439b1635f95bac129d0ce2.zip
WIP
Diffstat (limited to 'libs/socketio')
-rw-r--r--libs/socketio/__init__.py38
-rw-r--r--libs/socketio/asgi.py36
-rw-r--r--libs/socketio/asyncio_aiopika_manager.py105
-rw-r--r--libs/socketio/asyncio_client.py475
-rw-r--r--libs/socketio/asyncio_manager.py58
-rw-r--r--libs/socketio/asyncio_namespace.py204
-rw-r--r--libs/socketio/asyncio_pubsub_manager.py163
-rw-r--r--libs/socketio/asyncio_redis_manager.py107
-rw-r--r--libs/socketio/asyncio_server.py526
-rw-r--r--libs/socketio/base_manager.py178
-rw-r--r--libs/socketio/client.py620
-rw-r--r--libs/socketio/exceptions.py30
-rw-r--r--libs/socketio/kafka_manager.py63
-rw-r--r--libs/socketio/kombu_manager.py122
-rw-r--r--libs/socketio/middleware.py42
-rw-r--r--libs/socketio/namespace.py191
-rw-r--r--libs/socketio/packet.py179
-rw-r--r--libs/socketio/pubsub_manager.py154
-rw-r--r--libs/socketio/redis_manager.py115
-rw-r--r--libs/socketio/server.py730
-rw-r--r--libs/socketio/tornado.py11
-rw-r--r--libs/socketio/zmq_manager.py111
22 files changed, 4258 insertions, 0 deletions
diff --git a/libs/socketio/__init__.py b/libs/socketio/__init__.py
new file mode 100644
index 000000000..d3ee7242b
--- /dev/null
+++ b/libs/socketio/__init__.py
@@ -0,0 +1,38 @@
+import sys
+
+from .client import Client
+from .base_manager import BaseManager
+from .pubsub_manager import PubSubManager
+from .kombu_manager import KombuManager
+from .redis_manager import RedisManager
+from .kafka_manager import KafkaManager
+from .zmq_manager import ZmqManager
+from .server import Server
+from .namespace import Namespace, ClientNamespace
+from .middleware import WSGIApp, Middleware
+from .tornado import get_tornado_handler
+if sys.version_info >= (3, 5): # pragma: no cover
+ from .asyncio_client import AsyncClient
+ from .asyncio_server import AsyncServer
+ from .asyncio_manager import AsyncManager
+ from .asyncio_namespace import AsyncNamespace, AsyncClientNamespace
+ from .asyncio_redis_manager import AsyncRedisManager
+ from .asyncio_aiopika_manager import AsyncAioPikaManager
+ from .asgi import ASGIApp
+else: # pragma: no cover
+ AsyncClient = None
+ AsyncServer = None
+ AsyncManager = None
+ AsyncNamespace = None
+ AsyncRedisManager = None
+ AsyncAioPikaManager = None
+
+__version__ = '4.4.0'
+
+__all__ = ['__version__', 'Client', 'Server', 'BaseManager', 'PubSubManager',
+ 'KombuManager', 'RedisManager', 'ZmqManager', 'KafkaManager',
+ 'Namespace', 'ClientNamespace', 'WSGIApp', 'Middleware']
+if AsyncServer is not None: # pragma: no cover
+ __all__ += ['AsyncClient', 'AsyncServer', 'AsyncNamespace',
+ 'AsyncClientNamespace', 'AsyncManager', 'AsyncRedisManager',
+ 'ASGIApp', 'get_tornado_handler', 'AsyncAioPikaManager']
diff --git a/libs/socketio/asgi.py b/libs/socketio/asgi.py
new file mode 100644
index 000000000..9bcdd03ba
--- /dev/null
+++ b/libs/socketio/asgi.py
@@ -0,0 +1,36 @@
+import engineio
+
+
+class ASGIApp(engineio.ASGIApp): # pragma: no cover
+ """ASGI application middleware for Socket.IO.
+
+ This middleware dispatches traffic to an Socket.IO application. It can
+ also serve a list of static files to the client, or forward unrelated
+ HTTP traffic to another ASGI application.
+
+ :param socketio_server: The Socket.IO server. Must be an instance of the
+ ``socketio.AsyncServer`` class.
+ :param static_files: A dictionary with static file mapping rules. See the
+ documentation for details on this argument.
+ :param other_asgi_app: A separate ASGI app that receives all other traffic.
+ :param socketio_path: The endpoint where the Socket.IO application should
+ be installed. The default value is appropriate for
+ most cases.
+
+ Example usage::
+
+ import socketio
+ import uvicorn
+
+ sio = socketio.AsyncServer()
+ app = engineio.ASGIApp(sio, static_files={
+ '/': 'index.html',
+ '/static': './public',
+ })
+ uvicorn.run(app, host='127.0.0.1', port=5000)
+ """
+ def __init__(self, socketio_server, other_asgi_app=None,
+ static_files=None, socketio_path='socket.io'):
+ super().__init__(socketio_server, other_asgi_app,
+ static_files=static_files,
+ engineio_path=socketio_path)
diff --git a/libs/socketio/asyncio_aiopika_manager.py b/libs/socketio/asyncio_aiopika_manager.py
new file mode 100644
index 000000000..b20d6afd9
--- /dev/null
+++ b/libs/socketio/asyncio_aiopika_manager.py
@@ -0,0 +1,105 @@
+import asyncio
+import pickle
+
+from socketio.asyncio_pubsub_manager import AsyncPubSubManager
+
+try:
+ import aio_pika
+except ImportError:
+ aio_pika = None
+
+
+class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover
+ """Client manager that uses aio_pika for inter-process messaging under
+ asyncio.
+
+ This class implements a client manager backend for event sharing across
+ multiple processes, using RabbitMQ
+
+ To use a aio_pika backend, initialize the :class:`Server` instance as
+ follows::
+
+ url = 'amqp://user:password@hostname:port//'
+ server = socketio.Server(client_manager=socketio.AsyncAioPikaManager(
+ url))
+
+ :param url: The connection URL for the backend messaging queue. Example
+ connection URLs are ``'amqp://guest:guest@localhost:5672//'``
+ for RabbitMQ.
+ :param channel: The channel name on which the server sends and receives
+ notifications. Must be the same in all the servers.
+ With this manager, the channel name is the exchange name
+ in rabbitmq
+ :param write_only: If set ot ``True``, only initialize to emit events. The
+ default of ``False`` initializes the class for emitting
+ and receiving.
+ """
+
+ name = 'asyncaiopika'
+
+ def __init__(self, url='amqp://guest:guest@localhost:5672//',
+ channel='socketio', write_only=False, logger=None):
+ if aio_pika is None:
+ raise RuntimeError('aio_pika package is not installed '
+ '(Run "pip install aio_pika" in your '
+ 'virtualenv).')
+ self.url = url
+ self.listener_connection = None
+ self.listener_channel = None
+ self.listener_queue = None
+ super().__init__(channel=channel, write_only=write_only, logger=logger)
+
+ async def _connection(self):
+ return await aio_pika.connect_robust(self.url)
+
+ async def _channel(self, connection):
+ return await connection.channel()
+
+ async def _exchange(self, channel):
+ return await channel.declare_exchange(self.channel,
+ aio_pika.ExchangeType.FANOUT)
+
+ async def _queue(self, channel, exchange):
+ queue = await channel.declare_queue(durable=False,
+ arguments={'x-expires': 300000})
+ await queue.bind(exchange)
+ return queue
+
+ async def _publish(self, data):
+ connection = await self._connection()
+ channel = await self._channel(connection)
+ exchange = await self._exchange(channel)
+ await exchange.publish(
+ aio_pika.Message(body=pickle.dumps(data),
+ delivery_mode=aio_pika.DeliveryMode.PERSISTENT),
+ routing_key='*'
+ )
+
+ async def _listen(self):
+ retry_sleep = 1
+ while True:
+ try:
+ if self.listener_connection is None:
+ self.listener_connection = await self._connection()
+ self.listener_channel = await self._channel(
+ self.listener_connection
+ )
+ await self.listener_channel.set_qos(prefetch_count=1)
+ exchange = await self._exchange(self.listener_channel)
+ self.listener_queue = await self._queue(
+ self.listener_channel, exchange
+ )
+
+ async with self.listener_queue.iterator() as queue_iter:
+ async for message in queue_iter:
+ with message.process():
+ return pickle.loads(message.body)
+ except Exception:
+ self._get_logger().error('Cannot receive from rabbitmq... '
+ 'retrying in '
+ '{} secs'.format(retry_sleep))
+ self.listener_connection = None
+ await asyncio.sleep(retry_sleep)
+ retry_sleep *= 2
+ if retry_sleep > 60:
+ retry_sleep = 60
diff --git a/libs/socketio/asyncio_client.py b/libs/socketio/asyncio_client.py
new file mode 100644
index 000000000..2b10434ae
--- /dev/null
+++ b/libs/socketio/asyncio_client.py
@@ -0,0 +1,475 @@
+import asyncio
+import logging
+import random
+
+import engineio
+import six
+
+from . import client
+from . import exceptions
+from . import packet
+
+default_logger = logging.getLogger('socketio.client')
+
+
+class AsyncClient(client.Client):
+ """A Socket.IO client for asyncio.
+
+ This class implements a fully compliant Socket.IO web client with support
+ for websocket and long-polling transports.
+
+ :param reconnection: ``True`` if the client should automatically attempt to
+ reconnect to the server after an interruption, or
+ ``False`` to not reconnect. The default is ``True``.
+ :param reconnection_attempts: How many reconnection attempts to issue
+ before giving up, or 0 for infinity attempts.
+ The default is 0.
+ :param reconnection_delay: How long to wait in seconds before the first
+ reconnection attempt. Each successive attempt
+ doubles this delay.
+ :param reconnection_delay_max: The maximum delay between reconnection
+ attempts.
+ :param randomization_factor: Randomization amount for each delay between
+ reconnection attempts. The default is 0.5,
+ which means that each delay is randomly
+ adjusted by +/- 50%.
+ :param logger: To enable logging set to ``True`` or pass a logger object to
+ use. To disable logging set to ``False``. The default is
+ ``False``.
+ :param binary: ``True`` to support binary payloads, ``False`` to treat all
+ payloads as text. On Python 2, if this is set to ``True``,
+ ``unicode`` values are treated as text, and ``str`` and
+ ``bytes`` values are treated as binary. This option has no
+ effect on Python 3, where text and binary payloads are
+ always automatically discovered.
+ :param json: An alternative json module to use for encoding and decoding
+ packets. Custom json modules must have ``dumps`` and ``loads``
+ functions that are compatible with the standard library
+ versions.
+
+ The Engine.IO configuration supports the following settings:
+
+ :param request_timeout: A timeout in seconds for requests. The default is
+ 5 seconds.
+ :param ssl_verify: ``True`` to verify SSL certificates, or ``False`` to
+ skip SSL certificate verification, allowing
+ connections to servers with self signed certificates.
+ The default is ``True``.
+ :param engineio_logger: To enable Engine.IO logging set to ``True`` or pass
+ a logger object to use. To disable logging set to
+ ``False``. The default is ``False``.
+ """
+ def is_asyncio_based(self):
+ return True
+
+ async def connect(self, url, headers={}, transports=None,
+ namespaces=None, socketio_path='socket.io'):
+ """Connect to a Socket.IO server.
+
+ :param url: The URL of the Socket.IO server. It can include custom
+ query string parameters if required by the server.
+ :param headers: A dictionary with custom headers to send with the
+ connection request.
+ :param transports: The list of allowed transports. Valid transports
+ are ``'polling'`` and ``'websocket'``. If not
+ given, the polling transport is connected first,
+ then an upgrade to websocket is attempted.
+ :param namespaces: The list of custom namespaces to connect, in
+ addition to the default namespace. If not given,
+ the namespace list is obtained from the registered
+ event handlers.
+ :param socketio_path: The endpoint where the Socket.IO server is
+ installed. The default value is appropriate for
+ most cases.
+
+ Note: this method is a coroutine.
+
+ Example usage::
+
+ sio = socketio.Client()
+ sio.connect('http://localhost:5000')
+ """
+ self.connection_url = url
+ self.connection_headers = headers
+ self.connection_transports = transports
+ self.connection_namespaces = namespaces
+ self.socketio_path = socketio_path
+
+ if namespaces is None:
+ namespaces = set(self.handlers.keys()).union(
+ set(self.namespace_handlers.keys()))
+ elif isinstance(namespaces, six.string_types):
+ namespaces = [namespaces]
+ self.connection_namespaces = namespaces
+ self.namespaces = [n for n in namespaces if n != '/']
+ try:
+ await self.eio.connect(url, headers=headers,
+ transports=transports,
+ engineio_path=socketio_path)
+ except engineio.exceptions.ConnectionError as exc:
+ six.raise_from(exceptions.ConnectionError(exc.args[0]), None)
+ self.connected = True
+
+ async def wait(self):
+ """Wait until the connection with the server ends.
+
+ Client applications can use this function to block the main thread
+ during the life of the connection.
+
+ Note: this method is a coroutine.
+ """
+ while True:
+ await self.eio.wait()
+ await self.sleep(1) # give the reconnect task time to start up
+ if not self._reconnect_task:
+ break
+ await self._reconnect_task
+ if self.eio.state != 'connected':
+ break
+
+ async def emit(self, event, data=None, namespace=None, callback=None):
+ """Emit a custom event to one or more connected clients.
+
+ :param event: The event name. It can be any string. The event names
+ ``'connect'``, ``'message'`` and ``'disconnect'`` are
+ reserved and should not be used.
+ :param data: The data to send to the client or clients. Data can be of
+ type ``str``, ``bytes``, ``list`` or ``dict``. If a
+ ``list`` or ``dict``, the data will be serialized as JSON.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the event is emitted to the
+ default namespace.
+ :param callback: If given, this function will be called to acknowledge
+ the the client has received the message. The arguments
+ that will be passed to the function are those provided
+ by the client. Callback functions can only be used
+ when addressing an individual client.
+
+ Note: this method is a coroutine.
+ """
+ namespace = namespace or '/'
+ if namespace != '/' and namespace not in self.namespaces:
+ raise exceptions.BadNamespaceError(
+ namespace + ' is not a connected namespace.')
+ self.logger.info('Emitting event "%s" [%s]', event, namespace)
+ if callback is not None:
+ id = self._generate_ack_id(namespace, callback)
+ else:
+ id = None
+ if six.PY2 and not self.binary:
+ binary = False # pragma: nocover
+ else:
+ binary = None
+ # tuples are expanded to multiple arguments, everything else is sent
+ # as a single argument
+ if isinstance(data, tuple):
+ data = list(data)
+ elif data is not None:
+ data = [data]
+ else:
+ data = []
+ await self._send_packet(packet.Packet(
+ packet.EVENT, namespace=namespace, data=[event] + data, id=id,
+ binary=binary))
+
+ async def send(self, data, namespace=None, callback=None):
+ """Send a message to one or more connected clients.
+
+ This function emits an event with the name ``'message'``. Use
+ :func:`emit` to issue custom event names.
+
+ :param data: The data to send to the client or clients. Data can be of
+ type ``str``, ``bytes``, ``list`` or ``dict``. If a
+ ``list`` or ``dict``, the data will be serialized as JSON.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the event is emitted to the
+ default namespace.
+ :param callback: If given, this function will be called to acknowledge
+ the the client has received the message. The arguments
+ that will be passed to the function are those provided
+ by the client. Callback functions can only be used
+ when addressing an individual client.
+
+ Note: this method is a coroutine.
+ """
+ await self.emit('message', data=data, namespace=namespace,
+ callback=callback)
+
+ async def call(self, event, data=None, namespace=None, timeout=60):
+ """Emit a custom event to a client and wait for the response.
+
+ :param event: The event name. It can be any string. The event names
+ ``'connect'``, ``'message'`` and ``'disconnect'`` are
+ reserved and should not be used.
+ :param data: The data to send to the client or clients. Data can be of
+ type ``str``, ``bytes``, ``list`` or ``dict``. If a
+ ``list`` or ``dict``, the data will be serialized as JSON.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the event is emitted to the
+ default namespace.
+ :param timeout: The waiting timeout. If the timeout is reached before
+ the client acknowledges the event, then a
+ ``TimeoutError`` exception is raised.
+
+ Note: this method is a coroutine.
+ """
+ callback_event = self.eio.create_event()
+ callback_args = []
+
+ def event_callback(*args):
+ callback_args.append(args)
+ callback_event.set()
+
+ await self.emit(event, data=data, namespace=namespace,
+ callback=event_callback)
+ try:
+ await asyncio.wait_for(callback_event.wait(), timeout)
+ except asyncio.TimeoutError:
+ six.raise_from(exceptions.TimeoutError(), None)
+ return callback_args[0] if len(callback_args[0]) > 1 \
+ else callback_args[0][0] if len(callback_args[0]) == 1 \
+ else None
+
+ async def disconnect(self):
+ """Disconnect from the server.
+
+ Note: this method is a coroutine.
+ """
+ # here we just request the disconnection
+ # later in _handle_eio_disconnect we invoke the disconnect handler
+ for n in self.namespaces:
+ await self._send_packet(packet.Packet(packet.DISCONNECT,
+ namespace=n))
+ await self._send_packet(packet.Packet(
+ packet.DISCONNECT, namespace='/'))
+ self.connected = False
+ await self.eio.disconnect(abort=True)
+
+ def start_background_task(self, target, *args, **kwargs):
+ """Start a background task using the appropriate async model.
+
+ This is a utility function that applications can use to start a
+ background task using the method that is compatible with the
+ selected async mode.
+
+ :param target: the target function to execute.
+ :param args: arguments to pass to the function.
+ :param kwargs: keyword arguments to pass to the function.
+
+ This function returns an object compatible with the `Thread` class in
+ the Python standard library. The `start()` method on this object is
+ already called by this function.
+ """
+ return self.eio.start_background_task(target, *args, **kwargs)
+
+ async def sleep(self, seconds=0):
+ """Sleep for the requested amount of time using the appropriate async
+ model.
+
+ This is a utility function that applications can use to put a task to
+ sleep without having to worry about using the correct call for the
+ selected async mode.
+
+ Note: this method is a coroutine.
+ """
+ return await self.eio.sleep(seconds)
+
+ async def _send_packet(self, pkt):
+ """Send a Socket.IO packet to the server."""
+ encoded_packet = pkt.encode()
+ if isinstance(encoded_packet, list):
+ binary = False
+ for ep in encoded_packet:
+ await self.eio.send(ep, binary=binary)
+ binary = True
+ else:
+ await self.eio.send(encoded_packet, binary=False)
+
+ async def _handle_connect(self, namespace):
+ namespace = namespace or '/'
+ self.logger.info('Namespace {} is connected'.format(namespace))
+ await self._trigger_event('connect', namespace=namespace)
+ if namespace == '/':
+ for n in self.namespaces:
+ await self._send_packet(packet.Packet(packet.CONNECT,
+ namespace=n))
+ elif namespace not in self.namespaces:
+ self.namespaces.append(namespace)
+
+ async def _handle_disconnect(self, namespace):
+ if not self.connected:
+ return
+ namespace = namespace or '/'
+ if namespace == '/':
+ for n in self.namespaces:
+ await self._trigger_event('disconnect', namespace=n)
+ self.namespaces = []
+ await self._trigger_event('disconnect', namespace=namespace)
+ if namespace in self.namespaces:
+ self.namespaces.remove(namespace)
+ if namespace == '/':
+ self.connected = False
+
+ async def _handle_event(self, namespace, id, data):
+ namespace = namespace or '/'
+ self.logger.info('Received event "%s" [%s]', data[0], namespace)
+ r = await self._trigger_event(data[0], namespace, *data[1:])
+ if id is not None:
+ # send ACK packet with the response returned by the handler
+ # tuples are expanded as multiple arguments
+ if r is None:
+ data = []
+ elif isinstance(r, tuple):
+ data = list(r)
+ else:
+ data = [r]
+ if six.PY2 and not self.binary:
+ binary = False # pragma: nocover
+ else:
+ binary = None
+ await self._send_packet(packet.Packet(
+ packet.ACK, namespace=namespace, id=id, data=data,
+ binary=binary))
+
+ async def _handle_ack(self, namespace, id, data):
+ namespace = namespace or '/'
+ self.logger.info('Received ack [%s]', namespace)
+ callback = None
+ try:
+ callback = self.callbacks[namespace][id]
+ except KeyError:
+ # if we get an unknown callback we just ignore it
+ self.logger.warning('Unknown callback received, ignoring.')
+ else:
+ del self.callbacks[namespace][id]
+ if callback is not None:
+ if asyncio.iscoroutinefunction(callback):
+ await callback(*data)
+ else:
+ callback(*data)
+
+ async def _handle_error(self, namespace, data):
+ namespace = namespace or '/'
+ self.logger.info('Connection to namespace {} was rejected'.format(
+ namespace))
+ if data is None:
+ data = tuple()
+ elif not isinstance(data, (tuple, list)):
+ data = (data,)
+ await self._trigger_event('connect_error', namespace, *data)
+ if namespace in self.namespaces:
+ self.namespaces.remove(namespace)
+ if namespace == '/':
+ self.namespaces = []
+ self.connected = False
+
+ async def _trigger_event(self, event, namespace, *args):
+ """Invoke an application event handler."""
+ # first see if we have an explicit handler for the event
+ if namespace in self.handlers and event in self.handlers[namespace]:
+ if asyncio.iscoroutinefunction(self.handlers[namespace][event]):
+ try:
+ ret = await self.handlers[namespace][event](*args)
+ except asyncio.CancelledError: # pragma: no cover
+ ret = None
+ else:
+ ret = self.handlers[namespace][event](*args)
+ return ret
+
+ # or else, forward the event to a namepsace handler if one exists
+ elif namespace in self.namespace_handlers:
+ return await self.namespace_handlers[namespace].trigger_event(
+ event, *args)
+
+ async def _handle_reconnect(self):
+ self._reconnect_abort.clear()
+ client.reconnecting_clients.append(self)
+ attempt_count = 0
+ current_delay = self.reconnection_delay
+ while True:
+ delay = current_delay
+ current_delay *= 2
+ if delay > self.reconnection_delay_max:
+ delay = self.reconnection_delay_max
+ delay += self.randomization_factor * (2 * random.random() - 1)
+ self.logger.info(
+ 'Connection failed, new attempt in {:.02f} seconds'.format(
+ delay))
+ try:
+ await asyncio.wait_for(self._reconnect_abort.wait(), delay)
+ self.logger.info('Reconnect task aborted')
+ break
+ except (asyncio.TimeoutError, asyncio.CancelledError):
+ pass
+ attempt_count += 1
+ try:
+ await self.connect(self.connection_url,
+ headers=self.connection_headers,
+ transports=self.connection_transports,
+ namespaces=self.connection_namespaces,
+ socketio_path=self.socketio_path)
+ except (exceptions.ConnectionError, ValueError):
+ pass
+ else:
+ self.logger.info('Reconnection successful')
+ self._reconnect_task = None
+ break
+ if self.reconnection_attempts and \
+ attempt_count >= self.reconnection_attempts:
+ self.logger.info(
+ 'Maximum reconnection attempts reached, giving up')
+ break
+ client.reconnecting_clients.remove(self)
+
+ def _handle_eio_connect(self):
+ """Handle the Engine.IO connection event."""
+ self.logger.info('Engine.IO connection established')
+ self.sid = self.eio.sid
+
+ async def _handle_eio_message(self, data):
+ """Dispatch Engine.IO messages."""
+ if self._binary_packet:
+ pkt = self._binary_packet
+ if pkt.add_attachment(data):
+ self._binary_packet = None
+ if pkt.packet_type == packet.BINARY_EVENT:
+ await self._handle_event(pkt.namespace, pkt.id, pkt.data)
+ else:
+ await self._handle_ack(pkt.namespace, pkt.id, pkt.data)
+ else:
+ pkt = packet.Packet(encoded_packet=data)
+ if pkt.packet_type == packet.CONNECT:
+ await self._handle_connect(pkt.namespace)
+ elif pkt.packet_type == packet.DISCONNECT:
+ await self._handle_disconnect(pkt.namespace)
+ elif pkt.packet_type == packet.EVENT:
+ await self._handle_event(pkt.namespace, pkt.id, pkt.data)
+ elif pkt.packet_type == packet.ACK:
+ await self._handle_ack(pkt.namespace, pkt.id, pkt.data)
+ elif pkt.packet_type == packet.BINARY_EVENT or \
+ pkt.packet_type == packet.BINARY_ACK:
+ self._binary_packet = pkt
+ elif pkt.packet_type == packet.ERROR:
+ await self._handle_error(pkt.namespace, pkt.data)
+ else:
+ raise ValueError('Unknown packet type.')
+
+ async def _handle_eio_disconnect(self):
+ """Handle the Engine.IO disconnection event."""
+ self.logger.info('Engine.IO connection dropped')
+ self._reconnect_abort.set()
+ if self.connected:
+ for n in self.namespaces:
+ await self._trigger_event('disconnect', namespace=n)
+ await self._trigger_event('disconnect', namespace='/')
+ self.namespaces = []
+ self.connected = False
+ self.callbacks = {}
+ self._binary_packet = None
+ self.sid = None
+ if self.eio.state == 'connected' and self.reconnection:
+ self._reconnect_task = self.start_background_task(
+ self._handle_reconnect)
+
+ def _engineio_client_class(self):
+ return engineio.AsyncClient
diff --git a/libs/socketio/asyncio_manager.py b/libs/socketio/asyncio_manager.py
new file mode 100644
index 000000000..f4496ec7f
--- /dev/null
+++ b/libs/socketio/asyncio_manager.py
@@ -0,0 +1,58 @@
+import asyncio
+
+from .base_manager import BaseManager
+
+
+class AsyncManager(BaseManager):
+ """Manage a client list for an asyncio server."""
+ async def emit(self, event, data, namespace, room=None, skip_sid=None,
+ callback=None, **kwargs):
+ """Emit a message to a single client, a room, or all the clients
+ connected to the namespace.
+
+ Note: this method is a coroutine.
+ """
+ if namespace not in self.rooms or room not in self.rooms[namespace]:
+ return
+ tasks = []
+ if not isinstance(skip_sid, list):
+ skip_sid = [skip_sid]
+ for sid in self.get_participants(namespace, room):
+ if sid not in skip_sid:
+ if callback is not None:
+ id = self._generate_ack_id(sid, namespace, callback)
+ else:
+ id = None
+ tasks.append(self.server._emit_internal(sid, event, data,
+ namespace, id))
+ if tasks == []: # pragma: no cover
+ return
+ await asyncio.wait(tasks)
+
+ async def close_room(self, room, namespace):
+ """Remove all participants from a room.
+
+ Note: this method is a coroutine.
+ """
+ return super().close_room(room, namespace)
+
+ async def trigger_callback(self, sid, namespace, id, data):
+ """Invoke an application callback.
+
+ Note: this method is a coroutine.
+ """
+ callback = None
+ try:
+ callback = self.callbacks[sid][namespace][id]
+ except KeyError:
+ # if we get an unknown callback we just ignore it
+ self._get_logger().warning('Unknown callback received, ignoring.')
+ else:
+ del self.callbacks[sid][namespace][id]
+ if callback is not None:
+ ret = callback(*data)
+ if asyncio.iscoroutine(ret):
+ try:
+ await ret
+ except asyncio.CancelledError: # pragma: no cover
+ pass
diff --git a/libs/socketio/asyncio_namespace.py b/libs/socketio/asyncio_namespace.py
new file mode 100644
index 000000000..12e9c0fe6
--- /dev/null
+++ b/libs/socketio/asyncio_namespace.py
@@ -0,0 +1,204 @@
+import asyncio
+
+from socketio import namespace
+
+
+class AsyncNamespace(namespace.Namespace):
+ """Base class for asyncio server-side class-based namespaces.
+
+ A class-based namespace is a class that contains all the event handlers
+ for a Socket.IO namespace. The event handlers are methods of the class
+ with the prefix ``on_``, such as ``on_connect``, ``on_disconnect``,
+ ``on_message``, ``on_json``, and so on. These can be regular functions or
+ coroutines.
+
+ :param namespace: The Socket.IO namespace to be used with all the event
+ handlers defined in this class. If this argument is
+ omitted, the default namespace is used.
+ """
+ def is_asyncio_based(self):
+ return True
+
+ async def trigger_event(self, event, *args):
+ """Dispatch an event to the proper handler method.
+
+ In the most common usage, this method is not overloaded by subclasses,
+ as it performs the routing of events to methods. However, this
+ method can be overriden if special dispatching rules are needed, or if
+ having a single method that catches all events is desired.
+
+ Note: this method is a coroutine.
+ """
+ handler_name = 'on_' + event
+ if hasattr(self, handler_name):
+ handler = getattr(self, handler_name)
+ if asyncio.iscoroutinefunction(handler) is True:
+ try:
+ ret = await handler(*args)
+ except asyncio.CancelledError: # pragma: no cover
+ ret = None
+ else:
+ ret = handler(*args)
+ return ret
+
+ async def emit(self, event, data=None, room=None, skip_sid=None,
+ namespace=None, callback=None):
+ """Emit a custom event to one or more connected clients.
+
+ The only difference with the :func:`socketio.Server.emit` method is
+ that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+
+ Note: this method is a coroutine.
+ """
+ return await self.server.emit(event, data=data, room=room,
+ skip_sid=skip_sid,
+ namespace=namespace or self.namespace,
+ callback=callback)
+
+ async def send(self, data, room=None, skip_sid=None, namespace=None,
+ callback=None):
+ """Send a message to one or more connected clients.
+
+ The only difference with the :func:`socketio.Server.send` method is
+ that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+
+ Note: this method is a coroutine.
+ """
+ return await self.server.send(data, room=room, skip_sid=skip_sid,
+ namespace=namespace or self.namespace,
+ callback=callback)
+
+ async def close_room(self, room, namespace=None):
+ """Close a room.
+
+ The only difference with the :func:`socketio.Server.close_room` method
+ is that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+
+ Note: this method is a coroutine.
+ """
+ return await self.server.close_room(
+ room, namespace=namespace or self.namespace)
+
+ async def get_session(self, sid, namespace=None):
+ """Return the user session for a client.
+
+ The only difference with the :func:`socketio.Server.get_session`
+ method is that when the ``namespace`` argument is not given the
+ namespace associated with the class is used.
+
+ Note: this method is a coroutine.
+ """
+ return await self.server.get_session(
+ sid, namespace=namespace or self.namespace)
+
+ async def save_session(self, sid, session, namespace=None):
+ """Store the user session for a client.
+
+ The only difference with the :func:`socketio.Server.save_session`
+ method is that when the ``namespace`` argument is not given the
+ namespace associated with the class is used.
+
+ Note: this method is a coroutine.
+ """
+ return await self.server.save_session(
+ sid, session, namespace=namespace or self.namespace)
+
+ def session(self, sid, namespace=None):
+ """Return the user session for a client with context manager syntax.
+
+ The only difference with the :func:`socketio.Server.session` method is
+ that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+ """
+ return self.server.session(sid, namespace=namespace or self.namespace)
+
+ async def disconnect(self, sid, namespace=None):
+ """Disconnect a client.
+
+ The only difference with the :func:`socketio.Server.disconnect` method
+ is that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+
+ Note: this method is a coroutine.
+ """
+ return await self.server.disconnect(
+ sid, namespace=namespace or self.namespace)
+
+
+class AsyncClientNamespace(namespace.ClientNamespace):
+ """Base class for asyncio client-side class-based namespaces.
+
+ A class-based namespace is a class that contains all the event handlers
+ for a Socket.IO namespace. The event handlers are methods of the class
+ with the prefix ``on_``, such as ``on_connect``, ``on_disconnect``,
+ ``on_message``, ``on_json``, and so on. These can be regular functions or
+ coroutines.
+
+ :param namespace: The Socket.IO namespace to be used with all the event
+ handlers defined in this class. If this argument is
+ omitted, the default namespace is used.
+ """
+ def is_asyncio_based(self):
+ return True
+
+ async def trigger_event(self, event, *args):
+ """Dispatch an event to the proper handler method.
+
+ In the most common usage, this method is not overloaded by subclasses,
+ as it performs the routing of events to methods. However, this
+ method can be overriden if special dispatching rules are needed, or if
+ having a single method that catches all events is desired.
+
+ Note: this method is a coroutine.
+ """
+ handler_name = 'on_' + event
+ if hasattr(self, handler_name):
+ handler = getattr(self, handler_name)
+ if asyncio.iscoroutinefunction(handler) is True:
+ try:
+ ret = await handler(*args)
+ except asyncio.CancelledError: # pragma: no cover
+ ret = None
+ else:
+ ret = handler(*args)
+ return ret
+
+ async def emit(self, event, data=None, namespace=None, callback=None):
+ """Emit a custom event to the server.
+
+ The only difference with the :func:`socketio.Client.emit` method is
+ that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+
+ Note: this method is a coroutine.
+ """
+ return await self.client.emit(event, data=data,
+ namespace=namespace or self.namespace,
+ callback=callback)
+
+ async def send(self, data, namespace=None, callback=None):
+ """Send a message to the server.
+
+ The only difference with the :func:`socketio.Client.send` method is
+ that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+
+ Note: this method is a coroutine.
+ """
+ return await self.client.send(data,
+ namespace=namespace or self.namespace,
+ callback=callback)
+
+ async def disconnect(self):
+ """Disconnect a client.
+
+ The only difference with the :func:`socketio.Client.disconnect` method
+ is that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+
+ Note: this method is a coroutine.
+ """
+ return await self.client.disconnect()
diff --git a/libs/socketio/asyncio_pubsub_manager.py b/libs/socketio/asyncio_pubsub_manager.py
new file mode 100644
index 000000000..6fdba6d0c
--- /dev/null
+++ b/libs/socketio/asyncio_pubsub_manager.py
@@ -0,0 +1,163 @@
+from functools import partial
+import uuid
+
+import json
+import pickle
+import six
+
+from .asyncio_manager import AsyncManager
+
+
+class AsyncPubSubManager(AsyncManager):
+ """Manage a client list attached to a pub/sub backend under asyncio.
+
+ This is a base class that enables multiple servers to share the list of
+ clients, with the servers communicating events through a pub/sub backend.
+ The use of a pub/sub backend also allows any client connected to the
+ backend to emit events addressed to Socket.IO clients.
+
+ The actual backends must be implemented by subclasses, this class only
+ provides a pub/sub generic framework for asyncio applications.
+
+ :param channel: The channel name on which the server sends and receives
+ notifications.
+ """
+ name = 'asyncpubsub'
+
+ def __init__(self, channel='socketio', write_only=False, logger=None):
+ super().__init__()
+ self.channel = channel
+ self.write_only = write_only
+ self.host_id = uuid.uuid4().hex
+ self.logger = logger
+
+ def initialize(self):
+ super().initialize()
+ if not self.write_only:
+ self.thread = self.server.start_background_task(self._thread)
+ self._get_logger().info(self.name + ' backend initialized.')
+
+ async def emit(self, event, data, namespace=None, room=None, skip_sid=None,
+ callback=None, **kwargs):
+ """Emit a message to a single client, a room, or all the clients
+ connected to the namespace.
+
+ This method takes care or propagating the message to all the servers
+ that are connected through the message queue.
+
+ The parameters are the same as in :meth:`.Server.emit`.
+
+ Note: this method is a coroutine.
+ """
+ if kwargs.get('ignore_queue'):
+ return await super().emit(
+ event, data, namespace=namespace, room=room, skip_sid=skip_sid,
+ callback=callback)
+ namespace = namespace or '/'
+ if callback is not None:
+ if self.server is None:
+ raise RuntimeError('Callbacks can only be issued from the '
+ 'context of a server.')
+ if room is None:
+ raise ValueError('Cannot use callback without a room set.')
+ id = self._generate_ack_id(room, namespace, callback)
+ callback = (room, namespace, id)
+ else:
+ callback = None
+ await self._publish({'method': 'emit', 'event': event, 'data': data,
+ 'namespace': namespace, 'room': room,
+ 'skip_sid': skip_sid, 'callback': callback,
+ 'host_id': self.host_id})
+
+ async def close_room(self, room, namespace=None):
+ await self._publish({'method': 'close_room', 'room': room,
+ 'namespace': namespace or '/'})
+
+ async def _publish(self, data):
+ """Publish a message on the Socket.IO channel.
+
+ This method needs to be implemented by the different subclasses that
+ support pub/sub backends.
+ """
+ raise NotImplementedError('This method must be implemented in a '
+ 'subclass.') # pragma: no cover
+
+ async def _listen(self):
+ """Return the next message published on the Socket.IO channel,
+ blocking until a message is available.
+
+ This method needs to be implemented by the different subclasses that
+ support pub/sub backends.
+ """
+ raise NotImplementedError('This method must be implemented in a '
+ 'subclass.') # pragma: no cover
+
+ async def _handle_emit(self, message):
+ # Events with callbacks are very tricky to handle across hosts
+ # Here in the receiving end we set up a local callback that preserves
+ # the callback host and id from the sender
+ remote_callback = message.get('callback')
+ remote_host_id = message.get('host_id')
+ if remote_callback is not None and len(remote_callback) == 3:
+ callback = partial(self._return_callback, remote_host_id,
+ *remote_callback)
+ else:
+ callback = None
+ await super().emit(message['event'], message['data'],
+ namespace=message.get('namespace'),
+ room=message.get('room'),
+ skip_sid=message.get('skip_sid'),
+ callback=callback)
+
+ async def _handle_callback(self, message):
+ if self.host_id == message.get('host_id'):
+ try:
+ sid = message['sid']
+ namespace = message['namespace']
+ id = message['id']
+ args = message['args']
+ except KeyError:
+ return
+ await self.trigger_callback(sid, namespace, id, args)
+
+ async def _return_callback(self, host_id, sid, namespace, callback_id,
+ *args):
+ # When an event callback is received, the callback is returned back
+ # the sender, which is identified by the host_id
+ await self._publish({'method': 'callback', 'host_id': host_id,
+ 'sid': sid, 'namespace': namespace,
+ 'id': callback_id, 'args': args})
+
+ async def _handle_close_room(self, message):
+ await super().close_room(
+ room=message.get('room'), namespace=message.get('namespace'))
+
+ async def _thread(self):
+ while True:
+ try:
+ message = await self._listen()
+ except:
+ import traceback
+ traceback.print_exc()
+ break
+ data = None
+ if isinstance(message, dict):
+ data = message
+ else:
+ if isinstance(message, six.binary_type): # pragma: no cover
+ try:
+ data = pickle.loads(message)
+ except:
+ pass
+ if data is None:
+ try:
+ data = json.loads(message)
+ except:
+ pass
+ if data and 'method' in data:
+ if data['method'] == 'emit':
+ await self._handle_emit(data)
+ elif data['method'] == 'callback':
+ await self._handle_callback(data)
+ elif data['method'] == 'close_room':
+ await self._handle_close_room(data)
diff --git a/libs/socketio/asyncio_redis_manager.py b/libs/socketio/asyncio_redis_manager.py
new file mode 100644
index 000000000..21499c26c
--- /dev/null
+++ b/libs/socketio/asyncio_redis_manager.py
@@ -0,0 +1,107 @@
+import asyncio
+import pickle
+from urllib.parse import urlparse
+
+try:
+ import aioredis
+except ImportError:
+ aioredis = None
+
+from .asyncio_pubsub_manager import AsyncPubSubManager
+
+
+def _parse_redis_url(url):
+ p = urlparse(url)
+ if p.scheme not in {'redis', 'rediss'}:
+ raise ValueError('Invalid redis url')
+ ssl = p.scheme == 'rediss'
+ host = p.hostname or 'localhost'
+ port = p.port or 6379
+ password = p.password
+ if p.path:
+ db = int(p.path[1:])
+ else:
+ db = 0
+ return host, port, password, db, ssl
+
+
+class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
+ """Redis based client manager for asyncio servers.
+
+ This class implements a Redis backend for event sharing across multiple
+ processes. Only kept here as one more example of how to build a custom
+ backend, since the kombu backend is perfectly adequate to support a Redis
+ message queue.
+
+ To use a Redis backend, initialize the :class:`Server` instance as
+ follows::
+
+ server = socketio.Server(client_manager=socketio.AsyncRedisManager(
+ 'redis://hostname:port/0'))
+
+ :param url: The connection URL for the Redis server. For a default Redis
+ store running on the same host, use ``redis://``. To use an
+ SSL connection, use ``rediss://``.
+ :param channel: The channel name on which the server sends and receives
+ notifications. Must be the same in all the servers.
+ :param write_only: If set ot ``True``, only initialize to emit events. The
+ default of ``False`` initializes the class for emitting
+ and receiving.
+ """
+ name = 'aioredis'
+
+ def __init__(self, url='redis://localhost:6379/0', channel='socketio',
+ write_only=False, logger=None):
+ if aioredis is None:
+ raise RuntimeError('Redis package is not installed '
+ '(Run "pip install aioredis" in your '
+ 'virtualenv).')
+ (
+ self.host, self.port, self.password, self.db, self.ssl
+ ) = _parse_redis_url(url)
+ self.pub = None
+ self.sub = None
+ super().__init__(channel=channel, write_only=write_only, logger=logger)
+
+ async def _publish(self, data):
+ retry = True
+ while True:
+ try:
+ if self.pub is None:
+ self.pub = await aioredis.create_redis(
+ (self.host, self.port), db=self.db,
+ password=self.password, ssl=self.ssl
+ )
+ return await self.pub.publish(self.channel,
+ pickle.dumps(data))
+ except (aioredis.RedisError, OSError):
+ if retry:
+ self._get_logger().error('Cannot publish to redis... '
+ 'retrying')
+ self.pub = None
+ retry = False
+ else:
+ self._get_logger().error('Cannot publish to redis... '
+ 'giving up')
+ break
+
+ async def _listen(self):
+ retry_sleep = 1
+ while True:
+ try:
+ if self.sub is None:
+ self.sub = await aioredis.create_redis(
+ (self.host, self.port), db=self.db,
+ password=self.password, ssl=self.ssl
+ )
+ self.ch = (await self.sub.subscribe(self.channel))[0]
+ return await self.ch.get()
+ except (aioredis.RedisError, OSError):
+ self._get_logger().error('Cannot receive from redis... '
+ 'retrying in '
+ '{} secs'.format(retry_sleep))
+ self.sub = None
+ await asyncio.sleep(retry_sleep)
+ retry_sleep *= 2
+ if retry_sleep > 60:
+ retry_sleep = 60
diff --git a/libs/socketio/asyncio_server.py b/libs/socketio/asyncio_server.py
new file mode 100644
index 000000000..251d58180
--- /dev/null
+++ b/libs/socketio/asyncio_server.py
@@ -0,0 +1,526 @@
+import asyncio
+
+import engineio
+import six
+
+from . import asyncio_manager
+from . import exceptions
+from . import packet
+from . import server
+
+
+class AsyncServer(server.Server):
+ """A Socket.IO server for asyncio.
+
+ This class implements a fully compliant Socket.IO web server with support
+ for websocket and long-polling transports, compatible with the asyncio
+ framework on Python 3.5 or newer.
+
+ :param client_manager: The client manager instance that will manage the
+ client list. When this is omitted, the client list
+ is stored in an in-memory structure, so the use of
+ multiple connected servers is not possible.
+ :param logger: To enable logging set to ``True`` or pass a logger object to
+ use. To disable logging set to ``False``.
+ :param json: An alternative json module to use for encoding and decoding
+ packets. Custom json modules must have ``dumps`` and ``loads``
+ functions that are compatible with the standard library
+ versions.
+ :param async_handlers: If set to ``True``, event handlers are executed in
+ separate threads. To run handlers synchronously,
+ set to ``False``. The default is ``True``.
+ :param kwargs: Connection parameters for the underlying Engine.IO server.
+
+ The Engine.IO configuration supports the following settings:
+
+ :param async_mode: The asynchronous model to use. See the Deployment
+ section in the documentation for a description of the
+ available options. Valid async modes are "aiohttp". If
+ this argument is not given, an async mode is chosen
+ based on the installed packages.
+ :param ping_timeout: The time in seconds that the client waits for the
+ server to respond before disconnecting.
+ :param ping_interval: The interval in seconds at which the client pings
+ the server.
+ :param max_http_buffer_size: The maximum size of a message when using the
+ polling transport.
+ :param allow_upgrades: Whether to allow transport upgrades or not.
+ :param http_compression: Whether to compress packages when using the
+ polling transport.
+ :param compression_threshold: Only compress messages when their byte size
+ is greater than this value.
+ :param cookie: Name of the HTTP cookie that contains the client session
+ id. If set to ``None``, a cookie is not sent to the client.
+ :param cors_allowed_origins: Origin or list of origins that are allowed to
+ connect to this server. Only the same origin
+ is allowed by default. Set this argument to
+ ``'*'`` to allow all origins, or to ``[]`` to
+ disable CORS handling.
+ :param cors_credentials: Whether credentials (cookies, authentication) are
+ allowed in requests to this server.
+ :param monitor_clients: If set to ``True``, a background task will ensure
+ inactive clients are closed. Set to ``False`` to
+ disable the monitoring task (not recommended). The
+ default is ``True``.
+ :param engineio_logger: To enable Engine.IO logging set to ``True`` or pass
+ a logger object to use. To disable logging set to
+ ``False``.
+ """
+ def __init__(self, client_manager=None, logger=False, json=None,
+ async_handlers=True, **kwargs):
+ if client_manager is None:
+ client_manager = asyncio_manager.AsyncManager()
+ super().__init__(client_manager=client_manager, logger=logger,
+ binary=False, json=json,
+ async_handlers=async_handlers, **kwargs)
+
+ def is_asyncio_based(self):
+ return True
+
+ def attach(self, app, socketio_path='socket.io'):
+ """Attach the Socket.IO server to an application."""
+ self.eio.attach(app, socketio_path)
+
+ async def emit(self, event, data=None, to=None, room=None, skip_sid=None,
+ namespace=None, callback=None, **kwargs):
+ """Emit a custom event to one or more connected clients.
+
+ :param event: The event name. It can be any string. The event names
+ ``'connect'``, ``'message'`` and ``'disconnect'`` are
+ reserved and should not be used.
+ :param data: The data to send to the client or clients. Data can be of
+ type ``str``, ``bytes``, ``list`` or ``dict``. If a
+ ``list`` or ``dict``, the data will be serialized as JSON.
+ :param to: The recipient of the message. This can be set to the
+ session ID of a client to address only that client, or to
+ to any custom room created by the application to address all
+ the clients in that room, If this argument is omitted the
+ event is broadcasted to all connected clients.
+ :param room: Alias for the ``to`` parameter.
+ :param skip_sid: The session ID of a client to skip when broadcasting
+ to a room or to all clients. This can be used to
+ prevent a message from being sent to the sender.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the event is emitted to the
+ default namespace.
+ :param callback: If given, this function will be called to acknowledge
+ the the client has received the message. The arguments
+ that will be passed to the function are those provided
+ by the client. Callback functions can only be used
+ when addressing an individual client.
+ :param ignore_queue: Only used when a message queue is configured. If
+ set to ``True``, the event is emitted to the
+ clients directly, without going through the queue.
+ This is more efficient, but only works when a
+ single server process is used. It is recommended
+ to always leave this parameter with its default
+ value of ``False``.
+
+ Note: this method is a coroutine.
+ """
+ namespace = namespace or '/'
+ room = to or room
+ self.logger.info('emitting event "%s" to %s [%s]', event,
+ room or 'all', namespace)
+ await self.manager.emit(event, data, namespace, room=room,
+ skip_sid=skip_sid, callback=callback,
+ **kwargs)
+
+ async def send(self, data, to=None, room=None, skip_sid=None,
+ namespace=None, callback=None, **kwargs):
+ """Send a message to one or more connected clients.
+
+ This function emits an event with the name ``'message'``. Use
+ :func:`emit` to issue custom event names.
+
+ :param data: The data to send to the client or clients. Data can be of
+ type ``str``, ``bytes``, ``list`` or ``dict``. If a
+ ``list`` or ``dict``, the data will be serialized as JSON.
+ :param to: The recipient of the message. This can be set to the
+ session ID of a client to address only that client, or to
+ to any custom room created by the application to address all
+ the clients in that room, If this argument is omitted the
+ event is broadcasted to all connected clients.
+ :param room: Alias for the ``to`` parameter.
+ :param skip_sid: The session ID of a client to skip when broadcasting
+ to a room or to all clients. This can be used to
+ prevent a message from being sent to the sender.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the event is emitted to the
+ default namespace.
+ :param callback: If given, this function will be called to acknowledge
+ the the client has received the message. The arguments
+ that will be passed to the function are those provided
+ by the client. Callback functions can only be used
+ when addressing an individual client.
+ :param ignore_queue: Only used when a message queue is configured. If
+ set to ``True``, the event is emitted to the
+ clients directly, without going through the queue.
+ This is more efficient, but only works when a
+ single server process is used. It is recommended
+ to always leave this parameter with its default
+ value of ``False``.
+
+ Note: this method is a coroutine.
+ """
+ await self.emit('message', data=data, to=to, room=room,
+ skip_sid=skip_sid, namespace=namespace,
+ callback=callback, **kwargs)
+
+ async def call(self, event, data=None, to=None, sid=None, namespace=None,
+ timeout=60, **kwargs):
+ """Emit a custom event to a client and wait for the response.
+
+ :param event: The event name. It can be any string. The event names
+ ``'connect'``, ``'message'`` and ``'disconnect'`` are
+ reserved and should not be used.
+ :param data: The data to send to the client or clients. Data can be of
+ type ``str``, ``bytes``, ``list`` or ``dict``. If a
+ ``list`` or ``dict``, the data will be serialized as JSON.
+ :param to: The session ID of the recipient client.
+ :param sid: Alias for the ``to`` parameter.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the event is emitted to the
+ default namespace.
+ :param timeout: The waiting timeout. If the timeout is reached before
+ the client acknowledges the event, then a
+ ``TimeoutError`` exception is raised.
+ :param ignore_queue: Only used when a message queue is configured. If
+ set to ``True``, the event is emitted to the
+ client directly, without going through the queue.
+ This is more efficient, but only works when a
+ single server process is used. It is recommended
+ to always leave this parameter with its default
+ value of ``False``.
+ """
+ if not self.async_handlers:
+ raise RuntimeError(
+ 'Cannot use call() when async_handlers is False.')
+ callback_event = self.eio.create_event()
+ callback_args = []
+
+ def event_callback(*args):
+ callback_args.append(args)
+ callback_event.set()
+
+ await self.emit(event, data=data, room=to or sid, namespace=namespace,
+ callback=event_callback, **kwargs)
+ try:
+ await asyncio.wait_for(callback_event.wait(), timeout)
+ except asyncio.TimeoutError:
+ six.raise_from(exceptions.TimeoutError(), None)
+ return callback_args[0] if len(callback_args[0]) > 1 \
+ else callback_args[0][0] if len(callback_args[0]) == 1 \
+ else None
+
+ async def close_room(self, room, namespace=None):
+ """Close a room.
+
+ This function removes all the clients from the given room.
+
+ :param room: Room name.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the default namespace is used.
+
+ Note: this method is a coroutine.
+ """
+ namespace = namespace or '/'
+ self.logger.info('room %s is closing [%s]', room, namespace)
+ await self.manager.close_room(room, namespace)
+
+ async def get_session(self, sid, namespace=None):
+ """Return the user session for a client.
+
+ :param sid: The session id of the client.
+ :param namespace: The Socket.IO namespace. If this argument is omitted
+ the default namespace is used.
+
+ The return value is a dictionary. Modifications made to this
+ dictionary are not guaranteed to be preserved. If you want to modify
+ the user session, use the ``session`` context manager instead.
+ """
+ namespace = namespace or '/'
+ eio_session = await self.eio.get_session(sid)
+ return eio_session.setdefault(namespace, {})
+
+ async def save_session(self, sid, session, namespace=None):
+ """Store the user session for a client.
+
+ :param sid: The session id of the client.
+ :param session: The session dictionary.
+ :param namespace: The Socket.IO namespace. If this argument is omitted
+ the default namespace is used.
+ """
+ namespace = namespace or '/'
+ eio_session = await self.eio.get_session(sid)
+ eio_session[namespace] = session
+
+ def session(self, sid, namespace=None):
+ """Return the user session for a client with context manager syntax.
+
+ :param sid: The session id of the client.
+
+ This is a context manager that returns the user session dictionary for
+ the client. Any changes that are made to this dictionary inside the
+ context manager block are saved back to the session. Example usage::
+
+ @eio.on('connect')
+ def on_connect(sid, environ):
+ username = authenticate_user(environ)
+ if not username:
+ return False
+ with eio.session(sid) as session:
+ session['username'] = username
+
+ @eio.on('message')
+ def on_message(sid, msg):
+ async with eio.session(sid) as session:
+ print('received message from ', session['username'])
+ """
+ class _session_context_manager(object):
+ def __init__(self, server, sid, namespace):
+ self.server = server
+ self.sid = sid
+ self.namespace = namespace
+ self.session = None
+
+ async def __aenter__(self):
+ self.session = await self.server.get_session(
+ sid, namespace=self.namespace)
+ return self.session
+
+ async def __aexit__(self, *args):
+ await self.server.save_session(sid, self.session,
+ namespace=self.namespace)
+
+ return _session_context_manager(self, sid, namespace)
+
+ async def disconnect(self, sid, namespace=None):
+ """Disconnect a client.
+
+ :param sid: Session ID of the client.
+ :param namespace: The Socket.IO namespace to disconnect. If this
+ argument is omitted the default namespace is used.
+
+ Note: this method is a coroutine.
+ """
+ namespace = namespace or '/'
+ if self.manager.is_connected(sid, namespace=namespace):
+ self.logger.info('Disconnecting %s [%s]', sid, namespace)
+ self.manager.pre_disconnect(sid, namespace=namespace)
+ await self._send_packet(sid, packet.Packet(packet.DISCONNECT,
+ namespace=namespace))
+ await self._trigger_event('disconnect', namespace, sid)
+ self.manager.disconnect(sid, namespace=namespace)
+ if namespace == '/':
+ await self.eio.disconnect(sid)
+
+ async def handle_request(self, *args, **kwargs):
+ """Handle an HTTP request from the client.
+
+ This is the entry point of the Socket.IO application. This function
+ returns the HTTP response body to deliver to the client.
+
+ Note: this method is a coroutine.
+ """
+ return await self.eio.handle_request(*args, **kwargs)
+
+ def start_background_task(self, target, *args, **kwargs):
+ """Start a background task using the appropriate async model.
+
+ This is a utility function that applications can use to start a
+ background task using the method that is compatible with the
+ selected async mode.
+
+ :param target: the target function to execute. Must be a coroutine.
+ :param args: arguments to pass to the function.
+ :param kwargs: keyword arguments to pass to the function.
+
+ The return value is a ``asyncio.Task`` object.
+
+ Note: this method is a coroutine.
+ """
+ return self.eio.start_background_task(target, *args, **kwargs)
+
+ async def sleep(self, seconds=0):
+ """Sleep for the requested amount of time using the appropriate async
+ model.
+
+ This is a utility function that applications can use to put a task to
+ sleep without having to worry about using the correct call for the
+ selected async mode.
+
+ Note: this method is a coroutine.
+ """
+ return await self.eio.sleep(seconds)
+
+ async def _emit_internal(self, sid, event, data, namespace=None, id=None):
+ """Send a message to a client."""
+ # tuples are expanded to multiple arguments, everything else is sent
+ # as a single argument
+ if isinstance(data, tuple):
+ data = list(data)
+ else:
+ data = [data]
+ await self._send_packet(sid, packet.Packet(
+ packet.EVENT, namespace=namespace, data=[event] + data, id=id,
+ binary=None))
+
+ async def _send_packet(self, sid, pkt):
+ """Send a Socket.IO packet to a client."""
+ encoded_packet = pkt.encode()
+ if isinstance(encoded_packet, list):
+ binary = False
+ for ep in encoded_packet:
+ await self.eio.send(sid, ep, binary=binary)
+ binary = True
+ else:
+ await self.eio.send(sid, encoded_packet, binary=False)
+
+ async def _handle_connect(self, sid, namespace):
+ """Handle a client connection request."""
+ namespace = namespace or '/'
+ self.manager.connect(sid, namespace)
+ if self.always_connect:
+ await self._send_packet(sid, packet.Packet(packet.CONNECT,
+ namespace=namespace))
+ fail_reason = None
+ try:
+ success = await self._trigger_event('connect', namespace, sid,
+ self.environ[sid])
+ except exceptions.ConnectionRefusedError as exc:
+ fail_reason = exc.error_args
+ success = False
+
+ if success is False:
+ if self.always_connect:
+ self.manager.pre_disconnect(sid, namespace)
+ await self._send_packet(sid, packet.Packet(
+ packet.DISCONNECT, data=fail_reason, namespace=namespace))
+ self.manager.disconnect(sid, namespace)
+ if not self.always_connect:
+ await self._send_packet(sid, packet.Packet(
+ packet.ERROR, data=fail_reason, namespace=namespace))
+ if sid in self.environ: # pragma: no cover
+ del self.environ[sid]
+ elif not self.always_connect:
+ await self._send_packet(sid, packet.Packet(packet.CONNECT,
+ namespace=namespace))
+
+ async def _handle_disconnect(self, sid, namespace):
+ """Handle a client disconnect."""
+ namespace = namespace or '/'
+ if namespace == '/':
+ namespace_list = list(self.manager.get_namespaces())
+ else:
+ namespace_list = [namespace]
+ for n in namespace_list:
+ if n != '/' and self.manager.is_connected(sid, n):
+ await self._trigger_event('disconnect', n, sid)
+ self.manager.disconnect(sid, n)
+ if namespace == '/' and self.manager.is_connected(sid, namespace):
+ await self._trigger_event('disconnect', '/', sid)
+ self.manager.disconnect(sid, '/')
+
+ async def _handle_event(self, sid, namespace, id, data):
+ """Handle an incoming client event."""
+ namespace = namespace or '/'
+ self.logger.info('received event "%s" from %s [%s]', data[0], sid,
+ namespace)
+ if not self.manager.is_connected(sid, namespace):
+ self.logger.warning('%s is not connected to namespace %s',
+ sid, namespace)
+ return
+ if self.async_handlers:
+ self.start_background_task(self._handle_event_internal, self, sid,
+ data, namespace, id)
+ else:
+ await self._handle_event_internal(self, sid, data, namespace, id)
+
+ async def _handle_event_internal(self, server, sid, data, namespace, id):
+ r = await server._trigger_event(data[0], namespace, sid, *data[1:])
+ if id is not None:
+ # send ACK packet with the response returned by the handler
+ # tuples are expanded as multiple arguments
+ if r is None:
+ data = []
+ elif isinstance(r, tuple):
+ data = list(r)
+ else:
+ data = [r]
+ await server._send_packet(sid, packet.Packet(packet.ACK,
+ namespace=namespace,
+ id=id, data=data,
+ binary=None))
+
+ async def _handle_ack(self, sid, namespace, id, data):
+ """Handle ACK packets from the client."""
+ namespace = namespace or '/'
+ self.logger.info('received ack from %s [%s]', sid, namespace)
+ await self.manager.trigger_callback(sid, namespace, id, data)
+
+ async def _trigger_event(self, event, namespace, *args):
+ """Invoke an application event handler."""
+ # first see if we have an explicit handler for the event
+ if namespace in self.handlers and event in self.handlers[namespace]:
+ if asyncio.iscoroutinefunction(self.handlers[namespace][event]) \
+ is True:
+ try:
+ ret = await self.handlers[namespace][event](*args)
+ except asyncio.CancelledError: # pragma: no cover
+ ret = None
+ else:
+ ret = self.handlers[namespace][event](*args)
+ return ret
+
+ # or else, forward the event to a namepsace handler if one exists
+ elif namespace in self.namespace_handlers:
+ return await self.namespace_handlers[namespace].trigger_event(
+ event, *args)
+
+ async def _handle_eio_connect(self, sid, environ):
+ """Handle the Engine.IO connection event."""
+ if not self.manager_initialized:
+ self.manager_initialized = True
+ self.manager.initialize()
+ self.environ[sid] = environ
+ return await self._handle_connect(sid, '/')
+
+ async def _handle_eio_message(self, sid, data):
+ """Dispatch Engine.IO messages."""
+ if sid in self._binary_packet:
+ pkt = self._binary_packet[sid]
+ if pkt.add_attachment(data):
+ del self._binary_packet[sid]
+ if pkt.packet_type == packet.BINARY_EVENT:
+ await self._handle_event(sid, pkt.namespace, pkt.id,
+ pkt.data)
+ else:
+ await self._handle_ack(sid, pkt.namespace, pkt.id,
+ pkt.data)
+ else:
+ pkt = packet.Packet(encoded_packet=data)
+ if pkt.packet_type == packet.CONNECT:
+ await self._handle_connect(sid, pkt.namespace)
+ elif pkt.packet_type == packet.DISCONNECT:
+ await self._handle_disconnect(sid, pkt.namespace)
+ elif pkt.packet_type == packet.EVENT:
+ await self._handle_event(sid, pkt.namespace, pkt.id, pkt.data)
+ elif pkt.packet_type == packet.ACK:
+ await self._handle_ack(sid, pkt.namespace, pkt.id, pkt.data)
+ elif pkt.packet_type == packet.BINARY_EVENT or \
+ pkt.packet_type == packet.BINARY_ACK:
+ self._binary_packet[sid] = pkt
+ elif pkt.packet_type == packet.ERROR:
+ raise ValueError('Unexpected ERROR packet.')
+ else:
+ raise ValueError('Unknown packet type.')
+
+ async def _handle_eio_disconnect(self, sid):
+ """Handle Engine.IO disconnect event."""
+ await self._handle_disconnect(sid, '/')
+ if sid in self.environ:
+ del self.environ[sid]
+
+ def _engineio_server_class(self):
+ return engineio.AsyncServer
diff --git a/libs/socketio/base_manager.py b/libs/socketio/base_manager.py
new file mode 100644
index 000000000..3cccb8569
--- /dev/null
+++ b/libs/socketio/base_manager.py
@@ -0,0 +1,178 @@
+import itertools
+import logging
+
+import six
+
+default_logger = logging.getLogger('socketio')
+
+
+class BaseManager(object):
+ """Manage client connections.
+
+ This class keeps track of all the clients and the rooms they are in, to
+ support the broadcasting of messages. The data used by this class is
+ stored in a memory structure, making it appropriate only for single process
+ services. More sophisticated storage backends can be implemented by
+ subclasses.
+ """
+ def __init__(self):
+ self.logger = None
+ self.server = None
+ self.rooms = {}
+ self.callbacks = {}
+ self.pending_disconnect = {}
+
+ def set_server(self, server):
+ self.server = server
+
+ def initialize(self):
+ """Invoked before the first request is received. Subclasses can add
+ their initialization code here.
+ """
+ pass
+
+ def get_namespaces(self):
+ """Return an iterable with the active namespace names."""
+ return six.iterkeys(self.rooms)
+
+ def get_participants(self, namespace, room):
+ """Return an iterable with the active participants in a room."""
+ for sid, active in six.iteritems(self.rooms[namespace][room].copy()):
+ yield sid
+
+ def connect(self, sid, namespace):
+ """Register a client connection to a namespace."""
+ self.enter_room(sid, namespace, None)
+ self.enter_room(sid, namespace, sid)
+
+ def is_connected(self, sid, namespace):
+ if namespace in self.pending_disconnect and \
+ sid in self.pending_disconnect[namespace]:
+ # the client is in the process of being disconnected
+ return False
+ try:
+ return self.rooms[namespace][None][sid]
+ except KeyError:
+ pass
+
+ def pre_disconnect(self, sid, namespace):
+ """Put the client in the to-be-disconnected list.
+
+ This allows the client data structures to be present while the
+ disconnect handler is invoked, but still recognize the fact that the
+ client is soon going away.
+ """
+ if namespace not in self.pending_disconnect:
+ self.pending_disconnect[namespace] = []
+ self.pending_disconnect[namespace].append(sid)
+
+ def disconnect(self, sid, namespace):
+ """Register a client disconnect from a namespace."""
+ if namespace not in self.rooms:
+ return
+ rooms = []
+ for room_name, room in six.iteritems(self.rooms[namespace].copy()):
+ if sid in room:
+ rooms.append(room_name)
+ for room in rooms:
+ self.leave_room(sid, namespace, room)
+ if sid in self.callbacks and namespace in self.callbacks[sid]:
+ del self.callbacks[sid][namespace]
+ if len(self.callbacks[sid]) == 0:
+ del self.callbacks[sid]
+ if namespace in self.pending_disconnect and \
+ sid in self.pending_disconnect[namespace]:
+ self.pending_disconnect[namespace].remove(sid)
+ if len(self.pending_disconnect[namespace]) == 0:
+ del self.pending_disconnect[namespace]
+
+ def enter_room(self, sid, namespace, room):
+ """Add a client to a room."""
+ if namespace not in self.rooms:
+ self.rooms[namespace] = {}
+ if room not in self.rooms[namespace]:
+ self.rooms[namespace][room] = {}
+ self.rooms[namespace][room][sid] = True
+
+ def leave_room(self, sid, namespace, room):
+ """Remove a client from a room."""
+ try:
+ del self.rooms[namespace][room][sid]
+ if len(self.rooms[namespace][room]) == 0:
+ del self.rooms[namespace][room]
+ if len(self.rooms[namespace]) == 0:
+ del self.rooms[namespace]
+ except KeyError:
+ pass
+
+ def close_room(self, room, namespace):
+ """Remove all participants from a room."""
+ try:
+ for sid in self.get_participants(namespace, room):
+ self.leave_room(sid, namespace, room)
+ except KeyError:
+ pass
+
+ def get_rooms(self, sid, namespace):
+ """Return the rooms a client is in."""
+ r = []
+ try:
+ for room_name, room in six.iteritems(self.rooms[namespace]):
+ if room_name is not None and sid in room and room[sid]:
+ r.append(room_name)
+ except KeyError:
+ pass
+ return r
+
+ def emit(self, event, data, namespace, room=None, skip_sid=None,
+ callback=None, **kwargs):
+ """Emit a message to a single client, a room, or all the clients
+ connected to the namespace."""
+ if namespace not in self.rooms or room not in self.rooms[namespace]:
+ return
+ if not isinstance(skip_sid, list):
+ skip_sid = [skip_sid]
+ for sid in self.get_participants(namespace, room):
+ if sid not in skip_sid:
+ if callback is not None:
+ id = self._generate_ack_id(sid, namespace, callback)
+ else:
+ id = None
+ self.server._emit_internal(sid, event, data, namespace, id)
+
+ def trigger_callback(self, sid, namespace, id, data):
+ """Invoke an application callback."""
+ callback = None
+ try:
+ callback = self.callbacks[sid][namespace][id]
+ except KeyError:
+ # if we get an unknown callback we just ignore it
+ self._get_logger().warning('Unknown callback received, ignoring.')
+ else:
+ del self.callbacks[sid][namespace][id]
+ if callback is not None:
+ callback(*data)
+
+ def _generate_ack_id(self, sid, namespace, callback):
+ """Generate a unique identifier for an ACK packet."""
+ namespace = namespace or '/'
+ if sid not in self.callbacks:
+ self.callbacks[sid] = {}
+ if namespace not in self.callbacks[sid]:
+ self.callbacks[sid][namespace] = {0: itertools.count(1)}
+ id = six.next(self.callbacks[sid][namespace][0])
+ self.callbacks[sid][namespace][id] = callback
+ return id
+
+ def _get_logger(self):
+ """Get the appropriate logger
+
+ Prevents uninitialized servers in write-only mode from failing.
+ """
+
+ if self.logger:
+ return self.logger
+ elif self.server:
+ return self.server.logger
+ else:
+ return default_logger
diff --git a/libs/socketio/client.py b/libs/socketio/client.py
new file mode 100644
index 000000000..e917d634d
--- /dev/null
+++ b/libs/socketio/client.py
@@ -0,0 +1,620 @@
+import itertools
+import logging
+import random
+import signal
+
+import engineio
+import six
+
+from . import exceptions
+from . import namespace
+from . import packet
+
+default_logger = logging.getLogger('socketio.client')
+reconnecting_clients = []
+
+
+def signal_handler(sig, frame): # pragma: no cover
+ """SIGINT handler.
+
+ Notify any clients that are in a reconnect loop to abort. Other
+ disconnection tasks are handled at the engine.io level.
+ """
+ for client in reconnecting_clients[:]:
+ client._reconnect_abort.set()
+ return original_signal_handler(sig, frame)
+
+
+original_signal_handler = signal.signal(signal.SIGINT, signal_handler)
+
+
+class Client(object):
+ """A Socket.IO client.
+
+ This class implements a fully compliant Socket.IO web client with support
+ for websocket and long-polling transports.
+
+ :param reconnection: ``True`` if the client should automatically attempt to
+ reconnect to the server after an interruption, or
+ ``False`` to not reconnect. The default is ``True``.
+ :param reconnection_attempts: How many reconnection attempts to issue
+ before giving up, or 0 for infinity attempts.
+ The default is 0.
+ :param reconnection_delay: How long to wait in seconds before the first
+ reconnection attempt. Each successive attempt
+ doubles this delay.
+ :param reconnection_delay_max: The maximum delay between reconnection
+ attempts.
+ :param randomization_factor: Randomization amount for each delay between
+ reconnection attempts. The default is 0.5,
+ which means that each delay is randomly
+ adjusted by +/- 50%.
+ :param logger: To enable logging set to ``True`` or pass a logger object to
+ use. To disable logging set to ``False``. The default is
+ ``False``.
+ :param binary: ``True`` to support binary payloads, ``False`` to treat all
+ payloads as text. On Python 2, if this is set to ``True``,
+ ``unicode`` values are treated as text, and ``str`` and
+ ``bytes`` values are treated as binary. This option has no
+ effect on Python 3, where text and binary payloads are
+ always automatically discovered.
+ :param json: An alternative json module to use for encoding and decoding
+ packets. Custom json modules must have ``dumps`` and ``loads``
+ functions that are compatible with the standard library
+ versions.
+
+ The Engine.IO configuration supports the following settings:
+
+ :param request_timeout: A timeout in seconds for requests. The default is
+ 5 seconds.
+ :param ssl_verify: ``True`` to verify SSL certificates, or ``False`` to
+ skip SSL certificate verification, allowing
+ connections to servers with self signed certificates.
+ The default is ``True``.
+ :param engineio_logger: To enable Engine.IO logging set to ``True`` or pass
+ a logger object to use. To disable logging set to
+ ``False``. The default is ``False``.
+ """
+ def __init__(self, reconnection=True, reconnection_attempts=0,
+ reconnection_delay=1, reconnection_delay_max=5,
+ randomization_factor=0.5, logger=False, binary=False,
+ json=None, **kwargs):
+ self.reconnection = reconnection
+ self.reconnection_attempts = reconnection_attempts
+ self.reconnection_delay = reconnection_delay
+ self.reconnection_delay_max = reconnection_delay_max
+ self.randomization_factor = randomization_factor
+ self.binary = binary
+
+ engineio_options = kwargs
+ engineio_logger = engineio_options.pop('engineio_logger', None)
+ if engineio_logger is not None:
+ engineio_options['logger'] = engineio_logger
+ if json is not None:
+ packet.Packet.json = json
+ engineio_options['json'] = json
+
+ self.eio = self._engineio_client_class()(**engineio_options)
+ self.eio.on('connect', self._handle_eio_connect)
+ self.eio.on('message', self._handle_eio_message)
+ self.eio.on('disconnect', self._handle_eio_disconnect)
+
+ if not isinstance(logger, bool):
+ self.logger = logger
+ else:
+ self.logger = default_logger
+ if not logging.root.handlers and \
+ self.logger.level == logging.NOTSET:
+ if logger:
+ self.logger.setLevel(logging.INFO)
+ else:
+ self.logger.setLevel(logging.ERROR)
+ self.logger.addHandler(logging.StreamHandler())
+
+ self.connection_url = None
+ self.connection_headers = None
+ self.connection_transports = None
+ self.connection_namespaces = None
+ self.socketio_path = None
+ self.sid = None
+
+ self.connected = False
+ self.namespaces = []
+ self.handlers = {}
+ self.namespace_handlers = {}
+ self.callbacks = {}
+ self._binary_packet = None
+ self._reconnect_task = None
+ self._reconnect_abort = self.eio.create_event()
+
+ def is_asyncio_based(self):
+ return False
+
+ def on(self, event, handler=None, namespace=None):
+ """Register an event handler.
+
+ :param event: The event name. It can be any string. The event names
+ ``'connect'``, ``'message'`` and ``'disconnect'`` are
+ reserved and should not be used.
+ :param handler: The function that should be invoked to handle the
+ event. When this parameter is not given, the method
+ acts as a decorator for the handler function.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the handler is associated with
+ the default namespace.
+
+ Example usage::
+
+ # as a decorator:
+ @sio.on('connect')
+ def connect_handler():
+ print('Connected!')
+
+ # as a method:
+ def message_handler(msg):
+ print('Received message: ', msg)
+ sio.send( 'response')
+ sio.on('message', message_handler)
+
+ The ``'connect'`` event handler receives no arguments. The
+ ``'message'`` handler and handlers for custom event names receive the
+ message payload as only argument. Any values returned from a message
+ handler will be passed to the client's acknowledgement callback
+ function if it exists. The ``'disconnect'`` handler does not take
+ arguments.
+ """
+ namespace = namespace or '/'
+
+ def set_handler(handler):
+ if namespace not in self.handlers:
+ self.handlers[namespace] = {}
+ self.handlers[namespace][event] = handler
+ return handler
+
+ if handler is None:
+ return set_handler
+ set_handler(handler)
+
+ def event(self, *args, **kwargs):
+ """Decorator to register an event handler.
+
+ This is a simplified version of the ``on()`` method that takes the
+ event name from the decorated function.
+
+ Example usage::
+
+ @sio.event
+ def my_event(data):
+ print('Received data: ', data)
+
+ The above example is equivalent to::
+
+ @sio.on('my_event')
+ def my_event(data):
+ print('Received data: ', data)
+
+ A custom namespace can be given as an argument to the decorator::
+
+ @sio.event(namespace='/test')
+ def my_event(data):
+ print('Received data: ', data)
+ """
+ if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
+ # the decorator was invoked without arguments
+ # args[0] is the decorated function
+ return self.on(args[0].__name__)(args[0])
+ else:
+ # the decorator was invoked with arguments
+ def set_handler(handler):
+ return self.on(handler.__name__, *args, **kwargs)(handler)
+
+ return set_handler
+
+ def register_namespace(self, namespace_handler):
+ """Register a namespace handler object.
+
+ :param namespace_handler: An instance of a :class:`Namespace`
+ subclass that handles all the event traffic
+ for a namespace.
+ """
+ if not isinstance(namespace_handler, namespace.ClientNamespace):
+ raise ValueError('Not a namespace instance')
+ if self.is_asyncio_based() != namespace_handler.is_asyncio_based():
+ raise ValueError('Not a valid namespace class for this client')
+ namespace_handler._set_client(self)
+ self.namespace_handlers[namespace_handler.namespace] = \
+ namespace_handler
+
+ def connect(self, url, headers={}, transports=None,
+ namespaces=None, socketio_path='socket.io'):
+ """Connect to a Socket.IO server.
+
+ :param url: The URL of the Socket.IO server. It can include custom
+ query string parameters if required by the server.
+ :param headers: A dictionary with custom headers to send with the
+ connection request.
+ :param transports: The list of allowed transports. Valid transports
+ are ``'polling'`` and ``'websocket'``. If not
+ given, the polling transport is connected first,
+ then an upgrade to websocket is attempted.
+ :param namespaces: The list of custom namespaces to connect, in
+ addition to the default namespace. If not given,
+ the namespace list is obtained from the registered
+ event handlers.
+ :param socketio_path: The endpoint where the Socket.IO server is
+ installed. The default value is appropriate for
+ most cases.
+
+ Example usage::
+
+ sio = socketio.Client()
+ sio.connect('http://localhost:5000')
+ """
+ self.connection_url = url
+ self.connection_headers = headers
+ self.connection_transports = transports
+ self.connection_namespaces = namespaces
+ self.socketio_path = socketio_path
+
+ if namespaces is None:
+ namespaces = set(self.handlers.keys()).union(
+ set(self.namespace_handlers.keys()))
+ elif isinstance(namespaces, six.string_types):
+ namespaces = [namespaces]
+ self.connection_namespaces = namespaces
+ self.namespaces = [n for n in namespaces if n != '/']
+ try:
+ self.eio.connect(url, headers=headers, transports=transports,
+ engineio_path=socketio_path)
+ except engineio.exceptions.ConnectionError as exc:
+ six.raise_from(exceptions.ConnectionError(exc.args[0]), None)
+ self.connected = True
+
+ def wait(self):
+ """Wait until the connection with the server ends.
+
+ Client applications can use this function to block the main thread
+ during the life of the connection.
+ """
+ while True:
+ self.eio.wait()
+ self.sleep(1) # give the reconnect task time to start up
+ if not self._reconnect_task:
+ break
+ self._reconnect_task.join()
+ if self.eio.state != 'connected':
+ break
+
+ def emit(self, event, data=None, namespace=None, callback=None):
+ """Emit a custom event to one or more connected clients.
+
+ :param event: The event name. It can be any string. The event names
+ ``'connect'``, ``'message'`` and ``'disconnect'`` are
+ reserved and should not be used.
+ :param data: The data to send to the client or clients. Data can be of
+ type ``str``, ``bytes``, ``list`` or ``dict``. If a
+ ``list`` or ``dict``, the data will be serialized as JSON.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the event is emitted to the
+ default namespace.
+ :param callback: If given, this function will be called to acknowledge
+ the the client has received the message. The arguments
+ that will be passed to the function are those provided
+ by the client. Callback functions can only be used
+ when addressing an individual client.
+ """
+ namespace = namespace or '/'
+ if namespace != '/' and namespace not in self.namespaces:
+ raise exceptions.BadNamespaceError(
+ namespace + ' is not a connected namespace.')
+ self.logger.info('Emitting event "%s" [%s]', event, namespace)
+ if callback is not None:
+ id = self._generate_ack_id(namespace, callback)
+ else:
+ id = None
+ if six.PY2 and not self.binary:
+ binary = False # pragma: nocover
+ else:
+ binary = None
+ # tuples are expanded to multiple arguments, everything else is sent
+ # as a single argument
+ if isinstance(data, tuple):
+ data = list(data)
+ elif data is not None:
+ data = [data]
+ else:
+ data = []
+ self._send_packet(packet.Packet(packet.EVENT, namespace=namespace,
+ data=[event] + data, id=id,
+ binary=binary))
+
+ def send(self, data, namespace=None, callback=None):
+ """Send a message to one or more connected clients.
+
+ This function emits an event with the name ``'message'``. Use
+ :func:`emit` to issue custom event names.
+
+ :param data: The data to send to the client or clients. Data can be of
+ type ``str``, ``bytes``, ``list`` or ``dict``. If a
+ ``list`` or ``dict``, the data will be serialized as JSON.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the event is emitted to the
+ default namespace.
+ :param callback: If given, this function will be called to acknowledge
+ the the client has received the message. The arguments
+ that will be passed to the function are those provided
+ by the client. Callback functions can only be used
+ when addressing an individual client.
+ """
+ self.emit('message', data=data, namespace=namespace,
+ callback=callback)
+
+ def call(self, event, data=None, namespace=None, timeout=60):
+ """Emit a custom event to a client and wait for the response.
+
+ :param event: The event name. It can be any string. The event names
+ ``'connect'``, ``'message'`` and ``'disconnect'`` are
+ reserved and should not be used.
+ :param data: The data to send to the client or clients. Data can be of
+ type ``str``, ``bytes``, ``list`` or ``dict``. If a
+ ``list`` or ``dict``, the data will be serialized as JSON.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the event is emitted to the
+ default namespace.
+ :param timeout: The waiting timeout. If the timeout is reached before
+ the client acknowledges the event, then a
+ ``TimeoutError`` exception is raised.
+ """
+ callback_event = self.eio.create_event()
+ callback_args = []
+
+ def event_callback(*args):
+ callback_args.append(args)
+ callback_event.set()
+
+ self.emit(event, data=data, namespace=namespace,
+ callback=event_callback)
+ if not callback_event.wait(timeout=timeout):
+ raise exceptions.TimeoutError()
+ return callback_args[0] if len(callback_args[0]) > 1 \
+ else callback_args[0][0] if len(callback_args[0]) == 1 \
+ else None
+
+ def disconnect(self):
+ """Disconnect from the server."""
+ # here we just request the disconnection
+ # later in _handle_eio_disconnect we invoke the disconnect handler
+ for n in self.namespaces:
+ self._send_packet(packet.Packet(packet.DISCONNECT, namespace=n))
+ self._send_packet(packet.Packet(
+ packet.DISCONNECT, namespace='/'))
+ self.connected = False
+ self.eio.disconnect(abort=True)
+
+ def transport(self):
+ """Return the name of the transport used by the client.
+
+ The two possible values returned by this function are ``'polling'``
+ and ``'websocket'``.
+ """
+ return self.eio.transport()
+
+ def start_background_task(self, target, *args, **kwargs):
+ """Start a background task using the appropriate async model.
+
+ This is a utility function that applications can use to start a
+ background task using the method that is compatible with the
+ selected async mode.
+
+ :param target: the target function to execute.
+ :param args: arguments to pass to the function.
+ :param kwargs: keyword arguments to pass to the function.
+
+ This function returns an object compatible with the `Thread` class in
+ the Python standard library. The `start()` method on this object is
+ already called by this function.
+ """
+ return self.eio.start_background_task(target, *args, **kwargs)
+
+ def sleep(self, seconds=0):
+ """Sleep for the requested amount of time using the appropriate async
+ model.
+
+ This is a utility function that applications can use to put a task to
+ sleep without having to worry about using the correct call for the
+ selected async mode.
+ """
+ return self.eio.sleep(seconds)
+
+ def _send_packet(self, pkt):
+ """Send a Socket.IO packet to the server."""
+ encoded_packet = pkt.encode()
+ if isinstance(encoded_packet, list):
+ binary = False
+ for ep in encoded_packet:
+ self.eio.send(ep, binary=binary)
+ binary = True
+ else:
+ self.eio.send(encoded_packet, binary=False)
+
+ def _generate_ack_id(self, namespace, callback):
+ """Generate a unique identifier for an ACK packet."""
+ namespace = namespace or '/'
+ if namespace not in self.callbacks:
+ self.callbacks[namespace] = {0: itertools.count(1)}
+ id = six.next(self.callbacks[namespace][0])
+ self.callbacks[namespace][id] = callback
+ return id
+
+ def _handle_connect(self, namespace):
+ namespace = namespace or '/'
+ self.logger.info('Namespace {} is connected'.format(namespace))
+ self._trigger_event('connect', namespace=namespace)
+ if namespace == '/':
+ for n in self.namespaces:
+ self._send_packet(packet.Packet(packet.CONNECT, namespace=n))
+ elif namespace not in self.namespaces:
+ self.namespaces.append(namespace)
+
+ def _handle_disconnect(self, namespace):
+ if not self.connected:
+ return
+ namespace = namespace or '/'
+ if namespace == '/':
+ for n in self.namespaces:
+ self._trigger_event('disconnect', namespace=n)
+ self.namespaces = []
+ self._trigger_event('disconnect', namespace=namespace)
+ if namespace in self.namespaces:
+ self.namespaces.remove(namespace)
+ if namespace == '/':
+ self.connected = False
+
+ def _handle_event(self, namespace, id, data):
+ namespace = namespace or '/'
+ self.logger.info('Received event "%s" [%s]', data[0], namespace)
+ r = self._trigger_event(data[0], namespace, *data[1:])
+ if id is not None:
+ # send ACK packet with the response returned by the handler
+ # tuples are expanded as multiple arguments
+ if r is None:
+ data = []
+ elif isinstance(r, tuple):
+ data = list(r)
+ else:
+ data = [r]
+ if six.PY2 and not self.binary:
+ binary = False # pragma: nocover
+ else:
+ binary = None
+ self._send_packet(packet.Packet(packet.ACK, namespace=namespace,
+ id=id, data=data, binary=binary))
+
+ def _handle_ack(self, namespace, id, data):
+ namespace = namespace or '/'
+ self.logger.info('Received ack [%s]', namespace)
+ callback = None
+ try:
+ callback = self.callbacks[namespace][id]
+ except KeyError:
+ # if we get an unknown callback we just ignore it
+ self.logger.warning('Unknown callback received, ignoring.')
+ else:
+ del self.callbacks[namespace][id]
+ if callback is not None:
+ callback(*data)
+
+ def _handle_error(self, namespace, data):
+ namespace = namespace or '/'
+ self.logger.info('Connection to namespace {} was rejected'.format(
+ namespace))
+ if data is None:
+ data = tuple()
+ elif not isinstance(data, (tuple, list)):
+ data = (data,)
+ self._trigger_event('connect_error', namespace, *data)
+ if namespace in self.namespaces:
+ self.namespaces.remove(namespace)
+ if namespace == '/':
+ self.namespaces = []
+ self.connected = False
+
+ def _trigger_event(self, event, namespace, *args):
+ """Invoke an application event handler."""
+ # first see if we have an explicit handler for the event
+ if namespace in self.handlers and event in self.handlers[namespace]:
+ return self.handlers[namespace][event](*args)
+
+ # or else, forward the event to a namespace handler if one exists
+ elif namespace in self.namespace_handlers:
+ return self.namespace_handlers[namespace].trigger_event(
+ event, *args)
+
+ def _handle_reconnect(self):
+ self._reconnect_abort.clear()
+ reconnecting_clients.append(self)
+ attempt_count = 0
+ current_delay = self.reconnection_delay
+ while True:
+ delay = current_delay
+ current_delay *= 2
+ if delay > self.reconnection_delay_max:
+ delay = self.reconnection_delay_max
+ delay += self.randomization_factor * (2 * random.random() - 1)
+ self.logger.info(
+ 'Connection failed, new attempt in {:.02f} seconds'.format(
+ delay))
+ if self._reconnect_abort.wait(delay):
+ self.logger.info('Reconnect task aborted')
+ break
+ attempt_count += 1
+ try:
+ self.connect(self.connection_url,
+ headers=self.connection_headers,
+ transports=self.connection_transports,
+ namespaces=self.connection_namespaces,
+ socketio_path=self.socketio_path)
+ except (exceptions.ConnectionError, ValueError):
+ pass
+ else:
+ self.logger.info('Reconnection successful')
+ self._reconnect_task = None
+ break
+ if self.reconnection_attempts and \
+ attempt_count >= self.reconnection_attempts:
+ self.logger.info(
+ 'Maximum reconnection attempts reached, giving up')
+ break
+ reconnecting_clients.remove(self)
+
+ def _handle_eio_connect(self):
+ """Handle the Engine.IO connection event."""
+ self.logger.info('Engine.IO connection established')
+ self.sid = self.eio.sid
+
+ def _handle_eio_message(self, data):
+ """Dispatch Engine.IO messages."""
+ if self._binary_packet:
+ pkt = self._binary_packet
+ if pkt.add_attachment(data):
+ self._binary_packet = None
+ if pkt.packet_type == packet.BINARY_EVENT:
+ self._handle_event(pkt.namespace, pkt.id, pkt.data)
+ else:
+ self._handle_ack(pkt.namespace, pkt.id, pkt.data)
+ else:
+ pkt = packet.Packet(encoded_packet=data)
+ if pkt.packet_type == packet.CONNECT:
+ self._handle_connect(pkt.namespace)
+ elif pkt.packet_type == packet.DISCONNECT:
+ self._handle_disconnect(pkt.namespace)
+ elif pkt.packet_type == packet.EVENT:
+ self._handle_event(pkt.namespace, pkt.id, pkt.data)
+ elif pkt.packet_type == packet.ACK:
+ self._handle_ack(pkt.namespace, pkt.id, pkt.data)
+ elif pkt.packet_type == packet.BINARY_EVENT or \
+ pkt.packet_type == packet.BINARY_ACK:
+ self._binary_packet = pkt
+ elif pkt.packet_type == packet.ERROR:
+ self._handle_error(pkt.namespace, pkt.data)
+ else:
+ raise ValueError('Unknown packet type.')
+
+ def _handle_eio_disconnect(self):
+ """Handle the Engine.IO disconnection event."""
+ self.logger.info('Engine.IO connection dropped')
+ if self.connected:
+ for n in self.namespaces:
+ self._trigger_event('disconnect', namespace=n)
+ self._trigger_event('disconnect', namespace='/')
+ self.namespaces = []
+ self.connected = False
+ self.callbacks = {}
+ self._binary_packet = None
+ self.sid = None
+ if self.eio.state == 'connected' and self.reconnection:
+ self._reconnect_task = self.start_background_task(
+ self._handle_reconnect)
+
+ def _engineio_client_class(self):
+ return engineio.Client
diff --git a/libs/socketio/exceptions.py b/libs/socketio/exceptions.py
new file mode 100644
index 000000000..36dddd9fc
--- /dev/null
+++ b/libs/socketio/exceptions.py
@@ -0,0 +1,30 @@
+class SocketIOError(Exception):
+ pass
+
+
+class ConnectionError(SocketIOError):
+ pass
+
+
+class ConnectionRefusedError(ConnectionError):
+ """Connection refused exception.
+
+ This exception can be raised from a connect handler when the connection
+ is not accepted. The positional arguments provided with the exception are
+ returned with the error packet to the client.
+ """
+ def __init__(self, *args):
+ if len(args) == 0:
+ self.error_args = None
+ elif len(args) == 1 and not isinstance(args[0], list):
+ self.error_args = args[0]
+ else:
+ self.error_args = args
+
+
+class TimeoutError(SocketIOError):
+ pass
+
+
+class BadNamespaceError(SocketIOError):
+ pass
diff --git a/libs/socketio/kafka_manager.py b/libs/socketio/kafka_manager.py
new file mode 100644
index 000000000..00a2e7f05
--- /dev/null
+++ b/libs/socketio/kafka_manager.py
@@ -0,0 +1,63 @@
+import logging
+import pickle
+
+try:
+ import kafka
+except ImportError:
+ kafka = None
+
+from .pubsub_manager import PubSubManager
+
+logger = logging.getLogger('socketio')
+
+
+class KafkaManager(PubSubManager): # pragma: no cover
+ """Kafka based client manager.
+
+ This class implements a Kafka backend for event sharing across multiple
+ processes.
+
+ To use a Kafka backend, initialize the :class:`Server` instance as
+ follows::
+
+ url = 'kafka://hostname:port'
+ server = socketio.Server(client_manager=socketio.KafkaManager(url))
+
+ :param url: The connection URL for the Kafka server. For a default Kafka
+ store running on the same host, use ``kafka://``.
+ :param channel: The channel name (topic) on which the server sends and
+ receives notifications. Must be the same in all the
+ servers.
+ :param write_only: If set ot ``True``, only initialize to emit events. The
+ default of ``False`` initializes the class for emitting
+ and receiving.
+ """
+ name = 'kafka'
+
+ def __init__(self, url='kafka://localhost:9092', channel='socketio',
+ write_only=False):
+ if kafka is None:
+ raise RuntimeError('kafka-python package is not installed '
+ '(Run "pip install kafka-python" in your '
+ 'virtualenv).')
+
+ super(KafkaManager, self).__init__(channel=channel,
+ write_only=write_only)
+
+ self.kafka_url = url[8:] if url != 'kafka://' else 'localhost:9092'
+ self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_url)
+ self.consumer = kafka.KafkaConsumer(self.channel,
+ bootstrap_servers=self.kafka_url)
+
+ def _publish(self, data):
+ self.producer.send(self.channel, value=pickle.dumps(data))
+ self.producer.flush()
+
+ def _kafka_listen(self):
+ for message in self.consumer:
+ yield message
+
+ def _listen(self):
+ for message in self._kafka_listen():
+ if message.topic == self.channel:
+ yield pickle.loads(message.value)
diff --git a/libs/socketio/kombu_manager.py b/libs/socketio/kombu_manager.py
new file mode 100644
index 000000000..4eb9ee498
--- /dev/null
+++ b/libs/socketio/kombu_manager.py
@@ -0,0 +1,122 @@
+import pickle
+import uuid
+
+try:
+ import kombu
+except ImportError:
+ kombu = None
+
+from .pubsub_manager import PubSubManager
+
+
+class KombuManager(PubSubManager): # pragma: no cover
+ """Client manager that uses kombu for inter-process messaging.
+
+ This class implements a client manager backend for event sharing across
+ multiple processes, using RabbitMQ, Redis or any other messaging mechanism
+ supported by `kombu <http://kombu.readthedocs.org/en/latest/>`_.
+
+ To use a kombu backend, initialize the :class:`Server` instance as
+ follows::
+
+ url = 'amqp://user:password@hostname:port//'
+ server = socketio.Server(client_manager=socketio.KombuManager(url))
+
+ :param url: The connection URL for the backend messaging queue. Example
+ connection URLs are ``'amqp://guest:guest@localhost:5672//'``
+ and ``'redis://localhost:6379/'`` for RabbitMQ and Redis
+ respectively. Consult the `kombu documentation
+ <http://kombu.readthedocs.org/en/latest/userguide\
+ /connections.html#urls>`_ for more on how to construct
+ connection URLs.
+ :param channel: The channel name on which the server sends and receives
+ notifications. Must be the same in all the servers.
+ :param write_only: If set ot ``True``, only initialize to emit events. The
+ default of ``False`` initializes the class for emitting
+ and receiving.
+ :param connection_options: additional keyword arguments to be passed to
+ ``kombu.Connection()``.
+ :param exchange_options: additional keyword arguments to be passed to
+ ``kombu.Exchange()``.
+ :param queue_options: additional keyword arguments to be passed to
+ ``kombu.Queue()``.
+ :param producer_options: additional keyword arguments to be passed to
+ ``kombu.Producer()``.
+ """
+ name = 'kombu'
+
+ def __init__(self, url='amqp://guest:guest@localhost:5672//',
+ channel='socketio', write_only=False, logger=None,
+ connection_options=None, exchange_options=None,
+ queue_options=None, producer_options=None):
+ if kombu is None:
+ raise RuntimeError('Kombu package is not installed '
+ '(Run "pip install kombu" in your '
+ 'virtualenv).')
+ super(KombuManager, self).__init__(channel=channel,
+ write_only=write_only,
+ logger=logger)
+ self.url = url
+ self.connection_options = connection_options or {}
+ self.exchange_options = exchange_options or {}
+ self.queue_options = queue_options or {}
+ self.producer_options = producer_options or {}
+ self.producer = self._producer()
+
+ def initialize(self):
+ super(KombuManager, self).initialize()
+
+ monkey_patched = True
+ if self.server.async_mode == 'eventlet':
+ from eventlet.patcher import is_monkey_patched
+ monkey_patched = is_monkey_patched('socket')
+ elif 'gevent' in self.server.async_mode:
+ from gevent.monkey import is_module_patched
+ monkey_patched = is_module_patched('socket')
+ if not monkey_patched:
+ raise RuntimeError(
+ 'Kombu requires a monkey patched socket library to work '
+ 'with ' + self.server.async_mode)
+
+ def _connection(self):
+ return kombu.Connection(self.url, **self.connection_options)
+
+ def _exchange(self):
+ options = {'type': 'fanout', 'durable': False}
+ options.update(self.exchange_options)
+ return kombu.Exchange(self.channel, **options)
+
+ def _queue(self):
+ queue_name = 'flask-socketio.' + str(uuid.uuid4())
+ options = {'durable': False, 'queue_arguments': {'x-expires': 300000}}
+ options.update(self.queue_options)
+ return kombu.Queue(queue_name, self._exchange(), **options)
+
+ def _producer(self):
+ return self._connection().Producer(exchange=self._exchange(),
+ **self.producer_options)
+
+ def __error_callback(self, exception, interval):
+ self._get_logger().exception('Sleeping {}s'.format(interval))
+
+ def _publish(self, data):
+ connection = self._connection()
+ publish = connection.ensure(self.producer, self.producer.publish,
+ errback=self.__error_callback)
+ publish(pickle.dumps(data))
+
+ def _listen(self):
+ reader_queue = self._queue()
+
+ while True:
+ connection = self._connection().ensure_connection(
+ errback=self.__error_callback)
+ try:
+ with connection.SimpleQueue(reader_queue) as queue:
+ while True:
+ message = queue.get(block=True)
+ message.ack()
+ yield message.payload
+ except connection.connection_errors:
+ self._get_logger().exception("Connection error "
+ "while reading from queue")
diff --git a/libs/socketio/middleware.py b/libs/socketio/middleware.py
new file mode 100644
index 000000000..1a6974085
--- /dev/null
+++ b/libs/socketio/middleware.py
@@ -0,0 +1,42 @@
+import engineio
+
+
+class WSGIApp(engineio.WSGIApp):
+ """WSGI middleware for Socket.IO.
+
+ This middleware dispatches traffic to a Socket.IO application. It can also
+ serve a list of static files to the client, or forward unrelated HTTP
+ traffic to another WSGI application.
+
+ :param socketio_app: The Socket.IO server. Must be an instance of the
+ ``socketio.Server`` class.
+ :param wsgi_app: The WSGI app that receives all other traffic.
+ :param static_files: A dictionary with static file mapping rules. See the
+ documentation for details on this argument.
+ :param socketio_path: The endpoint where the Socket.IO application should
+ be installed. The default value is appropriate for
+ most cases.
+
+ Example usage::
+
+ import socketio
+ import eventlet
+ from . import wsgi_app
+
+ sio = socketio.Server()
+ app = socketio.WSGIApp(sio, wsgi_app)
+ eventlet.wsgi.server(eventlet.listen(('', 8000)), app)
+ """
+ def __init__(self, socketio_app, wsgi_app=None, static_files=None,
+ socketio_path='socket.io'):
+ super(WSGIApp, self).__init__(socketio_app, wsgi_app,
+ static_files=static_files,
+ engineio_path=socketio_path)
+
+
+class Middleware(WSGIApp):
+ """This class has been renamed to WSGIApp and is now deprecated."""
+ def __init__(self, socketio_app, wsgi_app=None,
+ socketio_path='socket.io'):
+ super(Middleware, self).__init__(socketio_app, wsgi_app,
+ socketio_path=socketio_path)
diff --git a/libs/socketio/namespace.py b/libs/socketio/namespace.py
new file mode 100644
index 000000000..418615ff8
--- /dev/null
+++ b/libs/socketio/namespace.py
@@ -0,0 +1,191 @@
+class BaseNamespace(object):
+ def __init__(self, namespace=None):
+ self.namespace = namespace or '/'
+
+ def is_asyncio_based(self):
+ return False
+
+ def trigger_event(self, event, *args):
+ """Dispatch an event to the proper handler method.
+
+ In the most common usage, this method is not overloaded by subclasses,
+ as it performs the routing of events to methods. However, this
+ method can be overriden if special dispatching rules are needed, or if
+ having a single method that catches all events is desired.
+ """
+ handler_name = 'on_' + event
+ if hasattr(self, handler_name):
+ return getattr(self, handler_name)(*args)
+
+
+class Namespace(BaseNamespace):
+ """Base class for server-side class-based namespaces.
+
+ A class-based namespace is a class that contains all the event handlers
+ for a Socket.IO namespace. The event handlers are methods of the class
+ with the prefix ``on_``, such as ``on_connect``, ``on_disconnect``,
+ ``on_message``, ``on_json``, and so on.
+
+ :param namespace: The Socket.IO namespace to be used with all the event
+ handlers defined in this class. If this argument is
+ omitted, the default namespace is used.
+ """
+ def __init__(self, namespace=None):
+ super(Namespace, self).__init__(namespace=namespace)
+ self.server = None
+
+ def _set_server(self, server):
+ self.server = server
+
+ def emit(self, event, data=None, room=None, skip_sid=None, namespace=None,
+ callback=None):
+ """Emit a custom event to one or more connected clients.
+
+ The only difference with the :func:`socketio.Server.emit` method is
+ that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+ """
+ return self.server.emit(event, data=data, room=room, skip_sid=skip_sid,
+ namespace=namespace or self.namespace,
+ callback=callback)
+
+ def send(self, data, room=None, skip_sid=None, namespace=None,
+ callback=None):
+ """Send a message to one or more connected clients.
+
+ The only difference with the :func:`socketio.Server.send` method is
+ that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+ """
+ return self.server.send(data, room=room, skip_sid=skip_sid,
+ namespace=namespace or self.namespace,
+ callback=callback)
+
+ def enter_room(self, sid, room, namespace=None):
+ """Enter a room.
+
+ The only difference with the :func:`socketio.Server.enter_room` method
+ is that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+ """
+ return self.server.enter_room(sid, room,
+ namespace=namespace or self.namespace)
+
+ def leave_room(self, sid, room, namespace=None):
+ """Leave a room.
+
+ The only difference with the :func:`socketio.Server.leave_room` method
+ is that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+ """
+ return self.server.leave_room(sid, room,
+ namespace=namespace or self.namespace)
+
+ def close_room(self, room, namespace=None):
+ """Close a room.
+
+ The only difference with the :func:`socketio.Server.close_room` method
+ is that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+ """
+ return self.server.close_room(room,
+ namespace=namespace or self.namespace)
+
+ def rooms(self, sid, namespace=None):
+ """Return the rooms a client is in.
+
+ The only difference with the :func:`socketio.Server.rooms` method is
+ that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+ """
+ return self.server.rooms(sid, namespace=namespace or self.namespace)
+
+ def get_session(self, sid, namespace=None):
+ """Return the user session for a client.
+
+ The only difference with the :func:`socketio.Server.get_session`
+ method is that when the ``namespace`` argument is not given the
+ namespace associated with the class is used.
+ """
+ return self.server.get_session(
+ sid, namespace=namespace or self.namespace)
+
+ def save_session(self, sid, session, namespace=None):
+ """Store the user session for a client.
+
+ The only difference with the :func:`socketio.Server.save_session`
+ method is that when the ``namespace`` argument is not given the
+ namespace associated with the class is used.
+ """
+ return self.server.save_session(
+ sid, session, namespace=namespace or self.namespace)
+
+ def session(self, sid, namespace=None):
+ """Return the user session for a client with context manager syntax.
+
+ The only difference with the :func:`socketio.Server.session` method is
+ that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+ """
+ return self.server.session(sid, namespace=namespace or self.namespace)
+
+ def disconnect(self, sid, namespace=None):
+ """Disconnect a client.
+
+ The only difference with the :func:`socketio.Server.disconnect` method
+ is that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+ """
+ return self.server.disconnect(sid,
+ namespace=namespace or self.namespace)
+
+
+class ClientNamespace(BaseNamespace):
+ """Base class for client-side class-based namespaces.
+
+ A class-based namespace is a class that contains all the event handlers
+ for a Socket.IO namespace. The event handlers are methods of the class
+ with the prefix ``on_``, such as ``on_connect``, ``on_disconnect``,
+ ``on_message``, ``on_json``, and so on.
+
+ :param namespace: The Socket.IO namespace to be used with all the event
+ handlers defined in this class. If this argument is
+ omitted, the default namespace is used.
+ """
+ def __init__(self, namespace=None):
+ super(ClientNamespace, self).__init__(namespace=namespace)
+ self.client = None
+
+ def _set_client(self, client):
+ self.client = client
+
+ def emit(self, event, data=None, namespace=None, callback=None):
+ """Emit a custom event to the server.
+
+ The only difference with the :func:`socketio.Client.emit` method is
+ that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+ """
+ return self.client.emit(event, data=data,
+ namespace=namespace or self.namespace,
+ callback=callback)
+
+ def send(self, data, room=None, skip_sid=None, namespace=None,
+ callback=None):
+ """Send a message to the server.
+
+ The only difference with the :func:`socketio.Client.send` method is
+ that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+ """
+ return self.client.send(data, namespace=namespace or self.namespace,
+ callback=callback)
+
+ def disconnect(self):
+ """Disconnect from the server.
+
+ The only difference with the :func:`socketio.Client.disconnect` method
+ is that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+ """
+ return self.client.disconnect()
diff --git a/libs/socketio/packet.py b/libs/socketio/packet.py
new file mode 100644
index 000000000..73b469d6d
--- /dev/null
+++ b/libs/socketio/packet.py
@@ -0,0 +1,179 @@
+import functools
+import json as _json
+
+import six
+
+(CONNECT, DISCONNECT, EVENT, ACK, ERROR, BINARY_EVENT, BINARY_ACK) = \
+ (0, 1, 2, 3, 4, 5, 6)
+packet_names = ['CONNECT', 'DISCONNECT', 'EVENT', 'ACK', 'ERROR',
+ 'BINARY_EVENT', 'BINARY_ACK']
+
+
+class Packet(object):
+ """Socket.IO packet."""
+
+ # the format of the Socket.IO packet is as follows:
+ #
+ # packet type: 1 byte, values 0-6
+ # num_attachments: ASCII encoded, only if num_attachments != 0
+ # '-': only if num_attachments != 0
+ # namespace: only if namespace != '/'
+ # ',': only if namespace and one of id and data are defined in this packet
+ # id: ASCII encoded, only if id is not None
+ # data: JSON dump of data payload
+
+ json = _json
+
+ def __init__(self, packet_type=EVENT, data=None, namespace=None, id=None,
+ binary=None, encoded_packet=None):
+ self.packet_type = packet_type
+ self.data = data
+ self.namespace = namespace
+ self.id = id
+ if binary or (binary is None and self._data_is_binary(self.data)):
+ if self.packet_type == EVENT:
+ self.packet_type = BINARY_EVENT
+ elif self.packet_type == ACK:
+ self.packet_type = BINARY_ACK
+ else:
+ raise ValueError('Packet does not support binary payload.')
+ self.attachment_count = 0
+ self.attachments = []
+ if encoded_packet:
+ self.attachment_count = self.decode(encoded_packet)
+
+ def encode(self):
+ """Encode the packet for transmission.
+
+ If the packet contains binary elements, this function returns a list
+ of packets where the first is the original packet with placeholders for
+ the binary components and the remaining ones the binary attachments.
+ """
+ encoded_packet = six.text_type(self.packet_type)
+ if self.packet_type == BINARY_EVENT or self.packet_type == BINARY_ACK:
+ data, attachments = self._deconstruct_binary(self.data)
+ encoded_packet += six.text_type(len(attachments)) + '-'
+ else:
+ data = self.data
+ attachments = None
+ needs_comma = False
+ if self.namespace is not None and self.namespace != '/':
+ encoded_packet += self.namespace
+ needs_comma = True
+ if self.id is not None:
+ if needs_comma:
+ encoded_packet += ','
+ needs_comma = False
+ encoded_packet += six.text_type(self.id)
+ if data is not None:
+ if needs_comma:
+ encoded_packet += ','
+ encoded_packet += self.json.dumps(data, separators=(',', ':'))
+ if attachments is not None:
+ encoded_packet = [encoded_packet] + attachments
+ return encoded_packet
+
+ def decode(self, encoded_packet):
+ """Decode a transmitted package.
+
+ The return value indicates how many binary attachment packets are
+ necessary to fully decode the packet.
+ """
+ ep = encoded_packet
+ try:
+ self.packet_type = int(ep[0:1])
+ except TypeError:
+ self.packet_type = ep
+ ep = ''
+ self.namespace = None
+ self.data = None
+ ep = ep[1:]
+ dash = ep.find('-')
+ attachment_count = 0
+ if dash > 0 and ep[0:dash].isdigit():
+ attachment_count = int(ep[0:dash])
+ ep = ep[dash + 1:]
+ if ep and ep[0:1] == '/':
+ sep = ep.find(',')
+ if sep == -1:
+ self.namespace = ep
+ ep = ''
+ else:
+ self.namespace = ep[0:sep]
+ ep = ep[sep + 1:]
+ q = self.namespace.find('?')
+ if q != -1:
+ self.namespace = self.namespace[0:q]
+ if ep and ep[0].isdigit():
+ self.id = 0
+ while ep and ep[0].isdigit():
+ self.id = self.id * 10 + int(ep[0])
+ ep = ep[1:]
+ if ep:
+ self.data = self.json.loads(ep)
+ return attachment_count
+
+ def add_attachment(self, attachment):
+ if self.attachment_count <= len(self.attachments):
+ raise ValueError('Unexpected binary attachment')
+ self.attachments.append(attachment)
+ if self.attachment_count == len(self.attachments):
+ self.reconstruct_binary(self.attachments)
+ return True
+ return False
+
+ def reconstruct_binary(self, attachments):
+ """Reconstruct a decoded packet using the given list of binary
+ attachments.
+ """
+ self.data = self._reconstruct_binary_internal(self.data,
+ self.attachments)
+
+ def _reconstruct_binary_internal(self, data, attachments):
+ if isinstance(data, list):
+ return [self._reconstruct_binary_internal(item, attachments)
+ for item in data]
+ elif isinstance(data, dict):
+ if data.get('_placeholder') and 'num' in data:
+ return attachments[data['num']]
+ else:
+ return {key: self._reconstruct_binary_internal(value,
+ attachments)
+ for key, value in six.iteritems(data)}
+ else:
+ return data
+
+ def _deconstruct_binary(self, data):
+ """Extract binary components in the packet."""
+ attachments = []
+ data = self._deconstruct_binary_internal(data, attachments)
+ return data, attachments
+
+ def _deconstruct_binary_internal(self, data, attachments):
+ if isinstance(data, six.binary_type):
+ attachments.append(data)
+ return {'_placeholder': True, 'num': len(attachments) - 1}
+ elif isinstance(data, list):
+ return [self._deconstruct_binary_internal(item, attachments)
+ for item in data]
+ elif isinstance(data, dict):
+ return {key: self._deconstruct_binary_internal(value, attachments)
+ for key, value in six.iteritems(data)}
+ else:
+ return data
+
+ def _data_is_binary(self, data):
+ """Check if the data contains binary components."""
+ if isinstance(data, six.binary_type):
+ return True
+ elif isinstance(data, list):
+ return functools.reduce(
+ lambda a, b: a or b, [self._data_is_binary(item)
+ for item in data], False)
+ elif isinstance(data, dict):
+ return functools.reduce(
+ lambda a, b: a or b, [self._data_is_binary(item)
+ for item in six.itervalues(data)],
+ False)
+ else:
+ return False
diff --git a/libs/socketio/pubsub_manager.py b/libs/socketio/pubsub_manager.py
new file mode 100644
index 000000000..2905b2c32
--- /dev/null
+++ b/libs/socketio/pubsub_manager.py
@@ -0,0 +1,154 @@
+from functools import partial
+import uuid
+
+import json
+import pickle
+import six
+
+from .base_manager import BaseManager
+
+
+class PubSubManager(BaseManager):
+ """Manage a client list attached to a pub/sub backend.
+
+ This is a base class that enables multiple servers to share the list of
+ clients, with the servers communicating events through a pub/sub backend.
+ The use of a pub/sub backend also allows any client connected to the
+ backend to emit events addressed to Socket.IO clients.
+
+ The actual backends must be implemented by subclasses, this class only
+ provides a pub/sub generic framework.
+
+ :param channel: The channel name on which the server sends and receives
+ notifications.
+ """
+ name = 'pubsub'
+
+ def __init__(self, channel='socketio', write_only=False, logger=None):
+ super(PubSubManager, self).__init__()
+ self.channel = channel
+ self.write_only = write_only
+ self.host_id = uuid.uuid4().hex
+ self.logger = logger
+
+ def initialize(self):
+ super(PubSubManager, self).initialize()
+ if not self.write_only:
+ self.thread = self.server.start_background_task(self._thread)
+ self._get_logger().info(self.name + ' backend initialized.')
+
+ def emit(self, event, data, namespace=None, room=None, skip_sid=None,
+ callback=None, **kwargs):
+ """Emit a message to a single client, a room, or all the clients
+ connected to the namespace.
+
+ This method takes care or propagating the message to all the servers
+ that are connected through the message queue.
+
+ The parameters are the same as in :meth:`.Server.emit`.
+ """
+ if kwargs.get('ignore_queue'):
+ return super(PubSubManager, self).emit(
+ event, data, namespace=namespace, room=room, skip_sid=skip_sid,
+ callback=callback)
+ namespace = namespace or '/'
+ if callback is not None:
+ if self.server is None:
+ raise RuntimeError('Callbacks can only be issued from the '
+ 'context of a server.')
+ if room is None:
+ raise ValueError('Cannot use callback without a room set.')
+ id = self._generate_ack_id(room, namespace, callback)
+ callback = (room, namespace, id)
+ else:
+ callback = None
+ self._publish({'method': 'emit', 'event': event, 'data': data,
+ 'namespace': namespace, 'room': room,
+ 'skip_sid': skip_sid, 'callback': callback,
+ 'host_id': self.host_id})
+
+ def close_room(self, room, namespace=None):
+ self._publish({'method': 'close_room', 'room': room,
+ 'namespace': namespace or '/'})
+
+ def _publish(self, data):
+ """Publish a message on the Socket.IO channel.
+
+ This method needs to be implemented by the different subclasses that
+ support pub/sub backends.
+ """
+ raise NotImplementedError('This method must be implemented in a '
+ 'subclass.') # pragma: no cover
+
+ def _listen(self):
+ """Return the next message published on the Socket.IO channel,
+ blocking until a message is available.
+
+ This method needs to be implemented by the different subclasses that
+ support pub/sub backends.
+ """
+ raise NotImplementedError('This method must be implemented in a '
+ 'subclass.') # pragma: no cover
+
+ def _handle_emit(self, message):
+ # Events with callbacks are very tricky to handle across hosts
+ # Here in the receiving end we set up a local callback that preserves
+ # the callback host and id from the sender
+ remote_callback = message.get('callback')
+ remote_host_id = message.get('host_id')
+ if remote_callback is not None and len(remote_callback) == 3:
+ callback = partial(self._return_callback, remote_host_id,
+ *remote_callback)
+ else:
+ callback = None
+ super(PubSubManager, self).emit(message['event'], message['data'],
+ namespace=message.get('namespace'),
+ room=message.get('room'),
+ skip_sid=message.get('skip_sid'),
+ callback=callback)
+
+ def _handle_callback(self, message):
+ if self.host_id == message.get('host_id'):
+ try:
+ sid = message['sid']
+ namespace = message['namespace']
+ id = message['id']
+ args = message['args']
+ except KeyError:
+ return
+ self.trigger_callback(sid, namespace, id, args)
+
+ def _return_callback(self, host_id, sid, namespace, callback_id, *args):
+ # When an event callback is received, the callback is returned back
+ # the sender, which is identified by the host_id
+ self._publish({'method': 'callback', 'host_id': host_id,
+ 'sid': sid, 'namespace': namespace, 'id': callback_id,
+ 'args': args})
+
+ def _handle_close_room(self, message):
+ super(PubSubManager, self).close_room(
+ room=message.get('room'), namespace=message.get('namespace'))
+
+ def _thread(self):
+ for message in self._listen():
+ data = None
+ if isinstance(message, dict):
+ data = message
+ else:
+ if isinstance(message, six.binary_type): # pragma: no cover
+ try:
+ data = pickle.loads(message)
+ except:
+ pass
+ if data is None:
+ try:
+ data = json.loads(message)
+ except:
+ pass
+ if data and 'method' in data:
+ if data['method'] == 'emit':
+ self._handle_emit(data)
+ elif data['method'] == 'callback':
+ self._handle_callback(data)
+ elif data['method'] == 'close_room':
+ self._handle_close_room(data)
diff --git a/libs/socketio/redis_manager.py b/libs/socketio/redis_manager.py
new file mode 100644
index 000000000..ad383345e
--- /dev/null
+++ b/libs/socketio/redis_manager.py
@@ -0,0 +1,115 @@
+import logging
+import pickle
+import time
+
+try:
+ import redis
+except ImportError:
+ redis = None
+
+from .pubsub_manager import PubSubManager
+
+logger = logging.getLogger('socketio')
+
+
+class RedisManager(PubSubManager): # pragma: no cover
+ """Redis based client manager.
+
+ This class implements a Redis backend for event sharing across multiple
+ processes. Only kept here as one more example of how to build a custom
+ backend, since the kombu backend is perfectly adequate to support a Redis
+ message queue.
+
+ To use a Redis backend, initialize the :class:`Server` instance as
+ follows::
+
+ url = 'redis://hostname:port/0'
+ server = socketio.Server(client_manager=socketio.RedisManager(url))
+
+ :param url: The connection URL for the Redis server. For a default Redis
+ store running on the same host, use ``redis://``.
+ :param channel: The channel name on which the server sends and receives
+ notifications. Must be the same in all the servers.
+ :param write_only: If set ot ``True``, only initialize to emit events. The
+ default of ``False`` initializes the class for emitting
+ and receiving.
+ :param redis_options: additional keyword arguments to be passed to
+ ``Redis.from_url()``.
+ """
+ name = 'redis'
+
+ def __init__(self, url='redis://localhost:6379/0', channel='socketio',
+ write_only=False, logger=None, redis_options=None):
+ if redis is None:
+ raise RuntimeError('Redis package is not installed '
+ '(Run "pip install redis" in your '
+ 'virtualenv).')
+ self.redis_url = url
+ self.redis_options = redis_options or {}
+ self._redis_connect()
+ super(RedisManager, self).__init__(channel=channel,
+ write_only=write_only,
+ logger=logger)
+
+ def initialize(self):
+ super(RedisManager, self).initialize()
+
+ monkey_patched = True
+ if self.server.async_mode == 'eventlet':
+ from eventlet.patcher import is_monkey_patched
+ monkey_patched = is_monkey_patched('socket')
+ elif 'gevent' in self.server.async_mode:
+ from gevent.monkey import is_module_patched
+ monkey_patched = is_module_patched('socket')
+ if not monkey_patched:
+ raise RuntimeError(
+ 'Redis requires a monkey patched socket library to work '
+ 'with ' + self.server.async_mode)
+
+ def _redis_connect(self):
+ self.redis = redis.Redis.from_url(self.redis_url,
+ **self.redis_options)
+ self.pubsub = self.redis.pubsub()
+
+ def _publish(self, data):
+ retry = True
+ while True:
+ try:
+ if not retry:
+ self._redis_connect()
+ return self.redis.publish(self.channel, pickle.dumps(data))
+ except redis.exceptions.ConnectionError:
+ if retry:
+ logger.error('Cannot publish to redis... retrying')
+ retry = False
+ else:
+ logger.error('Cannot publish to redis... giving up')
+ break
+
+ def _redis_listen_with_retries(self):
+ retry_sleep = 1
+ connect = False
+ while True:
+ try:
+ if connect:
+ self._redis_connect()
+ self.pubsub.subscribe(self.channel)
+ for message in self.pubsub.listen():
+ yield message
+ except redis.exceptions.ConnectionError:
+ logger.error('Cannot receive from redis... '
+ 'retrying in {} secs'.format(retry_sleep))
+ connect = True
+ time.sleep(retry_sleep)
+ retry_sleep *= 2
+ if retry_sleep > 60:
+ retry_sleep = 60
+
+ def _listen(self):
+ channel = self.channel.encode('utf-8')
+ self.pubsub.subscribe(self.channel)
+ for message in self._redis_listen_with_retries():
+ if message['channel'] == channel and \
+ message['type'] == 'message' and 'data' in message:
+ yield message['data']
+ self.pubsub.unsubscribe(self.channel)
diff --git a/libs/socketio/server.py b/libs/socketio/server.py
new file mode 100644
index 000000000..76b7d2e8f
--- /dev/null
+++ b/libs/socketio/server.py
@@ -0,0 +1,730 @@
+import logging
+
+import engineio
+import six
+
+from . import base_manager
+from . import exceptions
+from . import namespace
+from . import packet
+
+default_logger = logging.getLogger('socketio.server')
+
+
+class Server(object):
+ """A Socket.IO server.
+
+ This class implements a fully compliant Socket.IO web server with support
+ for websocket and long-polling transports.
+
+ :param client_manager: The client manager instance that will manage the
+ client list. When this is omitted, the client list
+ is stored in an in-memory structure, so the use of
+ multiple connected servers is not possible.
+ :param logger: To enable logging set to ``True`` or pass a logger object to
+ use. To disable logging set to ``False``. The default is
+ ``False``.
+ :param binary: ``True`` to support binary payloads, ``False`` to treat all
+ payloads as text. On Python 2, if this is set to ``True``,
+ ``unicode`` values are treated as text, and ``str`` and
+ ``bytes`` values are treated as binary. This option has no
+ effect on Python 3, where text and binary payloads are
+ always automatically discovered.
+ :param json: An alternative json module to use for encoding and decoding
+ packets. Custom json modules must have ``dumps`` and ``loads``
+ functions that are compatible with the standard library
+ versions.
+ :param async_handlers: If set to ``True``, event handlers for a client are
+ executed in separate threads. To run handlers for a
+ client synchronously, set to ``False``. The default
+ is ``True``.
+ :param always_connect: When set to ``False``, new connections are
+ provisory until the connect handler returns
+ something other than ``False``, at which point they
+ are accepted. When set to ``True``, connections are
+ immediately accepted, and then if the connect
+ handler returns ``False`` a disconnect is issued.
+ Set to ``True`` if you need to emit events from the
+ connect handler and your client is confused when it
+ receives events before the connection acceptance.
+ In any other case use the default of ``False``.
+ :param kwargs: Connection parameters for the underlying Engine.IO server.
+
+ The Engine.IO configuration supports the following settings:
+
+ :param async_mode: The asynchronous model to use. See the Deployment
+ section in the documentation for a description of the
+ available options. Valid async modes are "threading",
+ "eventlet", "gevent" and "gevent_uwsgi". If this
+ argument is not given, "eventlet" is tried first, then
+ "gevent_uwsgi", then "gevent", and finally "threading".
+ The first async mode that has all its dependencies
+ installed is then one that is chosen.
+ :param ping_timeout: The time in seconds that the client waits for the
+ server to respond before disconnecting. The default
+ is 60 seconds.
+ :param ping_interval: The interval in seconds at which the client pings
+ the server. The default is 25 seconds.
+ :param max_http_buffer_size: The maximum size of a message when using the
+ polling transport. The default is 100,000,000
+ bytes.
+ :param allow_upgrades: Whether to allow transport upgrades or not. The
+ default is ``True``.
+ :param http_compression: Whether to compress packages when using the
+ polling transport. The default is ``True``.
+ :param compression_threshold: Only compress messages when their byte size
+ is greater than this value. The default is
+ 1024 bytes.
+ :param cookie: Name of the HTTP cookie that contains the client session
+ id. If set to ``None``, a cookie is not sent to the client.
+ The default is ``'io'``.
+ :param cors_allowed_origins: Origin or list of origins that are allowed to
+ connect to this server. Only the same origin
+ is allowed by default. Set this argument to
+ ``'*'`` to allow all origins, or to ``[]`` to
+ disable CORS handling.
+ :param cors_credentials: Whether credentials (cookies, authentication) are
+ allowed in requests to this server. The default is
+ ``True``.
+ :param monitor_clients: If set to ``True``, a background task will ensure
+ inactive clients are closed. Set to ``False`` to
+ disable the monitoring task (not recommended). The
+ default is ``True``.
+ :param engineio_logger: To enable Engine.IO logging set to ``True`` or pass
+ a logger object to use. To disable logging set to
+ ``False``. The default is ``False``.
+ """
+ def __init__(self, client_manager=None, logger=False, binary=False,
+ json=None, async_handlers=True, always_connect=False,
+ **kwargs):
+ engineio_options = kwargs
+ engineio_logger = engineio_options.pop('engineio_logger', None)
+ if engineio_logger is not None:
+ engineio_options['logger'] = engineio_logger
+ if json is not None:
+ packet.Packet.json = json
+ engineio_options['json'] = json
+ engineio_options['async_handlers'] = False
+ self.eio = self._engineio_server_class()(**engineio_options)
+ self.eio.on('connect', self._handle_eio_connect)
+ self.eio.on('message', self._handle_eio_message)
+ self.eio.on('disconnect', self._handle_eio_disconnect)
+ self.binary = binary
+
+ self.environ = {}
+ self.handlers = {}
+ self.namespace_handlers = {}
+
+ self._binary_packet = {}
+
+ if not isinstance(logger, bool):
+ self.logger = logger
+ else:
+ self.logger = default_logger
+ if not logging.root.handlers and \
+ self.logger.level == logging.NOTSET:
+ if logger:
+ self.logger.setLevel(logging.INFO)
+ else:
+ self.logger.setLevel(logging.ERROR)
+ self.logger.addHandler(logging.StreamHandler())
+
+ if client_manager is None:
+ client_manager = base_manager.BaseManager()
+ self.manager = client_manager
+ self.manager.set_server(self)
+ self.manager_initialized = False
+
+ self.async_handlers = async_handlers
+ self.always_connect = always_connect
+
+ self.async_mode = self.eio.async_mode
+
+ def is_asyncio_based(self):
+ return False
+
+ def on(self, event, handler=None, namespace=None):
+ """Register an event handler.
+
+ :param event: The event name. It can be any string. The event names
+ ``'connect'``, ``'message'`` and ``'disconnect'`` are
+ reserved and should not be used.
+ :param handler: The function that should be invoked to handle the
+ event. When this parameter is not given, the method
+ acts as a decorator for the handler function.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the handler is associated with
+ the default namespace.
+
+ Example usage::
+
+ # as a decorator:
+ @socket_io.on('connect', namespace='/chat')
+ def connect_handler(sid, environ):
+ print('Connection request')
+ if environ['REMOTE_ADDR'] in blacklisted:
+ return False # reject
+
+ # as a method:
+ def message_handler(sid, msg):
+ print('Received message: ', msg)
+ eio.send(sid, 'response')
+ socket_io.on('message', namespace='/chat', message_handler)
+
+ The handler function receives the ``sid`` (session ID) for the
+ client as first argument. The ``'connect'`` event handler receives the
+ WSGI environment as a second argument, and can return ``False`` to
+ reject the connection. The ``'message'`` handler and handlers for
+ custom event names receive the message payload as a second argument.
+ Any values returned from a message handler will be passed to the
+ client's acknowledgement callback function if it exists. The
+ ``'disconnect'`` handler does not take a second argument.
+ """
+ namespace = namespace or '/'
+
+ def set_handler(handler):
+ if namespace not in self.handlers:
+ self.handlers[namespace] = {}
+ self.handlers[namespace][event] = handler
+ return handler
+
+ if handler is None:
+ return set_handler
+ set_handler(handler)
+
+ def event(self, *args, **kwargs):
+ """Decorator to register an event handler.
+
+ This is a simplified version of the ``on()`` method that takes the
+ event name from the decorated function.
+
+ Example usage::
+
+ @sio.event
+ def my_event(data):
+ print('Received data: ', data)
+
+ The above example is equivalent to::
+
+ @sio.on('my_event')
+ def my_event(data):
+ print('Received data: ', data)
+
+ A custom namespace can be given as an argument to the decorator::
+
+ @sio.event(namespace='/test')
+ def my_event(data):
+ print('Received data: ', data)
+ """
+ if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
+ # the decorator was invoked without arguments
+ # args[0] is the decorated function
+ return self.on(args[0].__name__)(args[0])
+ else:
+ # the decorator was invoked with arguments
+ def set_handler(handler):
+ return self.on(handler.__name__, *args, **kwargs)(handler)
+
+ return set_handler
+
+ def register_namespace(self, namespace_handler):
+ """Register a namespace handler object.
+
+ :param namespace_handler: An instance of a :class:`Namespace`
+ subclass that handles all the event traffic
+ for a namespace.
+ """
+ if not isinstance(namespace_handler, namespace.Namespace):
+ raise ValueError('Not a namespace instance')
+ if self.is_asyncio_based() != namespace_handler.is_asyncio_based():
+ raise ValueError('Not a valid namespace class for this server')
+ namespace_handler._set_server(self)
+ self.namespace_handlers[namespace_handler.namespace] = \
+ namespace_handler
+
+ def emit(self, event, data=None, to=None, room=None, skip_sid=None,
+ namespace=None, callback=None, **kwargs):
+ """Emit a custom event to one or more connected clients.
+
+ :param event: The event name. It can be any string. The event names
+ ``'connect'``, ``'message'`` and ``'disconnect'`` are
+ reserved and should not be used.
+ :param data: The data to send to the client or clients. Data can be of
+ type ``str``, ``bytes``, ``list`` or ``dict``. If a
+ ``list`` or ``dict``, the data will be serialized as JSON.
+ :param to: The recipient of the message. This can be set to the
+ session ID of a client to address only that client, or to
+ to any custom room created by the application to address all
+ the clients in that room, If this argument is omitted the
+ event is broadcasted to all connected clients.
+ :param room: Alias for the ``to`` parameter.
+ :param skip_sid: The session ID of a client to skip when broadcasting
+ to a room or to all clients. This can be used to
+ prevent a message from being sent to the sender. To
+ skip multiple sids, pass a list.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the event is emitted to the
+ default namespace.
+ :param callback: If given, this function will be called to acknowledge
+ the the client has received the message. The arguments
+ that will be passed to the function are those provided
+ by the client. Callback functions can only be used
+ when addressing an individual client.
+ :param ignore_queue: Only used when a message queue is configured. If
+ set to ``True``, the event is emitted to the
+ clients directly, without going through the queue.
+ This is more efficient, but only works when a
+ single server process is used. It is recommended
+ to always leave this parameter with its default
+ value of ``False``.
+ """
+ namespace = namespace or '/'
+ room = to or room
+ self.logger.info('emitting event "%s" to %s [%s]', event,
+ room or 'all', namespace)
+ self.manager.emit(event, data, namespace, room=room,
+ skip_sid=skip_sid, callback=callback, **kwargs)
+
+ def send(self, data, to=None, room=None, skip_sid=None, namespace=None,
+ callback=None, **kwargs):
+ """Send a message to one or more connected clients.
+
+ This function emits an event with the name ``'message'``. Use
+ :func:`emit` to issue custom event names.
+
+ :param data: The data to send to the client or clients. Data can be of
+ type ``str``, ``bytes``, ``list`` or ``dict``. If a
+ ``list`` or ``dict``, the data will be serialized as JSON.
+ :param to: The recipient of the message. This can be set to the
+ session ID of a client to address only that client, or to
+ to any custom room created by the application to address all
+ the clients in that room, If this argument is omitted the
+ event is broadcasted to all connected clients.
+ :param room: Alias for the ``to`` parameter.
+ :param skip_sid: The session ID of a client to skip when broadcasting
+ to a room or to all clients. This can be used to
+ prevent a message from being sent to the sender. To
+ skip multiple sids, pass a list.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the event is emitted to the
+ default namespace.
+ :param callback: If given, this function will be called to acknowledge
+ the the client has received the message. The arguments
+ that will be passed to the function are those provided
+ by the client. Callback functions can only be used
+ when addressing an individual client.
+ :param ignore_queue: Only used when a message queue is configured. If
+ set to ``True``, the event is emitted to the
+ clients directly, without going through the queue.
+ This is more efficient, but only works when a
+ single server process is used. It is recommended
+ to always leave this parameter with its default
+ value of ``False``.
+ """
+ self.emit('message', data=data, to=to, room=room, skip_sid=skip_sid,
+ namespace=namespace, callback=callback, **kwargs)
+
+ def call(self, event, data=None, to=None, sid=None, namespace=None,
+ timeout=60, **kwargs):
+ """Emit a custom event to a client and wait for the response.
+
+ :param event: The event name. It can be any string. The event names
+ ``'connect'``, ``'message'`` and ``'disconnect'`` are
+ reserved and should not be used.
+ :param data: The data to send to the client or clients. Data can be of
+ type ``str``, ``bytes``, ``list`` or ``dict``. If a
+ ``list`` or ``dict``, the data will be serialized as JSON.
+ :param to: The session ID of the recipient client.
+ :param sid: Alias for the ``to`` parameter.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the event is emitted to the
+ default namespace.
+ :param timeout: The waiting timeout. If the timeout is reached before
+ the client acknowledges the event, then a
+ ``TimeoutError`` exception is raised.
+ :param ignore_queue: Only used when a message queue is configured. If
+ set to ``True``, the event is emitted to the
+ client directly, without going through the queue.
+ This is more efficient, but only works when a
+ single server process is used. It is recommended
+ to always leave this parameter with its default
+ value of ``False``.
+ """
+ if not self.async_handlers:
+ raise RuntimeError(
+ 'Cannot use call() when async_handlers is False.')
+ callback_event = self.eio.create_event()
+ callback_args = []
+
+ def event_callback(*args):
+ callback_args.append(args)
+ callback_event.set()
+
+ self.emit(event, data=data, room=to or sid, namespace=namespace,
+ callback=event_callback, **kwargs)
+ if not callback_event.wait(timeout=timeout):
+ raise exceptions.TimeoutError()
+ return callback_args[0] if len(callback_args[0]) > 1 \
+ else callback_args[0][0] if len(callback_args[0]) == 1 \
+ else None
+
+ def enter_room(self, sid, room, namespace=None):
+ """Enter a room.
+
+ This function adds the client to a room. The :func:`emit` and
+ :func:`send` functions can optionally broadcast events to all the
+ clients in a room.
+
+ :param sid: Session ID of the client.
+ :param room: Room name. If the room does not exist it is created.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the default namespace is used.
+ """
+ namespace = namespace or '/'
+ self.logger.info('%s is entering room %s [%s]', sid, room, namespace)
+ self.manager.enter_room(sid, namespace, room)
+
+ def leave_room(self, sid, room, namespace=None):
+ """Leave a room.
+
+ This function removes the client from a room.
+
+ :param sid: Session ID of the client.
+ :param room: Room name.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the default namespace is used.
+ """
+ namespace = namespace or '/'
+ self.logger.info('%s is leaving room %s [%s]', sid, room, namespace)
+ self.manager.leave_room(sid, namespace, room)
+
+ def close_room(self, room, namespace=None):
+ """Close a room.
+
+ This function removes all the clients from the given room.
+
+ :param room: Room name.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the default namespace is used.
+ """
+ namespace = namespace or '/'
+ self.logger.info('room %s is closing [%s]', room, namespace)
+ self.manager.close_room(room, namespace)
+
+ def rooms(self, sid, namespace=None):
+ """Return the rooms a client is in.
+
+ :param sid: Session ID of the client.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the default namespace is used.
+ """
+ namespace = namespace or '/'
+ return self.manager.get_rooms(sid, namespace)
+
+ def get_session(self, sid, namespace=None):
+ """Return the user session for a client.
+
+ :param sid: The session id of the client.
+ :param namespace: The Socket.IO namespace. If this argument is omitted
+ the default namespace is used.
+
+ The return value is a dictionary. Modifications made to this
+ dictionary are not guaranteed to be preserved unless
+ ``save_session()`` is called, or when the ``session`` context manager
+ is used.
+ """
+ namespace = namespace or '/'
+ eio_session = self.eio.get_session(sid)
+ return eio_session.setdefault(namespace, {})
+
+ def save_session(self, sid, session, namespace=None):
+ """Store the user session for a client.
+
+ :param sid: The session id of the client.
+ :param session: The session dictionary.
+ :param namespace: The Socket.IO namespace. If this argument is omitted
+ the default namespace is used.
+ """
+ namespace = namespace or '/'
+ eio_session = self.eio.get_session(sid)
+ eio_session[namespace] = session
+
+ def session(self, sid, namespace=None):
+ """Return the user session for a client with context manager syntax.
+
+ :param sid: The session id of the client.
+
+ This is a context manager that returns the user session dictionary for
+ the client. Any changes that are made to this dictionary inside the
+ context manager block are saved back to the session. Example usage::
+
+ @sio.on('connect')
+ def on_connect(sid, environ):
+ username = authenticate_user(environ)
+ if not username:
+ return False
+ with sio.session(sid) as session:
+ session['username'] = username
+
+ @sio.on('message')
+ def on_message(sid, msg):
+ with sio.session(sid) as session:
+ print('received message from ', session['username'])
+ """
+ class _session_context_manager(object):
+ def __init__(self, server, sid, namespace):
+ self.server = server
+ self.sid = sid
+ self.namespace = namespace
+ self.session = None
+
+ def __enter__(self):
+ self.session = self.server.get_session(sid,
+ namespace=namespace)
+ return self.session
+
+ def __exit__(self, *args):
+ self.server.save_session(sid, self.session,
+ namespace=namespace)
+
+ return _session_context_manager(self, sid, namespace)
+
+ def disconnect(self, sid, namespace=None):
+ """Disconnect a client.
+
+ :param sid: Session ID of the client.
+ :param namespace: The Socket.IO namespace to disconnect. If this
+ argument is omitted the default namespace is used.
+ """
+ namespace = namespace or '/'
+ if self.manager.is_connected(sid, namespace=namespace):
+ self.logger.info('Disconnecting %s [%s]', sid, namespace)
+ self.manager.pre_disconnect(sid, namespace=namespace)
+ self._send_packet(sid, packet.Packet(packet.DISCONNECT,
+ namespace=namespace))
+ self._trigger_event('disconnect', namespace, sid)
+ self.manager.disconnect(sid, namespace=namespace)
+ if namespace == '/':
+ self.eio.disconnect(sid)
+
+ def transport(self, sid):
+ """Return the name of the transport used by the client.
+
+ The two possible values returned by this function are ``'polling'``
+ and ``'websocket'``.
+
+ :param sid: The session of the client.
+ """
+ return self.eio.transport(sid)
+
+ def handle_request(self, environ, start_response):
+ """Handle an HTTP request from the client.
+
+ This is the entry point of the Socket.IO application, using the same
+ interface as a WSGI application. For the typical usage, this function
+ is invoked by the :class:`Middleware` instance, but it can be invoked
+ directly when the middleware is not used.
+
+ :param environ: The WSGI environment.
+ :param start_response: The WSGI ``start_response`` function.
+
+ This function returns the HTTP response body to deliver to the client
+ as a byte sequence.
+ """
+ return self.eio.handle_request(environ, start_response)
+
+ def start_background_task(self, target, *args, **kwargs):
+ """Start a background task using the appropriate async model.
+
+ This is a utility function that applications can use to start a
+ background task using the method that is compatible with the
+ selected async mode.
+
+ :param target: the target function to execute.
+ :param args: arguments to pass to the function.
+ :param kwargs: keyword arguments to pass to the function.
+
+ This function returns an object compatible with the `Thread` class in
+ the Python standard library. The `start()` method on this object is
+ already called by this function.
+ """
+ return self.eio.start_background_task(target, *args, **kwargs)
+
+ def sleep(self, seconds=0):
+ """Sleep for the requested amount of time using the appropriate async
+ model.
+
+ This is a utility function that applications can use to put a task to
+ sleep without having to worry about using the correct call for the
+ selected async mode.
+ """
+ return self.eio.sleep(seconds)
+
+ def _emit_internal(self, sid, event, data, namespace=None, id=None):
+ """Send a message to a client."""
+ if six.PY2 and not self.binary:
+ binary = False # pragma: nocover
+ else:
+ binary = None
+ # tuples are expanded to multiple arguments, everything else is sent
+ # as a single argument
+ if isinstance(data, tuple):
+ data = list(data)
+ else:
+ data = [data]
+ self._send_packet(sid, packet.Packet(packet.EVENT, namespace=namespace,
+ data=[event] + data, id=id,
+ binary=binary))
+
+ def _send_packet(self, sid, pkt):
+ """Send a Socket.IO packet to a client."""
+ encoded_packet = pkt.encode()
+ if isinstance(encoded_packet, list):
+ binary = False
+ for ep in encoded_packet:
+ self.eio.send(sid, ep, binary=binary)
+ binary = True
+ else:
+ self.eio.send(sid, encoded_packet, binary=False)
+
+ def _handle_connect(self, sid, namespace):
+ """Handle a client connection request."""
+ namespace = namespace or '/'
+ self.manager.connect(sid, namespace)
+ if self.always_connect:
+ self._send_packet(sid, packet.Packet(packet.CONNECT,
+ namespace=namespace))
+ fail_reason = None
+ try:
+ success = self._trigger_event('connect', namespace, sid,
+ self.environ[sid])
+ except exceptions.ConnectionRefusedError as exc:
+ fail_reason = exc.error_args
+ success = False
+
+ if success is False:
+ if self.always_connect:
+ self.manager.pre_disconnect(sid, namespace)
+ self._send_packet(sid, packet.Packet(
+ packet.DISCONNECT, data=fail_reason, namespace=namespace))
+ self.manager.disconnect(sid, namespace)
+ if not self.always_connect:
+ self._send_packet(sid, packet.Packet(
+ packet.ERROR, data=fail_reason, namespace=namespace))
+ if sid in self.environ: # pragma: no cover
+ del self.environ[sid]
+ elif not self.always_connect:
+ self._send_packet(sid, packet.Packet(packet.CONNECT,
+ namespace=namespace))
+
+ def _handle_disconnect(self, sid, namespace):
+ """Handle a client disconnect."""
+ namespace = namespace or '/'
+ if namespace == '/':
+ namespace_list = list(self.manager.get_namespaces())
+ else:
+ namespace_list = [namespace]
+ for n in namespace_list:
+ if n != '/' and self.manager.is_connected(sid, n):
+ self._trigger_event('disconnect', n, sid)
+ self.manager.disconnect(sid, n)
+ if namespace == '/' and self.manager.is_connected(sid, namespace):
+ self._trigger_event('disconnect', '/', sid)
+ self.manager.disconnect(sid, '/')
+
+ def _handle_event(self, sid, namespace, id, data):
+ """Handle an incoming client event."""
+ namespace = namespace or '/'
+ self.logger.info('received event "%s" from %s [%s]', data[0], sid,
+ namespace)
+ if not self.manager.is_connected(sid, namespace):
+ self.logger.warning('%s is not connected to namespace %s',
+ sid, namespace)
+ return
+ if self.async_handlers:
+ self.start_background_task(self._handle_event_internal, self, sid,
+ data, namespace, id)
+ else:
+ self._handle_event_internal(self, sid, data, namespace, id)
+
+ def _handle_event_internal(self, server, sid, data, namespace, id):
+ r = server._trigger_event(data[0], namespace, sid, *data[1:])
+ if id is not None:
+ # send ACK packet with the response returned by the handler
+ # tuples are expanded as multiple arguments
+ if r is None:
+ data = []
+ elif isinstance(r, tuple):
+ data = list(r)
+ else:
+ data = [r]
+ if six.PY2 and not self.binary:
+ binary = False # pragma: nocover
+ else:
+ binary = None
+ server._send_packet(sid, packet.Packet(packet.ACK,
+ namespace=namespace,
+ id=id, data=data,
+ binary=binary))
+
+ def _handle_ack(self, sid, namespace, id, data):
+ """Handle ACK packets from the client."""
+ namespace = namespace or '/'
+ self.logger.info('received ack from %s [%s]', sid, namespace)
+ self.manager.trigger_callback(sid, namespace, id, data)
+
+ def _trigger_event(self, event, namespace, *args):
+ """Invoke an application event handler."""
+ # first see if we have an explicit handler for the event
+ if namespace in self.handlers and event in self.handlers[namespace]:
+ return self.handlers[namespace][event](*args)
+
+ # or else, forward the event to a namespace handler if one exists
+ elif namespace in self.namespace_handlers:
+ return self.namespace_handlers[namespace].trigger_event(
+ event, *args)
+
+ def _handle_eio_connect(self, sid, environ):
+ """Handle the Engine.IO connection event."""
+ if not self.manager_initialized:
+ self.manager_initialized = True
+ self.manager.initialize()
+ self.environ[sid] = environ
+ return self._handle_connect(sid, '/')
+
+ def _handle_eio_message(self, sid, data):
+ """Dispatch Engine.IO messages."""
+ if sid in self._binary_packet:
+ pkt = self._binary_packet[sid]
+ if pkt.add_attachment(data):
+ del self._binary_packet[sid]
+ if pkt.packet_type == packet.BINARY_EVENT:
+ self._handle_event(sid, pkt.namespace, pkt.id, pkt.data)
+ else:
+ self._handle_ack(sid, pkt.namespace, pkt.id, pkt.data)
+ else:
+ pkt = packet.Packet(encoded_packet=data)
+ if pkt.packet_type == packet.CONNECT:
+ self._handle_connect(sid, pkt.namespace)
+ elif pkt.packet_type == packet.DISCONNECT:
+ self._handle_disconnect(sid, pkt.namespace)
+ elif pkt.packet_type == packet.EVENT:
+ self._handle_event(sid, pkt.namespace, pkt.id, pkt.data)
+ elif pkt.packet_type == packet.ACK:
+ self._handle_ack(sid, pkt.namespace, pkt.id, pkt.data)
+ elif pkt.packet_type == packet.BINARY_EVENT or \
+ pkt.packet_type == packet.BINARY_ACK:
+ self._binary_packet[sid] = pkt
+ elif pkt.packet_type == packet.ERROR:
+ raise ValueError('Unexpected ERROR packet.')
+ else:
+ raise ValueError('Unknown packet type.')
+
+ def _handle_eio_disconnect(self, sid):
+ """Handle Engine.IO disconnect event."""
+ self._handle_disconnect(sid, '/')
+ if sid in self.environ:
+ del self.environ[sid]
+
+ def _engineio_server_class(self):
+ return engineio.Server
diff --git a/libs/socketio/tornado.py b/libs/socketio/tornado.py
new file mode 100644
index 000000000..5b2e6f684
--- /dev/null
+++ b/libs/socketio/tornado.py
@@ -0,0 +1,11 @@
+import sys
+if sys.version_info >= (3, 5):
+ try:
+ from engineio.async_drivers.tornado import get_tornado_handler as \
+ get_engineio_handler
+ except ImportError: # pragma: no cover
+ get_engineio_handler = None
+
+
+def get_tornado_handler(socketio_server): # pragma: no cover
+ return get_engineio_handler(socketio_server.eio)
diff --git a/libs/socketio/zmq_manager.py b/libs/socketio/zmq_manager.py
new file mode 100644
index 000000000..f2a2ae5dc
--- /dev/null
+++ b/libs/socketio/zmq_manager.py
@@ -0,0 +1,111 @@
+import pickle
+import re
+
+try:
+ import eventlet.green.zmq as zmq
+except ImportError:
+ zmq = None
+import six
+
+from .pubsub_manager import PubSubManager
+
+
+class ZmqManager(PubSubManager): # pragma: no cover
+ """zmq based client manager.
+
+ NOTE: this zmq implementation should be considered experimental at this
+ time. At this time, eventlet is required to use zmq.
+
+ This class implements a zmq backend for event sharing across multiple
+ processes. To use a zmq backend, initialize the :class:`Server` instance as
+ follows::
+
+ url = 'zmq+tcp://hostname:port1+port2'
+ server = socketio.Server(client_manager=socketio.ZmqManager(url))
+
+ :param url: The connection URL for the zmq message broker,
+ which will need to be provided and running.
+ :param channel: The channel name on which the server sends and receives
+ notifications. Must be the same in all the servers.
+ :param write_only: If set to ``True``, only initialize to emit events. The
+ default of ``False`` initializes the class for emitting
+ and receiving.
+
+ A zmq message broker must be running for the zmq_manager to work.
+ you can write your own or adapt one from the following simple broker
+ below::
+
+ import zmq
+
+ receiver = zmq.Context().socket(zmq.PULL)
+ receiver.bind("tcp://*:5555")
+
+ publisher = zmq.Context().socket(zmq.PUB)
+ publisher.bind("tcp://*:5556")
+
+ while True:
+ publisher.send(receiver.recv())
+ """
+ name = 'zmq'
+
+ def __init__(self, url='zmq+tcp://localhost:5555+5556',
+ channel='socketio',
+ write_only=False,
+ logger=None):
+ if zmq is None:
+ raise RuntimeError('zmq package is not installed '
+ '(Run "pip install pyzmq" in your '
+ 'virtualenv).')
+
+ r = re.compile(r':\d+\+\d+$')
+ if not (url.startswith('zmq+tcp://') and r.search(url)):
+ raise RuntimeError('unexpected connection string: ' + url)
+
+ url = url.replace('zmq+', '')
+ (sink_url, sub_port) = url.split('+')
+ sink_port = sink_url.split(':')[-1]
+ sub_url = sink_url.replace(sink_port, sub_port)
+
+ sink = zmq.Context().socket(zmq.PUSH)
+ sink.connect(sink_url)
+
+ sub = zmq.Context().socket(zmq.SUB)
+ sub.setsockopt_string(zmq.SUBSCRIBE, u'')
+ sub.connect(sub_url)
+
+ self.sink = sink
+ self.sub = sub
+ self.channel = channel
+ super(ZmqManager, self).__init__(channel=channel,
+ write_only=write_only,
+ logger=logger)
+
+ def _publish(self, data):
+ pickled_data = pickle.dumps(
+ {
+ 'type': 'message',
+ 'channel': self.channel,
+ 'data': data
+ }
+ )
+ return self.sink.send(pickled_data)
+
+ def zmq_listen(self):
+ while True:
+ response = self.sub.recv()
+ if response is not None:
+ yield response
+
+ def _listen(self):
+ for message in self.zmq_listen():
+ if isinstance(message, six.binary_type):
+ try:
+ message = pickle.loads(message)
+ except Exception:
+ pass
+ if isinstance(message, dict) and \
+ message['type'] == 'message' and \
+ message['channel'] == self.channel and \
+ 'data' in message:
+ yield message['data']
+ return