Skip to content

Conversation

@ptigas
Copy link

@ptigas ptigas commented Aug 1, 2025

Summary

Include a summary of major changes in bullet points:

  • Added Prefect workflow manager integration for jobflow
  • Implemented PrefectManager class with flow conversion and deployment capabilities
  • Added support for converting jobflow Jobs and Flows to Prefect workflows
  • Created example demonstrating Prefect integration patterns
  • Added reference resolution system for handling job dependencies in Prefect
  • Implemented error handling and parallel execution support

Additional dependencies introduced (if any)

  • prefect: Core Prefect workflow orchestration library (≥3.0.0) - Required for the new Prefect manager functionality to convert and execute
    jobflow workflows as Prefect flows. This enables users to leverage Prefect's distributed execution, monitoring, and deployment capabilities.
  • pytest: Testing framework - Added as a main dependency (was previously only in test dependencies) to support the new Prefect manager tests
    and examples.

Note: Updated minimum Python version requirement from 3.10 to 3.11 to align with Prefect 3.x requirements.

TODO (if any)

  • Documentation updates needed for the new Prefect integration
  • Consider adding more advanced Prefect features like retries and timeouts configuration
  • May need performance optimization for large workflows with many dependencies

Checklist

Work-in-progress pull requests are encouraged, but please put [WIP] in the pull request title.

Before a pull request can be merged, the following items must be checked:

Note that the CI system will run all the above checks. But it will be much more efficient if you already fix most errors prior to submitting
the PR. It is highly recommended that you use the pre-commit hook provided in the repository. Simply pip install pre-commit and then
pre-commit install and a check will be run prior to allowing commits.

@ptigas ptigas changed the title [WIP] Added prefect manager support [WIP] Add prefect manager support Aug 1, 2025
@utf
Copy link
Member

utf commented Aug 1, 2025

Wow!

Thanks for this contribution @ptigas, this is a major addition and something I know many users will be interested in. I'm familiar with Prefect but haven't used it before so I'd appreciate feedback from others. Potentially @Andrew-S-Rosen might have comments.

At first glance the code looks very clean and thank you for adding rigorous tests. This has reminded me I'll need to update the CI to drop Python 3.10 support and add 3.13.

One major comment up-front - one feature of jobflow is support for dynamic flows through the Response object. Do you think it will be possible to add support for this with Prefect? To be clear, this isn't a roadblock to getting this merged but I'd like to understand more of the scope of what you want to achieve. For example, a number of the atomate2 workflows use dynamic flows (e.g. the elastic constant workflow).

If it's helpful, I'm happy to arrange a call to discuss.

Copy link
Member

@Andrew-S-Rosen Andrew-S-Rosen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, this is looking to be very nicely done! I was really impressed to see how you handled the .submit() vs. serial logic. That is slick.

I would absolutely be interested in reviewing this further, but I probably won't have a chance to take a look until the middle of next week. One comment in the meantime while you continue working on the PR.

I think this PR saves me a lot of work I was planning on doing, so I may owe you... 😅

def run_on_prefect(
jobflow_obj: Union["jobflow.Flow", "jobflow.Job", List["jobflow.Job"]],
store: Optional["jobflow.JobStore"] = None,
task_runner: str = "concurrent",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful if this could take a str | TaskRunner as the input. For instance, if I want to use the DaskTaskRunner, which is the main way to interact with Slurm, then it would not be possible as-is.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Andrew-S-Rosen, I've updated to take TaskRunner as an argument. I've never used DaskTaskRunner before, so I might need some help testing that part.

@ptigas
Copy link
Author

ptigas commented Aug 5, 2025

@utf @Andrew-S-Rosen thank you for the supportive comments :)

One major comment up-front - one feature of jobflow is support for dynamic flows through the Response object. Do you think it will be possible to add support for this with Prefect? To be clear, this isn't a roadblock to getting this merged, but I'd like to understand more of the scope of what you want to achieve. For example, a number of the atomate2 workflows use dynamic flows (e.g. the elastic constant workflow).

I've updated the code to support dynamic flows. I've also added an example and tested it in a prefect cluster, and it seems to be working alright.

Also, I'd be happy to arrange a call to discuss the workflow I'm after in a bit more detail (interested in running atomate2 workflows in prefect, currently testing this and I'll report back or in a call about results).

@ptigas ptigas requested a review from Andrew-S-Rosen August 7, 2025 14:53
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this file from the PR. Thanks!

Comment on lines +37 to 40
"prefect",
"pytest",
"prefect-docker>=0.6.6",
]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid adding these to the core dependency stack. Having them in a dedicated prefect optional dependency section like you have done is okay.

If you want them to be tested in CI, you can include them in strict.

tests = ["moto==5.1.10", "pytest-cov==6.2.1", "pytest==8.4.1"]
vis = ["matplotlib", "pydot"]
fireworks = ["FireWorks"]
prefect = ["prefect>=3.0.0", "prefect-docker"]
Copy link
Member

@Andrew-S-Rosen Andrew-S-Rosen Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the needed to have prefect-docker here? That should not be needed by all users, such as those using Prefect Cloud. I think we can just have prefect here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is better suited for documentation rather than as a module within the codebase.

@Andrew-S-Rosen
Copy link
Member

Andrew-S-Rosen commented Aug 18, 2025

Hi @ptigas: Thank you for your patience. Unfortunately, I remain extremely busy to review this in the depth it deserves. I will have a software engineer working with my group in September who is going to focus on Prefect + Jobflow, so we can almost certainly test things out in detail then. Until then, I have left a few minor comments in the meantime.

I am happy to chat if you think it would be helpful. Just let me know.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants