Skip to content

Commit dc13b84

Browse files
committed
Add reconnection under load tests
Signed-off-by: Casper Beyer <[email protected]>
1 parent 2292614 commit dc13b84

File tree

2 files changed

+414
-0
lines changed

2 files changed

+414
-0
lines changed

nats-client/tests/test_client.py

Lines changed: 313 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1221,3 +1221,316 @@ async def test_server_initiated_ping_pong():
12211221
await client.close()
12221222
finally:
12231223
await server.shutdown()
1224+
1225+
1226+
@pytest.mark.asyncio
1227+
async def test_reconnect_while_publishing():
1228+
"""Test that client can reconnect while actively publishing messages.
1229+
1230+
This test verifies that:
1231+
1. Client continues to publish messages during normal operation
1232+
2. When the server disconnects, the client detects the disconnection
1233+
3. Publishing blocks during reconnection (waiting for connection)
1234+
4. Client successfully reconnects to a new server
1235+
5. Publishing resumes after reconnection
1236+
6. Messages published after reconnection are successfully delivered
1237+
"""
1238+
# Start initial server
1239+
server = await run(port=0)
1240+
server_port = server.port
1241+
1242+
# Events to track lifecycle
1243+
disconnect_event = asyncio.Event()
1244+
reconnect_event = asyncio.Event()
1245+
1246+
# Connect client with reconnection enabled
1247+
client = await connect(
1248+
server.client_url,
1249+
timeout=1.0,
1250+
allow_reconnect=True,
1251+
reconnect_time_wait=0.1,
1252+
reconnect_max_attempts=100,
1253+
)
1254+
1255+
client.add_disconnected_callback(disconnect_event.set)
1256+
client.add_reconnected_callback(reconnect_event.set)
1257+
1258+
# Set up subscription to verify messages
1259+
test_subject = f"test.reconnect.load.{uuid.uuid4()}"
1260+
subscription = await client.subscribe(test_subject)
1261+
await client.flush()
1262+
1263+
# Counters for tracking
1264+
messages_sent_before_disconnect = 0
1265+
messages_sent_after_reconnect = 0
1266+
publish_task_running = True
1267+
1268+
async def publish_continuously():
1269+
"""Continuously publish messages until told to stop.
1270+
1271+
publish() will block during reconnection, not raise exceptions.
1272+
"""
1273+
nonlocal messages_sent_before_disconnect, messages_sent_after_reconnect
1274+
counter = 0
1275+
1276+
while publish_task_running:
1277+
message = f"message_{counter}".encode()
1278+
# This may block during reconnection but won't raise
1279+
await client.publish(test_subject, message)
1280+
counter += 1
1281+
1282+
# Track message counts based on connection state
1283+
if reconnect_event.is_set():
1284+
messages_sent_after_reconnect += 1
1285+
elif not disconnect_event.is_set():
1286+
messages_sent_before_disconnect += 1
1287+
1288+
# Small delay to simulate realistic publish rate
1289+
await asyncio.sleep(0.01)
1290+
1291+
# Start publishing task
1292+
publish_task = asyncio.create_task(publish_continuously())
1293+
1294+
try:
1295+
# Let some messages publish successfully
1296+
await asyncio.sleep(0.2)
1297+
assert messages_sent_before_disconnect > 0, "Should have published messages before disconnect"
1298+
1299+
# Verify we're receiving messages
1300+
msg = await subscription.next(timeout=1.0)
1301+
assert msg.data.startswith(b"message_")
1302+
1303+
# Shutdown server while publishing is active
1304+
await server.shutdown()
1305+
1306+
# Wait for disconnect to be detected
1307+
await asyncio.wait_for(disconnect_event.wait(), timeout=2.0)
1308+
assert disconnect_event.is_set()
1309+
1310+
# Start new server on same port
1311+
new_server = await run(port=server_port)
1312+
1313+
try:
1314+
# Wait for reconnection
1315+
await asyncio.wait_for(reconnect_event.wait(), timeout=5.0)
1316+
assert reconnect_event.is_set()
1317+
1318+
# Give time for publishing to resume
1319+
await asyncio.sleep(0.3)
1320+
1321+
# Verify publishing resumed after reconnection
1322+
assert messages_sent_after_reconnect > 0, "Should have published messages after reconnect"
1323+
1324+
# Verify we can receive messages after reconnection
1325+
await client.flush()
1326+
msg = await subscription.next(timeout=2.0)
1327+
assert msg.data.startswith(b"message_")
1328+
1329+
finally:
1330+
await new_server.shutdown()
1331+
1332+
finally:
1333+
# Stop publishing task
1334+
publish_task_running = False
1335+
await publish_task
1336+
await client.close()
1337+
1338+
1339+
@pytest.mark.asyncio
1340+
async def test_reconnect_with_high_volume_publishing():
1341+
"""Test reconnection behavior under high message volume.
1342+
1343+
This test verifies that the client can handle reconnection even when
1344+
publishing a large number of messages rapidly, ensuring buffering and
1345+
flow control work correctly across reconnection boundaries.
1346+
"""
1347+
# Start initial server
1348+
server = await run(port=0)
1349+
server_port = server.port
1350+
1351+
# Events to track lifecycle
1352+
disconnect_event = asyncio.Event()
1353+
reconnect_event = asyncio.Event()
1354+
1355+
# Connect client
1356+
client = await connect(
1357+
server.client_url,
1358+
timeout=1.0,
1359+
allow_reconnect=True,
1360+
reconnect_time_wait=0.1,
1361+
reconnect_max_attempts=100,
1362+
)
1363+
1364+
client.add_disconnected_callback(disconnect_event.set)
1365+
client.add_reconnected_callback(reconnect_event.set)
1366+
1367+
test_subject = f"test.reconnect.highvolume.{uuid.uuid4()}"
1368+
subscription = await client.subscribe(test_subject)
1369+
await client.flush()
1370+
1371+
# Track successful publishes
1372+
successful_publishes = 0
1373+
publish_task_running = True
1374+
1375+
async def publish_high_volume():
1376+
"""Publish messages rapidly - will block during reconnection."""
1377+
nonlocal successful_publishes
1378+
counter = 0
1379+
1380+
while publish_task_running:
1381+
# Publish rapidly - may block during reconnection
1382+
await client.publish(test_subject, f"msg_{counter}".encode())
1383+
successful_publishes += 1
1384+
counter += 1
1385+
# Small sleep every N messages to prevent overwhelming
1386+
if counter % 50 == 0:
1387+
await asyncio.sleep(0.01)
1388+
1389+
# Start high-volume publishing
1390+
publish_task = asyncio.create_task(publish_high_volume())
1391+
1392+
try:
1393+
# Let messages accumulate
1394+
await asyncio.sleep(0.2)
1395+
publishes_before = successful_publishes
1396+
assert publishes_before > 50, f"Should have published many messages, got {publishes_before}"
1397+
1398+
# Trigger disconnect during heavy load
1399+
await server.shutdown()
1400+
await asyncio.wait_for(disconnect_event.wait(), timeout=2.0)
1401+
1402+
# Restart server
1403+
new_server = await run(port=server_port)
1404+
1405+
try:
1406+
# Wait for reconnection
1407+
await asyncio.wait_for(reconnect_event.wait(), timeout=5.0)
1408+
1409+
# Let publishing resume
1410+
await asyncio.sleep(0.2)
1411+
publishes_after = successful_publishes
1412+
1413+
# Verify publishing continued after reconnection
1414+
assert publishes_after > publishes_before, (
1415+
f"Publishing should resume after reconnect: before={publishes_before}, after={publishes_after}"
1416+
)
1417+
1418+
# Verify we can still receive messages
1419+
await client.flush()
1420+
msg = await subscription.next(timeout=2.0)
1421+
assert msg.data.startswith(b"msg_")
1422+
1423+
finally:
1424+
await new_server.shutdown()
1425+
1426+
finally:
1427+
publish_task_running = False
1428+
await publish_task
1429+
await client.close()
1430+
1431+
1432+
@pytest.mark.asyncio
1433+
async def test_reconnect_with_multiple_concurrent_publishers():
1434+
"""Test reconnection with multiple publishing tasks running concurrently.
1435+
1436+
This simulates a realistic scenario where multiple application components
1437+
are publishing to different subjects simultaneously when a reconnection occurs.
1438+
"""
1439+
# Start initial server
1440+
server = await run(port=0)
1441+
server_port = server.port
1442+
1443+
# Events
1444+
disconnect_event = asyncio.Event()
1445+
reconnect_event = asyncio.Event()
1446+
1447+
# Connect client
1448+
client = await connect(
1449+
server.client_url,
1450+
timeout=1.0,
1451+
allow_reconnect=True,
1452+
reconnect_time_wait=0.1,
1453+
reconnect_max_attempts=100,
1454+
)
1455+
1456+
client.add_disconnected_callback(disconnect_event.set)
1457+
client.add_reconnected_callback(reconnect_event.set)
1458+
1459+
# Create multiple subjects and subscriptions
1460+
num_subjects = 5
1461+
subjects = [f"test.subject.{i}.{uuid.uuid4()}" for i in range(num_subjects)]
1462+
subscriptions = []
1463+
1464+
for subject in subjects:
1465+
sub = await client.subscribe(subject)
1466+
subscriptions.append(sub)
1467+
await client.flush()
1468+
1469+
# Track publishes per subject
1470+
publish_counts = {subject: 0 for subject in subjects}
1471+
publish_lock = asyncio.Lock()
1472+
tasks_running = True
1473+
1474+
async def publish_to_subject(subject: str):
1475+
"""Publish continuously to a specific subject."""
1476+
counter = 0
1477+
while tasks_running:
1478+
await client.publish(subject, f"{subject}_msg_{counter}".encode())
1479+
async with publish_lock:
1480+
publish_counts[subject] += 1
1481+
counter += 1
1482+
await asyncio.sleep(0.02)
1483+
1484+
# Start multiple publishing tasks
1485+
publish_tasks = [asyncio.create_task(publish_to_subject(subject)) for subject in subjects]
1486+
1487+
try:
1488+
# Let all publishers run
1489+
await asyncio.sleep(0.3)
1490+
1491+
# Verify all subjects are being published to
1492+
async with publish_lock:
1493+
for subject, count in publish_counts.items():
1494+
assert count > 0, f"Subject {subject} should have messages"
1495+
1496+
# Trigger disconnect
1497+
await server.shutdown()
1498+
await asyncio.wait_for(disconnect_event.wait(), timeout=2.0)
1499+
1500+
# Restart server
1501+
new_server = await run(port=server_port)
1502+
1503+
try:
1504+
# Wait for reconnection
1505+
await asyncio.wait_for(reconnect_event.wait(), timeout=5.0)
1506+
1507+
# Let publishing resume
1508+
await asyncio.sleep(0.3)
1509+
1510+
# Verify all subjects resume publishing
1511+
async with publish_lock:
1512+
counts_before = dict(publish_counts)
1513+
1514+
await asyncio.sleep(0.2)
1515+
1516+
async with publish_lock:
1517+
counts_after = dict(publish_counts)
1518+
1519+
for subject in subjects:
1520+
assert counts_after[subject] > counts_before[subject], (
1521+
f"Subject {subject} should continue publishing after reconnect"
1522+
)
1523+
1524+
# Verify we can receive on all subscriptions
1525+
await client.flush()
1526+
for i, subscription in enumerate(subscriptions):
1527+
msg = await subscription.next(timeout=2.0)
1528+
assert subjects[i].encode() in msg.data
1529+
1530+
finally:
1531+
await new_server.shutdown()
1532+
1533+
finally:
1534+
tasks_running = False
1535+
await asyncio.gather(*publish_tasks)
1536+
await client.close()

0 commit comments

Comments
 (0)