1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
|
import asyncio
from socketio import AsyncClient
from socketio.exceptions import SocketIOError, TimeoutError, DisconnectedError
class AsyncSimpleClient:
"""A Socket.IO client.
This class implements a simple, yet fully compliant Socket.IO web client
with support for websocket and long-polling transports.
The positional and keyword arguments given in the constructor are passed
to the underlying :func:`socketio.AsyncClient` object.
"""
def __init__(self, *args, **kwargs):
self.client_args = args
self.client_kwargs = kwargs
self.client = None
self.namespace = '/'
self.connected_event = asyncio.Event()
self.connected = False
self.input_event = asyncio.Event()
self.input_buffer = []
async def connect(self, url, headers={}, auth=None, transports=None,
namespace='/', socketio_path='socket.io',
wait_timeout=5):
"""Connect to a Socket.IO server.
:param url: The URL of the Socket.IO server. It can include custom
query string parameters if required by the server. If a
function is provided, the client will invoke it to obtain
the URL each time a connection or reconnection is
attempted.
:param headers: A dictionary with custom headers to send with the
connection request. If a function is provided, the
client will invoke it to obtain the headers dictionary
each time a connection or reconnection is attempted.
:param auth: Authentication data passed to the server with the
connection request, normally a dictionary with one or
more string key/value pairs. If a function is provided,
the client will invoke it to obtain the authentication
data each time a connection or reconnection is attempted.
:param transports: The list of allowed transports. Valid transports
are ``'polling'`` and ``'websocket'``. If not
given, the polling transport is connected first,
then an upgrade to websocket is attempted.
:param namespace: The namespace to connect to as a string. If not
given, the default namespace ``/`` is used.
:param socketio_path: The endpoint where the Socket.IO server is
installed. The default value is appropriate for
most cases.
:param wait_timeout: How long the client should wait for the
connection. The default is 5 seconds.
Note: this method is a coroutine.
"""
if self.connected:
raise RuntimeError('Already connected')
self.namespace = namespace
self.input_buffer = []
self.input_event.clear()
self.client = AsyncClient(*self.client_args, **self.client_kwargs)
@self.client.event(namespace=self.namespace)
def connect(): # pragma: no cover
self.connected = True
self.connected_event.set()
@self.client.event(namespace=self.namespace)
def disconnect(): # pragma: no cover
self.connected_event.clear()
@self.client.event(namespace=self.namespace)
def __disconnect_final(): # pragma: no cover
self.connected = False
self.connected_event.set()
@self.client.on('*', namespace=self.namespace)
def on_event(event, *args): # pragma: no cover
self.input_buffer.append([event, *args])
self.input_event.set()
await self.client.connect(
url, headers=headers, auth=auth, transports=transports,
namespaces=[namespace], socketio_path=socketio_path,
wait_timeout=wait_timeout)
@property
def sid(self):
"""The session ID received from the server.
The session ID is not guaranteed to remain constant throughout the life
of the connection, as reconnections can cause it to change.
"""
return self.client.get_sid(self.namespace) if self.client else None
@property
def transport(self):
"""The name of the transport currently in use.
The transport is returned as a string and can be one of ``polling``
and ``websocket``.
"""
return self.client.transport if self.client else ''
async def emit(self, event, data=None):
"""Emit an event to the server.
:param event: The event name. It can be any string. The event names
``'connect'``, ``'message'`` and ``'disconnect'`` are
reserved and should not be used.
:param data: The data to send to the server. Data can be of
type ``str``, ``bytes``, ``list`` or ``dict``. To send
multiple arguments, use a tuple where each element is of
one of the types indicated above.
Note: this method is a coroutine.
This method schedules the event to be sent out and returns, without
actually waiting for its delivery. In cases where the client needs to
ensure that the event was received, :func:`socketio.SimpleClient.call`
should be used instead.
"""
while True:
await self.connected_event.wait()
if not self.connected:
raise DisconnectedError()
try:
return await self.client.emit(event, data,
namespace=self.namespace)
except SocketIOError:
pass
async def call(self, event, data=None, timeout=60):
"""Emit an event to the server and wait for a response.
This method issues an emit and waits for the server to provide a
response or acknowledgement. If the response does not arrive before the
timeout, then a ``TimeoutError`` exception is raised.
:param event: The event name. It can be any string. The event names
``'connect'``, ``'message'`` and ``'disconnect'`` are
reserved and should not be used.
:param data: The data to send to the server. Data can be of
type ``str``, ``bytes``, ``list`` or ``dict``. To send
multiple arguments, use a tuple where each element is of
one of the types indicated above.
:param timeout: The waiting timeout. If the timeout is reached before
the server acknowledges the event, then a
``TimeoutError`` exception is raised.
Note: this method is a coroutine.
"""
while True:
await self.connected_event.wait()
if not self.connected:
raise DisconnectedError()
try:
return await self.client.call(event, data,
namespace=self.namespace,
timeout=timeout)
except SocketIOError:
pass
async def receive(self, timeout=None):
"""Wait for an event from the server.
:param timeout: The waiting timeout. If the timeout is reached before
the server acknowledges the event, then a
``TimeoutError`` exception is raised.
Note: this method is a coroutine.
The return value is a list with the event name as the first element. If
the server included arguments with the event, they are returned as
additional list elements.
"""
while not self.input_buffer:
try:
await asyncio.wait_for(self.connected_event.wait(),
timeout=timeout)
except asyncio.TimeoutError: # pragma: no cover
raise TimeoutError()
if not self.connected:
raise DisconnectedError()
try:
await asyncio.wait_for(self.input_event.wait(),
timeout=timeout)
except asyncio.TimeoutError:
raise TimeoutError()
self.input_event.clear()
return self.input_buffer.pop(0)
async def disconnect(self):
"""Disconnect from the server.
Note: this method is a coroutine.
"""
if self.connected:
await self.client.disconnect()
self.client = None
self.connected = False
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.disconnect()
|