Skip to content

Commit 3420e1c

Browse files
authored
Merge pull request #276 from YiuRULE/custom_prefix
Add inbox prefix as a configuration variable
2 parents f0af7dd + d348b1b commit 3420e1c

File tree

2 files changed

+27
-5
lines changed

2 files changed

+27
-5
lines changed

nats/aio/client.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,7 @@
7373

7474
PING_PROTO = PING_OP + _CRLF_
7575
PONG_PROTO = PONG_OP + _CRLF_
76-
INBOX_PREFIX = bytearray(b'_INBOX.')
77-
INBOX_PREFIX_LEN = len(INBOX_PREFIX) + 22 + 1
76+
DEFAULT_INBOX_PREFIX = b'_INBOX'
7877

7978
DEFAULT_PENDING_SIZE = 1024 * 1024
8079
DEFAULT_BUFFER_SIZE = 32768
@@ -188,6 +187,7 @@ def __init__(self) -> None:
188187
self._resp_map: Dict[str, asyncio.Future] = {}
189188
self._resp_sub_prefix: Optional[bytearray] = None
190189
self._nuid = NUID()
190+
self._inbox_prefix = bytearray(DEFAULT_INBOX_PREFIX)
191191

192192
# NKEYS support
193193
#
@@ -247,6 +247,7 @@ async def connect(
247247
user_jwt_cb: Optional[JWTCallback] = None,
248248
user_credentials: Optional[Credentials] = None,
249249
nkeys_seed: Optional[str] = None,
250+
inbox_prefix: Union[str, bytes] = DEFAULT_INBOX_PREFIX,
250251
) -> None:
251252
"""
252253
Establishes a connection to NATS.
@@ -347,6 +348,12 @@ async def subscribe_handler(msg):
347348
self._reconnected_cb = reconnected_cb
348349
self._disconnected_cb = disconnected_cb
349350

351+
# Custom inbox prefix
352+
if isinstance(inbox_prefix, str):
353+
inbox_prefix = inbox_prefix.encode()
354+
assert isinstance(inbox_prefix, bytes)
355+
self._inbox_prefix = bytearray(inbox_prefix)
356+
350357
# NKEYS support
351358
self._signature_cb = signature_cb
352359
self._user_jwt_cb = user_jwt_cb
@@ -822,7 +829,8 @@ async def _send_subscribe(self, sub: Subscription) -> None:
822829
async def _init_request_sub(self) -> None:
823830
self._resp_map = {}
824831

825-
self._resp_sub_prefix = INBOX_PREFIX[:]
832+
self._resp_sub_prefix = self._inbox_prefix[:]
833+
self._resp_sub_prefix.extend(b'.')
826834
self._resp_sub_prefix.extend(self._nuid.next())
827835
self._resp_sub_prefix.extend(b'.')
828836
resp_mux_subject = self._resp_sub_prefix[:]
@@ -832,7 +840,7 @@ async def _init_request_sub(self) -> None:
832840
)
833841

834842
async def _request_sub_callback(self, msg: Msg) -> None:
835-
token = msg.subject[INBOX_PREFIX_LEN:]
843+
token = msg.subject[len(self._inbox_prefix) + 22 + 2:]
836844
try:
837845
fut = self._resp_map.get(token)
838846
if not fut:
@@ -916,7 +924,8 @@ def new_inbox(self) -> str:
916924
nc.publish('broadcast', b'', reply=inbox)
917925
msg = sub.next_msg()
918926
"""
919-
next_inbox = INBOX_PREFIX[:]
927+
next_inbox = self._inbox_prefix[:]
928+
next_inbox.extend(b'.')
920929
next_inbox.extend(self._nuid.next())
921930
return next_inbox.decode()
922931

tests/test_client.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -741,6 +741,19 @@ async def worker_handler(msg):
741741

742742
await nc.close()
743743

744+
@async_test
745+
async def test_custom_inbox_prefix(self):
746+
nc = NATS()
747+
748+
async def worker_handler(msg):
749+
self.assertTrue(msg.reply.startswith('bar.'))
750+
await msg.respond(b"OK")
751+
752+
await nc.connect(inbox_prefix="bar")
753+
await nc.subscribe("foo", cb=worker_handler)
754+
await nc.request("foo", b'')
755+
await nc.close()
756+
744757
@async_test
745758
async def test_msg_respond(self):
746759
nc = NATS()

0 commit comments

Comments
 (0)