Aggregate results of FISPACT runs to efficient datasets.
Note
This document is in progress.
The module loads FISPACT JSON output files and converts to Polars dataframes with minor data normalization. The dataframes can be stored either to parquet files or DuckDb database. This allows efficient data aggregation and analysis. Multiple JSON files for different FISPACT runs can be combined using simple additional identification. So far, we use just two-dimensional identification by material and case. The case usually identifies certain neutron flux energy distribution.
More details in documentation.
- export to DuckDB
- export to parquet files
- neutron flux presentation conversion
From PyPI
pip install xpypact
# or
uv pip install xpypactAs dependency
uv add xpypactFrom source
pip install htpps://github.com/MC-kit/xpypact.git
# or
uv pip install htpps://github.com/MC-kit/xpypact.gitfrom xpypact import FullDataCollector, Inventory
def get_material_id(p: Path) -> int:
...
def get_case_id(p: Path) -> int:
...
jsons = [path1, path2, ...]
material_ids = {p: get_material_id(p) for p in jsons }
case_ids = {c: get_case_id(p) for p in jsons }
collector = FullDataCollector()
if sequential_load:
for json in jsons:
inventory = Inventory.from_json(json)
collector.append(inventory, material_id=material_ids[json], case_id=case_ids[json])
else: # multithreading is allowed for collector as well
task_list = ... # list of tuples[directory, case_id, tasks_sequence]
threads = 16 # whatever
def _find_path(arg) -> tuple[int, int, Path]:
_case, path, inventory = arg
json_path: Path = (Path(path) / inventory).with_suffix(".json")
if not json_path.exists():
msg = f"Cannot find file {json_path}"
raise FindPathError(msg)
try:
material_id = int(inventory[_LEN_INVENTORY:])
case_str = json_path.parent.parts[-1]
case_id = int(case_str[_LEN_CASE:])
except (ValueError, IndexError) as x:
msg = f"Cannot define material_id and case_id from {json_path}"
raise FindPathError(msg) from x
if case_id != _case:
msg = f"Contradicting values of case_id in case path and database: {case_id} != {_case}"
raise FindPathError(msg)
return material_id, case_id, json_path
with futures.ThreadPoolExecutor(max_workers=threads) as executor:
mcp_futures = [
executor.submit(_find_path, arg)
for arg in (
(task_case[0], task_case[1], task)
for task_case in task_list
for task in task_case[2].split(",")
if task.startswith("inventory-")
)
]
mips = [x.result() for x in futures.as_completed(mcp_futures)]
mips.sort(key=lambda x: x[0:2]) # sort by material_id, case_id
def _load_json(arg) -> None:
collector, material_id, case_id, json_path = arg
collector.append(from_json(json_path.read_text(encoding="utf8")), material_id, case_id)
with futures.ThreadPoolExecutor(max_workers=threads) as executor:
executor.map(_load_json, ((collector, *mip) for mip in mips))
collected = collector.get_result()
# save to parquet files
collected.save_to_parquets(Path.cwd() / "parquets")
# or use DuckDB database
import from xpypact.dao save
import duckdb as db
con = db.connect()
save(con, collected)
gamma_from_db = con.sql(
"""
select
g, rate
from timestep_gamma
where material_id = 1 and case_id = 54 and time_step_number = 7
order by g
""",
).fetchall()Just follow ordinary practice:
Some specific: in development environment we use uv, just, ruff.
To setup development environment, run:
just install | reinstallTo build documentation, run:
just docs # - for local online docs rendering, while editing
just docs-build # - to build documentationTo release, run:
just bump [major|minor|patch] # - in `devel` branchThen merge devel to master (via Pull Request) and if all the checks are passed create Release. Manually.