summaryrefslogtreecommitdiffhomepage
path: root/bazarr/app/signalr_client.py
blob: 2f048eb17af67e9023c4ecdaba5dac6d1fa13284 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# coding=utf-8

import logging
import json
import time
import threading

from requests import Session
from signalr import Connection
from requests.exceptions import ConnectionError
from signalrcore.hub_connection_builder import HubConnectionBuilder
from collections import deque
from time import sleep

from constants import headers
from app.event_handler import event_stream
from sonarr.sync.episodes import sync_episodes, sync_one_episode
from sonarr.sync.series import update_series, update_one_series
from radarr.sync.movies import update_movies, update_one_movie
from sonarr.info import get_sonarr_info, url_sonarr
from radarr.info import url_radarr
from .database import TableShows, TableMovies, database, select

from .config import settings
from .scheduler import scheduler
from .get_args import args


sonarr_queue = deque()
radarr_queue = deque()

last_event_data = None


class SonarrSignalrClientLegacy:
    def __init__(self):
        super(SonarrSignalrClientLegacy, self).__init__()
        self.apikey_sonarr = None
        self.session = Session()
        self.session.timeout = 60
        self.session.verify = False
        self.session.headers = headers
        self.connection = None
        self.connected = False

    def start(self):
        if get_sonarr_info.is_legacy():
            logging.warning(
                f'BAZARR can only sync from Sonarr v3 SignalR feed to get real-time update. You should consider '
                f'upgrading your version({get_sonarr_info.version()}).')
        else:
            self.connected = False
            event_stream(type='badges')
            logging.info('BAZARR trying to connect to Sonarr SignalR feed...')
            self.configure()
            while not self.connection.started:
                try:
                    self.connection.start()
                except ConnectionError:
                    time.sleep(5)
                except json.decoder.JSONDecodeError:
                    logging.error("BAZARR cannot parse JSON returned by SignalR feed. This is caused by a permissions "
                                  "issue when Sonarr try to access its /config/.config directory."
                                  "Typically permissions are too permissive - only the user and group Sonarr runs as "
                                  "should have Read/Write permissions (e.g. files 664 / folders 775). You should fix "
                                  "permissions on that directory and restart Sonarr. Also, if you're a Docker image "
                                  "user, you should make sure you properly defined PUID/PGID environment variables. "
                                  "Otherwise, please contact Sonarr support.")
                    self.stop()
                    break
                else:
                    self.connected = True
                    event_stream(type='badges')
                    logging.info('BAZARR SignalR client for Sonarr is connected and waiting for events.')
                    if not args.dev:
                        scheduler.add_job(update_series, kwargs={'send_event': True}, max_instances=1)

    def stop(self, log=True):
        try:
            self.connection.close()
        except Exception:
            self.connection.started = False
        if log:
            logging.info('BAZARR SignalR client for Sonarr is now disconnected.')

    def restart(self):
        if self.connection:
            if self.connection.started:
                self.stop(log=False)
        if settings.general.use_sonarr:
            self.start()

    def exception_handler(self):
        sonarr_queue.clear()
        self.connected = False
        event_stream(type='badges')
        logging.error('BAZARR connection to Sonarr SignalR feed has been lost.')
        self.restart()

    def configure(self):
        self.apikey_sonarr = settings.sonarr.apikey
        self.connection = Connection(f"{url_sonarr()}/signalr", self.session)
        self.connection.qs = {'apikey': self.apikey_sonarr}
        sonarr_hub = self.connection.register_hub('')  # Sonarr doesn't use named hub

        sonarr_method = ['series', 'episode']
        for item in sonarr_method:
            sonarr_hub.client.on(item, feed_queue)

        self.connection.exception += self.exception_handler


