Skip to content

Commit 58e3734

Browse files
committed
Improve .material.report.run_reporting.Config
- Rename ReporterConfig → Config, move class to its own submodule - Use stdlib dataclasses instead of pydantic. - load_config() wraps Config.from_files(). - Move .reporter_utils.create_var_map_from_yaml_dict() → Config.use_vars_dict() - Move .run_reporting.create_agg_var_map_from_yaml() → Config.use_aggregates_dict(). - Add missing material/report/__init__.py. - Remove pydantic from optional dependencies.
1 parent 20ba0e5 commit 58e3734

File tree

5 files changed

+235
-164
lines changed

5 files changed

+235
-164
lines changed

message_ix_models/model/material/report/__init__.py

Whitespace-only changes.
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
from dataclasses import dataclass, field
2+
from itertools import count, product
3+
from typing import Literal
4+
5+
import pandas as pd
6+
from ixmp.report.common import RENAME_DIMS
7+
8+
from message_ix_models.util import package_data_path
9+
10+
11+
@dataclass
12+
class Config:
13+
"""Configuration for reporting of a subset of material data."""
14+
15+
#: Prefix or initial fragment of IAMC ‘variable’ name.
16+
iamc_prefix: str
17+
18+
#: Units of measure for the reported data.
19+
unit: Literal["Mt/yr", "GWa", "Mt CH4/yr", "GW"]
20+
21+
#: :mod:`message_ix.report` key from which to retrieve the data.
22+
var: Literal["out", "in", "ACT", "emi", "CAP"]
23+
24+
#: Data frame with:
25+
#:
26+
#: - MultiIndex levels including 1 or more of :math:`(c, l, m, t)`.
27+
#: - 3 columns:
28+
#: - "iamc_name": a (fragment of) an IAMC ‘variable’ name. This is appended to
29+
#: to :attr:`iamc_prefix` to construct a complete name.
30+
#: - "short_name": …
31+
#: - "unit": units of measure.
32+
#:
33+
#: This expresses a mapping between the index entries (=indices of reported data)
34+
#: and the information in the 3 columns.
35+
mapping: pd.DataFrame = field(
36+
default_factory=lambda: pd.DataFrame(
37+
columns=["iamc_name", "short_name", "unit"],
38+
)
39+
)
40+
41+
@classmethod
42+
def from_files(cls, category: str) -> "Config":
43+
"""Create a Config instance from 1 or 2 YAML files.
44+
45+
A file like :file:`message_ix_models/data/material/reporting/{category}.yaml` is
46+
read and used to populate a new instance. The file must have:
47+
48+
- Top-level keys corresponding to :attr:`iamc_prefix`, :attr:`unit`, and
49+
:attr:`var`.
50+
- A top-level key ``vars:`` containing a mapping compatible with
51+
:meth:`use_vars_dict`.
52+
53+
If a file exists in the same directory named like
54+
:file:`{category}_aggregates.yaml`, it is also read, and its contents passed to
55+
:meth:`use_aggregates_dict`.
56+
"""
57+
import yaml
58+
59+
# Handle basic configuration file
60+
path = package_data_path("material", "reporting", f"{category}.yaml")
61+
with open(path) as f: # Raises FileNotFoundError on missing file
62+
kw = yaml.safe_load(f) # Raises on invalid YAML
63+
64+
# Remove the "vars" top-level key from the file
65+
vars = kw.pop("vars")
66+
67+
# Create a ReporterConfig instance
68+
result = cls(**kw)
69+
70+
# Update mapping data frame using `vars`
71+
result.use_vars_dict(vars)
72+
73+
# Handle aggregates configuration file
74+
path_agg = path.with_name(f"{category}_aggregates.yaml")
75+
try:
76+
with open(path_agg) as f:
77+
data_agg = yaml.safe_load(f)
78+
except FileNotFoundError:
79+
data_agg = dict() # No aggregates file
80+
81+
result.use_aggregates_dict(data_agg)
82+
83+
return result
84+
85+
def check_mapping(self) -> None:
86+
"""Assert that :attr:`mapping` has the correct structure and is complete."""
87+
assert self.mapping.empty or set(self.mapping.index.names) <= set("clmt")
88+
assert {"iamc_name", "short_name", "unit"} == set(self.mapping.columns)
89+
assert not self.mapping.isna().any(axis=None)
90+
91+
def use_aggregates_dict(self, data: dict) -> None:
92+
"""Update :attr:`mapping` from `data`.
93+
94+
This method handles `data` with structure equivalent to the following YAML
95+
content:
96+
97+
.. code-block:: yaml
98+
99+
level_1:
100+
Chemicals|Liquids|Other:
101+
short: fe_pe_chem_oth
102+
components: [ fe_pe_hvc_oth ]
103+
Chemicals|Liquids|Biomass:
104+
short: fe_pe_chem_bio
105+
components: [ fe_pe_hvc_bio_eth ]
106+
# Any number of similar entries
107+
level_2:
108+
Heat:
109+
short: fe_pe_heat
110+
components:
111+
- fe_pe_cement_heat
112+
- fe_pe_aluminum_heat
113+
- fe_pe_steel_heat
114+
- fe_pe_other_heat
115+
# Any number of similar entries
116+
117+
In general:
118+
119+
- Top-level keys may be "level_1", "level_2", etc. Additional top-level keys
120+
like "iamc_prefix", "unit", and "var" are checked against the corresponding
121+
attributes.
122+
- Second-level keys are fragments of IAMC ‘variable’ names
123+
- Third level keys must be:
124+
125+
- "short": A single string. See the description of the "short_name" column in
126+
:attr:`mapping`. This is the aggregate to be produced.
127+
- "components": A list of strings. These are the components of the
128+
aggregation. Components referenced under "level_1" must already be present
129+
in :attr:`mapping`. Components referenced under "level_2" may include the
130+
aggregates described by "level_1", etc.
131+
"""
132+
# Check that other entries in `data` (e.g. loaded from YAML) match
133+
for k in ("iamc_prefix", "unit", "var"):
134+
assert data.pop(k, getattr(self, k)) == getattr(self, k)
135+
136+
dims = self.mapping.index.names
137+
138+
# Iterate over top-level keys: "level_1", "level_2", etc.
139+
for k_level in map("level_{}".format, count(start=1)):
140+
try:
141+
# Iterate over aggregates defined in this "level"
142+
dfs = []
143+
for k, v in data.pop(k_level).items():
144+
# Extract aggregate name and components
145+
d = dict(iamc_name=k, agg=v["short"], short_name=v["components"])
146+
# Convert to DataFrame with desired structure
147+
dfs.append(pd.DataFrame(d))
148+
except KeyError:
149+
break # No data for this or any subsequent levels; finish
150+
151+
# The merge and concat steps must be repeated on every iteration so that
152+
# aggregates defined under "level_2" may refer to aggregates defined under
153+
# "level_1" etc.
154+
155+
# - Concatenate together all `dfs`.
156+
# - Merge with (c, l, m, t, short_name, unit) from self.mapping (omit
157+
# existing iamc_name), on the short_name values.
158+
# - Replace the existing short_name with aggregate short_name.
159+
# - Restore multiindex.
160+
sn = "short_name"
161+
agg_mapping = (
162+
pd.concat(dfs)
163+
.merge(self.mapping.reset_index().drop(["iamc_name"], axis=1), on=[sn])
164+
.drop([sn], axis=1)
165+
.rename(columns={"agg": sn})
166+
.set_index(dims)
167+
)
168+
# Concatenate to exixsting mappings
169+
self.mapping = pd.concat([self.mapping, agg_mapping])
170+
171+
self.check_mapping()
172+
173+
def use_vars_dict(self, data: dict) -> None:
174+
"""Update :attr:`mapping` using `data`.
175+
176+
This handles `data` with structure equivalent to the following YAML content:
177+
178+
.. code-block:: yaml
179+
180+
Chemicals|High-Value Chemicals|Electricity|Steam Cracking:
181+
filter:
182+
commodity: electr
183+
level: final
184+
mode: [vacuum_gasoil, atm_gasoil, naphtha, ethane, propane]
185+
technology: steam_cracker_petro,
186+
short: fe_pe_hvc_el_sc
187+
unit: kg # Optional
188+
189+
# Any number of similar entries
190+
191+
Within this:
192+
193+
- ``Chemicals|High-Value Chemicals|Electricity|Steam Cracking`` is a (fragment
194+
of) an IAMC ‘variable’ name.
195+
- ``filter`` entries may have values that are strings or lists of strings.
196+
The subkeys may include the MESSAGEix sets [technology, mode, commodity,
197+
level].
198+
"""
199+
200+
dims: set[str] = set()
201+
dfs = []
202+
for iamc_name, values in data.items():
203+
# Convert:
204+
# - scalar/single str entries to length-1 list of str
205+
# - long/full message_ix set names ("technology") to short dim IDs ("t")
206+
filters = {
207+
RENAME_DIMS[k]: [v] if isinstance(v, str) else v
208+
for k, v in values["filter"].items()
209+
}
210+
dims |= filters.keys()
211+
212+
# - Create data frame: all valid combinations of indices
213+
# - Set other columns
214+
dfs.append(
215+
pd.DataFrame(
216+
list(product(*filters.values())), columns=list(filters.keys())
217+
).assign(
218+
iamc_name=iamc_name,
219+
short_name=values["short"],
220+
unit=values.get("unit", self.unit),
221+
)
222+
)
223+
224+
# Concatenate all mappings; set multi-index based on `dims`
225+
self.mapping = pd.concat(dfs).set_index(sorted(dims))
226+
self.check_mapping()

