Skip to content

Commit 614060c

Browse files
committed
Add examples
Signed-off-by: Casper Beyer <[email protected]>
1 parent 5b02ffe commit 614060c

File tree

6 files changed

+831
-0
lines changed

6 files changed

+831
-0
lines changed

nats-client/examples/nats-echo.py

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
#!/usr/bin/env python3
2+
"""NATS Echo Service Example.
3+
4+
Implements an echo service that replies to requests with the same message content.
5+
Also provides a status endpoint that returns service information.
6+
7+
Usage:
8+
python nats-echo.py [-s server] [-creds file] [-nkey file] [-t] [-id service_id] <subject>
9+
10+
Examples:
11+
python nats-echo.py echo
12+
python nats-echo.py -s nats://demo.nats.io:4222 echo
13+
python nats-echo.py -id my-echo-1 -t echo
14+
"""
15+
16+
import argparse
17+
import asyncio
18+
import json
19+
import platform
20+
import signal
21+
import sys
22+
from datetime import datetime
23+
24+
from nats.client import connect
25+
26+
# Global flag for graceful shutdown
27+
shutdown_event = asyncio.Event()
28+
29+
30+
def signal_handler(sig, frame):
31+
"""Handle interrupt signal for graceful shutdown."""
32+
print("\nShutting down...")
33+
shutdown_event.set()
34+
35+
36+
async def main():
37+
"""Run the echo service."""
38+
parser = argparse.ArgumentParser(
39+
description="NATS Echo Service",
40+
formatter_class=argparse.RawDescriptionHelpFormatter,
41+
)
42+
parser.add_argument(
43+
"-s",
44+
"--server",
45+
default="nats://localhost:4222",
46+
help="NATS server URL (default: nats://localhost:4222)",
47+
)
48+
parser.add_argument(
49+
"-creds",
50+
"--credentials",
51+
help="User credentials file",
52+
)
53+
parser.add_argument(
54+
"-nkey",
55+
"--nkey",
56+
help="NKey seed file",
57+
)
58+
parser.add_argument(
59+
"-t",
60+
"--timestamp",
61+
action="store_true",
62+
help="Display timestamps",
63+
)
64+
parser.add_argument(
65+
"-id",
66+
"--service-id",
67+
default="nats-echo",
68+
help="Service identifier (default: nats-echo)",
69+
)
70+
parser.add_argument(
71+
"subject",
72+
help="Subject to listen on for echo requests",
73+
)
74+
75+
args = parser.parse_args()
76+
77+
# Load credentials if provided
78+
token = None
79+
user = None
80+
password = None
81+
nkey_seed = None
82+
83+
if args.credentials:
84+
with open(args.credentials) as f:
85+
token = f.read().strip()
86+
87+
if args.nkey:
88+
with open(args.nkey) as f:
89+
nkey_seed = f.read().strip()
90+
91+
# Setup signal handler
92+
signal.signal(signal.SIGINT, signal_handler)
93+
signal.signal(signal.SIGTERM, signal_handler)
94+
95+
# Service info
96+
service_info = {
97+
"id": args.service_id,
98+
"subject": args.subject,
99+
"platform": platform.platform(),
100+
"python_version": platform.python_version(),
101+
}
102+
103+
try:
104+
# Connect to NATS
105+
client = await connect(
106+
args.server,
107+
token=token,
108+
user=user,
109+
password=password,
110+
nkey_seed=nkey_seed,
111+
)
112+
113+
print(f"Echo service '{args.service_id}' listening on [{args.subject}]")
114+
print(f"Status available on [{args.subject}.status]")
115+
116+
# Subscribe to the echo subject (with queue group for load balancing)
117+
echo_subscription = await client.subscribe(args.subject, queue_group="echo-service")
118+
119+
# Subscribe to the status subject (without queue group, all instances respond)
120+
status_subject = f"{args.subject}.status"
121+
status_subscription = await client.subscribe(status_subject)
122+
123+
# Message counters
124+
echo_count = 0
125+
status_count = 0
126+
127+
async def handle_echo():
128+
"""Handle echo requests."""
129+
nonlocal echo_count
130+
async with echo_subscription:
131+
while not shutdown_event.is_set():
132+
try:
133+
msg = await asyncio.wait_for(echo_subscription.next(), timeout=0.5)
134+
echo_count += 1
135+
136+
if args.timestamp:
137+
timestamp = datetime.now().strftime("%H:%M:%S")
138+
print(f"[#{echo_count} {timestamp}] Echo request: {msg.data.decode()}")
139+
else:
140+
print(f"[#{echo_count}] Echo request: {msg.data.decode()}")
141+
142+
# Echo back the message
143+
if msg.reply_to:
144+
await client.publish(msg.reply_to, msg.data)
145+
146+
except asyncio.TimeoutError:
147+
continue
148+
except Exception as e:
149+
print(f"Error handling echo request: {e}", file=sys.stderr)
150+
break
151+
152+
async def handle_status():
153+
"""Handle status requests."""
154+
nonlocal status_count
155+
async with status_subscription:
156+
while not shutdown_event.is_set():
157+
try:
158+
msg = await asyncio.wait_for(status_subscription.next(), timeout=0.5)
159+
status_count += 1
160+
161+
if args.timestamp:
162+
timestamp = datetime.now().strftime("%H:%M:%S")
163+
print(f"[#{status_count} {timestamp}] Status request")
164+
else:
165+
print(f"[#{status_count}] Status request")
166+
167+
# Send status information
168+
if msg.reply_to:
169+
status_response = {
170+
**service_info,
171+
"echo_count": echo_count,
172+
"status_count": status_count,
173+
}
174+
await client.publish(msg.reply_to, json.dumps(status_response).encode())
175+
176+
except asyncio.TimeoutError:
177+
continue
178+
except Exception as e:
179+
print(f"Error handling status request: {e}", file=sys.stderr)
180+
break
181+
182+
# Run both handlers concurrently
183+
await asyncio.gather(
184+
handle_echo(),
185+
handle_status(),
186+
)
187+
188+
# Close the connection
189+
await client.close()
190+
print("Echo service stopped")
191+
192+
except Exception as e:
193+
print(f"Error: {e}", file=sys.stderr)
194+
sys.exit(1)
195+
196+
197+
if __name__ == "__main__":
198+
asyncio.run(main())

