Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions spinedb_api/db_mapping_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,13 @@ def merge(self, other: dict) -> tuple[Optional[dict], set[str]]:
if not other:
return None, set()
merged = {**self.extended(), **other}
for id_field, fields in self._internal_fields.items():
source_fields = fields[0]
if len(source_fields) != 1:
continue
source_field = source_fields[0]
if source_field in other and self.fields[source_field].get("optional", False):
del merged[id_field]
return merged, set(other)

def _strip_equal_fields(self, other: dict) -> dict:
Expand Down
24 changes: 12 additions & 12 deletions spinedb_api/mapped_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ def merge(self, other):
merged, updated_fields = super().merge(other)
if not merged:
return merged, updated_fields
if self.value_key in merged:
if self.value_key in updated_fields:
self._parsed_value = None
self._arrow_value = None
return merged, updated_fields
Expand Down Expand Up @@ -866,17 +866,10 @@ def _asdict(self):
return d

def merge(self, other):
other_parameter_value_list_id = other.get("parameter_value_list_id")
if (
other_parameter_value_list_id is not None
and other_parameter_value_list_id != self["parameter_value_list_id"]
and any(
x["parameter_definition_id"] == self["id"]
for x in self.db_map.mapped_table("parameter_value").valid_values()
)
):
del other["parameter_value_list_id"]
raise SpineDBAPIError("can't modify the parameter value list of a parameter that already has values")
if "parameter_value_list_id" in other:
self._raise_if_update_invalidates_values("parameter_value_list_id", other["parameter_value_list_id"])
elif "parameter_value_list_name" in other:
self._raise_if_update_invalidates_values("parameter_value_list_name", other["parameter_value_list_name"])
other_type_list = other.get("parameter_type_list")
if other_type_list is not None and other_type_list != self.__getitem__("parameter_type_list"):
try:
Expand All @@ -886,6 +879,13 @@ def merge(self, other):
raise type_error
return super().merge(other)

def _raise_if_update_invalidates_values(self, key: str, other_value: str | TempId) -> None:
if other_value != self[key] and any(
x["parameter_definition_id"] == self["id"]
for x in self.db_map.mapped_table("parameter_value").valid_values()
):
raise SpineDBAPIError("can't modify the parameter value list of a parameter that already has values")

def _make_new_type_items(self, new_type_list):
new_types = set(new_type_list)
current_types = set(self["parameter_type_list"])
Expand Down
72 changes: 71 additions & 1 deletion tests/test_DatabaseMapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
""" Unit tests for DatabaseMapping class. """
"""Unit tests for DatabaseMapping class."""
from collections import namedtuple
from datetime import datetime
import gc
Expand Down Expand Up @@ -2183,13 +2183,18 @@ def test_update_parameter_definition_with_parsed_value(self):
db_map.add_parameter_definition_item(name="y", entity_class_name="Object")
)
self.assertIsNone(definition["default_value"])
self.assertIsNone(definition["parsed_value"])
definition.update(parsed_value=2.3)
self.assertEqual(definition["parsed_value"], 2.3)
value_blob, value_type = to_database(2.3)
self.assertEqual(definition["default_value"], value_blob)
self.assertEqual(definition["default_type"], value_type)

def test_update_parameter_value_with_parsed_value(self):
with DatabaseMapping("sqlite://", create=True) as db_map:
self._assert_success(db_map.add_entity_class_item(name="Object"))
self._assert_success(db_map.add_parameter_definition_item(name="y", entity_class_name="Object"))
self._assert_success(db_map.add_parameter_definition_item(name="z", entity_class_name="Object"))
self._assert_success(db_map.add_entity_item(name="spoon", entity_class_name="Object"))
value_item = self._assert_success(
db_map.add_parameter_value_item(
Expand All @@ -2203,6 +2208,35 @@ def test_update_parameter_value_with_parsed_value(self):
self.assertIsNone(value_item["parsed_value"])
value_item.update(parsed_value=2.3)
self.assertEqual(value_item["parsed_value"], 2.3)
value, value_type = to_database("original value")
with_uncached_parsed_value = db_map.add_parameter_value(
entity_class_name="Object",
parameter_definition_name="z",
entity_byname=("spoon",),
alternative_name="Base",
value=value,
type=value_type,
)
with_uncached_parsed_value.update(parsed_value=2.3)
self.assertEqual(with_uncached_parsed_value["parsed_value"], 2.3)

def test_update_parameter_value_with_parsed_value2(self):
with DatabaseMapping("sqlite://", create=True) as db_map:
self._assert_success(db_map.add_entity_class_item(name="Object"))
self._assert_success(db_map.add_parameter_definition_item(name="y", entity_class_name="Object"))
self._assert_success(db_map.add_entity_item(name="spoon", entity_class_name="Object"))
value_item = self._assert_success(
db_map.add_parameter_value_item(
entity_class_name="Object",
parameter_definition_name="y",
entity_byname=("spoon",),
alternative_name="Base",
value=b"5.5",
type="float",
)
)
value_item.update(parsed_value=2.3)
self.assertEqual(value_item["parsed_value"], 2.3)

def test_update_list_value_with_parsed_value(self):
with DatabaseMapping("sqlite://", create=True) as db_map:
Expand Down Expand Up @@ -3451,6 +3485,42 @@ def test_updating_none_entity_location_does_not_add_new_location(self):
puurtila.update(name="Taipale", lat=None, lon=None, alt=None, shape_name=None, shape_blob=None)
self.assertEqual(db_map.find_entity_locations(), [])

def test_unset_parameter_value_list_in_definition(self):
with DatabaseMapping("sqlite://", create=True) as db_map:
db_map.add_entity_class(name="amphibian")
lungs = db_map.add_parameter_definition(entity_class_name="amphibian", name="lungs")
db_map.add_parameter_value_list(name="states")
lungs.update(parameter_value_list_name="states")
self.assertEqual(lungs["parameter_value_list_name"], "states")
lungs.update(parameter_value_list_name=None)
self.assertIsNone(lungs["parameter_value_list_name"])

def test_changing_value_list_while_values_exist_is_disallowed(self):
with DatabaseMapping("sqlite://", create=True) as db_map:
db_map.add_entity_class(name="amphibian")
lungs = db_map.add_parameter_definition(entity_class_name="amphibian", name="lungs")
db_map.add_parameter_value_list(name="values1")
db_map.add_list_value(parameter_value_list_name="values1", parsed_value="yes", index=0)
values2 = db_map.add_parameter_value_list(name="values2")
db_map.add_list_value(parameter_value_list_name="values2", parsed_value="yes", index=0)
lungs.update(parameter_value_list_name="values1")
db_map.add_entity(entity_class_name="amphibian", name="frog")
db_map.add_parameter_value(
entity_class_name="amphibian",
entity_byname=("frog",),
parameter_definition_name="lungs",
alternative_name="Base",
parsed_value="yes",
)
with self.assertRaisesRegex(
SpineDBAPIError, "^can't modify the parameter value list of a parameter that already has values$"
):
lungs.update(parameter_value_list_name=values2["name"])
with self.assertRaisesRegex(
SpineDBAPIError, "^can't modify the parameter value list of a parameter that already has values$"
):
lungs.update(parameter_value_list_id=values2["id"])


class TestDatabaseMappingLegacy(unittest.TestCase):
"""'Backward compatibility' tests, i.e. pre-entity tests converted to work with the entity structure."""
Expand Down