class SonarrSignalrClient:
    def __init__(self):
        super(SonarrSignalrClient, self).__init__()
        self.apikey_sonarr = None
        self.connection = None
        self.connected = False

    def start(self):
        self.configure()
        logging.info('BAZARR trying to connect to Sonarr SignalR feed...')
        while self.connection.transport.state.value not in [0, 1, 2]:
            try:
                self.connection.start()
            except ConnectionError:
                time.sleep(5)

    def stop(self):
        logging.info('BAZARR SignalR client for Sonarr is now disconnected.')
        self.connection.stop()

    def restart(self):
        if self.connection:
            if self.connection.transport.state.value in [0, 1, 2]:
                self.stop()
        if settings.general.use_sonarr:
            self.start()

    def exception_handler(self):
        sonarr_queue.clear()
        self.connected = False
        event_stream(type='badges')
        logging.error("BAZARR connection to Sonarr SignalR feed has failed. We'll try to reconnect.")
        self.restart()

    def on_connect_handler(self):
        self.connected = True
        event_stream(type='badges')
        logging.info('BAZARR SignalR client for Sonarr is connected and waiting for events.')
        if not args.dev:
            scheduler.add_job(update_series, kwargs={'send_event': True}, max_instances=1)

    def on_reconnect_handler(self):
        self.connected = False
        event_stream(type='badges')
        logging.error('BAZARR SignalR client for Sonarr connection as been lost. Trying to reconnect...')

    def configure(self):
        self.apikey_sonarr = settings.sonarr.apikey
        self.connection = HubConnectionBuilder() \
            .with_url(f"{url_sonarr()}/signalr/messages?access_token={self.apikey_sonarr}",
                      options={
                          "verify_ssl": False,
                          "headers": headers
                      }) \
            .with_automatic_reconnect({
                "type": "raw",
                "keep_alive_interval": 5,
                "reconnect_interval": 180,
                "max_attempts": None
            }).build()
        self.connection.on_open(self.on_connect_handler)
        self.connection.on_reconnect(self.on_reconnect_handler)
        self.connection.on_close(lambda: logging.debug('BAZARR SignalR client for Sonarr is disconnected.'))
        self.connection.on_error(self.exception_handler)
        self.connection.on("receiveMessage", feed_queue)


class RadarrSignalrClient:
    def __init__(self):
        super(RadarrSignalrClient, self).__init__()
        self.apikey_radarr = None
        self.connection = None
        self.connected = False

    def start(self):
        self.configure()
        logging.info('BAZARR trying to connect to Radarr SignalR feed...')
        while self.connection.transport.state.value not in [0, 1, 2]:
            try:
                self.connection.start()
            except ConnectionError:
                time.sleep(5)

    def stop(self):
        logging.info('BAZARR SignalR client for Radarr is now disconnected.')
        self.connection.stop()

    def restart(self):
        if self.connection:
            if self.connection.transport.state.value in [0, 1, 2]:
                self.stop()
        if settings.general.use_radarr:
            self.start()

    def exception_handler(self):
        radarr_queue.clear()
        self.connected = False
        event_stream(type='badges')
        logging.error("BAZARR connection to Radarr SignalR feed has failed. We'll try to reconnect.")
        self.restart()

    def on_connect_handler(self):
        self.connected = True
        event_stream(type='badges')
        logging.info('BAZARR SignalR client for Radarr is connected and waiting for events.')
        if not args.dev:
            scheduler.add_job(update_movies, kwargs={'send_event': True}, max_instances=1)

    def on_reconnect_handler(self):
        self.connected = False
        event_stream(type='badges')
        logging.error('BAZARR SignalR client for Radarr connection as been lost. Trying to reconnect...')

    def configure(self):
        self.apikey_radarr = settings.radarr.apikey
        self.connection = HubConnectionBuilder() \
            .with_url(f"{url_radarr()}/signalr/messages?access_token={self.apikey_radarr}",
                      options={
                          "verify_ssl": False,
                          "headers": headers
                      }) \
            .with_automatic_reconnect({
                "type": "raw",
                "keep_alive_interval": 5,
                "reconnect_interval": 180,
                "max_attempts": None
            }).build()
        self.connection.on_open(self.on_connect_handler)
        self.connection.on_reconnect(self.on_reconnect_handler)
        self.connection.on_close(lambda: logging.debug('BAZARR SignalR client for Radarr is disconnected.'))
        self.connection.on_error(self.exception_handler)
        self.connection.on("receiveMessage", feed_queue)


