diff options
author | morpheus65535 <[email protected]> | 2022-11-07 13:06:49 -0500 |
---|---|---|
committer | morpheus65535 <[email protected]> | 2022-11-07 13:08:27 -0500 |
commit | bbe2483e21c2c1549ceeed16f021f9581b899f70 (patch) | |
tree | bcc2bef2f55789ec6e6c64809c07fb4f4d3d9c86 /libs/socketio | |
parent | 708fbfcd8ec0620647975be39a1f6acbbf08f767 (diff) | |
download | bazarr-bbe2483e21c2c1549ceeed16f021f9581b899f70.tar.gz bazarr-bbe2483e21c2c1549ceeed16f021f9581b899f70.zip |
Updated vendored dependencies.
Diffstat (limited to 'libs/socketio')
-rw-r--r-- | libs/socketio/asyncio_aiopika_manager.py | 2 | ||||
-rw-r--r-- | libs/socketio/asyncio_client.py | 8 | ||||
-rw-r--r-- | libs/socketio/asyncio_manager.py | 12 | ||||
-rw-r--r-- | libs/socketio/asyncio_pubsub_manager.py | 31 | ||||
-rw-r--r-- | libs/socketio/asyncio_redis_manager.py | 21 | ||||
-rw-r--r-- | libs/socketio/asyncio_server.py | 46 | ||||
-rw-r--r-- | libs/socketio/base_manager.py | 3 | ||||
-rw-r--r-- | libs/socketio/client.py | 2 | ||||
-rw-r--r-- | libs/socketio/msgpack_packet.py | 2 | ||||
-rw-r--r-- | libs/socketio/pubsub_manager.py | 25 | ||||
-rw-r--r-- | libs/socketio/server.py | 27 |
11 files changed, 122 insertions, 57 deletions
diff --git a/libs/socketio/asyncio_aiopika_manager.py b/libs/socketio/asyncio_aiopika_manager.py index 96dcec65d..eff3f8c8b 100644 --- a/libs/socketio/asyncio_aiopika_manager.py +++ b/libs/socketio/asyncio_aiopika_manager.py @@ -93,7 +93,7 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover async with self.listener_queue.iterator() as queue_iter: async for message in queue_iter: - with message.process(): + async with message.process(): yield pickle.loads(message.body) except Exception: self._get_logger().error('Cannot receive from rabbitmq... ' diff --git a/libs/socketio/asyncio_client.py b/libs/socketio/asyncio_client.py index 03b770de8..3c700ee02 100644 --- a/libs/socketio/asyncio_client.py +++ b/libs/socketio/asyncio_client.py @@ -50,9 +50,9 @@ class AsyncClient(client.Client): :param request_timeout: A timeout in seconds for requests. The default is 5 seconds. - :param http_session: an initialized ``requests.Session`` object to be used - when sending requests to the server. Use it if you - need to add special client options such as proxy + :param http_session: an initialized ``aiohttp.ClientSession`` object to be + used when sending requests to the server. Use it if + you need to add special client options such as proxy servers, SSL certificates, etc. :param ssl_verify: ``True`` to verify SSL certificates, or ``False`` to skip SSL certificate verification, allowing @@ -497,7 +497,7 @@ class AsyncClient(client.Client): """Handle the Engine.IO connection event.""" self.logger.info('Engine.IO connection established') self.sid = self.eio.sid - real_auth = await self._get_real_value(self.connection_auth) + real_auth = await self._get_real_value(self.connection_auth) or {} for n in self.connection_namespaces: await self._send_packet(self.packet_class( packet.CONNECT, data=real_auth, namespace=n)) diff --git a/libs/socketio/asyncio_manager.py b/libs/socketio/asyncio_manager.py index f89022c62..20a16c888 100644 --- a/libs/socketio/asyncio_manager.py +++ b/libs/socketio/asyncio_manager.py @@ -26,12 +26,20 @@ class AsyncManager(BaseManager): id = self._generate_ack_id(sid, callback) else: id = None - tasks.append(self.server._emit_internal(eio_sid, event, data, - namespace, id)) + tasks.append(asyncio.create_task( + self.server._emit_internal(eio_sid, event, data, + namespace, id))) if tasks == []: # pragma: no cover return await asyncio.wait(tasks) + async def disconnect(self, sid, namespace, **kwargs): + """Disconnect a client. + + Note: this method is a coroutine. + """ + return super().disconnect(sid, namespace, **kwargs) + async def close_room(self, room, namespace): """Remove all participants from a room. diff --git a/libs/socketio/asyncio_pubsub_manager.py b/libs/socketio/asyncio_pubsub_manager.py index ff37f2dfa..1a06889ee 100644 --- a/libs/socketio/asyncio_pubsub_manager.py +++ b/libs/socketio/asyncio_pubsub_manager.py @@ -76,7 +76,14 @@ class AsyncPubSubManager(AsyncManager): else: # client is in another server, so we post request to the queue await self._publish({'method': 'disconnect', 'sid': sid, - 'namespace': namespace or '/'}) + 'namespace': namespace or '/'}) + + async def disconnect(self, sid, namespace, **kwargs): + if kwargs.get('ignore_queue'): + return await super(AsyncPubSubManager, self).disconnect( + sid, namespace=namespace) + await self._publish({'method': 'disconnect', 'sid': sid, + 'namespace': namespace or '/'}) async def close_room(self, room, namespace=None): await self._publish({'method': 'close_room', 'room': room, @@ -166,14 +173,20 @@ class AsyncPubSubManager(AsyncManager): if data and 'method' in data: self._get_logger().info('pubsub message: {}'.format( data['method'])) - if data['method'] == 'emit': - await self._handle_emit(data) - elif data['method'] == 'callback': - await self._handle_callback(data) - elif data['method'] == 'disconnect': - await self._handle_disconnect(data) - elif data['method'] == 'close_room': - await self._handle_close_room(data) + try: + if data['method'] == 'emit': + await self._handle_emit(data) + elif data['method'] == 'callback': + await self._handle_callback(data) + elif data['method'] == 'disconnect': + await self._handle_disconnect(data) + elif data['method'] == 'close_room': + await self._handle_close_room(data) + except asyncio.CancelledError: + raise # let the outer try/except handle it + except: + self.server.logger.exception( + 'Unknown error in pubsub listening task') except asyncio.CancelledError: # pragma: no cover break except: # pragma: no cover diff --git a/libs/socketio/asyncio_redis_manager.py b/libs/socketio/asyncio_redis_manager.py index d9da5f9af..9e3d9544a 100644 --- a/libs/socketio/asyncio_redis_manager.py +++ b/libs/socketio/asyncio_redis_manager.py @@ -1,10 +1,16 @@ import asyncio import pickle -try: - import aioredis -except ImportError: - aioredis = None +try: # pragma: no cover + from redis import asyncio as aioredis + from redis.exceptions import RedisError +except ImportError: # pragma: no cover + try: + import aioredis + from aioredis.exceptions import RedisError + except ImportError: + aioredis = None + RedisError = None from .asyncio_pubsub_manager import AsyncPubSubManager @@ -39,8 +45,7 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover write_only=False, logger=None, redis_options=None): if aioredis is None: raise RuntimeError('Redis package is not installed ' - '(Run "pip install aioredis" in your ' - 'virtualenv).') + '(Run "pip install redis" in your virtualenv).') if not hasattr(aioredis.Redis, 'from_url'): raise RuntimeError('Version 2 of aioredis package is required.') self.redis_url = url @@ -61,7 +66,7 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover self._redis_connect() return await self.redis.publish( self.channel, pickle.dumps(data)) - except aioredis.exceptions.RedisError: + except RedisError: if retry: self._get_logger().error('Cannot publish to redis... ' 'retrying') @@ -82,7 +87,7 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover retry_sleep = 1 async for message in self.pubsub.listen(): yield message - except aioredis.exceptions.RedisError: + except RedisError: self._get_logger().error('Cannot receive from redis... ' 'retrying in ' '{} secs'.format(retry_sleep)) diff --git a/libs/socketio/asyncio_server.py b/libs/socketio/asyncio_server.py index 59fab4a22..eb708b21c 100644 --- a/libs/socketio/asyncio_server.py +++ b/libs/socketio/asyncio_server.py @@ -40,18 +40,23 @@ class AsyncServer(server.Server): connect handler and your client is confused when it receives events before the connection acceptance. In any other case use the default of ``False``. + :param namespaces: a list of namespaces that are accepted, in addition to + any namespaces for which handlers have been defined. The + default is `['/']`, which always accepts connections to + the default namespace. Set to `'*'` to accept all + namespaces. :param kwargs: Connection parameters for the underlying Engine.IO server. The Engine.IO configuration supports the following settings: :param async_mode: The asynchronous model to use. See the Deployment section in the documentation for a description of the - available options. Valid async modes are "threading", - "eventlet", "gevent" and "gevent_uwsgi". If this - argument is not given, "eventlet" is tried first, then - "gevent_uwsgi", then "gevent", and finally "threading". - The first async mode that has all its dependencies - installed is then one that is chosen. + available options. Valid async modes are "aiohttp", + "sanic", "tornado" and "asgi". If this argument is not + given, "aiohttp" is tried first, followed by "sanic", + "tornado", and finally "asgi". The first async mode that + has all its dependencies installed is the one that is + chosen. :param ping_interval: The interval in seconds at which the server pings the client. The default is 25 seconds. For advanced control, a two element tuple can be given, where @@ -97,11 +102,12 @@ class AsyncServer(server.Server): ``engineio_logger`` is ``False``. """ def __init__(self, client_manager=None, logger=False, json=None, - async_handlers=True, **kwargs): + async_handlers=True, namespaces=None, **kwargs): if client_manager is None: client_manager = asyncio_manager.AsyncManager() super().__init__(client_manager=client_manager, logger=logger, - json=json, async_handlers=async_handlers, **kwargs) + json=json, async_handlers=async_handlers, + namespaces=namespaces, **kwargs) def is_asyncio_based(self): return True @@ -378,7 +384,8 @@ class AsyncServer(server.Server): await self._send_packet(eio_sid, self.packet_class( packet.DISCONNECT, namespace=namespace)) await self._trigger_event('disconnect', namespace, sid) - self.manager.disconnect(sid, namespace=namespace) + await self.manager.disconnect(sid, namespace=namespace, + ignore_queue=True) async def handle_request(self, *args, **kwargs): """Handle an HTTP request from the client. @@ -442,7 +449,16 @@ class AsyncServer(server.Server): async def _handle_connect(self, eio_sid, namespace, data): """Handle a client connection request.""" namespace = namespace or '/' - sid = self.manager.connect(eio_sid, namespace) + sid = None + if namespace in self.handlers or namespace in self.namespace_handlers \ + or self.namespaces == '*' or namespace in self.namespaces: + sid = self.manager.connect(eio_sid, namespace) + if sid is None: + await self._send_packet(eio_sid, self.packet_class( + packet.CONNECT_ERROR, data='Unable to connect', + namespace=namespace)) + return + if self.always_connect: await self._send_packet(eio_sid, self.packet_class( packet.CONNECT, {'sid': sid}, namespace=namespace)) @@ -471,7 +487,7 @@ class AsyncServer(server.Server): await self._send_packet(eio_sid, self.packet_class( packet.CONNECT_ERROR, data=fail_reason, namespace=namespace)) - self.manager.disconnect(sid, namespace) + await self.manager.disconnect(sid, namespace, ignore_queue=True) elif not self.always_connect: await self._send_packet(eio_sid, self.packet_class( packet.CONNECT, {'sid': sid}, namespace=namespace)) @@ -484,7 +500,7 @@ class AsyncServer(server.Server): return self.manager.pre_disconnect(sid, namespace=namespace) await self._trigger_event('disconnect', namespace, sid) - self.manager.disconnect(sid, namespace) + await self.manager.disconnect(sid, namespace, ignore_queue=True) async def _handle_event(self, eio_sid, namespace, id, data): """Handle an incoming client event.""" @@ -506,7 +522,7 @@ class AsyncServer(server.Server): async def _handle_event_internal(self, server, sid, eio_sid, data, namespace, id): r = await server._trigger_event(data[0], namespace, sid, *data[1:]) - if id is not None: + if r != self.not_handled and id is not None: # send ACK packet with the response returned by the handler # tuples are expanded as multiple arguments if r is None: @@ -545,9 +561,11 @@ class AsyncServer(server.Server): else: ret = handler(*args) return ret + else: + return self.not_handled # or else, forward the event to a namepsace handler if one exists - elif namespace in self.namespace_handlers: + elif namespace in self.namespace_handlers: # pragma: no branch return await self.namespace_handlers[namespace].trigger_event( event, *args) diff --git a/libs/socketio/base_manager.py b/libs/socketio/base_manager.py index 0d6e1a9f2..87d238793 100644 --- a/libs/socketio/base_manager.py +++ b/libs/socketio/base_manager.py @@ -68,6 +68,7 @@ class BaseManager(object): return self.rooms[namespace][None][sid] is not None except KeyError: pass + return False def sid_from_eio_sid(self, eio_sid, namespace): try: @@ -94,7 +95,7 @@ class BaseManager(object): self.pending_disconnect[namespace].append(sid) return self.rooms[namespace][None].get(sid) - def disconnect(self, sid, namespace): + def disconnect(self, sid, namespace, **kwargs): """Register a client disconnect from a namespace.""" if namespace not in self.rooms: return diff --git a/libs/socketio/client.py b/libs/socketio/client.py index 5046ea860..58d381b1d 100644 --- a/libs/socketio/client.py +++ b/libs/socketio/client.py @@ -680,7 +680,7 @@ class Client(object): """Handle the Engine.IO connection event.""" self.logger.info('Engine.IO connection established') self.sid = self.eio.sid - real_auth = self._get_real_value(self.connection_auth) + real_auth = self._get_real_value(self.connection_auth) or {} for n in self.connection_namespaces: self._send_packet(self.packet_class( packet.CONNECT, data=real_auth, namespace=n)) diff --git a/libs/socketio/msgpack_packet.py b/libs/socketio/msgpack_packet.py index cb6afe838..27462634a 100644 --- a/libs/socketio/msgpack_packet.py +++ b/libs/socketio/msgpack_packet.py @@ -13,6 +13,6 @@ class MsgPackPacket(packet.Packet): """Decode a transmitted package.""" decoded = msgpack.loads(encoded_packet) self.packet_type = decoded['type'] - self.data = decoded['data'] + self.data = decoded.get('data') self.id = decoded.get('id') self.namespace = decoded['nsp'] diff --git a/libs/socketio/pubsub_manager.py b/libs/socketio/pubsub_manager.py index 9b6f36de2..51079bf22 100644 --- a/libs/socketio/pubsub_manager.py +++ b/libs/socketio/pubsub_manager.py @@ -75,7 +75,10 @@ class PubSubManager(BaseManager): self._publish({'method': 'disconnect', 'sid': sid, 'namespace': namespace or '/'}) - def disconnect(self, sid, namespace=None): + def disconnect(self, sid, namespace=None, **kwargs): + if kwargs.get('ignore_queue'): + return super(PubSubManager, self).disconnect( + sid, namespace=namespace) self._publish({'method': 'disconnect', 'sid': sid, 'namespace': namespace or '/'}) @@ -164,11 +167,15 @@ class PubSubManager(BaseManager): if data and 'method' in data: self._get_logger().info('pubsub message: {}'.format( data['method'])) - if data['method'] == 'emit': - self._handle_emit(data) - elif data['method'] == 'callback': - self._handle_callback(data) - elif data['method'] == 'disconnect': - self._handle_disconnect(data) - elif data['method'] == 'close_room': - self._handle_close_room(data) + try: + if data['method'] == 'emit': + self._handle_emit(data) + elif data['method'] == 'callback': + self._handle_callback(data) + elif data['method'] == 'disconnect': + self._handle_disconnect(data) + elif data['method'] == 'close_room': + self._handle_close_room(data) + except: + self.server.logger.exception( + 'Unknown error in pubsub listening thread') diff --git a/libs/socketio/server.py b/libs/socketio/server.py index cdf255b3f..daa1c0d4d 100644 --- a/libs/socketio/server.py +++ b/libs/socketio/server.py @@ -49,6 +49,11 @@ class Server(object): connect handler and your client is confused when it receives events before the connection acceptance. In any other case use the default of ``False``. + :param namespaces: a list of namespaces that are accepted, in addition to + any namespaces for which handlers have been defined. The + default is `['/']`, which always accepts connections to + the default namespace. Set to `'*'` to accept all + namespaces. :param kwargs: Connection parameters for the underlying Engine.IO server. The Engine.IO configuration supports the following settings: @@ -110,7 +115,7 @@ class Server(object): def __init__(self, client_manager=None, logger=False, serializer='default', json=None, async_handlers=True, always_connect=False, - **kwargs): + namespaces=None, **kwargs): engineio_options = kwargs engineio_logger = engineio_options.pop('engineio_logger', None) if engineio_logger is not None: @@ -134,6 +139,7 @@ class Server(object): self.environ = {} self.handlers = {} self.namespace_handlers = {} + self.not_handled = object() self._binary_packet = {} @@ -156,6 +162,7 @@ class Server(object): self.async_handlers = async_handlers self.always_connect = always_connect + self.namespaces = namespaces or ['/'] self.async_mode = self.eio.async_mode @@ -558,7 +565,8 @@ class Server(object): self._send_packet(eio_sid, self.packet_class( packet.DISCONNECT, namespace=namespace)) self._trigger_event('disconnect', namespace, sid) - self.manager.disconnect(sid, namespace=namespace) + self.manager.disconnect(sid, namespace=namespace, + ignore_queue=True) def transport(self, sid): """Return the name of the transport used by the client. @@ -648,7 +656,10 @@ class Server(object): def _handle_connect(self, eio_sid, namespace, data): """Handle a client connection request.""" namespace = namespace or '/' - sid = self.manager.connect(eio_sid, namespace) + sid = None + if namespace in self.handlers or namespace in self.namespace_handlers \ + or self.namespaces == '*' or namespace in self.namespaces: + sid = self.manager.connect(eio_sid, namespace) if sid is None: self._send_packet(eio_sid, self.packet_class( packet.CONNECT_ERROR, data='Unable to connect', @@ -683,7 +694,7 @@ class Server(object): self._send_packet(eio_sid, self.packet_class( packet.CONNECT_ERROR, data=fail_reason, namespace=namespace)) - self.manager.disconnect(sid, namespace) + self.manager.disconnect(sid, namespace, ignore_queue=True) elif not self.always_connect: self._send_packet(eio_sid, self.packet_class( packet.CONNECT, {'sid': sid}, namespace=namespace)) @@ -696,7 +707,7 @@ class Server(object): return self.manager.pre_disconnect(sid, namespace=namespace) self._trigger_event('disconnect', namespace, sid) - self.manager.disconnect(sid, namespace) + self.manager.disconnect(sid, namespace, ignore_queue=True) def _handle_event(self, eio_sid, namespace, id, data): """Handle an incoming client event.""" @@ -718,7 +729,7 @@ class Server(object): def _handle_event_internal(self, server, sid, eio_sid, data, namespace, id): r = server._trigger_event(data[0], namespace, sid, *data[1:]) - if id is not None: + if r != self.not_handled and id is not None: # send ACK packet with the response returned by the handler # tuples are expanded as multiple arguments if r is None: @@ -746,9 +757,11 @@ class Server(object): elif event not in self.reserved_events and \ '*' in self.handlers[namespace]: return self.handlers[namespace]['*'](event, *args) + else: + return self.not_handled # or else, forward the event to a namespace handler if one exists - elif namespace in self.namespace_handlers: + elif namespace in self.namespace_handlers: # pragma: no branch return self.namespace_handlers[namespace].trigger_event( event, *args) |