-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer.py
More file actions
37 lines (30 loc) · 917 Bytes
/
consumer.py
File metadata and controls
37 lines (30 loc) · 917 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# consumer.py
from kafka import KafkaConsumer
import json
import requests
consumer = KafkaConsumer(
'coordinates',
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
group_id='coordinate-group',
value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)
print("Waiting for messages...")
counter = 0
payload = {"x" : [],"y":[]}
for message in consumer:
print(counter)
counter +=1
coords = message.value
payload["x"].append(coords['x'])
payload['y'].append(coords['y'])
if(counter == 100):
if "id" not in payload.keys():
payload["id"]=message.key.decode('utf-8')
response=requests.post("http://localhost:8000/ingestdata",json=payload)
if response.status_code == 200:
counter = 0
payload = {"x" : [],"y":[]}
#print(payload)
#
#print(f"Received: x={coords['x']}, y={coords['y']}")