summaryrefslogtreecommitdiffhomepage
path: root/libs/socketio
diff options
context:
space:
mode:
authormorpheus65535 <[email protected]>2022-11-07 13:06:49 -0500
committermorpheus65535 <[email protected]>2022-11-07 13:08:27 -0500
commitbbe2483e21c2c1549ceeed16f021f9581b899f70 (patch)
treebcc2bef2f55789ec6e6c64809c07fb4f4d3d9c86 /libs/socketio
parent708fbfcd8ec0620647975be39a1f6acbbf08f767 (diff)
downloadbazarr-bbe2483e21c2c1549ceeed16f021f9581b899f70.tar.gz
bazarr-bbe2483e21c2c1549ceeed16f021f9581b899f70.zip
Updated vendored dependencies.
Diffstat (limited to 'libs/socketio')
-rw-r--r--libs/socketio/asyncio_aiopika_manager.py2
-rw-r--r--libs/socketio/asyncio_client.py8
-rw-r--r--libs/socketio/asyncio_manager.py12
-rw-r--r--libs/socketio/asyncio_pubsub_manager.py31
-rw-r--r--libs/socketio/asyncio_redis_manager.py21
-rw-r--r--libs/socketio/asyncio_server.py46
-rw-r--r--libs/socketio/base_manager.py3
-rw-r--r--libs/socketio/client.py2
-rw-r--r--libs/socketio/msgpack_packet.py2
-rw-r--r--libs/socketio/pubsub_manager.py25
-rw-r--r--libs/socketio/server.py27
11 files changed, 122 insertions, 57 deletions
diff --git a/libs/socketio/asyncio_aiopika_manager.py b/libs/socketio/asyncio_aiopika_manager.py
index 96dcec65d..eff3f8c8b 100644
--- a/libs/socketio/asyncio_aiopika_manager.py
+++ b/libs/socketio/asyncio_aiopika_manager.py
@@ -93,7 +93,7 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover
async with self.listener_queue.iterator() as queue_iter:
async for message in queue_iter:
- with message.process():
+ async with message.process():
yield pickle.loads(message.body)
except Exception:
self._get_logger().error('Cannot receive from rabbitmq... '
diff --git a/libs/socketio/asyncio_client.py b/libs/socketio/asyncio_client.py
index 03b770de8..3c700ee02 100644
--- a/libs/socketio/asyncio_client.py
+++ b/libs/socketio/asyncio_client.py
@@ -50,9 +50,9 @@ class AsyncClient(client.Client):
:param request_timeout: A timeout in seconds for requests. The default is
5 seconds.
- :param http_session: an initialized ``requests.Session`` object to be used
- when sending requests to the server. Use it if you
- need to add special client options such as proxy
+ :param http_session: an initialized ``aiohttp.ClientSession`` object to be
+ used when sending requests to the server. Use it if
+ you need to add special client options such as proxy
servers, SSL certificates, etc.
:param ssl_verify: ``True`` to verify SSL certificates, or ``False`` to
skip SSL certificate verification, allowing
@@ -497,7 +497,7 @@ class AsyncClient(client.Client):
"""Handle the Engine.IO connection event."""
self.logger.info('Engine.IO connection established')
self.sid = self.eio.sid
- real_auth = await self._get_real_value(self.connection_auth)
+ real_auth = await self._get_real_value(self.connection_auth) or {}
for n in self.connection_namespaces:
await self._send_packet(self.packet_class(
packet.CONNECT, data=real_auth, namespace=n))
diff --git a/libs/socketio/asyncio_manager.py b/libs/socketio/asyncio_manager.py
index f89022c62..20a16c888 100644
--- a/libs/socketio/asyncio_manager.py
+++ b/libs/socketio/asyncio_manager.py
@@ -26,12 +26,20 @@ class AsyncManager(BaseManager):
id = self._generate_ack_id(sid, callback)
else:
id = None
- tasks.append(self.server._emit_internal(eio_sid, event, data,
- namespace, id))
+ tasks.append(asyncio.create_task(
+ self.server._emit_internal(eio_sid, event, data,
+ namespace, id)))
if tasks == []: # pragma: no cover
return
await asyncio.wait(tasks)
+ async def disconnect(self, sid, namespace, **kwargs):
+ """Disconnect a client.
+
+ Note: this method is a coroutine.
+ """
+ return super().disconnect(sid, namespace, **kwargs)
+
async def close_room(self, room, namespace):
"""Remove all participants from a room.
diff --git a/libs/socketio/asyncio_pubsub_manager.py b/libs/socketio/asyncio_pubsub_manager.py
index ff37f2dfa..1a06889ee 100644
--- a/libs/socketio/asyncio_pubsub_manager.py
+++ b/libs/socketio/asyncio_pubsub_manager.py
@@ -76,7 +76,14 @@ class AsyncPubSubManager(AsyncManager):
else:
# client is in another server, so we post request to the queue
await self._publish({'method': 'disconnect', 'sid': sid,
- 'namespace': namespace or '/'})
+ 'namespace': namespace or '/'})
+
+ async def disconnect(self, sid, namespace, **kwargs):
+ if kwargs.get('ignore_queue'):
+ return await super(AsyncPubSubManager, self).disconnect(
+ sid, namespace=namespace)
+ await self._publish({'method': 'disconnect', 'sid': sid,
+ 'namespace': namespace or '/'})
async def close_room(self, room, namespace=None):
await self._publish({'method': 'close_room', 'room': room,
@@ -166,14 +173,20 @@ class AsyncPubSubManager(AsyncManager):
if data and 'method' in data:
self._get_logger().info('pubsub message: {}'.format(
data['method']))
- if data['method'] == 'emit':
- await self._handle_emit(data)
- elif data['method'] == 'callback':
- await self._handle_callback(data)
- elif data['method'] == 'disconnect':
- await self._handle_disconnect(data)
- elif data['method'] == 'close_room':
- await self._handle_close_room(data)
+ try:
+ if data['method'] == 'emit':
+ await self._handle_emit(data)
+ elif data['method'] == 'callback':
+ await self._handle_callback(data)
+ elif data['method'] == 'disconnect':
+ await self._handle_disconnect(data)
+ elif data['method'] == 'close_room':
+ await self._handle_close_room(data)
+ except asyncio.CancelledError:
+ raise # let the outer try/except handle it
+ except:
+ self.server.logger.exception(
+ 'Unknown error in pubsub listening task')
except asyncio.CancelledError: # pragma: no cover
break
except: # pragma: no cover
diff --git a/libs/socketio/asyncio_redis_manager.py b/libs/socketio/asyncio_redis_manager.py
index d9da5f9af..9e3d9544a 100644
--- a/libs/socketio/asyncio_redis_manager.py
+++ b/libs/socketio/asyncio_redis_manager.py
@@ -1,10 +1,16 @@
import asyncio
import pickle
-try:
- import aioredis
-except ImportError:
- aioredis = None
+try: # pragma: no cover
+ from redis import asyncio as aioredis
+ from redis.exceptions import RedisError
+except ImportError: # pragma: no cover
+ try:
+ import aioredis
+ from aioredis.exceptions import RedisError
+ except ImportError:
+ aioredis = None
+ RedisError = None
from .asyncio_pubsub_manager import AsyncPubSubManager
@@ -39,8 +45,7 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
write_only=False, logger=None, redis_options=None):
if aioredis is None:
raise RuntimeError('Redis package is not installed '
- '(Run "pip install aioredis" in your '
- 'virtualenv).')
+ '(Run "pip install redis" in your virtualenv).')
if not hasattr(aioredis.Redis, 'from_url'):
raise RuntimeError('Version 2 of aioredis package is required.')
self.redis_url = url
@@ -61,7 +66,7 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
self._redis_connect()
return await self.redis.publish(
self.channel, pickle.dumps(data))
- except aioredis.exceptions.RedisError:
+ except RedisError:
if retry:
self._get_logger().error('Cannot publish to redis... '
'retrying')
@@ -82,7 +87,7 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
retry_sleep = 1
async for message in self.pubsub.listen():
yield message
- except aioredis.exceptions.RedisError:
+ except RedisError:
self._get_logger().error('Cannot receive from redis... '
'retrying in '
'{} secs'.format(retry_sleep))
diff --git a/libs/socketio/asyncio_server.py b/libs/socketio/asyncio_server.py
index 59fab4a22..eb708b21c 100644
--- a/libs/socketio/asyncio_server.py
+++ b/libs/socketio/asyncio_server.py
@@ -40,18 +40,23 @@ class AsyncServer(server.Server):
connect handler and your client is confused when it
receives events before the connection acceptance.
In any other case use the default of ``False``.
+ :param namespaces: a list of namespaces that are accepted, in addition to
+ any namespaces for which handlers have been defined. The
+ default is `['/']`, which always accepts connections to
+ the default namespace. Set to `'*'` to accept all
+ namespaces.
:param kwargs: Connection parameters for the underlying Engine.IO server.
The Engine.IO configuration supports the following settings:
:param async_mode: The asynchronous model to use. See the Deployment
section in the documentation for a description of the
- available options. Valid async modes are "threading",
- "eventlet", "gevent" and "gevent_uwsgi". If this
- argument is not given, "eventlet" is tried first, then
- "gevent_uwsgi", then "gevent", and finally "threading".
- The first async mode that has all its dependencies
- installed is then one that is chosen.
+ available options. Valid async modes are "aiohttp",
+ "sanic", "tornado" and "asgi". If this argument is not
+ given, "aiohttp" is tried first, followed by "sanic",
+ "tornado", and finally "asgi". The first async mode that
+ has all its dependencies installed is the one that is
+ chosen.
:param ping_interval: The interval in seconds at which the server pings
the client. The default is 25 seconds. For advanced
control, a two element tuple can be given, where
@@ -97,11 +102,12 @@ class AsyncServer(server.Server):
``engineio_logger`` is ``False``.
"""
def __init__(self, client_manager=None, logger=False, json=None,
- async_handlers=True, **kwargs):
+ async_handlers=True, namespaces=None, **kwargs):
if client_manager is None:
client_manager = asyncio_manager.AsyncManager()
super().__init__(client_manager=client_manager, logger=logger,
- json=json, async_handlers=async_handlers, **kwargs)
+ json=json, async_handlers=async_handlers,
+ namespaces=namespaces, **kwargs)
def is_asyncio_based(self):
return True
@@ -378,7 +384,8 @@ class AsyncServer(server.Server):
await self._send_packet(eio_sid, self.packet_class(
packet.DISCONNECT, namespace=namespace))
await self._trigger_event('disconnect', namespace, sid)
- self.manager.disconnect(sid, namespace=namespace)
+ await self.manager.disconnect(sid, namespace=namespace,
+ ignore_queue=True)
async def handle_request(self, *args, **kwargs):
"""Handle an HTTP request from the client.
@@ -442,7 +449,16 @@ class AsyncServer(server.Server):
async def _handle_connect(self, eio_sid, namespace, data):
"""Handle a client connection request."""
namespace = namespace or '/'
- sid = self.manager.connect(eio_sid, namespace)
+ sid = None
+ if namespace in self.handlers or namespace in self.namespace_handlers \
+ or self.namespaces == '*' or namespace in self.namespaces:
+ sid = self.manager.connect(eio_sid, namespace)
+ if sid is None:
+ await self._send_packet(eio_sid, self.packet_class(
+ packet.CONNECT_ERROR, data='Unable to connect',
+ namespace=namespace))
+ return
+
if self.always_connect:
await self._send_packet(eio_sid, self.packet_class(
packet.CONNECT, {'sid': sid}, namespace=namespace))
@@ -471,7 +487,7 @@ class AsyncServer(server.Server):
await self._send_packet(eio_sid, self.packet_class(
packet.CONNECT_ERROR, data=fail_reason,
namespace=namespace))
- self.manager.disconnect(sid, namespace)
+ await self.manager.disconnect(sid, namespace, ignore_queue=True)
elif not self.always_connect:
await self._send_packet(eio_sid, self.packet_class(
packet.CONNECT, {'sid': sid}, namespace=namespace))
@@ -484,7 +500,7 @@ class AsyncServer(server.Server):
return
self.manager.pre_disconnect(sid, namespace=namespace)
await self._trigger_event('disconnect', namespace, sid)
- self.manager.disconnect(sid, namespace)
+ await self.manager.disconnect(sid, namespace, ignore_queue=True)
async def _handle_event(self, eio_sid, namespace, id, data):
"""Handle an incoming client event."""
@@ -506,7 +522,7 @@ class AsyncServer(server.Server):
async def _handle_event_internal(self, server, sid, eio_sid, data,
namespace, id):
r = await server._trigger_event(data[0], namespace, sid, *data[1:])
- if id is not None:
+ if r != self.not_handled and id is not None:
# send ACK packet with the response returned by the handler
# tuples are expanded as multiple arguments
if r is None:
@@ -545,9 +561,11 @@ class AsyncServer(server.Server):
else:
ret = handler(*args)
return ret
+ else:
+ return self.not_handled
# or else, forward the event to a namepsace handler if one exists
- elif namespace in self.namespace_handlers:
+ elif namespace in self.namespace_handlers: # pragma: no branch
return await self.namespace_handlers[namespace].trigger_event(
event, *args)
diff --git a/libs/socketio/base_manager.py b/libs/socketio/base_manager.py
index 0d6e1a9f2..87d238793 100644
--- a/libs/socketio/base_manager.py
+++ b/libs/socketio/base_manager.py
@@ -68,6 +68,7 @@ class BaseManager(object):
return self.rooms[namespace][None][sid] is not None
except KeyError:
pass
+ return False
def sid_from_eio_sid(self, eio_sid, namespace):
try:
@@ -94,7 +95,7 @@ class BaseManager(object):
self.pending_disconnect[namespace].append(sid)
return self.rooms[namespace][None].get(sid)
- def disconnect(self, sid, namespace):
+ def disconnect(self, sid, namespace, **kwargs):
"""Register a client disconnect from a namespace."""
if namespace not in self.rooms:
return
diff --git a/libs/socketio/client.py b/libs/socketio/client.py
index 5046ea860..58d381b1d 100644
--- a/libs/socketio/client.py
+++ b/libs/socketio/client.py
@@ -680,7 +680,7 @@ class Client(object):
"""Handle the Engine.IO connection event."""
self.logger.info('Engine.IO connection established')
self.sid = self.eio.sid
- real_auth = self._get_real_value(self.connection_auth)
+ real_auth = self._get_real_value(self.connection_auth) or {}
for n in self.connection_namespaces:
self._send_packet(self.packet_class(
packet.CONNECT, data=real_auth, namespace=n))
diff --git a/libs/socketio/msgpack_packet.py b/libs/socketio/msgpack_packet.py
index cb6afe838..27462634a 100644
--- a/libs/socketio/msgpack_packet.py
+++ b/libs/socketio/msgpack_packet.py
@@ -13,6 +13,6 @@ class MsgPackPacket(packet.Packet):
"""Decode a transmitted package."""
decoded = msgpack.loads(encoded_packet)
self.packet_type = decoded['type']
- self.data = decoded['data']
+ self.data = decoded.get('data')
self.id = decoded.get('id')
self.namespace = decoded['nsp']
diff --git a/libs/socketio/pubsub_manager.py b/libs/socketio/pubsub_manager.py
index 9b6f36de2..51079bf22 100644
--- a/libs/socketio/pubsub_manager.py
+++ b/libs/socketio/pubsub_manager.py
@@ -75,7 +75,10 @@ class PubSubManager(BaseManager):
self._publish({'method': 'disconnect', 'sid': sid,
'namespace': namespace or '/'})
- def disconnect(self, sid, namespace=None):
+ def disconnect(self, sid, namespace=None, **kwargs):
+ if kwargs.get('ignore_queue'):
+ return super(PubSubManager, self).disconnect(
+ sid, namespace=namespace)
self._publish({'method': 'disconnect', 'sid': sid,
'namespace': namespace or '/'})
@@ -164,11 +167,15 @@ class PubSubManager(BaseManager):
if data and 'method' in data:
self._get_logger().info('pubsub message: {}'.format(
data['method']))
- if data['method'] == 'emit':
- self._handle_emit(data)
- elif data['method'] == 'callback':
- self._handle_callback(data)
- elif data['method'] == 'disconnect':
- self._handle_disconnect(data)
- elif data['method'] == 'close_room':
- self._handle_close_room(data)
+ try:
+ if data['method'] == 'emit':
+ self._handle_emit(data)
+ elif data['method'] == 'callback':
+ self._handle_callback(data)
+ elif data['method'] == 'disconnect':
+ self._handle_disconnect(data)
+ elif data['method'] == 'close_room':
+ self._handle_close_room(data)
+ except:
+ self.server.logger.exception(
+ 'Unknown error in pubsub listening thread')
diff --git a/libs/socketio/server.py b/libs/socketio/server.py
index cdf255b3f..daa1c0d4d 100644
--- a/libs/socketio/server.py
+++ b/libs/socketio/server.py
@@ -49,6 +49,11 @@ class Server(object):
connect handler and your client is confused when it
receives events before the connection acceptance.
In any other case use the default of ``False``.
+ :param namespaces: a list of namespaces that are accepted, in addition to
+ any namespaces for which handlers have been defined. The
+ default is `['/']`, which always accepts connections to
+ the default namespace. Set to `'*'` to accept all
+ namespaces.
:param kwargs: Connection parameters for the underlying Engine.IO server.
The Engine.IO configuration supports the following settings:
@@ -110,7 +115,7 @@ class Server(object):
def __init__(self, client_manager=None, logger=False, serializer='default',
json=None, async_handlers=True, always_connect=False,
- **kwargs):
+ namespaces=None, **kwargs):
engineio_options = kwargs
engineio_logger = engineio_options.pop('engineio_logger', None)
if engineio_logger is not None:
@@ -134,6 +139,7 @@ class Server(object):
self.environ = {}
self.handlers = {}
self.namespace_handlers = {}
+ self.not_handled = object()
self._binary_packet = {}
@@ -156,6 +162,7 @@ class Server(object):
self.async_handlers = async_handlers
self.always_connect = always_connect
+ self.namespaces = namespaces or ['/']
self.async_mode = self.eio.async_mode
@@ -558,7 +565,8 @@ class Server(object):
self._send_packet(eio_sid, self.packet_class(
packet.DISCONNECT, namespace=namespace))
self._trigger_event('disconnect', namespace, sid)
- self.manager.disconnect(sid, namespace=namespace)
+ self.manager.disconnect(sid, namespace=namespace,
+ ignore_queue=True)
def transport(self, sid):
"""Return the name of the transport used by the client.
@@ -648,7 +656,10 @@ class Server(object):
def _handle_connect(self, eio_sid, namespace, data):
"""Handle a client connection request."""
namespace = namespace or '/'
- sid = self.manager.connect(eio_sid, namespace)
+ sid = None
+ if namespace in self.handlers or namespace in self.namespace_handlers \
+ or self.namespaces == '*' or namespace in self.namespaces:
+ sid = self.manager.connect(eio_sid, namespace)
if sid is None:
self._send_packet(eio_sid, self.packet_class(
packet.CONNECT_ERROR, data='Unable to connect',
@@ -683,7 +694,7 @@ class Server(object):
self._send_packet(eio_sid, self.packet_class(
packet.CONNECT_ERROR, data=fail_reason,
namespace=namespace))
- self.manager.disconnect(sid, namespace)
+ self.manager.disconnect(sid, namespace, ignore_queue=True)
elif not self.always_connect:
self._send_packet(eio_sid, self.packet_class(
packet.CONNECT, {'sid': sid}, namespace=namespace))
@@ -696,7 +707,7 @@ class Server(object):
return
self.manager.pre_disconnect(sid, namespace=namespace)
self._trigger_event('disconnect', namespace, sid)
- self.manager.disconnect(sid, namespace)
+ self.manager.disconnect(sid, namespace, ignore_queue=True)
def _handle_event(self, eio_sid, namespace, id, data):
"""Handle an incoming client event."""
@@ -718,7 +729,7 @@ class Server(object):
def _handle_event_internal(self, server, sid, eio_sid, data, namespace,
id):
r = server._trigger_event(data[0], namespace, sid, *data[1:])
- if id is not None:
+ if r != self.not_handled and id is not None:
# send ACK packet with the response returned by the handler
# tuples are expanded as multiple arguments
if r is None:
@@ -746,9 +757,11 @@ class Server(object):
elif event not in self.reserved_events and \
'*' in self.handlers[namespace]:
return self.handlers[namespace]['*'](event, *args)
+ else:
+ return self.not_handled
# or else, forward the event to a namespace handler if one exists
- elif namespace in self.namespace_handlers:
+ elif namespace in self.namespace_handlers: # pragma: no branch
return self.namespace_handlers[namespace].trigger_event(
event, *args)