summaryrefslogtreecommitdiffhomepage
path: root/libs/socketio/zmq_manager.py
blob: 54538cf1b5c9b2310355c3a6a6ececf95f6a907b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import pickle
import re

try:
    import eventlet.green.zmq as zmq
except ImportError:
    zmq = None

from .pubsub_manager import PubSubManager


class ZmqManager(PubSubManager):  # pragma: no cover
    """zmq based client manager.

    NOTE: this zmq implementation should be considered experimental at this
    time. At this time, eventlet is required to use zmq.

    This class implements a zmq backend for event sharing across multiple
    processes. To use a zmq backend, initialize the :class:`Server` instance as
    follows::

        url = 'zmq+tcp://hostname:port1+port2'
        server = socketio.Server(client_manager=socketio.ZmqManager(url))

    :param url: The connection URL for the zmq message broker,
                which will need to be provided and running.
    :param channel: The channel name on which the server sends and receives
                    notifications. Must be the same in all the servers.
    :param write_only: If set to ``True``, only initialize to emit events. The
                       default of ``False`` initializes the class for emitting
                       and receiving.

    A zmq message broker must be running for the zmq_manager to work.
    you can write your own or adapt one from the following simple broker
    below::

        import zmq

        receiver = zmq.Context().socket(zmq.PULL)
        receiver.bind("tcp://*:5555")

        publisher = zmq.Context().socket(zmq.PUB)
        publisher.bind("tcp://*:5556")

        while True:
            publisher.send(receiver.recv())
    """
    name = 'zmq'

    def __init__(self, url='zmq+tcp://localhost:5555+5556',
                 channel='socketio',
                 write_only=False,
                 logger=None):
        if zmq is None:
            raise RuntimeError('zmq package is not installed '
                               '(Run "pip install pyzmq" in your '
                               'virtualenv).')

        r = re.compile(r':\d+\+\d+$')
        if not (url.startswith('zmq+tcp://') and r.search(url)):
            raise RuntimeError('unexpected connection string: ' + url)

        url = url.replace('zmq+', '')
        (sink_url, sub_port) = url.split('+')
        sink_port = sink_url.split(':')[-1]
        sub_url = sink_url.replace(sink_port, sub_port)

        sink = zmq.Context().socket(zmq.PUSH)
        sink.connect(sink_url)

        sub = zmq.Context().socket(zmq.SUB)
        sub.setsockopt_string(zmq.SUBSCRIBE, u'')
        sub.connect(sub_url)

        self.sink = sink
        self.sub = sub
        self.channel = channel
        super(ZmqManager, self).__init__(channel=channel,
                                         write_only=write_only,
                                         logger=logger)

    def _publish(self, data):
        pickled_data = pickle.dumps(
            {
                'type': 'message',
                'channel': self.channel,
                'data': data
            }
        )
        return self.sink.send(pickled_data)

    def zmq_listen(self):
        while True:
            response = self.sub.recv()
            if response is not None:
                yield response

    def _listen(self):
        for message in self.zmq_listen():
            if isinstance(message, bytes):
                try:
                    message = pickle.loads(message)
                except Exception:
                    pass
            if isinstance(message, dict) and \
                    message['type'] == 'message' and \
                    message['channel'] == self.channel and \
                    'data' in message:
                yield message['data']
        return