Skip to content

Improve replay buffer incremental benchmark#447

Merged
k82cn merged 1 commit into
mainfrom
flm_445_replay_perf_readme
May 12, 2026
Merged

Improve replay buffer incremental benchmark#447
k82cn merged 1 commit into
mainfrom
flm_445_replay_perf_readme

Conversation

@k82cn
Copy link
Copy Markdown
Contributor

@k82cn k82cn commented May 12, 2026

Summary

  • make replay-buffer materialization apply only newly seen patch batches in the service-local deserializer
  • make merge defaults mode-aware: disabled for incremental runs, enabled every 5 iterations for forced-full reads
  • update the RFE445 design and replay-buffer README with benchmark commands, local/incremental/full comparison tables, and observed 500k-transition results
  • add focused replay-buffer unit tests for incremental materialization and merge policy resolution

Validation

  • cd examples/rl/replay_buffer && PYTHONPATH=../../../sdk/python/src ../../../sdk/python/.venv/bin/python -m pytest test_replay_buffer.py -q
  • cd examples/rl/replay_buffer && uv run -n ruff check main.py replay_buffer.py test_replay_buffer.py
  • cd examples/rl/replay_buffer && uv run -n ruff format --check main.py replay_buffer.py test_replay_buffer.py
  • cd examples/rl/replay_buffer && uv run -n main.py --help
  • git diff --check

Notes

  • tasks/ remains local task-tracking and is not included in this PR.

@k82cn k82cn marked this pull request as ready for review May 12, 2026 03:43
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements incremental object retrieval for the replay-buffer example, optimizing performance by applying only newly seen patch batches to a service-local materialized buffer. Changes include updates to the design documentation, CLI flags for benchmark evaluation, and the introduction of a stateful deserializer in the ReplayBuffer class. Review feedback correctly identifies that the new stateful implementation lacks synchronization, which could lead to race conditions in concurrent environments; adding a lock to protect the internal state during materialization is recommended.

Comment on lines +18 to +20
self._materialized_base = None
self._materialized_data: dict[str, Any] | None = None
self._materialized_patch_count = 0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The ReplayBuffer is now stateful to support incremental materialization, but it lacks synchronization. If this buffer is used within a service that handles concurrent requests (which is common in Flame services), multiple threads could execute _deserializer simultaneously, leading to race conditions and data corruption in the materialized buffer. I recommend adding a lock to protect the internal state.

Suggested change
self._materialized_base = None
self._materialized_data: dict[str, Any] | None = None
self._materialized_patch_count = 0
import threading
self._lock = threading.Lock()
self._materialized_base = None
self._materialized_data: dict[str, Any] | None = None
self._materialized_patch_count = 0

Comment on lines 22 to +40
def _deserializer(self, base: dict, deltas: List) -> dict:
transitions = list(base.get("transitions", []))
for delta in deltas:
transitions.extend(delta)
return {
"transitions": transitions,
"total_added": base.get("total_added", 0) + sum(len(d) for d in deltas),
}
if (
self._materialized_data is None
or self._materialized_base is not base
or self._materialized_patch_count > len(deltas)
):
self._materialized_base = base
self._materialized_data = {
"transitions": list(base.get("transitions", [])),
"total_added": base.get("total_added", 0),
}
self._materialized_patch_count = 0

for delta in deltas[self._materialized_patch_count :]:
self._materialized_data["transitions"].extend(delta)
self._materialized_data["total_added"] += len(delta)

self._materialized_patch_count = len(deltas)
return self._materialized_data
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

To ensure thread safety when updating the materialized buffer state, wrap the logic in a lock. This prevents multiple threads from concurrently extending the transitions list or updating the patch count, which could otherwise lead to duplicate data or inconsistent state.

Suggested change
def _deserializer(self, base: dict, deltas: List) -> dict:
transitions = list(base.get("transitions", []))
for delta in deltas:
transitions.extend(delta)
return {
"transitions": transitions,
"total_added": base.get("total_added", 0) + sum(len(d) for d in deltas),
}
if (
self._materialized_data is None
or self._materialized_base is not base
or self._materialized_patch_count > len(deltas)
):
self._materialized_base = base
self._materialized_data = {
"transitions": list(base.get("transitions", [])),
"total_added": base.get("total_added", 0),
}
self._materialized_patch_count = 0
for delta in deltas[self._materialized_patch_count :]:
self._materialized_data["transitions"].extend(delta)
self._materialized_data["total_added"] += len(delta)
self._materialized_patch_count = len(deltas)
return self._materialized_data
def _deserializer(self, base: dict, deltas: List) -> dict:
with self._lock:
if (
self._materialized_data is None
or self._materialized_base is not base
or self._materialized_patch_count > len(deltas)
):
self._materialized_base = base
self._materialized_data = {
"transitions": list(base.get("transitions", [])),
"total_added": base.get("total_added", 0),
}
self._materialized_patch_count = 0
for delta in deltas[self._materialized_patch_count :]:
self._materialized_data["transitions"].extend(delta)
self._materialized_data["total_added"] += len(delta)
self._materialized_patch_count = len(deltas)
return self._materialized_data

@k82cn k82cn force-pushed the flm_445_replay_perf_readme branch 3 times, most recently from 980ee6e to 563991f Compare May 12, 2026 03:55
@k82cn k82cn force-pushed the flm_445_replay_perf_readme branch from 563991f to 6e2ce65 Compare May 12, 2026 03:58
@k82cn k82cn merged commit 28adcfb into main May 12, 2026
11 checks passed
@k82cn k82cn deleted the flm_445_replay_perf_readme branch May 12, 2026 04:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant