Skip to content

Broken pipe error if the task takes too much time. #295

@ixjosemi

Description

@ixjosemi

Hi guys!

I have been working with this library recently and I found an error when a task takes some much time to process.

Here is the example code that I have been using for my tests:

import asyncio
import time

from nats.aio.client import Client as NATS
from nats.errors import ConnectionClosedError, TimeoutError


async def main():
    nc = NATS()

    try:
        # It is very likely that the demo server will see traffic from clients other than yours.
        # To avoid this, start your own locally and modify the example to use it.
        await nc.connect(servers=["nats://127.0.0.1:4222"])
        # await nc.connect(servers=["nats://demo.nats.io:4222"])
    except Exception as e:
        print(f"Cannot connect to NATS: {e}")
        pass

    async def message_handler(msg):
        print(f"[Received on '{msg.subject}']: {msg.data.decode()}")
        print("Sleeping...")
        time.sleep(2400)

    try:
        # Interested in receiving 2 messages from the 'discover' subject.
        sub = await nc.subscribe("discover", "", message_handler)
        await sub.unsubscribe(2)

        await nc.publish("discover", b"hello")
        await nc.publish("discover", b"world")

        # Following 2 messages won't be received.
        await nc.publish("discover", b"again")
        await nc.publish("discover", b"!!!!!")
    except ConnectionClosedError:
        print("Connection closed prematurely")

    async def request_handler(msg):
        print("[Request on '{} {}']: {}".format(msg.subject, msg.reply, msg.data.decode()))
        await nc.publish(msg.reply, b"OK")

    if nc.is_connected:

        # Subscription using a 'workers' queue so that only a single subscriber
        # gets a request at a time.
        await nc.subscribe("help", "workers", cb=request_handler)

        try:
            # Make a request expecting a single response within 500 ms,
            # otherwise raising a timeout error.
            msg = await nc.request("help", b"help please", timeout=500)
            print(f"[Response]: {msg.data}")

            # Make a roundtrip to the server to ensure messages
            # that sent messages have been processed already.
            await nc.flush(timeout=1000)
        except TimeoutError:
            print("[Error] Timeout!")

        # Wait a bit for message to be dispatched...
        await asyncio.sleep(1)

        # Detach from the server.
        await nc.close()

    if nc.last_error is not None:
        print(f"Last Error: {nc.last_error}")

    if nc.is_closed:
        print("Disconnected.")


if __name__ == "__main__":
    asyncio.run(main())

As you see, there is a time.sleep that blocks the code for 40 minutes in order to test what happens if the process waits that much time.

After running the code I get the following error:

$ python nats_example.py 
[Received on 'discover']: hello
Sleeping...
 [Received on 'discover']: world
Sleeping...
[Request on 'help _INBOX.8u6cZWf5n6ErPpphJtYnzR.8u6cZWf5n6ErPpphJtYo1E']: help please
[Error] Timeout!
nats: encountered error
Traceback (most recent call last):
  File "/home/josemiguel.hernandez/.local/lib/python3.8/site-packages/nats/aio/client.py", line 1283, in _attempt_reconnect
    await self._io_writer.wait_closed()
  File "/usr/lib/python3.8/asyncio/streams.py", line 359, in wait_closed
    await self._protocol._get_close_waiter(self)
  File "/usr/lib/python3.8/asyncio/selector_events.py", line 910, in write
    n = self._sock.send(data)
BrokenPipeError: [Errno 32] Broken pipe
Disconnected.

Is there any way to avoid getting this Broken pipe error even if the process takes 5 minutes or 3 hours?

Thanks in advance!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions