diff options
author | morpheus65535 <[email protected]> | 2024-03-03 12:15:23 -0500 |
---|---|---|
committer | GitHub <[email protected]> | 2024-03-03 12:15:23 -0500 |
commit | 03afeb347075381bcb7fd6036295c9fa4a90d2dc (patch) | |
tree | 7c5d72c973d2c8e4ade57391a1c9ad5e94903a46 /libs/socketio/async_aiopika_manager.py | |
parent | 9ae684240b5bdd40a870d8122f0e380f8d03a187 (diff) | |
download | bazarr-03afeb347075381bcb7fd6036295c9fa4a90d2dc.tar.gz bazarr-03afeb347075381bcb7fd6036295c9fa4a90d2dc.zip |
Updated multiple Python modules (now in libs and custom_libs directories) and React libraries
Diffstat (limited to 'libs/socketio/async_aiopika_manager.py')
-rw-r--r-- | libs/socketio/async_aiopika_manager.py | 126 |
1 files changed, 126 insertions, 0 deletions
diff --git a/libs/socketio/async_aiopika_manager.py b/libs/socketio/async_aiopika_manager.py new file mode 100644 index 000000000..b6f09b8b5 --- /dev/null +++ b/libs/socketio/async_aiopika_manager.py @@ -0,0 +1,126 @@ +import asyncio +import pickle + +from .async_pubsub_manager import AsyncPubSubManager + +try: + import aio_pika +except ImportError: + aio_pika = None + + +class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover + """Client manager that uses aio_pika for inter-process messaging under + asyncio. + + This class implements a client manager backend for event sharing across + multiple processes, using RabbitMQ + + To use a aio_pika backend, initialize the :class:`Server` instance as + follows:: + + url = 'amqp://user:password@hostname:port//' + server = socketio.Server(client_manager=socketio.AsyncAioPikaManager( + url)) + + :param url: The connection URL for the backend messaging queue. Example + connection URLs are ``'amqp://guest:guest@localhost:5672//'`` + for RabbitMQ. + :param channel: The channel name on which the server sends and receives + notifications. Must be the same in all the servers. + With this manager, the channel name is the exchange name + in rabbitmq + :param write_only: If set to ``True``, only initialize to emit events. The + default of ``False`` initializes the class for emitting + and receiving. + """ + + name = 'asyncaiopika' + + def __init__(self, url='amqp://guest:guest@localhost:5672//', + channel='socketio', write_only=False, logger=None): + if aio_pika is None: + raise RuntimeError('aio_pika package is not installed ' + '(Run "pip install aio_pika" in your ' + 'virtualenv).') + self.url = url + self._lock = asyncio.Lock() + self.publisher_connection = None + self.publisher_channel = None + self.publisher_exchange = None + super().__init__(channel=channel, write_only=write_only, logger=logger) + + async def _connection(self): + return await aio_pika.connect_robust(self.url) + + async def _channel(self, connection): + return await connection.channel() + + async def _exchange(self, channel): + return await channel.declare_exchange(self.channel, + aio_pika.ExchangeType.FANOUT) + + async def _queue(self, channel, exchange): + queue = await channel.declare_queue(durable=False, + arguments={'x-expires': 300000}) + await queue.bind(exchange) + return queue + + async def _publish(self, data): + if self.publisher_connection is None: + async with self._lock: + if self.publisher_connection is None: + self.publisher_connection = await self._connection() + self.publisher_channel = await self._channel( + self.publisher_connection + ) + self.publisher_exchange = await self._exchange( + self.publisher_channel + ) + retry = True + while True: + try: + await self.publisher_exchange.publish( + aio_pika.Message( + body=pickle.dumps(data), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT + ), routing_key='*', + ) + break + except aio_pika.AMQPException: + if retry: + self._get_logger().error('Cannot publish to rabbitmq... ' + 'retrying') + retry = False + else: + self._get_logger().error( + 'Cannot publish to rabbitmq... giving up') + break + except aio_pika.exceptions.ChannelInvalidStateError: + # aio_pika raises this exception when the task is cancelled + raise asyncio.CancelledError() + + async def _listen(self): + async with (await self._connection()) as connection: + channel = await self._channel(connection) + await channel.set_qos(prefetch_count=1) + exchange = await self._exchange(channel) + queue = await self._queue(channel, exchange) + + retry_sleep = 1 + while True: + try: + async with queue.iterator() as queue_iter: + async for message in queue_iter: + async with message.process(): + yield pickle.loads(message.body) + retry_sleep = 1 + except aio_pika.AMQPException: + self._get_logger().error( + 'Cannot receive from rabbitmq... ' + 'retrying in {} secs'.format(retry_sleep)) + await asyncio.sleep(retry_sleep) + retry_sleep = min(retry_sleep * 2, 60) + except aio_pika.exceptions.ChannelInvalidStateError: + # aio_pika raises this exception when the task is cancelled + raise asyncio.CancelledError() |