message_ix_models/model/material/report/reporter_utils.py

Lines changed: 0 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
import itertools
2-
31
import message_ix
4-
import pandas as pd
52
from genno import Key
63

74
comm_tec_map = {
@@ -12,67 +9,6 @@
129
}
1310

1411

15-
def create_var_map_from_yaml_dict(dictionary: dict) -> pd.DataFrame:
16-
"""Creates a 1-to-n mapping of IAMC template variables to the data indices.
17-
18-
The used query keys are the MESSAGEix sets: [technology, mode, commodity, level].
19-
The resulting map is represented by a pandas DataFrame with the columns:
20-
- iamc_name
21-
- short_name
22-
- unit
23-
- technology
24-
- mode
25-
- commodity
26-
- level
27-
28-
Parameters
29-
----------
30-
dictionary
31-
a dictionary with the required information about the mapping
32-
needs the following tree structure of key-value pairs:
33-
34-
- "vars"
35-
- "filter"
36-
- "short"
37-
- "common"
38-
- "unit"
39-
40-
Returns
41-
-------
42-
43-
"""
44-
data = dictionary["vars"]
45-
all = pd.DataFrame()
46-
unit = dictionary["unit"]
47-
for iamc_key, values in data.items():
48-
# Extract relevant information
49-
filter_data = values["filter"]
50-
short_name = values["short"]
51-
52-
# Create a list to hold the modified entries
53-
# Iterate over the list of technologies
54-
data = {k: [v] if isinstance(v, str) else v for k, v in filter_data.items()}
55-
combinations = list(itertools.product(*data.values()))
56-
57-
# Create DataFrame
58-
df = pd.DataFrame(combinations, columns=data.keys())
59-
df["iamc_name"] = iamc_key
60-
df["short_name"] = short_name
61-
if "unit" in list(values.keys()):
62-
df["unit"] = values["unit"]
63-
else:
64-
df["unit"] = unit
65-
66-
# append
67-
all = pd.concat([all, df])
68-
69-
rename_dict = {"mode": "m", "technology": "t", "level": "l", "commodity": "c"}
70-
rename_dict = {k: v for k, v in rename_dict.items() if k in all.columns}
71-
72-
all = all.rename(columns=rename_dict).set_index(list(rename_dict.values()))
73-
return all
74-
75-
7612
def add_methanol_share_calculations(rep: message_ix.Reporter, mode: str = "feedstock"):
7713
"""Prepare reporter to compute regional bio-methanol shares of regional production.
7814

0 commit comments

Comments
 (0)