-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathsimple_consumer.py
More file actions
34 lines (25 loc) · 1005 Bytes
/
simple_consumer.py
File metadata and controls
34 lines (25 loc) · 1005 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from typing import Dict
from flowcept.commons.flowcept_dataclasses.task_object import TaskObject
from flowcept.flowceptor.consumers.base_consumer import BaseConsumer
class MyConsumer(BaseConsumer):
def __init__(self):
super().__init__()
def message_handler(self, msg_obj: Dict) -> bool:
if msg_obj.get('type', '') == 'task':
msg = TaskObject.from_dict(msg_obj)
print(msg)
if msg.used:
print(f"\t\tUsed: {msg.used}")
if msg.generated:
print(f"\t\tGenerated: {msg.generated}")
if msg.custom_metadata:
print(f"\t\tCustom Metadata: {msg.custom_metadata}")
print()
print()
else:
print(f"We got a msg with different type: {msg_obj.get("type", None)}")
return True
if __name__ == "__main__":
print("Starting consumer indefinitely. Press ctrl+c to stop")
consumer = MyConsumer()
consumer.start(daemon=False)