-
Notifications
You must be signed in to change notification settings - Fork 39
[Prototype] Simplified PipelineRL #428
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…M into denis/new_datasets
…enis/new_datasets
fast_llm/data/dataset/streaming.py
Outdated
| assert stream_key == REDIS_DATA_STREAM.encode() | ||
| for msg_id, msg_data in msgs: | ||
| processed += 1 | ||
| # TODO: or do it after processing all received messaged then count > 1? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not need this for now if we stick with consumer groups only. The producer can rely on the group lag to control the production rate and on last-delivered-id to safely trim the stream.
| backend="nccl", | ||
| init_method=init_method, | ||
| world_size=config.broadcast.external_world_size + 1, | ||
| rank=0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should be configurable, since the external system may not treat us as rank 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also have control over the external system, we can make it so. It's easier to sync program with hard-coded values than syncing config files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe name it tests/models/test_streaming_training_callbacks.py or something similar?
Or perhaps it would be better under tests/engine/trainer/ or tests/trainer/, since this is testing trainer callbacks with a streaming dataset rather than model logic (with the exception of the tensor iterator).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be in test/models because it uses the model_configs machinery. I tend to prefer concise names, so test_streaming seems appropriate.
| worker_resources: WorkerResources, | ||
| report_subtest, | ||
| ): | ||
| report_subtest(path := run_test_script_base_path / config.name, config.total_gpus) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we need to check whether we have enough GPUs for a subtest; otherwise, it will incorrectly report the test as failed with “did not run.”
| @pytest.mark.depends_on(on=["test_data_streaming"]) | ||
| @pytest.mark.parametrize(("name", "num_gpus", "distributed_config_dict"), _DISTRIBUTED_TESTING_CONFIGS) | ||
| def test_data_streaming_distributed(result_path, name, num_gpus, distributed_config_dict, report_subtest): | ||
| report_subtest(path := result_path / f"data_streaming/{name}", num_gpus) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we need to check whether we have enough GPUs for a subtest; otherwise, it will incorrectly report the test as failed with “did not run.”
fast_llm/data/dataset/config.py
Outdated
|
|
||
| _abstract = False | ||
|
|
||
| acknowledge_interval: int = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also not needed if we only use consumer gorups, see above comment for implementation.
✨ Description
See #427