summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authormorpheus65535 <[email protected]>2021-06-01 14:42:52 -0400
committermorpheus65535 <[email protected]>2021-06-01 14:42:52 -0400
commit736b67fd80efcd676c1395af92d931a8e3dd8de0 (patch)
tree451cb3cfa520ca8a2cf0b682f71b1a41c79ced27
parent22a75415438c64225cfcb89d4c538324ac2917b6 (diff)
downloadbazarr-736b67fd80efcd676c1395af92d931a8e3dd8de0.tar.gz
bazarr-736b67fd80efcd676c1395af92d931a8e3dd8de0.zip
Improved stability and reliability of SignalR feed clients.
-rw-r--r--bazarr/main.py4
-rw-r--r--bazarr/signalr_client.py29
-rw-r--r--libs/signalr/__init__.py7
-rw-r--r--libs/signalr/_connection.py29
-rw-r--r--libs/signalr/transports/_sse_transport.py9
-rw-r--r--libs/signalr/transports/_transport.py5
-rw-r--r--libs/signalr/transports/_ws_transport.py7
-rw-r--r--libs/version.txt2
8 files changed, 47 insertions, 45 deletions
diff --git a/bazarr/main.py b/bazarr/main.py
index a364039ab..7139903d3 100644
--- a/bazarr/main.py
+++ b/bazarr/main.py
@@ -197,9 +197,9 @@ def proxy(protocol, url):
if settings.general.getboolean('use_sonarr'):
- sonarr_signalr_client.start()
+ gevent.Greenlet.spawn(sonarr_signalr_client.start)
if settings.general.getboolean('use_radarr'):
- radarr_signalr_client.start()
+ gevent.Greenlet.spawn(radarr_signalr_client.start)
if __name__ == "__main__":
diff --git a/bazarr/signalr_client.py b/bazarr/signalr_client.py
index 4c7665e0a..9242a01fb 100644
--- a/bazarr/signalr_client.py
+++ b/bazarr/signalr_client.py
@@ -38,16 +38,16 @@ class SonarrSignalrClient:
'consider upgrading.')
return
- logging.debug('BAZARR connecting to Sonarr SignalR feed...')
+ logging.info('BAZARR trying to connect to Sonarr SignalR feed...')
self.configure()
- while not self.connection.is_open:
+ while not self.connection.started:
try:
self.connection.start()
except ConnectionError:
gevent.sleep(5)
except json.decoder.JSONDecodeError:
- logging.error('BAZARR cannot parse JSON returned by SignalR feed. Take a look at: '
- 'https://forums.sonarr.tv/t/signalr-problem/5785/3')
+ logging.error("BAZARR cannot parse JSON returned by SignalR feed. This is a known issue when Sonarr "
+ "doesn't have write permission to it's /config/xdg directory.")
self.stop()
logging.info('BAZARR SignalR client for Sonarr is connected and waiting for events.')
if not args.dev:
@@ -64,13 +64,16 @@ class SonarrSignalrClient:
def restart(self):
if self.connection:
- if self.connection.is_open:
- self.stop(log=False)
+ if self.connection.started:
+ try:
+ self.stop(log=False)
+ except:
+ self.connection.started = False
if settings.general.getboolean('use_sonarr'):
self.start()
def exception_handler(self, type, exception, traceback):
- logging.error('BAZARR connection to Sonarr SignalR feed has been lost. Reconnecting...')
+ logging.error('BAZARR connection to Sonarr SignalR feed has been lost.')
self.restart()
def configure(self):
@@ -94,8 +97,12 @@ class RadarrSignalrClient:
def start(self):
self.configure()
- logging.debug('BAZARR connecting to Radarr SignalR feed...')
- self.connection.start()
+ logging.info('BAZARR trying to connect to Radarr SignalR feed...')
+ while self.connection.transport.state.value not in [0, 1, 2]:
+ try:
+ self.connection.start()
+ except ConnectionError:
+ gevent.sleep(5)
def stop(self):
logging.info('BAZARR SignalR client for Radarr is now disconnected.')
@@ -133,8 +140,8 @@ class RadarrSignalrClient:
"max_attempts": None
}).build()
self.connection.on_open(self.on_connect_handler)
- self.connection.on_reconnect(lambda: logging.info('BAZARR SignalR client for Radarr connection as been lost. '
- 'Trying to reconnect...'))
+ self.connection.on_reconnect(lambda: logging.error('BAZARR SignalR client for Radarr connection as been lost. '
+ 'Trying to reconnect...'))
self.connection.on_close(lambda: logging.debug('BAZARR SignalR client for Radarr is disconnected.'))
self.connection.on_error(self.exception_handler)
self.connection.on("receiveMessage", dispatcher)
diff --git a/libs/signalr/__init__.py b/libs/signalr/__init__.py
index 7742eeb58..3d155c5c6 100644
--- a/libs/signalr/__init__.py
+++ b/libs/signalr/__init__.py
@@ -1,3 +1,8 @@
+from gevent import monkey
+
+monkey.patch_socket()
+monkey.patch_ssl()
+
from ._connection import Connection
-__version__ = '0.0.12'
+__version__ = '0.0.7'
diff --git a/libs/signalr/_connection.py b/libs/signalr/_connection.py
index 6471ba670..377606f99 100644
--- a/libs/signalr/_connection.py
+++ b/libs/signalr/_connection.py
@@ -1,6 +1,6 @@
import json
+import gevent
import sys
-from threading import Thread
from signalr.events import EventHook
from signalr.hubs import Hub
from signalr.transports import AutoTransport
@@ -15,16 +15,14 @@ class Connection:
self.qs = {}
self.__send_counter = -1
self.token = None
- self.id = None
self.data = None
self.received = EventHook()
self.error = EventHook()
self.starting = EventHook()
self.stopping = EventHook()
self.exception = EventHook()
- self.is_open = False
self.__transport = AutoTransport(session, self)
- self.__listener_thread = None
+ self.__greenlet = None
self.started = False
def handle_error(**kwargs):
@@ -50,32 +48,27 @@ class Connection:
negotiate_data = self.__transport.negotiate()
self.token = negotiate_data['ConnectionToken']
- self.id = negotiate_data['ConnectionId']
listener = self.__transport.start()
def wrapped_listener():
- while self.is_open:
- try:
- listener()
- except:
- self.exception.fire(*sys.exc_info())
- self.is_open = False
-
- self.is_open = True
- self.__listener_thread = Thread(target=wrapped_listener)
- self.__listener_thread.start()
+ try:
+ listener()
+ gevent.sleep()
+ except:
+ self.exception.fire(*sys.exc_info())
+
+ self.__greenlet = gevent.spawn(wrapped_listener)
self.started = True
def wait(self, timeout=30):
- Thread.join(self.__listener_thread, timeout)
+ gevent.joinall([self.__greenlet], timeout)
def send(self, data):
self.__transport.send(data)
def close(self):
- self.is_open = False
- self.__listener_thread.join()
+ gevent.kill(self.__greenlet)
self.__transport.close()
def register_hub(self, name):
diff --git a/libs/signalr/transports/_sse_transport.py b/libs/signalr/transports/_sse_transport.py
index 7faaf936a..63d978643 100644
--- a/libs/signalr/transports/_sse_transport.py
+++ b/libs/signalr/transports/_sse_transport.py
@@ -12,16 +12,11 @@ class ServerSentEventsTransport(Transport):
return 'serverSentEvents'
def start(self):
- connect_url = self._get_url('connect')
- self.__response = iter(sseclient.SSEClient(connect_url, session=self._session))
+ self.__response = sseclient.SSEClient(self._get_url('connect'), session=self._session)
self._session.get(self._get_url('start'))
def _receive():
- try:
- notification = next(self.__response)
- except StopIteration:
- return
- else:
+ for notification in self.__response:
if notification.data != 'initialized':
self._handle_notification(notification.data)
diff --git a/libs/signalr/transports/_transport.py b/libs/signalr/transports/_transport.py
index af62672fd..c0d0d4278 100644
--- a/libs/signalr/transports/_transport.py
+++ b/libs/signalr/transports/_transport.py
@@ -1,12 +1,13 @@
from abc import abstractmethod
import json
import sys
-import threading
+
if sys.version_info[0] < 3:
from urllib import quote_plus
else:
from urllib.parse import quote_plus
+import gevent
class Transport:
@@ -47,7 +48,7 @@ class Transport:
if len(message) > 0:
data = json.loads(message)
self._connection.received.fire(**data)
- #thread.sleep() #TODO: investigate if we should sleep here
+ gevent.sleep()
def _get_url(self, action, **kwargs):
args = kwargs.copy()
diff --git a/libs/signalr/transports/_ws_transport.py b/libs/signalr/transports/_ws_transport.py
index 4d9a80ad1..14fefa6cc 100644
--- a/libs/signalr/transports/_ws_transport.py
+++ b/libs/signalr/transports/_ws_transport.py
@@ -1,6 +1,7 @@
import json
import sys
+import gevent
if sys.version_info[0] < 3:
from urlparse import urlparse, urlunparse
@@ -38,14 +39,14 @@ class WebSocketsTransport(Transport):
self._session.get(self._get_url('start'))
def _receive():
- notification = self.ws.recv()
- self._handle_notification(notification)
+ for notification in self.ws:
+ self._handle_notification(notification)
return _receive
def send(self, data):
self.ws.send(json.dumps(data))
- #thread.sleep() #TODO: inveistage if we should sleep here or not
+ gevent.sleep()
def close(self):
self.ws.close()
diff --git a/libs/version.txt b/libs/version.txt
index 5c04c9373..ff635d2c7 100644
--- a/libs/version.txt
+++ b/libs/version.txt
@@ -30,7 +30,7 @@ rarfile=3.0
rebulk=3.0.1
requests=2.18.4
semver=2.13.0
-signalr-client-threads=0.0.12 <-- Modified to work with Sonarr
+signalr-client=0.0.7 <-- Modified to work with Sonarr and added exception handler
signalrcore=0.9.2 <-- https://github.com/mandrewcito/signalrcore/pull/60 and 61
SimpleConfigParser=0.1.0 <-- modified version: do not update!!!
six=1.11.0