diff options
author | morpheus65535 <[email protected]> | 2021-11-29 23:07:14 -0500 |
---|---|---|
committer | morpheus65535 <[email protected]> | 2021-11-29 23:07:14 -0500 |
commit | c60c7513a5a776b2a15ac3a7b463d0ef9875cf04 (patch) | |
tree | 34a617dc4e93841387601fec4b74c75f07298d0a /libs/engineio | |
parent | a7a685491a42c2ff8327b9ac4a9328fd61012528 (diff) | |
download | bazarr-c60c7513a5a776b2a15ac3a7b463d0ef9875cf04.tar.gz bazarr-c60c7513a5a776b2a15ac3a7b463d0ef9875cf04.zip |
Upgraded engine.io module to improve socket.io connection stability. Should help to prevent #1613.v1.0.2-beta.0
Diffstat (limited to 'libs/engineio')
-rw-r--r-- | libs/engineio/__init__.py | 2 | ||||
-rw-r--r-- | libs/engineio/async_drivers/asgi.py | 37 | ||||
-rw-r--r-- | libs/engineio/async_drivers/gevent_uwsgi.py | 22 | ||||
-rw-r--r-- | libs/engineio/async_drivers/threading.py | 37 | ||||
-rw-r--r-- | libs/engineio/asyncio_client.py | 44 | ||||
-rw-r--r-- | libs/engineio/asyncio_server.py | 19 | ||||
-rw-r--r-- | libs/engineio/asyncio_socket.py | 12 | ||||
-rw-r--r-- | libs/engineio/client.py | 44 | ||||
-rw-r--r-- | libs/engineio/json.py | 16 | ||||
-rw-r--r-- | libs/engineio/middleware.py | 24 | ||||
-rw-r--r-- | libs/engineio/packet.py | 4 | ||||
-rw-r--r-- | libs/engineio/server.py | 64 | ||||
-rw-r--r-- | libs/engineio/socket.py | 12 | ||||
-rw-r--r-- | libs/engineio/static_files.py | 13 |
14 files changed, 232 insertions, 118 deletions
diff --git a/libs/engineio/__init__.py b/libs/engineio/__init__.py index b897468d2..b87baf0c0 100644 --- a/libs/engineio/__init__.py +++ b/libs/engineio/__init__.py @@ -17,7 +17,7 @@ else: # pragma: no cover get_tornado_handler = None ASGIApp = None -__version__ = '4.0.2dev' +__version__ = '4.2.1dev' __all__ = ['__version__', 'Server', 'WSGIApp', 'Middleware', 'Client'] if AsyncServer is not None: # pragma: no cover diff --git a/libs/engineio/async_drivers/asgi.py b/libs/engineio/async_drivers/asgi.py index eb3139b5e..092f4c244 100644 --- a/libs/engineio/async_drivers/asgi.py +++ b/libs/engineio/async_drivers/asgi.py @@ -43,19 +43,23 @@ class ASGIApp: on_startup=None, on_shutdown=None): self.engineio_server = engineio_server self.other_asgi_app = other_asgi_app - self.engineio_path = engineio_path.strip('/') + self.engineio_path = engineio_path + if not self.engineio_path.startswith('/'): + self.engineio_path = '/' + self.engineio_path + if not self.engineio_path.endswith('/'): + self.engineio_path += '/' self.static_files = static_files or {} self.on_startup = on_startup self.on_shutdown = on_shutdown async def __call__(self, scope, receive, send): if scope['type'] in ['http', 'websocket'] and \ - scope['path'].startswith('/{0}/'.format(self.engineio_path)): + scope['path'].startswith(self.engineio_path): await self.engineio_server.handle_request(scope, receive, send) else: static_file = get_static_file(scope['path'], self.static_files) \ if scope['type'] == 'http' and self.static_files else None - if static_file: + if static_file and os.path.exists(static_file['filename']): await self.serve_static_file(static_file, receive, send) elif self.other_asgi_app is not None: await self.other_asgi_app(scope, receive, send) @@ -68,17 +72,14 @@ class ASGIApp: send): # pragma: no cover event = await receive() if event['type'] == 'http.request': - if os.path.exists(static_file['filename']): - with open(static_file['filename'], 'rb') as f: - payload = f.read() - await send({'type': 'http.response.start', - 'status': 200, - 'headers': [(b'Content-Type', static_file[ - 'content_type'].encode('utf-8'))]}) - await send({'type': 'http.response.body', - 'body': payload}) - else: - await self.not_found(receive, send) + with open(static_file['filename'], 'rb') as f: + payload = f.read() + await send({'type': 'http.response.start', + 'status': 200, + 'headers': [(b'Content-Type', static_file[ + 'content_type'].encode('utf-8'))]}) + await send({'type': 'http.response.body', + 'body': payload}) async def lifespan(self, receive, send): while True: @@ -195,7 +196,13 @@ async def make_response(status, headers, payload, environ): await environ['asgi.send']({'type': 'websocket.accept', 'headers': headers}) else: - await environ['asgi.send']({'type': 'websocket.close'}) + if payload: + reason = payload.decode('utf-8') \ + if isinstance(payload, bytes) else str(payload) + await environ['asgi.send']({'type': 'websocket.close', + 'reason': reason}) + else: + await environ['asgi.send']({'type': 'websocket.close'}) return await environ['asgi.send']({'type': 'http.response.start', diff --git a/libs/engineio/async_drivers/gevent_uwsgi.py b/libs/engineio/async_drivers/gevent_uwsgi.py index bdee812de..43cf69868 100644 --- a/libs/engineio/async_drivers/gevent_uwsgi.py +++ b/libs/engineio/async_drivers/gevent_uwsgi.py @@ -1,8 +1,7 @@ -from __future__ import absolute_import - import gevent from gevent import queue from gevent.event import Event +from gevent import selectors import uwsgi _websocket_available = hasattr(uwsgi, 'websocket_handshake') @@ -40,21 +39,20 @@ class uWSGIWebSocket(object): # pragma: no cover self._req_ctx = uwsgi.request_context() else: # use event and queue for sending messages - from gevent.event import Event - from gevent.queue import Queue - from gevent.select import select self._event = Event() - self._send_queue = Queue() + self._send_queue = queue.Queue() # spawn a select greenlet def select_greenlet_runner(fd, event): """Sets event when data becomes available to read on fd.""" - while True: - event.set() - try: - select([fd], [], [])[0] - except ValueError: - break + sel = selectors.DefaultSelector() + sel.register(fd, selectors.EVENT_READ) + try: + while True: + sel.select() + event.set() + except gevent.GreenletExit: + sel.unregister(fd) self._select_greenlet = gevent.spawn( select_greenlet_runner, self._sock, diff --git a/libs/engineio/async_drivers/threading.py b/libs/engineio/async_drivers/threading.py index 9b5375668..2eebdf477 100644 --- a/libs/engineio/async_drivers/threading.py +++ b/libs/engineio/async_drivers/threading.py @@ -1,17 +1,48 @@ from __future__ import absolute_import +import queue import threading import time try: - import queue + from simple_websocket import Server, ConnectionClosed + _websocket_available = True except ImportError: # pragma: no cover - import Queue as queue + _websocket_available = False + + +class WebSocketWSGI(object): # pragma: no cover + """ + This wrapper class provides a threading WebSocket interface that is + compatible with eventlet's implementation. + """ + def __init__(self, app): + self.app = app + + def __call__(self, environ, start_response): + self.ws = Server(environ) + return self.app(self) + + def close(self): + return self.ws.close() + + def send(self, message): + try: + return self.ws.send(message) + except ConnectionClosed: + raise IOError() + + def wait(self): + try: + return self.ws.receive() + except ConnectionClosed: + raise IOError() + _async = { 'thread': threading.Thread, 'queue': queue.Queue, 'queue_empty': queue.Empty, 'event': threading.Event, - 'websocket': None, + 'websocket': WebSocketWSGI if _websocket_available else None, 'sleep': time.sleep, } diff --git a/libs/engineio/asyncio_client.py b/libs/engineio/asyncio_client.py index 4a11eb3b2..d702be1ff 100644 --- a/libs/engineio/asyncio_client.py +++ b/libs/engineio/asyncio_client.py @@ -57,6 +57,11 @@ class AsyncClient(client.Client): skip SSL certificate verification, allowing connections to servers with self signed certificates. The default is ``True``. + :param handle_sigint: Set to ``True`` to automatically handle disconnection + when the process is interrupted, or to ``False`` to + leave interrupt handling to the calling application. + Interrupt handling can only be enabled when the + client instance is created in the main thread. """ def is_asyncio_based(self): return True @@ -85,9 +90,8 @@ class AsyncClient(client.Client): await eio.connect('http://localhost:5000') """ global async_signal_handler_set - if not async_signal_handler_set and \ + if self.handle_sigint and not async_signal_handler_set and \ threading.current_thread() == threading.main_thread(): - try: asyncio.get_event_loop().add_signal_handler( signal.SIGINT, async_signal_handler) @@ -166,11 +170,7 @@ class AsyncClient(client.Client): :param args: arguments to pass to the function. :param kwargs: keyword arguments to pass to the function. - This function returns an object compatible with the `Thread` class in - the Python standard library. The `start()` method on this object is - already called by this function. - - Note: this method is a coroutine. + The return value is a ``asyncio.Task`` object. """ return asyncio.ensure_future(target(*args, **kwargs)) @@ -191,10 +191,17 @@ class AsyncClient(client.Client): """Create an event object.""" return asyncio.Event() - def _reset(self): - if self.http: # pragma: no cover - asyncio.ensure_future(self.http.close()) - super()._reset() + def __del__(self): # pragma: no cover + # try to close the aiohttp session if it is still open + if self.http and not self.http.closed: + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + loop.ensure_future(self.http.close()) + else: + loop.run_until_complete(self.http.close()) + except: + pass async def _connect_polling(self, url, headers, engineio_path): """Establish a long-polling connection to the Engine.IO server.""" @@ -207,10 +214,10 @@ class AsyncClient(client.Client): r = await self._send_request( 'GET', self.base_url + self._get_url_timestamp(), headers=headers, timeout=self.request_timeout) - if r is None: + if r is None or isinstance(r, str): self._reset() raise exceptions.ConnectionError( - 'Connection refused by the server') + r or 'Connection refused by the server') if r.status < 200 or r.status >= 300: self._reset() try: @@ -416,6 +423,7 @@ class AsyncClient(client.Client): except (aiohttp.ClientError, asyncio.TimeoutError) as exc: self.logger.info('HTTP %s request to %s failed with error %s.', method, url, exc) + return str(exc) async def _trigger_event(self, event, *args, **kwargs): """Invoke an event handler.""" @@ -462,9 +470,9 @@ class AsyncClient(client.Client): r = await self._send_request( 'GET', self.base_url + self._get_url_timestamp(), timeout=max(self.ping_interval, self.ping_timeout) + 5) - if r is None: + if r is None or isinstance(r, str): self.logger.warning( - 'Connection refused by the server, aborting') + r or 'Connection refused by the server, aborting') await self.queue.put(None) break if r.status < 200 or r.status >= 300: @@ -578,13 +586,13 @@ class AsyncClient(client.Client): p = payload.Payload(packets=packets) r = await self._send_request( 'POST', self.base_url, body=p.encode(), - headers={'Content-Type': 'application/octet-stream'}, + headers={'Content-Type': 'text/plain'}, timeout=self.request_timeout) for pkt in packets: self.queue.task_done() - if r is None: + if r is None or isinstance(r, str): self.logger.warning( - 'Connection refused by the server, aborting') + r or 'Connection refused by the server, aborting') break if r.status < 200 or r.status >= 300: self.logger.warning('Unexpected status code %s in server ' diff --git a/libs/engineio/asyncio_server.py b/libs/engineio/asyncio_server.py index 6639f26bf..706b1d6f2 100644 --- a/libs/engineio/asyncio_server.py +++ b/libs/engineio/asyncio_server.py @@ -29,7 +29,7 @@ class AsyncServer(server.Server): is a grace period added by the server. :param ping_timeout: The time in seconds that the client waits for the server to respond before disconnecting. The default - is 5 seconds. + is 20 seconds. :param max_http_buffer_size: The maximum size of a message when using the polling transport. The default is 1,000,000 bytes. @@ -63,6 +63,9 @@ class AsyncServer(server.Server): :param async_handlers: If set to ``True``, run message event handlers in non-blocking threads. To run handlers synchronously, set to ``False``. The default is ``True``. + :param transports: The list of allowed transports. Valid transports + are ``'polling'`` and ``'websocket'``. Defaults to + ``['polling', 'websocket']``. :param kwargs: Reserved for future extensions, any additional parameters given as keyword arguments will be silently ignored. """ @@ -213,6 +216,13 @@ class AsyncServer(server.Server): jsonp = False jsonp_index = None + # make sure the client uses an allowed transport + transport = query.get('transport', ['polling'])[0] + if transport not in self.transports: + self._log_error_once('Invalid transport', 'bad-transport') + return await self._make_response( + self._bad_request('Invalid transport'), environ) + # make sure the client speaks a compatible Engine.IO version sid = query['sid'][0] if 'sid' in query else None if sid is None and query.get('EIO') != ['4']: @@ -239,7 +249,6 @@ class AsyncServer(server.Server): r = self._bad_request('Invalid JSONP index number') elif method == 'GET': if sid is None: - transport = query.get('transport', ['polling'])[0] # transport must be one of 'polling' or 'websocket'. # if 'websocket', the HTTP_UPGRADE header must match. upgrade_header = environ.get('HTTP_UPGRADE').lower() \ @@ -249,9 +258,9 @@ class AsyncServer(server.Server): r = await self._handle_connect(environ, transport, jsonp_index) else: - self._log_error_once('Invalid transport ' + transport, - 'bad-transport') - r = self._bad_request('Invalid transport ' + transport) + self._log_error_once('Invalid websocket upgrade', + 'bad-upgrade') + r = self._bad_request('Invalid websocket upgrade') else: if sid not in self.sockets: self._log_error_once('Invalid session ' + sid, 'bad-sid') diff --git a/libs/engineio/asyncio_socket.py b/libs/engineio/asyncio_socket.py index 508ee3ca2..17265c201 100644 --- a/libs/engineio/asyncio_socket.py +++ b/libs/engineio/asyncio_socket.py @@ -143,12 +143,18 @@ class AsyncSocket(socket.Socket): async def _websocket_handler(self, ws): """Engine.IO handler for websocket transport.""" + async def websocket_wait(): + data = await ws.wait() + if data and len(data) > self.server.max_http_buffer_size: + raise ValueError('packet is too large') + return data + if self.connected: # the socket was already connected, so this is an upgrade self.upgrading = True # hold packet sends during the upgrade try: - pkt = await ws.wait() + pkt = await websocket_wait() except IOError: # pragma: no cover return decoded_pkt = packet.Packet(encoded_packet=pkt) @@ -162,7 +168,7 @@ class AsyncSocket(socket.Socket): await self.queue.put(packet.Packet(packet.NOOP)) # end poll try: - pkt = await ws.wait() + pkt = await websocket_wait() except IOError: # pragma: no cover self.upgrading = False return @@ -204,7 +210,7 @@ class AsyncSocket(socket.Socket): while True: p = None - wait_task = asyncio.ensure_future(ws.wait()) + wait_task = asyncio.ensure_future(websocket_wait()) try: p = await asyncio.wait_for( wait_task, diff --git a/libs/engineio/client.py b/libs/engineio/client.py index d307a5d62..4738671e8 100644 --- a/libs/engineio/client.py +++ b/libs/engineio/client.py @@ -1,10 +1,7 @@ from base64 import b64encode -from json import JSONDecodeError +from engineio.json import JSONDecodeError import logging -try: - import queue -except ImportError: # pragma: no cover - import Queue as queue +import queue import signal import ssl import threading @@ -69,17 +66,18 @@ class Client(object): skip SSL certificate verification, allowing connections to servers with self signed certificates. The default is ``True``. + :param handle_sigint: Set to ``True`` to automatically handle disconnection + when the process is interrupted, or to ``False`` to + leave interrupt handling to the calling application. + Interrupt handling can only be enabled when the + client instance is created in the main thread. """ event_names = ['connect', 'disconnect', 'message'] - def __init__(self, - logger=False, - json=None, - request_timeout=5, - http_session=None, - ssl_verify=True): + def __init__(self, logger=False, json=None, request_timeout=5, + http_session=None, ssl_verify=True, handle_sigint=True): global original_signal_handler - if original_signal_handler is None and \ + if handle_sigint and original_signal_handler is None and \ threading.current_thread() == threading.main_thread(): original_signal_handler = signal.signal(signal.SIGINT, signal_handler) @@ -92,6 +90,7 @@ class Client(object): self.ping_interval = None self.ping_timeout = None self.http = http_session + self.handle_sigint = handle_sigint self.ws = None self.read_loop_task = None self.write_loop_task = None @@ -244,9 +243,9 @@ class Client(object): :param args: arguments to pass to the function. :param kwargs: keyword arguments to pass to the function. - This function returns an object compatible with the `Thread` class in - the Python standard library. The `start()` method on this object is - already called by this function. + This function returns an object that represents the background task, + on which the ``join()`` method can be invoked to wait for the task to + complete. """ th = threading.Thread(target=target, args=args, kwargs=kwargs) th.start() @@ -282,10 +281,10 @@ class Client(object): r = self._send_request( 'GET', self.base_url + self._get_url_timestamp(), headers=headers, timeout=self.request_timeout) - if r is None: + if r is None or isinstance(r, str): self._reset() raise exceptions.ConnectionError( - 'Connection refused by the server') + r or 'Connection refused by the server') if r.status_code < 200 or r.status_code >= 300: self._reset() try: @@ -528,6 +527,7 @@ class Client(object): except requests.exceptions.RequestException as exc: self.logger.info('HTTP %s request to %s failed with error %s.', method, url, exc) + return str(exc) def _trigger_event(self, event, *args, **kwargs): """Invoke an event handler.""" @@ -574,9 +574,9 @@ class Client(object): r = self._send_request( 'GET', self.base_url + self._get_url_timestamp(), timeout=max(self.ping_interval, self.ping_timeout) + 5) - if r is None: + if r is None or isinstance(r, str): self.logger.warning( - 'Connection refused by the server, aborting') + r or 'Connection refused by the server, aborting') self.queue.put(None) break if r.status_code < 200 or r.status_code >= 300: @@ -682,13 +682,13 @@ class Client(object): p = payload.Payload(packets=packets) r = self._send_request( 'POST', self.base_url, body=p.encode(), - headers={'Content-Type': 'application/octet-stream'}, + headers={'Content-Type': 'text/plain'}, timeout=self.request_timeout) for pkt in packets: self.queue.task_done() - if r is None: + if r is None or isinstance(r, str): self.logger.warning( - 'Connection refused by the server, aborting') + r or 'Connection refused by the server, aborting') break if r.status_code < 200 or r.status_code >= 300: self.logger.warning('Unexpected status code %s in server ' diff --git a/libs/engineio/json.py b/libs/engineio/json.py new file mode 100644 index 000000000..b61255683 --- /dev/null +++ b/libs/engineio/json.py @@ -0,0 +1,16 @@ +"""JSON-compatible module with sane defaults.""" + +from json import * # noqa: F401, F403 +from json import loads as original_loads + + +def _safe_int(s): + if len(s) > 100: + raise ValueError('Integer is too large') + return int(s) + + +def loads(*args, **kwargs): + if 'parse_int' not in kwargs: # pragma: no cover + kwargs['parse_int'] = _safe_int + return original_loads(*args, **kwargs) diff --git a/libs/engineio/middleware.py b/libs/engineio/middleware.py index d0bdcc747..5d6ffddf6 100644 --- a/libs/engineio/middleware.py +++ b/libs/engineio/middleware.py @@ -35,7 +35,11 @@ class WSGIApp(object): engineio_path='engine.io'): self.engineio_app = engineio_app self.wsgi_app = wsgi_app - self.engineio_path = engineio_path.strip('/') + self.engineio_path = engineio_path + if not self.engineio_path.startswith('/'): + self.engineio_path = '/' + self.engineio_path + if not self.engineio_path.endswith('/'): + self.engineio_path += '/' self.static_files = static_files or {} def __call__(self, environ, start_response): @@ -55,21 +59,17 @@ class WSGIApp(object): environ['eventlet.input'] = Input(environ['gunicorn.socket']) path = environ['PATH_INFO'] - if path is not None and \ - path.startswith('/{0}/'.format(self.engineio_path)): + if path is not None and path.startswith(self.engineio_path): return self.engineio_app.handle_request(environ, start_response) else: static_file = get_static_file(path, self.static_files) \ if self.static_files else None - if static_file: - if os.path.exists(static_file['filename']): - start_response( - '200 OK', - [('Content-Type', static_file['content_type'])]) - with open(static_file['filename'], 'rb') as f: - return [f.read()] - else: - return self.not_found(start_response) + if static_file and os.path.exists(static_file['filename']): + start_response( + '200 OK', + [('Content-Type', static_file['content_type'])]) + with open(static_file['filename'], 'rb') as f: + return [f.read()] elif self.wsgi_app is not None: return self.wsgi_app(environ, start_response) return self.not_found(start_response) diff --git a/libs/engineio/packet.py b/libs/engineio/packet.py index 9dbd6c684..dda88acab 100644 --- a/libs/engineio/packet.py +++ b/libs/engineio/packet.py @@ -1,5 +1,5 @@ import base64 -import json as _json +from engineio import json as _json (OPEN, CLOSE, PING, PONG, MESSAGE, UPGRADE, NOOP) = (0, 1, 2, 3, 4, 5, 6) packet_names = ['OPEN', 'CLOSE', 'PING', 'PONG', 'MESSAGE', 'UPGRADE', 'NOOP'] @@ -23,7 +23,7 @@ class Packet(object): self.binary = False if self.binary and self.packet_type != MESSAGE: raise ValueError('Binary packets can only be of type MESSAGE') - if encoded_packet: + if encoded_packet is not None: self.decode(encoded_packet) def encode(self, b64=False): diff --git a/libs/engineio/server.py b/libs/engineio/server.py index 7498f3f6b..04bfffbb3 100644 --- a/libs/engineio/server.py +++ b/libs/engineio/server.py @@ -36,7 +36,7 @@ class Server(object): is a grace period added by the server. :param ping_timeout: The time in seconds that the client waits for the server to respond before disconnecting. The default - is 5 seconds. + is 20 seconds. :param max_http_buffer_size: The maximum size of a message when using the polling transport. The default is 1,000,000 bytes. @@ -78,20 +78,25 @@ class Server(object): inactive clients are closed. Set to ``False`` to disable the monitoring task (not recommended). The default is ``True``. + :param transports: The list of allowed transports. Valid transports + are ``'polling'`` and ``'websocket'``. Defaults to + ``['polling', 'websocket']``. :param kwargs: Reserved for future extensions, any additional parameters given as keyword arguments will be silently ignored. """ compression_methods = ['gzip', 'deflate'] event_names = ['connect', 'disconnect', 'message'] + valid_transports = ['polling', 'websocket'] _default_monitor_clients = True sequence_number = 0 - def __init__(self, async_mode=None, ping_interval=25, ping_timeout=5, + def __init__(self, async_mode=None, ping_interval=25, ping_timeout=20, max_http_buffer_size=1000000, allow_upgrades=True, http_compression=True, compression_threshold=1024, cookie=None, cors_allowed_origins=None, cors_credentials=True, logger=False, json=None, - async_handlers=True, monitor_clients=None, **kwargs): + async_handlers=True, monitor_clients=None, transports=None, + **kwargs): self.ping_timeout = ping_timeout if isinstance(ping_interval, tuple): self.ping_interval = ping_interval[0] @@ -152,6 +157,14 @@ class Server(object): self._async['asyncio']: # pragma: no cover raise ValueError('The selected async_mode requires asyncio and ' 'must use the AsyncServer class') + if transports is not None: + if isinstance(transports, str): + transports = [transports] + transports = [transport for transport in transports + if transport in self.valid_transports] + if not transports: + raise ValueError('No valid transports provided') + self.transports = transports or self.valid_transports self.logger.info('Server initialized for %s.', self.async_mode) def is_asyncio_based(self): @@ -333,8 +346,7 @@ class Server(object): allowed_origins: self._log_error_once( origin + ' is not an accepted origin.', 'bad-origin') - r = self._bad_request( - origin + ' is not an accepted origin.') + r = self._bad_request('Not an accepted origin.') start_response(r['status'], r['headers']) return [r['response']] @@ -343,6 +355,14 @@ class Server(object): jsonp = False jsonp_index = None + # make sure the client uses an allowed transport + transport = query.get('transport', ['polling'])[0] + if transport not in self.transports: + self._log_error_once('Invalid transport', 'bad-transport') + r = self._bad_request('Invalid transport') + start_response(r['status'], r['headers']) + return [r['response']] + # make sure the client speaks a compatible Engine.IO version sid = query['sid'][0] if 'sid' in query else None if sid is None and query.get('EIO') != ['4']: @@ -369,7 +389,6 @@ class Server(object): r = self._bad_request('Invalid JSONP index number') elif method == 'GET': if sid is None: - transport = query.get('transport', ['polling'])[0] # transport must be one of 'polling' or 'websocket'. # if 'websocket', the HTTP_UPGRADE header must match. upgrade_header = environ.get('HTTP_UPGRADE').lower() \ @@ -379,13 +398,13 @@ class Server(object): r = self._handle_connect(environ, start_response, transport, jsonp_index) else: - self._log_error_once('Invalid transport ' + transport, - 'bad-transport') - r = self._bad_request('Invalid transport ' + transport) + self._log_error_once('Invalid websocket upgrade', + 'bad-upgrade') + r = self._bad_request('Invalid websocket upgrade') else: if sid not in self.sockets: self._log_error_once('Invalid session ' + sid, 'bad-sid') - r = self._bad_request('Invalid session ' + sid) + r = self._bad_request('Invalid session') else: socket = self._get_socket(sid) try: @@ -405,7 +424,7 @@ class Server(object): if sid is None or sid not in self.sockets: self._log_error_once( 'Invalid session ' + (sid or 'None'), 'bad-sid') - r = self._bad_request('Invalid session ' + (sid or 'None')) + r = self._bad_request('Invalid session') else: socket = self._get_socket(sid) try: @@ -453,9 +472,9 @@ class Server(object): :param args: arguments to pass to the function. :param kwargs: keyword arguments to pass to the function. - This function returns an object compatible with the `Thread` class in - the Python standard library. The `start()` method on this object is - already called by this function. + This function returns an object that represents the background task, + on which the ``join()`` methond can be invoked to wait for the task to + complete. """ th = self._async['thread'](target=target, args=args, kwargs=kwargs) th.start() @@ -581,7 +600,14 @@ class Server(object): def _upgrades(self, sid, transport): """Return the list of possible upgrades for a client connection.""" if not self.allow_upgrades or self._get_socket(sid).upgraded or \ - self._async['websocket'] is None or transport == 'websocket': + transport == 'websocket': + return [] + if self._async['websocket'] is None: # pragma: no cover + self._log_error_once( + 'The WebSocket transport is not available, you must install a ' + 'WebSocket server that is compatible with your async mode to ' + 'enable it. See the documentation for details.', + 'no-websocket') return [] return ['websocket'] @@ -656,13 +682,15 @@ class Server(object): if 'wsgi.url_scheme' in environ and 'HTTP_HOST' in environ: default_origins.append('{scheme}://{host}'.format( scheme=environ['wsgi.url_scheme'], host=environ['HTTP_HOST'])) - if 'HTTP_X_FORWARDED_HOST' in environ: + if 'HTTP_X_FORWARDED_PROTO' in environ or \ + 'HTTP_X_FORWARDED_HOST' in environ: scheme = environ.get( 'HTTP_X_FORWARDED_PROTO', environ['wsgi.url_scheme']).split(',')[0].strip() default_origins.append('{scheme}://{host}'.format( - scheme=scheme, host=environ['HTTP_X_FORWARDED_HOST'].split( - ',')[0].strip())) + scheme=scheme, host=environ.get( + 'HTTP_X_FORWARDED_HOST', environ['HTTP_HOST']).split( + ',')[0].strip())) if self.cors_allowed_origins is None: allowed_origins = default_origins elif self.cors_allowed_origins == '*': diff --git a/libs/engineio/socket.py b/libs/engineio/socket.py index 1434b191d..be0c83f6b 100644 --- a/libs/engineio/socket.py +++ b/libs/engineio/socket.py @@ -159,6 +159,12 @@ class Socket(object): def _websocket_handler(self, ws): """Engine.IO handler for websocket transport.""" + def websocket_wait(): + data = ws.wait() + if data and len(data) > self.server.max_http_buffer_size: + raise ValueError('packet is too large') + return data + # try to set a socket timeout matching the configured ping interval # and timeout for attr in ['_sock', 'socket']: # pragma: no cover @@ -170,7 +176,7 @@ class Socket(object): # the socket was already connected, so this is an upgrade self.upgrading = True # hold packet sends during the upgrade - pkt = ws.wait() + pkt = websocket_wait() decoded_pkt = packet.Packet(encoded_packet=pkt) if decoded_pkt.packet_type != packet.PING or \ decoded_pkt.data != 'probe': @@ -181,7 +187,7 @@ class Socket(object): ws.send(packet.Packet(packet.PONG, data='probe').encode()) self.queue.put(packet.Packet(packet.NOOP)) # end poll - pkt = ws.wait() + pkt = websocket_wait() decoded_pkt = packet.Packet(encoded_packet=pkt) if decoded_pkt.packet_type != packet.UPGRADE: self.upgraded = False @@ -221,7 +227,7 @@ class Socket(object): while True: p = None try: - p = ws.wait() + p = websocket_wait() except Exception as e: # if the socket is already closed, we can assume this is a # downstream error of that diff --git a/libs/engineio/static_files.py b/libs/engineio/static_files.py index 3058f6ea4..77c891571 100644 --- a/libs/engineio/static_files.py +++ b/libs/engineio/static_files.py @@ -21,23 +21,28 @@ def get_static_file(path, static_files): "content_type". If the requested URL does not match any static file, the return value is None. """ + extra_path = '' if path in static_files: f = static_files[path] else: f = None - rest = '' while path != '': path, last = path.rsplit('/', 1) - rest = '/' + last + rest + extra_path = '/' + last + extra_path if path in static_files: - f = static_files[path] + rest + f = static_files[path] break elif path + '/' in static_files: - f = static_files[path + '/'] + rest[1:] + f = static_files[path + '/'] break if f: if isinstance(f, str): f = {'filename': f} + else: + f = f.copy() # in case it is mutated below + if f['filename'].endswith('/') and extra_path.startswith('/'): + extra_path = extra_path[1:] + f['filename'] += extra_path if f['filename'].endswith('/'): if '' in static_files: if isinstance(static_files[''], str): |