aboutsummaryrefslogtreecommitdiffhomepage
path: root/libs/socketio/async_aiopika_manager.py
blob: b6f09b8b5cb2dfb3bcd341b7e273b4be06331362 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import asyncio
import pickle

from .async_pubsub_manager import AsyncPubSubManager

try:
    import aio_pika
except ImportError:
    aio_pika = None


class AsyncAioPikaManager(AsyncPubSubManager):  # pragma: no cover
    """Client manager that uses aio_pika for inter-process messaging under
    asyncio.

    This class implements a client manager backend for event sharing across
    multiple processes, using RabbitMQ

    To use a aio_pika backend, initialize the :class:`Server` instance as
    follows::

        url = 'amqp://user:password@hostname:port//'
        server = socketio.Server(client_manager=socketio.AsyncAioPikaManager(
            url))

    :param url: The connection URL for the backend messaging queue. Example
                connection URLs are ``'amqp://guest:guest@localhost:5672//'``
                for RabbitMQ.
    :param channel: The channel name on which the server sends and receives
                    notifications. Must be the same in all the servers.
                    With this manager, the channel name is the exchange name
                    in rabbitmq
    :param write_only: If set to ``True``, only initialize to emit events. The
                       default of ``False`` initializes the class for emitting
                       and receiving.
    """

    name = 'asyncaiopika'

    def __init__(self, url='amqp://guest:guest@localhost:5672//',
                 channel='socketio', write_only=False, logger=None):
        if aio_pika is None:
            raise RuntimeError('aio_pika package is not installed '
                               '(Run "pip install aio_pika" in your '
                               'virtualenv).')
        self.url = url
        self._lock = asyncio.Lock()
        self.publisher_connection = None
        self.publisher_channel = None
        self.publisher_exchange = None
        super().__init__(channel=channel, write_only=write_only, logger=logger)

    async def _connection(self):
        return await aio_pika.connect_robust(self.url)

    async def _channel(self, connection):
        return await connection.channel()

    async def _exchange(self, channel):
        return await channel.declare_exchange(self.channel,
                                              aio_pika.ExchangeType.FANOUT)

    async def _queue(self, channel, exchange):
        queue = await channel.declare_queue(durable=False,
                                            arguments={'x-expires': 300000})
        await queue.bind(exchange)
        return queue

    async def _publish(self, data):
        if self.publisher_connection is None:
            async with self._lock:
                if self.publisher_connection is None:
                    self.publisher_connection = await self._connection()
                    self.publisher_channel = await self._channel(
                        self.publisher_connection
                    )
                    self.publisher_exchange = await self._exchange(
                        self.publisher_channel
                    )
        retry = True
        while True:
            try:
                await self.publisher_exchange.publish(
                    aio_pika.Message(
                        body=pickle.dumps(data),
                        delivery_mode=aio_pika.DeliveryMode.PERSISTENT
                    ), routing_key='*',
                )
                break
            except aio_pika.AMQPException:
                if retry:
                    self._get_logger().error('Cannot publish to rabbitmq... '
                                             'retrying')
                    retry = False
                else:
                    self._get_logger().error(
                        'Cannot publish to rabbitmq... giving up')
                    break
            except aio_pika.exceptions.ChannelInvalidStateError:
                # aio_pika raises this exception when the task is cancelled
                raise asyncio.CancelledError()

    async def _listen(self):
        async with (await self._connection()) as connection:
            channel = await self._channel(connection)
            await channel.set_qos(prefetch_count=1)
            exchange = await self._exchange(channel)
            queue = await self._queue(channel, exchange)

            retry_sleep = 1
            while True:
                try:
                    async with queue.iterator() as queue_iter:
                        async for message in queue_iter:
                            async with message.process():
                                yield pickle.loads(message.body)
                                retry_sleep = 1
                except aio_pika.AMQPException:
                    self._get_logger().error(
                        'Cannot receive from rabbitmq... '
                        'retrying in {} secs'.format(retry_sleep))
                    await asyncio.sleep(retry_sleep)
                    retry_sleep = min(retry_sleep * 2, 60)
                except aio_pika.exceptions.ChannelInvalidStateError:
                    # aio_pika raises this exception when the task is cancelled
                    raise asyncio.CancelledError()