summaryrefslogtreecommitdiffhomepage
path: root/libs/socketio/kafka_manager.py
blob: 739871a338dc6f2762671f0c78b2768711721381 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import logging
import pickle

try:
    import kafka
except ImportError:
    kafka = None

from .pubsub_manager import PubSubManager

logger = logging.getLogger('socketio')


class KafkaManager(PubSubManager):  # pragma: no cover
    """Kafka based client manager.

    This class implements a Kafka backend for event sharing across multiple
    processes.

    To use a Kafka backend, initialize the :class:`Server` instance as
    follows::

        url = 'kafka://hostname:port'
        server = socketio.Server(client_manager=socketio.KafkaManager(url))

    :param url: The connection URL for the Kafka server. For a default Kafka
                store running on the same host, use ``kafka://``. For a highly
                available deployment of Kafka, pass a list with all the
                connection URLs available in your cluster.
    :param channel: The channel name (topic) on which the server sends and
                    receives notifications. Must be the same in all the
                    servers.
    :param write_only: If set to ``True``, only initialize to emit events. The
                       default of ``False`` initializes the class for emitting
                       and receiving.
    """
    name = 'kafka'

    def __init__(self, url='kafka://localhost:9092', channel='socketio',
                 write_only=False):
        if kafka is None:
            raise RuntimeError('kafka-python package is not installed '
                               '(Run "pip install kafka-python" in your '
                               'virtualenv).')

        super(KafkaManager, self).__init__(channel=channel,
                                           write_only=write_only)

        urls = [url] if isinstance(url, str) else url
        self.kafka_urls = [url[8:] if url != 'kafka://' else 'localhost:9092'
                           for url in urls]
        self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_urls)
        self.consumer = kafka.KafkaConsumer(self.channel,
                                            bootstrap_servers=self.kafka_urls)

    def _publish(self, data):
        self.producer.send(self.channel, value=pickle.dumps(data))
        self.producer.flush()

    def _kafka_listen(self):
        for message in self.consumer:
            yield message

    def _listen(self):
        for message in self._kafka_listen():
            if message.topic == self.channel:
                yield pickle.loads(message.value)