Skip to content

Commit 5b02ffe

Browse files
committed
Add subscription concurrency tests
Signed-off-by: Casper Beyer <[email protected]>
1 parent dc13b84 commit 5b02ffe

File tree

1 file changed

+343
-0
lines changed

1 file changed

+343
-0
lines changed

nats-client/tests/test_subscription.py

Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -823,6 +823,349 @@ async def test_subscription_drain_processes_pending_messages(client):
823823
await subscription.next(timeout=0.5)
824824

825825

826+
@pytest.mark.asyncio
827+
async def test_many_subscriptions_on_same_subject():
828+
"""Test that client can handle many concurrent subscriptions on the same subject.
829+
830+
This stress test verifies that the client can manage a large number of
831+
subscriptions all listening to the same subject, with each receiving all messages.
832+
"""
833+
server = await run(port=0)
834+
835+
try:
836+
client = await connect(server.client_url, timeout=1.0)
837+
838+
try:
839+
num_subscriptions = 100
840+
test_subject = f"test.many.same.{uuid.uuid4()}"
841+
subscriptions = []
842+
843+
# Create many subscriptions on the same subject
844+
for i in range(num_subscriptions):
845+
sub = await client.subscribe(test_subject)
846+
subscriptions.append(sub)
847+
848+
await client.flush()
849+
850+
# Publish a single message
851+
test_message = b"shared_message"
852+
await client.publish(test_subject, test_message)
853+
await client.flush()
854+
855+
# Verify all subscriptions receive the message
856+
for i, sub in enumerate(subscriptions):
857+
msg = await sub.next(timeout=2.0)
858+
assert msg.data == test_message, f"Subscription {i} received wrong message"
859+
assert msg.subject == test_subject, f"Subscription {i} received wrong subject"
860+
861+
finally:
862+
await client.close()
863+
864+
finally:
865+
await server.shutdown()
866+
867+
868+
@pytest.mark.asyncio
869+
async def test_many_subscriptions_on_unique_subjects():
870+
"""Test that client can handle many concurrent subscriptions on unique subjects.
871+
872+
This stress test verifies that the client can manage a large number of
873+
subscriptions simultaneously, each on a unique subject and receiving its own messages.
874+
"""
875+
server = await run(port=0)
876+
877+
try:
878+
client = await connect(server.client_url, timeout=1.0)
879+
880+
try:
881+
num_subscriptions = 100
882+
subscriptions = []
883+
subjects = []
884+
885+
# Create many subscriptions on unique subjects
886+
for i in range(num_subscriptions):
887+
subject = f"test.many.unique.{uuid.uuid4()}.{i}"
888+
subjects.append(subject)
889+
sub = await client.subscribe(subject)
890+
subscriptions.append(sub)
891+
892+
await client.flush()
893+
894+
# Publish a message to each unique subject
895+
for i, subject in enumerate(subjects):
896+
await client.publish(subject, f"msg_{i}".encode())
897+
898+
await client.flush()
899+
900+
# Verify each subscription receives its specific message
901+
for i, sub in enumerate(subscriptions):
902+
msg = await sub.next(timeout=2.0)
903+
assert msg.data == f"msg_{i}".encode(), f"Subscription {i} received wrong message"
904+
assert msg.subject == subjects[i], f"Subscription {i} received wrong subject"
905+
906+
finally:
907+
await client.close()
908+
909+
finally:
910+
await server.shutdown()
911+
912+
913+
@pytest.mark.asyncio
914+
async def test_multiple_concurrent_consumers_using_next(client):
915+
"""Test multiple tasks consuming from the same subscription using .next().
916+
917+
This verifies that multiple concurrent consumers can safely read from the
918+
same subscription, with each message being delivered to exactly one consumer.
919+
This simulates real-world scenarios like worker pools processing messages.
920+
"""
921+
test_subject = f"test.concurrent.next.{uuid.uuid4()}"
922+
message_count = 50
923+
924+
# Create subscription
925+
subscription = await client.subscribe(test_subject)
926+
await client.flush()
927+
928+
# Track messages received by each consumer
929+
consumer_messages = {0: [], 1: [], 2: []}
930+
931+
async def consumer_task(consumer_id: int):
932+
"""Consumer task that processes messages using .next().
933+
934+
Simulates a worker that continuously processes messages from a queue.
935+
"""
936+
while True:
937+
try:
938+
msg = await subscription.next(timeout=0.5)
939+
# Simulate some processing work
940+
await asyncio.sleep(0.01)
941+
consumer_messages[consumer_id].append(msg.data.decode())
942+
except asyncio.TimeoutError:
943+
# No more messages available - worker is done
944+
break
945+
except RuntimeError:
946+
# Subscription closed
947+
break
948+
949+
# Start multiple concurrent consumer tasks (simulating a worker pool)
950+
num_consumers = 3
951+
consumer_tasks = [asyncio.create_task(consumer_task(i)) for i in range(num_consumers)]
952+
953+
try:
954+
# Give consumers time to start waiting for work
955+
await asyncio.sleep(0.1)
956+
957+
# Publish messages slowly to allow fair distribution across workers
958+
for i in range(message_count):
959+
await client.publish(test_subject, f"message_{i}".encode())
960+
if i % 10 == 0:
961+
await asyncio.sleep(0.01) # Small delay to allow distribution
962+
await client.flush()
963+
964+
# Wait for all consumer tasks to finish processing
965+
await asyncio.gather(*consumer_tasks, return_exceptions=True)
966+
967+
# Verify all messages were received exactly once
968+
all_messages = []
969+
for messages in consumer_messages.values():
970+
all_messages.extend(messages)
971+
972+
assert len(all_messages) == message_count, f"Expected {message_count} messages, got {len(all_messages)}"
973+
974+
# Verify no duplicate messages
975+
assert len(set(all_messages)) == message_count, "Some messages were received multiple times"
976+
977+
finally:
978+
# Ensure tasks are complete
979+
for task in consumer_tasks:
980+
if not task.done():
981+
task.cancel()
982+
try:
983+
await task
984+
except asyncio.CancelledError:
985+
pass
986+
987+
988+
@pytest.mark.asyncio
989+
async def test_multiple_concurrent_consumers_using_async_for(client):
990+
"""Test multiple tasks consuming from the same subscription using async for.
991+
992+
This verifies that multiple concurrent consumers using async iteration
993+
can safely read from the same subscription.
994+
This simulates real-world scenarios like event processors using async iteration.
995+
"""
996+
test_subject = f"test.concurrent.iter.{uuid.uuid4()}"
997+
message_count = 50
998+
999+
# Create subscription
1000+
subscription = await client.subscribe(test_subject)
1001+
await client.flush()
1002+
1003+
# Track messages received by each consumer
1004+
consumer_messages = {0: [], 1: [], 2: []}
1005+
stop_event = asyncio.Event()
1006+
1007+
async def consumer_task(consumer_id: int):
1008+
"""Consumer task that processes messages using async for iteration.
1009+
1010+
Simulates an event processor that uses async iteration to handle messages.
1011+
"""
1012+
async for msg in subscription:
1013+
# Simulate some processing work
1014+
await asyncio.sleep(0.01)
1015+
consumer_messages[consumer_id].append(msg.data.decode())
1016+
1017+
# Stop when we've received all expected messages across all consumers
1018+
total = sum(len(msgs) for msgs in consumer_messages.values())
1019+
if total >= message_count:
1020+
break
1021+
if stop_event.is_set():
1022+
break
1023+
1024+
# Start multiple concurrent consumer tasks (simulating event processors)
1025+
num_consumers = 3
1026+
consumer_tasks = [asyncio.create_task(consumer_task(i)) for i in range(num_consumers)]
1027+
1028+
try:
1029+
# Give consumers time to start their event loops
1030+
await asyncio.sleep(0.1)
1031+
1032+
# Publish messages slowly to allow fair distribution across processors
1033+
for i in range(message_count):
1034+
await client.publish(test_subject, f"message_{i}".encode())
1035+
if i % 10 == 0:
1036+
await asyncio.sleep(0.01) # Small delay to allow distribution
1037+
await client.flush()
1038+
1039+
# Wait for all messages to be consumed (with timeout)
1040+
max_wait = 5.0
1041+
start = asyncio.get_event_loop().time()
1042+
while sum(len(msgs) for msgs in consumer_messages.values()) < message_count:
1043+
if asyncio.get_event_loop().time() - start > max_wait:
1044+
break
1045+
await asyncio.sleep(0.1)
1046+
1047+
# Signal consumers to stop
1048+
stop_event.set()
1049+
await subscription.unsubscribe()
1050+
1051+
# Wait for consumer tasks to finish
1052+
await asyncio.wait_for(asyncio.gather(*consumer_tasks, return_exceptions=True), timeout=2.0)
1053+
1054+
# Verify all messages were received
1055+
all_messages = []
1056+
for messages in consumer_messages.values():
1057+
all_messages.extend(messages)
1058+
1059+
assert len(all_messages) == message_count, f"Expected {message_count} messages, got {len(all_messages)}"
1060+
1061+
# Verify no duplicate messages
1062+
assert len(set(all_messages)) == message_count, "Some messages were received multiple times"
1063+
1064+
finally:
1065+
stop_event.set()
1066+
for task in consumer_tasks:
1067+
if not task.done():
1068+
task.cancel()
1069+
try:
1070+
await task
1071+
except asyncio.CancelledError:
1072+
pass
1073+
1074+
1075+
@pytest.mark.asyncio
1076+
async def test_async_iteration_with_concurrent_publishers(client):
1077+
"""Test async iteration while multiple tasks are publishing concurrently.
1078+
1079+
This verifies that async for iteration works correctly when messages are
1080+
being published continuously by multiple publishers.
1081+
This simulates real-world scenarios with multiple producers and a single consumer.
1082+
"""
1083+
test_subject = f"test.iter.concurrent.pub.{uuid.uuid4()}"
1084+
messages_per_publisher = 20
1085+
num_publishers = 3
1086+
1087+
# Create subscription
1088+
subscription = await client.subscribe(test_subject)
1089+
await client.flush()
1090+
1091+
received_messages = []
1092+
stop_iteration = asyncio.Event()
1093+
1094+
async def consumer_task():
1095+
"""Consumer task using async for iteration.
1096+
1097+
Simulates a single consumer processing events from multiple producers.
1098+
"""
1099+
async for msg in subscription:
1100+
# Simulate some processing work
1101+
await asyncio.sleep(0.005)
1102+
received_messages.append(msg.data.decode())
1103+
if stop_iteration.is_set():
1104+
break
1105+
1106+
async def publisher_task(publisher_id: int):
1107+
"""Publisher task that continuously produces messages.
1108+
1109+
Simulates a producer generating events.
1110+
"""
1111+
for i in range(messages_per_publisher):
1112+
await client.publish(test_subject, f"pub{publisher_id}_msg{i}".encode())
1113+
await asyncio.sleep(0.01) # Small delay to simulate realistic publishing
1114+
1115+
# Start consumer task
1116+
consumer = asyncio.create_task(consumer_task())
1117+
1118+
# Start multiple publisher tasks
1119+
publisher_tasks = [asyncio.create_task(publisher_task(i)) for i in range(num_publishers)]
1120+
1121+
try:
1122+
# Wait for all publishers to finish
1123+
await asyncio.gather(*publisher_tasks)
1124+
await client.flush()
1125+
1126+
# Wait for consumer to receive all messages
1127+
expected_count = messages_per_publisher * num_publishers
1128+
max_wait = 5.0
1129+
start = asyncio.get_event_loop().time()
1130+
while len(received_messages) < expected_count:
1131+
if asyncio.get_event_loop().time() - start > max_wait:
1132+
break
1133+
await asyncio.sleep(0.1)
1134+
1135+
# Stop consumer task
1136+
stop_iteration.set()
1137+
await subscription.unsubscribe()
1138+
await asyncio.wait_for(consumer, timeout=2.0)
1139+
1140+
# Verify all messages received
1141+
assert len(received_messages) == expected_count, (
1142+
f"Expected {expected_count} messages, got {len(received_messages)}"
1143+
)
1144+
1145+
# Verify messages from all publishers
1146+
for pub_id in range(num_publishers):
1147+
pub_messages = [msg for msg in received_messages if msg.startswith(f"pub{pub_id}_")]
1148+
assert len(pub_messages) == messages_per_publisher, (
1149+
f"Publisher {pub_id} messages: expected {messages_per_publisher}, got {len(pub_messages)}"
1150+
)
1151+
1152+
finally:
1153+
stop_iteration.set()
1154+
for task in publisher_tasks:
1155+
if not task.done():
1156+
task.cancel()
1157+
try:
1158+
await task
1159+
except asyncio.CancelledError:
1160+
pass
1161+
if not consumer.done():
1162+
consumer.cancel()
1163+
try:
1164+
await consumer
1165+
except asyncio.CancelledError:
1166+
pass
1167+
1168+
8261169
@pytest.mark.asyncio
8271170
async def test_reconnect_preserves_subscription_during_publishing():
8281171
"""Test that subscriptions remain active after reconnection during active publishing.

0 commit comments

Comments
 (0)