-
Notifications
You must be signed in to change notification settings - Fork 235
Closed
Description
Hi guys!
I have been working with this library recently and I found an error when a task takes some much time to process.
Here is the example code that I have been using for my tests:
import asyncio
import time
from nats.aio.client import Client as NATS
from nats.errors import ConnectionClosedError, TimeoutError
async def main():
nc = NATS()
try:
# It is very likely that the demo server will see traffic from clients other than yours.
# To avoid this, start your own locally and modify the example to use it.
await nc.connect(servers=["nats://127.0.0.1:4222"])
# await nc.connect(servers=["nats://demo.nats.io:4222"])
except Exception as e:
print(f"Cannot connect to NATS: {e}")
pass
async def message_handler(msg):
print(f"[Received on '{msg.subject}']: {msg.data.decode()}")
print("Sleeping...")
time.sleep(2400)
try:
# Interested in receiving 2 messages from the 'discover' subject.
sub = await nc.subscribe("discover", "", message_handler)
await sub.unsubscribe(2)
await nc.publish("discover", b"hello")
await nc.publish("discover", b"world")
# Following 2 messages won't be received.
await nc.publish("discover", b"again")
await nc.publish("discover", b"!!!!!")
except ConnectionClosedError:
print("Connection closed prematurely")
async def request_handler(msg):
print("[Request on '{} {}']: {}".format(msg.subject, msg.reply, msg.data.decode()))
await nc.publish(msg.reply, b"OK")
if nc.is_connected:
# Subscription using a 'workers' queue so that only a single subscriber
# gets a request at a time.
await nc.subscribe("help", "workers", cb=request_handler)
try:
# Make a request expecting a single response within 500 ms,
# otherwise raising a timeout error.
msg = await nc.request("help", b"help please", timeout=500)
print(f"[Response]: {msg.data}")
# Make a roundtrip to the server to ensure messages
# that sent messages have been processed already.
await nc.flush(timeout=1000)
except TimeoutError:
print("[Error] Timeout!")
# Wait a bit for message to be dispatched...
await asyncio.sleep(1)
# Detach from the server.
await nc.close()
if nc.last_error is not None:
print(f"Last Error: {nc.last_error}")
if nc.is_closed:
print("Disconnected.")
if __name__ == "__main__":
asyncio.run(main())As you see, there is a time.sleep that blocks the code for 40 minutes in order to test what happens if the process waits that much time.
After running the code I get the following error:
$ python nats_example.py
[Received on 'discover']: hello
Sleeping...
[Received on 'discover']: world
Sleeping...
[Request on 'help _INBOX.8u6cZWf5n6ErPpphJtYnzR.8u6cZWf5n6ErPpphJtYo1E']: help please
[Error] Timeout!
nats: encountered error
Traceback (most recent call last):
File "/home/josemiguel.hernandez/.local/lib/python3.8/site-packages/nats/aio/client.py", line 1283, in _attempt_reconnect
await self._io_writer.wait_closed()
File "/usr/lib/python3.8/asyncio/streams.py", line 359, in wait_closed
await self._protocol._get_close_waiter(self)
File "/usr/lib/python3.8/asyncio/selector_events.py", line 910, in write
n = self._sock.send(data)
BrokenPipeError: [Errno 32] Broken pipe
Disconnected.Is there any way to avoid getting this Broken pipe error even if the process takes 5 minutes or 3 hours?
Thanks in advance!
JYisus, alejandrogr and christianint
Metadata
Metadata
Assignees
Labels
No labels