-
-
Notifications
You must be signed in to change notification settings - Fork 213
[ENH] Reduce complexity of run_flow_on_task func
#1596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
35b0977
5a4a089
1a006fb
93aa877
331b4be
6771fb4
22f52a8
04a6e0f
d3460d0
1ec0302
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,7 +7,7 @@ | |
| from collections import OrderedDict | ||
| from functools import partial | ||
| from pathlib import Path | ||
| from typing import TYPE_CHECKING, Any | ||
| from typing import TYPE_CHECKING, Any, cast | ||
|
|
||
| import numpy as np | ||
| import pandas as pd | ||
|
|
@@ -54,6 +54,225 @@ | |
| ERROR_CODE = 512 | ||
|
|
||
|
|
||
| def _validate_flow_and_task_inputs( | ||
| flow: OpenMLFlow | OpenMLTask, | ||
| task: OpenMLTask | OpenMLFlow, | ||
| flow_tags: list[str] | None, | ||
| ) -> tuple[OpenMLFlow, OpenMLTask]: | ||
| """Validate and normalize inputs for flow and task execution. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| flow : OpenMLFlow or OpenMLTask | ||
| The flow object (may be swapped with task for backward compatibility). | ||
| task : OpenMLTask or OpenMLFlow | ||
| The task object (may be swapped with flow for backward compatibility). | ||
| flow_tags : List[str] or None | ||
| A list of tags that the flow should have at creation. | ||
|
|
||
| Returns | ||
| ------- | ||
| Tuple[OpenMLFlow, OpenMLTask] | ||
| The validated flow and task. | ||
|
|
||
| Raises | ||
| ------ | ||
| ValueError | ||
| If flow_tags is not a list or task is not published. | ||
| """ | ||
| if flow_tags is not None and not isinstance(flow_tags, list): | ||
| raise ValueError("flow_tags should be a list") | ||
|
|
||
| # TODO: At some point in the future do not allow for arguments in old order (changed 6-2018). | ||
| # Flexibility currently still allowed due to code-snippet in OpenML100 paper (3-2019). | ||
| if isinstance(flow, OpenMLTask) and isinstance(task, OpenMLFlow): | ||
| # We want to allow either order of argument (to avoid confusion). | ||
| warnings.warn( | ||
| "The old argument order (Flow, model) is deprecated and " | ||
| "will not be supported in the future. Please use the " | ||
| "order (model, Flow).", | ||
| DeprecationWarning, | ||
| stacklevel=3, | ||
| ) | ||
| task, flow = flow, task | ||
|
|
||
| if not isinstance(flow, OpenMLFlow): | ||
| raise TypeError("Flow must be OpenMLFlow after validation") | ||
|
|
||
| if not isinstance(task, OpenMLTask): | ||
| raise TypeError("Task must be OpenMLTask after validation") | ||
|
|
||
| if task.task_id is None: | ||
| raise ValueError("The task should be published at OpenML") | ||
|
|
||
| return flow, task | ||
|
|
||
|
|
||
| def _sync_flow_with_server( | ||
| flow: OpenMLFlow, | ||
| task: OpenMLTask, | ||
| *, | ||
| upload_flow: bool, | ||
| avoid_duplicate_runs: bool, | ||
| ) -> int | None: | ||
| """Synchronize flow with server and check if setup/task combination is already present. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| flow : OpenMLFlow | ||
| The flow to synchronize. | ||
| task : OpenMLTask | ||
| The task to check for duplicate runs. | ||
| upload_flow : bool | ||
| Whether to upload the flow if it doesn't exist. | ||
| avoid_duplicate_runs : bool | ||
| Whether to check for duplicate runs. | ||
|
|
||
| Returns | ||
| ------- | ||
| int or None | ||
| The flow_id if synced with server, None otherwise. | ||
|
|
||
| Raises | ||
| ------ | ||
| PyOpenMLError | ||
| If flow_id mismatch or flow doesn't exist when expected. | ||
| OpenMLRunsExistError | ||
| If duplicate runs exist and avoid_duplicate_runs is True. | ||
| """ | ||
| # We only need to sync with the server right now if we want to upload the flow, | ||
| # or ensure no duplicate runs exist. Otherwise it can be synced at upload time. | ||
| flow_id = None | ||
| if upload_flow or avoid_duplicate_runs: | ||
| flow_id = flow_exists(flow.name, flow.external_version) | ||
| if isinstance(flow.flow_id, int) and flow_id != flow.flow_id: | ||
| if flow_id is not False: | ||
| raise PyOpenMLError( | ||
| f"Local flow_id does not match server flow_id: '{flow.flow_id}' vs '{flow_id}'", | ||
| ) | ||
| raise PyOpenMLError( | ||
| "Flow does not exist on the server, but 'flow.flow_id' is not None." | ||
| ) | ||
| if upload_flow and flow_id is False: | ||
| flow.publish() | ||
| flow_id = flow.flow_id | ||
| elif flow_id: | ||
| flow_from_server = get_flow(flow_id) | ||
| _copy_server_fields(flow_from_server, flow) | ||
| if avoid_duplicate_runs: | ||
| flow_from_server.model = flow.model | ||
| setup_id = setup_exists(flow_from_server) | ||
| task_id = task.task_id | ||
| ids = run_exists(cast("int", task_id), setup_id) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expect
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agreed, but I didn't point this out cause I thought it was a requirement for
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you confirm it, in that case let's leave it as it is
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes , Even if I revert that during pre-commit it automatically changed
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok makes sense then
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, we leave it converted. Seems to be safer apparently |
||
| if ids: | ||
| error_message = ( | ||
| "One or more runs of this setup were already performed on the task." | ||
| ) | ||
| raise OpenMLRunsExistError(ids, error_message) | ||
| else: | ||
| # Flow does not exist on server and we do not want to upload it. | ||
| # No sync with the server happens. | ||
| flow_id = None | ||
|
|
||
| return flow_id | ||
|
|
||
|
|
||
| def _prepare_run_environment(flow: OpenMLFlow) -> tuple[list[str], list[str]]: | ||
| """Prepare run environment information and tags. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| flow : OpenMLFlow | ||
| The flow to get version information from. | ||
|
|
||
| Returns | ||
| ------- | ||
| Tuple[List[str], List[str]] | ||
| A tuple of (tags, run_environment). | ||
| """ | ||
| run_environment = flow.extension.get_version_information() | ||
| tags = ["openml-python", run_environment[1]] | ||
| return tags, run_environment | ||
|
|
||
|
|
||
| def _create_run_from_results( # noqa: PLR0913 | ||
| task: OpenMLTask, | ||
| flow: OpenMLFlow, | ||
| flow_id: int | None, | ||
| data_content: list[list], | ||
| trace: OpenMLRunTrace | None, | ||
| fold_evaluations: OrderedDict[str, OrderedDict], | ||
| sample_evaluations: OrderedDict[str, OrderedDict], | ||
| tags: list[str], | ||
| run_environment: list[str], | ||
| upload_flow: bool, | ||
| avoid_duplicate_runs: bool, | ||
| ) -> OpenMLRun: | ||
| """Create an OpenMLRun object from execution results. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| task : OpenMLTask | ||
| The task that was executed. | ||
| flow : OpenMLFlow | ||
| The flow that was executed. | ||
| flow_id : int or None | ||
| The flow ID if synced with server. | ||
| data_content : List[List] | ||
| The prediction data content. | ||
| trace : OpenMLRunTrace or None | ||
| The execution trace if available. | ||
| fold_evaluations : OrderedDict | ||
| The fold-based evaluation measures. | ||
| sample_evaluations : OrderedDict | ||
| The sample-based evaluation measures. | ||
| tags : List[str] | ||
| Tags to attach to the run. | ||
| run_environment : List[str] | ||
| Environment information. | ||
| upload_flow : bool | ||
| Whether the flow was uploaded. | ||
| avoid_duplicate_runs : bool | ||
| Whether duplicate runs were checked. | ||
|
|
||
| Returns | ||
| ------- | ||
| OpenMLRun | ||
| The created run object. | ||
| """ | ||
| dataset = task.get_dataset() | ||
| fields = [*run_environment, time.strftime("%c"), "Created by run_flow_on_task"] | ||
| generated_description = "\n".join(fields) | ||
|
|
||
| run = OpenMLRun( | ||
| task_id=cast("int", task.task_id), | ||
| flow_id=flow_id, | ||
| dataset_id=dataset.dataset_id, | ||
| model=flow.model, | ||
| flow_name=flow.name, | ||
| tags=tags, | ||
| trace=trace, | ||
| data_content=data_content, | ||
| flow=flow, | ||
| setup_string=flow.extension.create_setup_string(flow.model), | ||
| description_text=generated_description, | ||
| ) | ||
|
|
||
| if (upload_flow or avoid_duplicate_runs) and flow.flow_id is not None: | ||
| # We only extract the parameter settings if a sync happened with the server. | ||
| # I.e. when the flow was uploaded or we found it in the avoid_duplicate check. | ||
| # Otherwise, we will do this at upload time. | ||
| run.parameter_settings = flow.extension.obtain_parameter_values(flow) | ||
|
|
||
| # now we need to attach the detailed evaluations | ||
| if task.task_type_id == TaskType.LEARNING_CURVE: | ||
| run.sample_evaluations = sample_evaluations | ||
| else: | ||
| run.fold_evaluations = fold_evaluations | ||
|
|
||
| return run | ||
|
|
||
|
|
||
| # TODO(eddiebergman): Could potentially overload this but | ||
| # it seems very big to do so | ||
| def run_model_on_task( # noqa: PLR0913 | ||
|
|
@@ -175,7 +394,7 @@ def get_task_and_type_conversion(_task: int | str | OpenMLTask) -> OpenMLTask: | |
| return run | ||
|
|
||
|
|
||
| def run_flow_on_task( # noqa: C901, PLR0912, PLR0915, PLR0913 | ||
| def run_flow_on_task( # noqa: PLR0913 | ||
| flow: OpenMLFlow, | ||
| task: OpenMLTask, | ||
| avoid_duplicate_runs: bool | None = None, | ||
|
|
@@ -222,116 +441,61 @@ def run_flow_on_task( # noqa: C901, PLR0912, PLR0915, PLR0913 | |
| run : OpenMLRun | ||
| Result of the run. | ||
| """ | ||
| if flow_tags is not None and not isinstance(flow_tags, list): | ||
| raise ValueError("flow_tags should be a list") | ||
|
|
||
| if avoid_duplicate_runs is None: | ||
| avoid_duplicate_runs = openml.config.avoid_duplicate_runs | ||
|
|
||
| # TODO: At some point in the future do not allow for arguments in old order (changed 6-2018). | ||
| # Flexibility currently still allowed due to code-snippet in OpenML100 paper (3-2019). | ||
| if isinstance(flow, OpenMLTask) and isinstance(task, OpenMLFlow): | ||
| # We want to allow either order of argument (to avoid confusion). | ||
| warnings.warn( | ||
| "The old argument order (Flow, model) is deprecated and " | ||
| "will not be supported in the future. Please use the " | ||
| "order (model, Flow).", | ||
| DeprecationWarning, | ||
| stacklevel=2, | ||
| ) | ||
| task, flow = flow, task | ||
|
|
||
| if task.task_id is None: | ||
| raise ValueError("The task should be published at OpenML") | ||
| # 1. Validate inputs | ||
| flow, task = _validate_flow_and_task_inputs(flow, task, flow_tags) | ||
|
|
||
| # 2. Prepare the model | ||
| if flow.model is None: | ||
| flow.model = flow.extension.flow_to_model(flow) | ||
|
|
||
| flow.model = flow.extension.seed_model(flow.model, seed=seed) | ||
|
|
||
| # We only need to sync with the server right now if we want to upload the flow, | ||
| # or ensure no duplicate runs exist. Otherwise it can be synced at upload time. | ||
| flow_id = None | ||
| if upload_flow or avoid_duplicate_runs: | ||
| flow_id = flow_exists(flow.name, flow.external_version) | ||
| if isinstance(flow.flow_id, int) and flow_id != flow.flow_id: | ||
| if flow_id is not False: | ||
| raise PyOpenMLError( | ||
| f"Local flow_id does not match server flow_id: '{flow.flow_id}' vs '{flow_id}'", | ||
| ) | ||
| raise PyOpenMLError( | ||
| "Flow does not exist on the server, but 'flow.flow_id' is not None." | ||
| ) | ||
| if upload_flow and flow_id is False: | ||
| flow.publish() | ||
| flow_id = flow.flow_id | ||
| elif flow_id: | ||
| flow_from_server = get_flow(flow_id) | ||
| _copy_server_fields(flow_from_server, flow) | ||
| if avoid_duplicate_runs: | ||
| flow_from_server.model = flow.model | ||
| setup_id = setup_exists(flow_from_server) | ||
| ids = run_exists(task.task_id, setup_id) | ||
| if ids: | ||
| error_message = ( | ||
| "One or more runs of this setup were already performed on the task." | ||
| ) | ||
| raise OpenMLRunsExistError(ids, error_message) | ||
| else: | ||
| # Flow does not exist on server and we do not want to upload it. | ||
| # No sync with the server happens. | ||
| flow_id = None | ||
|
|
||
| dataset = task.get_dataset() | ||
| # 3. Sync with server and check for duplicates | ||
| flow_id = _sync_flow_with_server( | ||
| flow, | ||
| task, | ||
| upload_flow=upload_flow, | ||
| avoid_duplicate_runs=avoid_duplicate_runs, | ||
| ) | ||
|
|
||
| run_environment = flow.extension.get_version_information() | ||
| tags = ["openml-python", run_environment[1]] | ||
| # 4. Prepare run environment | ||
| tags, run_environment = _prepare_run_environment(flow) | ||
|
|
||
| # 5. Check if model is already fitted | ||
| if flow.extension.check_if_model_fitted(flow.model): | ||
| warnings.warn( | ||
| "The model is already fitted! This might cause inconsistency in comparison of results.", | ||
| RuntimeWarning, | ||
| stacklevel=2, | ||
| ) | ||
|
|
||
| # execute the run | ||
| res = _run_task_get_arffcontent( | ||
| # 6. Execute the run (parallel processing happens here) | ||
| data_content, trace, fold_evaluations, sample_evaluations = _run_task_get_arffcontent( | ||
| model=flow.model, | ||
| task=task, | ||
| extension=flow.extension, | ||
| add_local_measures=add_local_measures, | ||
| n_jobs=n_jobs, | ||
| ) | ||
|
|
||
| data_content, trace, fold_evaluations, sample_evaluations = res | ||
| fields = [*run_environment, time.strftime("%c"), "Created by run_flow_on_task"] | ||
| generated_description = "\n".join(fields) | ||
| run = OpenMLRun( | ||
| task_id=task.task_id, | ||
| # 7. Create run from results | ||
| run = _create_run_from_results( | ||
| task=task, | ||
| flow=flow, | ||
| flow_id=flow_id, | ||
| dataset_id=dataset.dataset_id, | ||
| model=flow.model, | ||
| flow_name=flow.name, | ||
| tags=tags, | ||
| trace=trace, | ||
| data_content=data_content, | ||
| flow=flow, | ||
| setup_string=flow.extension.create_setup_string(flow.model), | ||
| description_text=generated_description, | ||
| trace=trace, | ||
| fold_evaluations=fold_evaluations, | ||
| sample_evaluations=sample_evaluations, | ||
| tags=tags, | ||
| run_environment=run_environment, | ||
| upload_flow=upload_flow, | ||
| avoid_duplicate_runs=avoid_duplicate_runs, | ||
| ) | ||
|
|
||
| if (upload_flow or avoid_duplicate_runs) and flow.flow_id is not None: | ||
| # We only extract the parameter settings if a sync happened with the server. | ||
| # I.e. when the flow was uploaded or we found it in the avoid_duplicate check. | ||
| # Otherwise, we will do this at upload time. | ||
| run.parameter_settings = flow.extension.obtain_parameter_values(flow) | ||
|
|
||
| # now we need to attach the detailed evaluations | ||
| if task.task_type_id == TaskType.LEARNING_CURVE: | ||
| run.sample_evaluations = sample_evaluations | ||
| else: | ||
| run.fold_evaluations = fold_evaluations | ||
|
|
||
| # 8. Log completion message | ||
| if flow_id: | ||
| message = f"Executed Task {task.task_id} with Flow id:{run.flow_id}" | ||
| else: | ||
Omswastik-11 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.