nats-client/examples/nats-pub.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
#!/usr/bin/env python3
2+
"""NATS Publisher Example.
3+
4+
Publishes a message to a specified subject on a NATS server.
5+
6+
Usage:
7+
python nats-pub.py [-s server] [-creds file] [-nkey file] <subject> <msg>
8+
9+
Examples:
10+
python nats-pub.py hello "world"
11+
python nats-pub.py -s nats://demo.nats.io:4222 hello "world"
12+
python nats-pub.py -creds ~/.nats/creds hello "world"
13+
"""
14+
15+
import argparse
16+
import asyncio
17+
import sys
18+
19+
from nats.client import connect
20+
21+
22+
async def main():
23+
"""Publish a message to NATS."""
24+
parser = argparse.ArgumentParser(
25+
description="NATS Publisher",
26+
formatter_class=argparse.RawDescriptionHelpFormatter,
27+
)
28+
parser.add_argument(
29+
"-s",
30+
"--server",
31+
default="nats://localhost:4222",
32+
help="NATS server URL (default: nats://localhost:4222)",
33+
)
34+
parser.add_argument(
35+
"-creds",
36+
"--credentials",
37+
help="User credentials file",
38+
)
39+
parser.add_argument(
40+
"-nkey",
41+
"--nkey",
42+
help="NKey seed file",
43+
)
44+
parser.add_argument(
45+
"subject",
46+
help="Subject to publish to",
47+
)
48+
parser.add_argument(
49+
"message",
50+
help="Message to publish",
51+
)
52+
53+
args = parser.parse_args()
54+
55+
# Load credentials if provided
56+
token = None
57+
user = None
58+
password = None
59+
nkey_seed = None
60+
61+
if args.credentials:
62+
# For simplicity, we'll just support token in credentials file
63+
# A full implementation would parse JWT credentials
64+
with open(args.credentials) as f:
65+
token = f.read().strip()
66+
67+
if args.nkey:
68+
with open(args.nkey) as f:
69+
nkey_seed = f.read().strip()
70+
71+
try:
72+
# Connect to NATS
73+
client = await connect(
74+
args.server,
75+
token=token,
76+
user=user,
77+
password=password,
78+
nkey_seed=nkey_seed,
79+
)
80+
81+
# Publish the message
82+
await client.publish(args.subject, args.message.encode())
83+
await client.flush()
84+
85+
print(f"Published [{args.subject}] : '{args.message}'")
86+
87+
# Close the connection
88+
await client.close()
89+
90+
except Exception as e:
91+
print(f"Error: {e}", file=sys.stderr)
92+
sys.exit(1)
93+
94+
95+
if __name__ == "__main__":
96+
asyncio.run(main())

0 commit comments

Comments
 (0)