Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hed/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from hed.models.hed_string import HedString
from hed.models.hed_tag import HedTag
from hed.errors.error_reporter import get_printable_issue_string
from hed.errors.exceptions import HedFileError, HedExceptions
from hed.errors.exceptions import HedFileError, HedExceptions, HedQueryError

from hed.models.base_input import BaseInput
from hed.models.spreadsheet_input import SpreadsheetInput
Expand Down
9 changes: 9 additions & 0 deletions hed/errors/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,12 @@ def __init__(self, code, message, filename, issues=None):
self.issues = issues
if self.issues is None:
self.issues = []


class HedQueryError(ValueError):
"""Exception raised when a HED query string cannot be parsed.

Inherits from :class:`ValueError` so that existing ``except ValueError`` handlers
continue to work, while allowing callers that need finer-grained control to
catch only query parse errors with ``except HedQueryError``.
"""
49 changes: 48 additions & 1 deletion hed/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,51 @@
"""Data structures for HED tag handling."""
"""HED data models: strings, tags, groups, inputs, queries, and definitions.

This module provides the core data structures used to represent, validate, and
transform HED-annotated data. A loaded :class:`~hed.schema.HedSchema` (from
``hed.schema``) is typically passed in when constructing these objects.

Typical usage
-------------
Parse and validate a raw HED string::

from hed.schema import load_schema_version
from hed.models import HedString

schema = load_schema_version("8.3.0")
hs = HedString("Sensory-event, (Action, Move/Flexion)", schema)
issues = hs.validate(schema)

Load a BIDS events file with a sidecar::

from hed.models import TabularInput, Sidecar

sidecar = Sidecar("task-rest_events.json", name="MySidecar")
events = TabularInput("sub-01_task-rest_events.tsv", sidecar=sidecar)
issues = events.validate(schema)

Search HED annotations with a query::

from hed.models import QueryHandler

query = QueryHandler("Sensory-event && Action")
matches = query.search(hs)

Key exports
-----------
- :class:`HedString` — a parsed HED annotation string (root of the parse tree).
- :class:`HedTag` — a single HED tag with schema linkage and canonical form.
- :class:`HedGroup` — a parenthesised group of tags and nested groups.
- :class:`TabularInput` — a BIDS-style TSV events file with optional sidecar.
- :class:`Sidecar` — a BIDS JSON sidecar mapping column values to HED strings.
- :class:`SpreadsheetInput` — an Excel / TSV spreadsheet with HED columns.
- :class:`TimeseriesInput` — a continuous time-series file with HED annotations.
- :class:`DefinitionDict` — a collection of resolved HED Def/Def-expand definitions.
- :class:`QueryHandler` — compile and execute queries against HED strings.
- :func:`get_query_handlers` / :func:`search_hed_objs` — convenience helpers for
batch querying.
- :func:`convert_to_form`, :func:`shrink_defs`, :func:`expand_defs`,
:func:`process_def_expands` — DataFrame-level HED transformation utilities.
"""

from .base_input import BaseInput
from .column_mapper import ColumnMapper
Expand Down
7 changes: 1 addition & 6 deletions hed/models/column_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import copy
from collections import Counter
from functools import partial

PANDAS_COLUMN_PREFIX_TO_IGNORE = "Unnamed: "
NO_WARN_COLUMNS = ["onset", "duration"]
Expand Down Expand Up @@ -110,20 +111,14 @@ def get_transformers(self):
if isinstance(assign_to_column, int):
if self._column_map:
assign_to_column = self._column_map[assign_to_column]
else:
assign_to_column = assign_to_column
if column.column_type == ColumnType.Ignore:
continue
elif column.column_type == ColumnType.Value:
value_str = column.hed_dict
from functools import partial

final_transformers[assign_to_column] = partial(self._value_handler, value_str)
elif column.column_type == ColumnType.Categorical:
need_categorical.append(column.column_name)
category_values = column.hed_dict
from functools import partial

final_transformers[assign_to_column] = partial(self._category_handler, category_values)
else:
final_transformers[assign_to_column] = lambda x: x
Expand Down
20 changes: 16 additions & 4 deletions hed/models/def_expand_gather.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ def add_def(self, def_tag, def_expand_group):
def_extension = def_tag.extension.split("/")
existing_contents = self.actual_contents.get(def_extension[1], None)
if existing_contents and existing_contents != orig_group:
raise ValueError("Invalid Definition")
raise ValueError(
f"Definition '{def_extension[0]}' has conflicting contents for value '{def_extension[1]}': "
f"existing={existing_contents} vs new={orig_group}"
)
elif existing_contents:
return
self.actual_contents[def_extension[1]] = orig_group.copy()
Expand All @@ -42,14 +45,20 @@ def add_def(self, def_tag, def_expand_group):
tag for tag in orig_group.get_all_tags() if tag.extension == def_extension[1] and tag.is_takes_value_tag()
]
if len(matching_tags) == 0:
raise ValueError("Invalid Definition")
raise ValueError(
f"Definition '{def_extension[0]}': no takes-value tag with extension '{def_extension[1]}' "
f"found in group {orig_group}"
)
matching_names = {tag.short_base_tag for tag in matching_tags}
if self.matching_names is not None:
self.matching_names = self.matching_names & matching_names
else:
self.matching_names = matching_names
if len(self.matching_names) == 0:
raise ValueError("Invalid Definition")
raise ValueError(
f"Definition '{def_extension[0]}': no tag name is consistently the takes-value tag across "
f"all observed values — candidate names were {matching_names}"
)

