blob: 6561cc4ded07209f7e219dcab7e7a588a9543dbd (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
import uuid
import threading
from typing import Any
from .messages.invocation_message import InvocationClientStreamMessage
from .messages.stream_item_message import StreamItemMessage
from .messages.completion_message import CompletionClientStreamMessage
class Subject(object):
"""Client to server streaming
https://docs.microsoft.com/en-gb/aspnet/core/signalr/streaming?view=aspnetcore-5.0#client-to-server-streaming
items = list(range(0,10))
subject = Subject()
connection.send("UploadStream", subject)
while(len(self.items) > 0):
subject.next(str(self.items.pop()))
subject.complete()
"""
def __init__(self):
self.connection = None
self.target = None
self.invocation_id = str(uuid.uuid4())
self.lock = threading.RLock()
def check(self):
"""Ensures that invocation streaming object is correct
Raises:
ValueError: if object is not valid, exception will be raised
"""
if self.connection is None\
or self.target is None\
or self.invocation_id is None:
raise ValueError(
"subject must be passed as an agument to a send function. "
+ "hub_connection.send([method],[subject]")
def next(self, item: Any):
"""Send next item to the server
Args:
item (any): Item that will be streamed
"""
self.check()
with self.lock:
self.connection.transport.send(StreamItemMessage(
self.invocation_id,
item))
def start(self):
"""Starts streaming
"""
self.check()
with self.lock:
self.connection.transport.send(
InvocationClientStreamMessage(
[self.invocation_id],
self.target,
[]))
def complete(self):
"""Finish streaming
"""
self.check()
with self.lock:
self.connection.transport.send(CompletionClientStreamMessage(
self.invocation_id))
|