Skip to content

Commit 62a06c2

Browse files
committed
fix: race condition bug for threading lock example
1 parent ddb10e1 commit 62a06c2

2 files changed

Lines changed: 41 additions & 18 deletions

File tree

plugins/acp/examples/reactive/buyer.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,25 @@ def job_worker():
102102
while True:
103103
job_event.wait() # Wait for job
104104

105-
job = safe_pop_job()
106-
while job:
107-
process_job(job)
105+
# Process all available jobs
106+
while True:
108107
job = safe_pop_job()
109-
110-
job_event.clear() # Go back to wait
108+
if not job:
109+
break
110+
try:
111+
process_job(job)
112+
except Exception as e:
113+
print(f"❌ Error processing job: {e}")
114+
# Continue processing other jobs even if one fails
115+
116+
# Clear event only after ensuring no jobs remain
117+
if use_thread_lock:
118+
with job_queue_lock:
119+
if not job_queue:
120+
job_event.clear()
121+
else:
122+
if not job_queue:
123+
job_event.clear()
111124

112125
# Event-triggered job task receiver
113126
def on_new_task(job: ACPJob):
@@ -200,7 +213,7 @@ def post_tweet(content: str, reasoning: str) -> Tuple[FunctionResultStatus, str,
200213
agent_goal="Finding the best meme to do tweet posting",
201214
agent_description=f"""
202215
Agent that gain market traction by posting meme. Your interest are in cats and AI.
203-
You can head to acp to look for agents to help you generating meme.
216+
You can head to acp to look for devrel_seller to help you generating meme.
204217
Do not look for a relevant validator to validate the deliverable.
205218
206219
{acp_plugin.agent_description}

plugins/acp/examples/reactive/seller.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from virtuals_acp import ACPJob, ACPJobPhase
99
from game_sdk.game.custom_types import Argument, Function, FunctionResultStatus
1010
from game_sdk.game.agent import Agent
11+
from collections import deque
1112
from rich import print, box
1213
from rich.panel import Panel
1314
from dotenv import load_dotenv
@@ -55,7 +56,7 @@ def seller(use_thread_lock: bool = True):
5556
return
5657

5758
# Thread-safe job queue setup
58-
job_queue = []
59+
job_queue = deque()
5960
job_queue_lock = threading.Lock()
6061
job_event = threading.Event()
6162

@@ -78,14 +79,14 @@ def safe_pop_job():
7879
with job_queue_lock:
7980
print("[pop] Lock acquired.")
8081
if job_queue:
81-
job = job_queue.pop(0)
82+
job = job_queue.popleft()
8283
print(f"[pop] Job popped: {job.id}")
8384
return job
8485
else:
8586
print("[pop] Queue is empty.")
8687
else:
8788
if job_queue:
88-
job = job_queue.pop(0)
89+
job = job_queue.popleft()
8990
print(f"[pop] Job popped (no lock): {job.id}")
9091
return job
9192
else:
@@ -94,19 +95,28 @@ def safe_pop_job():
9495

9596
# Background thread worker: process jobs one by one
9697
def job_worker():
97-
print("[worker] Job worker started, waiting for jobs.")
9898
while True:
9999
job_event.wait()
100-
print("[worker] job_event triggered.")
101100

102-
job = safe_pop_job()
103-
while job:
104-
print(f"[worker] Processing job {job.id}")
105-
process_job(job)
101+
# Process all available jobs
102+
while True:
106103
job = safe_pop_job()
107-
108-
job_event.clear()
109-
print("[worker] All jobs processed. Waiting again.")
104+
if not job:
105+
break
106+
try:
107+
process_job(job)
108+
except Exception as e:
109+
print(f"❌ Error processing job: {e}")
110+
# Continue processing other jobs even if one fails
111+
112+
# Clear event only after ensuring no jobs remain
113+
if use_thread_lock:
114+
with job_queue_lock:
115+
if not job_queue:
116+
job_event.clear()
117+
else:
118+
if not job_queue:
119+
job_event.clear()
110120

111121
# Event-triggered job task receiver
112122
def on_new_task(job: ACPJob):

0 commit comments

Comments
 (0)