Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions openapi_core/validation/schemas/_caches.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Per-resolver schema-property caches.

Several ``SchemaValidator`` methods need to answer static questions
about a schema -- "does this subtree carry composition?" or "does this
subtree contain a binary/byte string?" -- and reuse the answers across
many validation calls. A naive class-level cache keyed on ``SchemaPath``
is unsafe because ``SchemaPath`` equality / hashing (inherited from
``pathable.BasePath``) is path-only: two distinct OpenAPI specs that
happen to share a JSON-pointer path (``anyOf#0``) collide.

This module provides a small key abstraction that keeps the answers
correct across specs and lets them be reclaimed when the spec is
garbage-collected.

Design:

* Each OpenAPI spec resolves through a single, stable ``Resolver``
instance. All ``SchemaPath`` objects derived from the same root spec
share that resolver, so the resolver's identity is a reliable
per-spec key (verified empirically against ``jsonschema-path``).
* Each spec's content is laid out as a single tree of dict objects.
Two distinct dicts within the same spec have distinct ``id()``
values, and the ``id()`` is stable for the lifetime of the dict
(it is a CPython memory address). Within a spec, ``id(content)``
is therefore safe as an inner cache key.
* When the spec (and its resolver) is collected, ``weakref.finalize``
evicts the entire spec's cache slot in one shot. This both prevents
the cache from pinning the spec in memory and forecloses on the
classic ``id()``-reuse hazard.

The module exposes one helper per query: ``ResolverScopedCache.get`` /
``put``. Callers are responsible for the actual computation -- the
cache only stores results.
"""

from __future__ import annotations

import weakref
from typing import Any
from typing import Dict
from typing import Optional


class _PerResolverCache:
"""One spec's worth of cached answers.

``slots`` reduces the per-spec overhead to two dict slots; we
expect at most a handful of these to exist concurrently (one per
loaded OpenAPI document).
"""

__slots__ = ("needs_state", "needs_binary_normalization")

def __init__(self) -> None:
self.needs_state: Dict[int, bool] = {}
self.needs_binary_normalization: Dict[int, bool] = {}


# Class-level registry of per-resolver caches. Keys are ``id(resolver)``
# and entries are removed via ``weakref.finalize`` when the resolver is
# collected; ``id()`` reuse is therefore safe by construction (the slot
# is empty before the next resolver can claim the address).
_caches: Dict[int, _PerResolverCache] = {}


def cache_for(resolver: Any) -> _PerResolverCache:
"""Return the per-resolver cache for ``resolver``, creating it on
first access. Registers a finalizer so the entry evicts when the
resolver is collected.
"""
rid = id(resolver)
cache = _caches.get(rid)
if cache is not None:
return cache
cache = _PerResolverCache()
_caches[rid] = cache
# ``weakref.finalize`` is the only mechanism that survives the
# resolver's collection. The callback pops by the resolver's *old*
# id, which is correct: the slot was claimed by this resolver and
# nothing else can occupy it until this callback fires.
weakref.finalize(resolver, _caches.pop, rid, None)
return cache


def _reset_for_tests() -> None:
"""Drop all cached entries. Test-only helper; production code never
needs to call this because the resolver lifetime drives eviction.
"""
_caches.clear()


__all__ = ["cache_for", "_PerResolverCache", "_reset_for_tests"]
97 changes: 58 additions & 39 deletions openapi_core/validation/schemas/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from openapi_core.validation.schemas.datatypes import (
_EMPTY_STATES as _EMPTY_STATES_MAP,
)
from openapi_core.validation.schemas._caches import cache_for as _cache_for
from openapi_core.validation.schemas.datatypes import FormatValidator
from openapi_core.validation.schemas.datatypes import ValidationState
from openapi_core.validation.schemas.exceptions import InvalidSchemaValue
Expand Down Expand Up @@ -47,60 +48,78 @@ def validate(self, value: Any) -> None:
schema_type = (self.schema / "type").read_str_or_list("any")
raise InvalidSchemaValue(value, schema_type, schema_errors=errors)

# Cache the recursive "does this schema benefit from a ValidationState?"
# check, keyed on the SchemaPath. SchemaPath is hashed by content, so
# two SchemaPaths pointing at the same spec location share a cache
# slot regardless of identity -- safe across GC, bounded by the number
# of distinct schema shapes in the spec rather than by input volume.
_needs_state_cache: dict[SchemaPath, bool] = {}

@classmethod
def _schema_needs_state(cls, schema: SchemaPath) -> bool:
"""True if building a ValidationState for ``schema`` carries
information the unmarshaller can reuse: either composition
(oneOf/anyOf/allOf) on this node, or a descendant that does.

Cycle-safe: a False sentinel is stored before recursing, so a
$ref loop terminates and the real answer overwrites the
sentinel once the recursion completes.
The answer is purely a function of the resolved schema contents,
so we cache it per-resolver (i.e. per OpenAPI spec) keyed on
the content dict's identity. See ``_caches.py`` for why a
SchemaPath-keyed cache would be unsafe across specs.
"""
cache = cls._needs_state_cache
cached = cache.get(schema)
with schema.resolve() as resolved:
return cls._contents_need_state(
resolved.contents, _cache_for(resolved.resolver), set()
)

@classmethod
def _contents_need_state(
cls,
contents: Any,
cache: Any,
seen: set,
) -> bool:
# Boolean schemas (True/False) and other non-dict shapes can't
# introduce composition.
if not isinstance(contents, dict):
return False

marker = id(contents)
cached = cache.needs_state.get(marker)
if cached is not None:
return cached
# Self-composition is the strongest signal; check it first to
# short-circuit the cheap case.
if "oneOf" in schema or "anyOf" in schema or "allOf" in schema:
cache[schema] = True
# Cycle protection: a $ref loop resolves back to the same dict.
# ``seen`` is per-call (not shared across calls), so a True
# result downstream still propagates back up correctly.
if marker in seen:
return False
seen.add(marker)

# Self-composition: strongest signal, short-circuit.
if (
"oneOf" in contents
or "anyOf" in contents
or "allOf" in contents
):
cache.needs_state[marker] = True
return True
# Seed the in-progress sentinel for cycle protection.
cache[schema] = False
# Recurse into children. We only need to find one descendant
# that needs state to flip our own answer.

result = False
if "properties" in schema:
prop_iter = (schema / "properties").items()
for prop_name, prop_schema in prop_iter:
if not isinstance(prop_name, str):
continue
if cls._schema_needs_state(prop_schema):

properties = contents.get("properties")
if isinstance(properties, dict):
for prop_schema in properties.values():
if cls._contents_need_state(prop_schema, cache, seen):
result = True
break
if not result and "additionalProperties" in schema:
try:
ap = schema / "additionalProperties"
except Exception:
ap = None
if ap is not None and cls._schema_needs_state(ap):

if not result:
additional = contents.get("additionalProperties")
if isinstance(additional, dict) and cls._contents_need_state(
additional, cache, seen
):
result = True
if not result and "items" in schema:
try:
items = schema / "items"
except Exception:
items = None
if items is not None and cls._schema_needs_state(items):

if not result:
items = contents.get("items")
if isinstance(items, dict) and cls._contents_need_state(
items, cache, seen
):
result = True
cache[schema] = result

cache.needs_state[marker] = result
return result

def validate_state(self, value: Any) -> ValidationState:
Expand Down
102 changes: 102 additions & 0 deletions tests/unit/validation/test_schema_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,3 +356,105 @@ def test_enforce_properties_required_applies_to_nested_composed_schemas(
schema,
enforce_properties_required=True,
).validate({"name": "openapi-core", "meta": {}})



class TestSchemaValidatorCacheIsolation:
"""The per-resolver cache must keep ``_schema_needs_state`` answers
independent across distinct OpenAPI specs that happen to share
JSON-pointer paths.

Regression test for the ``SchemaPath``-keyed cache: ``SchemaPath``
equality is path-only (inherited from ``pathable.BasePath``), so a
``dict``-keyed cache would collide on identical paths regardless of
what the paths actually resolve to. The bug is silent in production
because all evolved schemas come from one spec, but bites in any
process that loads more than one.
"""

def test_disjoint_specs_with_colliding_paths(self):
# Both specs have a value at ``anyOf/0`` but one is a leaf
# string and the other carries oneOf -- only the second should
# report needs_state=True.
from openapi_core.validation.schemas.validators import SchemaValidator

spec_simple = SchemaPath.from_dict(
{"anyOf": [{"type": "string"}, {"type": "integer"}]}
)
spec_composed = SchemaPath.from_dict(
{
"anyOf": [
{
"type": "object",
"properties": {
"x": {
"oneOf": [
{"type": "string"},
{"type": "integer"},
]
}
},
},
{"type": "integer"},
]
}
)

# Each branch's value at anyOf/0 has the SAME SchemaPath
# (anyOf#0) but disjoint contents.
simple_branch = spec_simple / "anyOf" / 0
composed_branch = spec_composed / "anyOf" / 0
assert simple_branch == composed_branch # path-only equality
assert hash(simple_branch) == hash(composed_branch)

# The cache must distinguish them by spec.
assert SchemaValidator._schema_needs_state(simple_branch) is False
assert SchemaValidator._schema_needs_state(composed_branch) is True
# And the order doesn't matter -- ask in reverse.
spec_simple_2 = SchemaPath.from_dict(
{"anyOf": [{"type": "string"}, {"type": "integer"}]}
)
spec_composed_2 = SchemaPath.from_dict(
{
"anyOf": [
{"oneOf": [{"type": "string"}]},
{"type": "integer"},
]
}
)
assert (
SchemaValidator._schema_needs_state(
spec_composed_2 / "anyOf" / 0
)
is True
)
assert (
SchemaValidator._schema_needs_state(
spec_simple_2 / "anyOf" / 0
)
is False
)

def test_cache_evicts_on_resolver_collection(self):
# When a spec's resolver is garbage-collected, its cache slot
# is dropped. This both prevents the cache from pinning the
# spec in memory and forecloses on the classic id()-reuse
# hazard (a freshly allocated resolver cannot inherit stale
# answers from a collected one at the same address).
import gc

from openapi_core.validation.schemas._caches import _caches
from openapi_core.validation.schemas.validators import SchemaValidator

before = len(_caches)
spec = SchemaPath.from_dict(
{"oneOf": [{"type": "string"}, {"type": "integer"}]}
)
SchemaValidator._schema_needs_state(spec)
# Capturing one extra slot is what we expect.
assert len(_caches) == before + 1

# Drop the only outside reference; the cache slot must follow.
del spec
gc.collect()
assert len(_caches) == before
Loading