def resolve_definition(self):
"""Try to resolve the definition based on the information available.
Expand Down Expand Up @@ -84,7 +93,10 @@ def resolve_definition(self):
self.resolved_definition = candidate_contents
return True
if len(candidate_tags) == 0 or (1 < len(candidate_tags) < len(tuple_list)):
raise ValueError("Invalid Definition")
raise ValueError(
f"Definition '{self.def_tag_name}': could not resolve a unique takes-value tag — "
f"found {len(candidate_tags)} candidate(s) across {len(tuple_list)} value(s)"
)
return False

def get_definition_string(self):
Expand Down
5 changes: 4 additions & 1 deletion hed/models/hed_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ def remove(self, items_to_remove: Iterable[Union[HedTag, "HedGroup"]]):
item._parent = None

def __copy__(self):
raise ValueError("Cannot make shallow copies of HedGroups")
raise copy.Error(
"Shallow copy of HedGroup is not supported: _parent pointers would alias the original. "
"Use .copy() for a deep copy."
)

def copy(self) -> "HedGroup":
"""Return a deep copy of this group.
Expand Down
11 changes: 11 additions & 0 deletions hed/models/hed_string.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ def __init__(self, hed_string, hed_schema, def_dict=None, _contents=None):
try:
contents = self.split_into_groups(hed_string, hed_schema, def_dict)
except ValueError:
# ValueError is raised by split_into_groups for structurally malformed
# strings (mismatched or misordered parentheses). Rather than raising
# here, we fall back to an empty parse tree so that the object can be
# passed to the validator, which will independently detect and report
# the structural error through check_count_tag_group_parentheses /
# check_delimiter_issues_in_hed_string on the raw string.
#
# Callers that construct HedString without running it through
# HedValidator will receive an empty children list with no error
# indication. Always validate after construction if correctness is
# required.
contents = []
super().__init__(hed_string, contents=contents, startpos=0, endpos=len(hed_string))
self._schema = hed_schema
Expand Down
15 changes: 8 additions & 7 deletions hed/models/query_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
ExpressionExactMatch,
)
from hed.models.query_util import Token
from hed.errors.exceptions import HedQueryError


class QueryHandler:
Expand Down Expand Up @@ -76,7 +77,7 @@ def _get_next_token(self):
"""Returns the current token and advances the counter"""
self.at_token += 1
if self.at_token >= len(self.tokens):
raise ValueError("Parse error in get next token")
raise HedQueryError("Parse error in get next token")
return self.tokens[self.at_token]

def _next_token_is(self, kinds):
Expand All @@ -94,7 +95,7 @@ def _parse(self, expression_string):
expr = self._handle_or_op()

if self.at_token + 1 != len(self.tokens):
raise ValueError("Parse error in search string")
raise HedQueryError("Parse error in search string")

return expr

