Skip to content

Commit c90166c

Browse files
authored
Fix deliver_subject in implicit subscription creation (#615)
--------- Signed-off-by: Mikołaj Nowak <[email protected]>
1 parent 1ecc5e1 commit c90166c

File tree

2 files changed

+48
-3
lines changed

2 files changed

+48
-3
lines changed

nats/js/client.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -414,9 +414,10 @@ async def cb(msg):
414414
if inactive_threshold:
415415
config.inactive_threshold = inactive_threshold
416416

417-
# Create inbox for push consumer.
418-
deliver = self._nc.new_inbox()
419-
config.deliver_subject = deliver
417+
# Create inbox for push consumer, if deliver_subject is not assigned already.
418+
if config.deliver_subject is None:
419+
deliver = self._nc.new_inbox()
420+
config.deliver_subject = deliver
420421

421422
# Auto created consumers use the filter subject.
422423
config.filter_subject = subject

tests/test_js.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1807,6 +1807,50 @@ async def cb(msg):
18071807
self.assertEqual(coroutines_before, coroutines_after_unsubscribe)
18081808
self.assertNotEqual(coroutines_before, coroutines_after_subscribe)
18091809

1810+
@async_test
1811+
async def test_subscribe_push_config(self):
1812+
nc = await nats.connect()
1813+
js = nc.jetstream()
1814+
1815+
await js.add_stream(name="pconfig", subjects=["pconfig"])
1816+
1817+
s, d = ([], [])
1818+
1819+
async def cb_s(msg):
1820+
s.append(msg.data)
1821+
1822+
async def cb_d(msg):
1823+
d.append(msg.data)
1824+
1825+
#Create config for our subscriber
1826+
cc = nats.js.api.ConsumerConfig(
1827+
name="pconfig-ps", deliver_subject="pconfig-deliver"
1828+
)
1829+
1830+
#Make stream consumer with set deliver_subjct
1831+
sub_s = await js.subscribe(
1832+
"pconfig", stream="pconfig", cb=cb_s, config=cc
1833+
)
1834+
#Make direct sub on deliver_subject
1835+
sub_d = await nc.subscribe("pconfig-deliver", "check-queue", cb=cb_d)
1836+
1837+
#Stream consumer sub should have configured subject
1838+
assert sub_s.subject == "pconfig-deliver"
1839+
1840+
#Publish some messages
1841+
for i in range(10):
1842+
await js.publish("pconfig", f'Hello World {i}'.encode())
1843+
1844+
await asyncio.sleep(0.5)
1845+
#Both subs should recieve same messages, but we are not sure about order
1846+
assert len(s) == len(d)
1847+
assert set(s) == set(d)
1848+
1849+
#Cleanup
1850+
await js.delete_consumer("pconfig", "pconfig-ps")
1851+
await js.delete_stream("pconfig")
1852+
await nc.close()
1853+
18101854

18111855
class AckPolicyTest(SingleJetStreamServerTestCase):
18121856

0 commit comments

Comments
 (0)