def dispatcher(data):
    try:
        series_title = series_year = episode_title = season_number = episode_number = movie_title = movie_year = None

        #
        try:
            episodesChanged = False
            topic = data['name']

            media_id = data['body']['resource']['id']
            action = data['body']['action']
            if topic == 'series':
                if 'episodesChanged' in data['body']['resource']:
                    episodesChanged = data['body']['resource']['episodesChanged']
                series_title = data['body']['resource']['title']
                series_year = data['body']['resource']['year']
            elif topic == 'episode':
                if 'series' in data['body']['resource']:
                    series_title = data['body']['resource']['series']['title']
                    series_year = data['body']['resource']['series']['year']
                else:
                    series_metadata = database.execute(
                        select(TableShows.title, TableShows.year)
                        .where(TableShows.sonarrSeriesId == data['body']['resource']['seriesId']))\
                        .first()
                    if series_metadata:
                        series_title = series_metadata.title
                        series_year = series_metadata.year
                episode_title = data['body']['resource']['title']
                season_number = data['body']['resource']['seasonNumber']
                episode_number = data['body']['resource']['episodeNumber']
            elif topic == 'movie':
                if action == 'deleted':
                    existing_movie_details = database.execute(
                        select(TableMovies.title, TableMovies.year)
                        .where(TableMovies.radarrId == media_id)) \
                        .first()
                    if existing_movie_details:
                        movie_title = existing_movie_details.title
                        movie_year = existing_movie_details.year
                    else:
                        return
                else:
                    movie_title = data['body']['resource']['title']
                    movie_year = data['body']['resource']['year']
        except KeyError:
            return

        if topic == 'series':
            logging.debug(f'Event received from Sonarr for series: {series_title} ({series_year})')
            update_one_series(series_id=media_id, action=action)
            if episodesChanged:
                # this will happen if a season monitored status is changed.
                sync_episodes(series_id=media_id, send_event=True)
        elif topic == 'episode':
            logging.debug(f'Event received from Sonarr for episode: {series_title} ({series_year}) - '
                          f'S{season_number:0>2}E{episode_number:0>2} - {episode_title}')
            sync_one_episode(episode_id=media_id, defer_search=settings.sonarr.defer_search_signalr)
        elif topic == 'movie':
            logging.debug(f'Event received from Radarr for movie: {movie_title} ({movie_year})')
            update_one_movie(movie_id=media_id, action=action,
                             defer_search=settings.radarr.defer_search_signalr)
    except Exception as e:
        logging.debug(f'BAZARR an exception occurred while parsing SignalR feed: {repr(e)}')
    finally:
        event_stream(type='badges')
        return


def feed_queue(data):
    # check if event is duplicate from the previous one
    global last_event_data
    if data == last_event_data:
        return
    else:
        last_event_data = data

    # some sonarr version send event as a list of a single dict, we make it a dict
    if isinstance(data, list) and len(data):
        data = data[0]

    # if data is a dict and contain an event for series, episode or movie, we add it to the event queue
    if isinstance(data, dict) and 'name' in data:
        if data['name'] in ['series', 'episode']:
            sonarr_queue.append(data)
        elif data['name'] == 'movie':
            radarr_queue.append(data)


def consume_queue(queue):
    # get events data from queue one at a time and dispatch it
    while True:
        try:
            data = queue.popleft()
        except IndexError:
            pass
        else:
            dispatcher(data)
        sleep(0.1)


# start both queue consuming threads
threading.Thread(target=consume_queue, args=(sonarr_queue,)).start()
threading.Thread(target=consume_queue, args=(radarr_queue,)).start()

# instantiate proper SignalR client
sonarr_signalr_client = SonarrSignalrClientLegacy() if get_sonarr_info.version().startswith(('0.', '2.', '3.')) else \
    SonarrSignalrClient()
radarr_signalr_client = RadarrSignalrClient()