Expand Down Expand Up @@ -137,7 +138,7 @@ def _handle_negation(self):
if next_token == Token.LogicalNegation:
interior = self._handle_grouping_op()
if "?" in str(interior):
raise ValueError(
raise HedQueryError(
"Cannot negate wildcards, or expressions that contain wildcards."
"Use {required_expression : optional_expression}."
)
Expand All @@ -152,13 +153,13 @@ def _handle_grouping_op(self):
expr = self._handle_or_op()
next_token = self._next_token_is([Token.LogicalGroupEnd])
if next_token != Token.LogicalGroupEnd:
raise ValueError("Parse error: Missing closing paren")
raise HedQueryError("Parse error: Missing closing paren")
elif next_token == Token.DescendantGroup:
interior = self._handle_or_op()
expr = ExpressionDescendantGroup(next_token, right=interior)
next_token = self._next_token_is([Token.DescendantGroupEnd])
if next_token != Token.DescendantGroupEnd:
raise ValueError("Parse error: Missing closing square bracket")
raise HedQueryError("Parse error: Missing closing square bracket")
elif next_token == Token.ExactMatch:
interior = self._handle_or_op()
expr = ExpressionExactMatch(next_token, right=interior)
Expand All @@ -172,14 +173,14 @@ def _handle_grouping_op(self):
expr.left = optional_portion
next_token = self._next_token_is([Token.ExactMatchEnd])
if "~" in str(expr):
raise ValueError(
raise HedQueryError(
"Cannot use negation in exact matching groups,"
" as it's not clear what is being matched.\n"
"{thing and ~(expression)} is allowed."
)

if next_token is None:
raise ValueError("Parse error: Missing closing curly bracket")
raise HedQueryError("Parse error: Missing closing curly bracket")
else:
next_token = self._get_next_token()
if next_token and next_token.kind == Token.Wildcard:
Expand Down
42 changes: 41 additions & 1 deletion hed/schema/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,44 @@
"""Data structures for handling the HED schema."""
"""HED schema loading, caching, and introspection.

This module exposes everything needed to load and inspect HED schemas — the
vocabularies that define valid HED tags.

Typical usage
-------------
Load a released schema by version number (auto-downloaded and cached)::

from hed.schema import load_schema_version
schema = load_schema_version("8.3.0")

Load a schema from a local file or URL::

from hed.schema import load_schema
schema = load_schema("/path/to/HED8.3.0.xml")

Load a library schema alongside a standard schema::

schema = load_schema_version(["8.3.0", "sc:score_1.0.0"])

Key exports
-----------
- :class:`HedSchema` — a single loaded schema; use it to validate tags.
- :class:`HedSchemaGroup` — two or more schemas used together (base + libraries).
- :func:`load_schema` — load from a file path or URL.
- :func:`load_schema_version` — load by version string(s), with caching.
- :func:`from_string` — parse a schema from an in-memory string.
- :func:`from_dataframes` — reconstruct a schema from TSV DataFrames.
- :data:`HedKey` / :data:`HedSectionKey` — enumerations of schema attribute and
section names used when querying schema entries.
- :func:`get_hed_versions` — list versions available in the local cache.
- :func:`get_hed_xml_version` — read the HED version string from an XML schema file on disk.
- :func:`cache_xml_versions` — pre-populate the local cache from the HED GitHub
releases.

See also
--------
``hed.models`` for data structures that *use* a loaded schema (HedString, HedTag,
TabularInput, etc.).
"""

from .hed_schema import HedSchema
from .hed_schema_entry import HedSchemaEntry, UnitClassEntry, UnitEntry, HedTagEntry
Expand Down
23 changes: 0 additions & 23 deletions hed/schema/hed_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,39 +446,16 @@ def __eq__(self, other):
if other is None:
return False
if self.get_save_header_attributes() != other.get_save_header_attributes():
# print(f"Header attributes not equal: '{self.get_save_header_attributes()}' vs '{other.get_save_header_attributes()}'")
return False
if self.has_duplicates() != other.has_duplicates():
# print(f"Duplicates: '{self.has_duplicates()}' vs '{other.has_duplicates()}'")
return False
if self.prologue.strip() != other.prologue.strip():
# print(f"PROLOGUE NOT EQUAL: '{self.prologue.strip()}' vs '{other.prologue.strip()}'")
return False
if self.epilogue.strip() != other.epilogue.strip():
# print(f"EPILOGUE NOT EQUAL: '{self.epilogue.strip()}' vs '{other.epilogue.strip()}'")
return False
if self._sections != other._sections:
# This block is useful for debugging when modifying the schema class itself.
# for section1, section2 in zip(self._sections.values(), other._sections.values()):
# if section1 != section2:
# dict1 = section1.all_names
# dict2 = section2.all_names
# if dict1 != dict2:
# print(f"DICT {section1._section_key} NOT EQUAL")
# key_union = set(list(dict1.keys()) + list(dict2.keys()))
# for key in key_union:
# if key not in dict1:
# print(f"{key} not in dict1")
# continue
# if key not in dict2:
# print(f"{key} not in dict2")
# continue
# if dict1[key] != dict2[key]:
# s = f"{key} unmatched: '{str(dict1[key].name)}' vs '{str(dict2[key].name)}'"
# print(s)
return False
if self._namespace != other._namespace:
# print(f"NAMESPACE NOT EQUAL: '{self._namespace}' vs '{other._namespace}'")
return False
return True

Expand Down
7 changes: 4 additions & 3 deletions hed/schema/hed_schema_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ def _set_attribute_value(self, attribute, attribute_value):
# todo: remove this patch and redo the code
# This check doesn't need to be done if the schema is valid.
if attribute not in self._section.valid_attributes:
# print(f"Unknown attribute {attribute}")
if self._unknown_attributes is None:
self._unknown_attributes = {}
self._unknown_attributes[attribute] = attribute_value
Expand Down Expand Up @@ -406,8 +405,10 @@ def _finalize_inherited_attributes(self):
# Replace the list with a copy we can modify.
self.inherited_attributes = self.attributes.copy()
for attribute in self._section.inheritable_attributes:
if self._check_inherited_attribute(attribute):
self.inherited_attributes[attribute] = self._check_inherited_attribute(attribute, True)
value = self._check_inherited_attribute(attribute, return_value=True)
# None means "not found in the hierarchy"; attribute values themselves are never None.
if value is not None:
self.inherited_attributes[attribute] = value

def finalize_entry(self, schema):
"""Called once after schema loading to set state.
Expand Down
Loading