summaryrefslogtreecommitdiffhomepage
path: root/libs/signalrcore/subject.py
blob: 6561cc4ded07209f7e219dcab7e7a588a9543dbd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import uuid
import threading
from typing import Any
from .messages.invocation_message import InvocationClientStreamMessage
from .messages.stream_item_message import StreamItemMessage
from .messages.completion_message import CompletionClientStreamMessage


class Subject(object):
    """Client to server streaming
    https://docs.microsoft.com/en-gb/aspnet/core/signalr/streaming?view=aspnetcore-5.0#client-to-server-streaming
    items = list(range(0,10))
    subject = Subject()
    connection.send("UploadStream", subject)
    while(len(self.items) > 0):
        subject.next(str(self.items.pop()))
    subject.complete()
    """

    def __init__(self):
        self.connection = None
        self.target = None
        self.invocation_id = str(uuid.uuid4())
        self.lock = threading.RLock()

    def check(self):
        """Ensures that invocation streaming object is correct

        Raises:
            ValueError: if object is not valid, exception will be raised
        """
        if self.connection is None\
                or self.target is None\
                or self.invocation_id is None:
            raise ValueError(
                "subject must be passed as an agument to a send function. "
                + "hub_connection.send([method],[subject]")

    def next(self, item: Any):
        """Send next item to the server

        Args:
            item (any): Item that will be streamed
        """
        self.check()
        with self.lock:
            self.connection.transport.send(StreamItemMessage(
                self.invocation_id,
                item))

    def start(self):
        """Starts streaming
        """
        self.check()
        with self.lock:
            self.connection.transport.send(
                InvocationClientStreamMessage(
                    [self.invocation_id],
                    self.target,
                    []))

    def complete(self):
        """Finish streaming
        """
        self.check()
        with self.lock:
            self.connection.transport.send(CompletionClientStreamMessage(
                self.invocation_id))