Skip to content

Conversation

@jlamypoirier
Copy link
Collaborator

✨ Description

See #427

@jlamypoirier jlamypoirier changed the base branch from main to jlp_subtest January 7, 2026 04:16
assert stream_key == REDIS_DATA_STREAM.encode()
for msg_id, msg_data in msgs:
processed += 1
# TODO: or do it after processing all received messaged then count > 1?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need this for now if we stick with consumer groups only. The producer can rely on the group lag to control the production rate and on last-delivered-id to safely trim the stream.

backend="nccl",
init_method=init_method,
world_size=config.broadcast.external_world_size + 1,
rank=0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be configurable, since the external system may not treat us as rank 0.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also have control over the external system, we can make it so. It's easier to sync program with hard-coded values than syncing config files.

Copy link
Contributor

@bigximik bigximik Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe name it tests/models/test_streaming_training_callbacks.py or something similar?
Or perhaps it would be better under tests/engine/trainer/ or tests/trainer/, since this is testing trainer callbacks with a streaming dataset rather than model logic (with the exception of the tensor iterator).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be in test/models because it uses the model_configs machinery. I tend to prefer concise names, so test_streaming seems appropriate.

worker_resources: WorkerResources,
report_subtest,
):
report_subtest(path := run_test_script_base_path / config.name, config.total_gpus)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we need to check whether we have enough GPUs for a subtest; otherwise, it will incorrectly report the test as failed with “did not run.”

@pytest.mark.depends_on(on=["test_data_streaming"])
@pytest.mark.parametrize(("name", "num_gpus", "distributed_config_dict"), _DISTRIBUTED_TESTING_CONFIGS)
def test_data_streaming_distributed(result_path, name, num_gpus, distributed_config_dict, report_subtest):
report_subtest(path := result_path / f"data_streaming/{name}", num_gpus)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we need to check whether we have enough GPUs for a subtest; otherwise, it will incorrectly report the test as failed with “did not run.”


_abstract = False

acknowledge_interval: int = Field(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also not needed if we only use consumer gorups, see above comment for implementation.

Base automatically changed from jlp_subtest to main January 13, 2026 01:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants