Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,29 @@ clean-test:
.PHONY: clean
clean: clean-pyc clean-test

venv: poetry.lock
poetry install --all-extras
venv: uv.lock
uv sync

.PHONY: format
format: venv
poetry run black nodestream tests
poetry run isort nodestream tests
poetry run ruff check nodestream tests --fix
uv run black nodestream tests
uv run isort nodestream tests
uv run ruff check nodestream tests --fix

.PHONY: lint
lint: venv
poetry run black nodestream tests --check
poetry run ruff check nodestream tests
poetry run isort nodestream tests --check-only
uv run black nodestream tests --check
uv run ruff check nodestream tests
uv run isort nodestream tests --check-only

.PHONY: test-unit
test-unit: venv
poetry run pytest -m "not e2e"
uv run pytest -m "not e2e"

.PHONY: test-e2e
test-e2e: venv
poetry run pytest -m "e2e"
uv run pytest -m "e2e"

.PHONY: snapshot
snapshot: venv
poetry run pytest --snapshot-update
uv run pytest --snapshot-update
2,297 changes: 0 additions & 2,297 deletions poetry.lock

This file was deleted.

2 changes: 0 additions & 2 deletions poetry.toml

This file was deleted.

151 changes: 79 additions & 72 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,78 +1,103 @@
[tool.poetry]
[project]
name = "nodestream"
version = "0.14.17"
description = "A Fast, Declarative ETL for Graph Databases."
authors = [{ name = "Zach Probst", email = "[email protected]" }]
requires-python = "~=3.10"
readme = "README.md"
license = "GPL-3.0-only"
authors = [
"Zach Probst <[email protected]>"
keywords = [
"etl",
"neo4j",
"declarative",
"data",
"kafka",
"ingest",
]
readme = "README.md"

homepage = "https://github.com/nodestream-proj/nodestream"
repository = "https://github.com/nodestream-proj/nodestream"
documentation = "https://nodestream-proj.github.io/nodestream"

keywords = ["etl", "neo4j", "declarative", "data", "kafka", "ingest"]
classifiers = [
"Development Status :: 4 - Beta",
"Environment :: Console",
"Intended Audience :: Developers",
"Intended Audience :: Information Technology",
"License :: OSI Approved :: GNU Affero General Public License v3",
"Natural Language :: English",
"Topic :: Database"
"Topic :: Database",
]
dependencies = [
"pyyaml~=6.0",
"jmespath~=1.0",
"cleo~=2.0",
"python-json-logger~=2.0",
"boto3>=1.34.127,<2",
"confluent-kafka~=2.5",
"Jinja2>=3,<4",
"pandas>=2,<3",
"pyarrow>=17.0.0,<19.0.0",
"schema>=0.7,<0.8",
"cookiecutter~=2.0",
"httpx>=0.27,<0.28",
"psutil~=6.0",
"uvloop>=0.17.0,<=0.21.0 ; sys_platform == 'darwin'",
"uvloop>=0.17.0,<=0.21.0 ; sys_platform == 'linux'",
"prometheus-client>=0.21.1",
]
packages = [ { include = "nodestream" } ]

[tool.isort]
profile = "black"

[tool.poetry.dependencies]
python = "^3.10"
pyyaml = "^6.0"
jmespath = "^1.0"
cleo = "^2.0"
python-json-logger = "^2.0"
boto3 = "^1.34.127"
confluent-kafka = "^2.5"
Jinja2 = "^3"
pandas = "^2"
pyarrow = ">=17.0.0,<19.0.0"
schema = "^0.7"
cookiecutter = "^2.0"
httpx = "^0.27"
psutil = "^6.0"
[project.optional-dependencies]
prometheus = ["prometheus-client>=0.21.1,<0.22"]
validation = [
"genson>=1.3.0,<2",
"jsonschema>=4.23.0,<5",
]

uvloop = [
{ version = ">=0.17.0,<=0.21.0", platform = "darwin"},
{ version = ">=0.17.0,<=0.21.0", platform = "linux"}
[project.urls]
Homepage = "https://github.com/nodestream-proj/nodestream"
Repository = "https://github.com/nodestream-proj/nodestream"
Documentation = "https://nodestream-proj.github.io/nodestream"

[project.scripts]
nodestream = "nodestream.cli.application:run"

[project.entry-points."nodestream.plugins"]
argument_resolvers = "nodestream.pipeline.argument_resolvers"
file_formats = "nodestream.pipeline.extractors.files"
interpretations = "nodestream.interpreting.interpretations"
normalizers = "nodestream.pipeline.normalizers"
value_providers = "nodestream.pipeline.value_providers"
record_formats = "nodestream.pipeline.extractors.streams"
stream_connectors = "nodestream.pipeline.extractors.streams"
commands = "nodestream.cli.commands"
audits = "nodestream.project.audits"
schema_printers = "nodestream.schema.printers"
databases = "nodestream.databases.null"

[dependency-groups]
dev = [
"pytest>=8.3.2,<9",
"black>=25.1.0,<26",
"pyhamcrest>=2.1.0,<3",
"isort>=6.0.1,<7",
"pytest-cov>=6.1.1,<7",
"pytest-asyncio>=1.0.0,<2",
"pytest-mock>=3.14.1,<4",
"freezegun>=1.2.2,<2",
"moto[s3, dynamodb]>=5,<6",
"ruff>=0.11.12,<0.12",
"pytest-snapshot>=0.9.0,<0.10",
"pytest-httpx>=0.30.0,<0.31",
]

prometheus-client = {version = "^0.21.1", optional = true}
genson = {version = "^1.3.0", optional = true}
jsonschema = {version = "^4.23.0", optional = true}
[tool.isort]
profile = "black"

[tool.poetry.extras]
prometheus = ["prometheus-client"]
validation = ["genson", "jsonschema"]
[tool.hatch.build.targets.sdist]
include = ["nodestream"]

[tool.poetry.group.dev.dependencies]
pytest = "^8.3.2"
black = "^25.1.0"
pyhamcrest = "^2.1.0"
isort = "^6.0.1"
pytest-cov = "^6.1.1"
pytest-asyncio = "^1.0.0"
pytest-mock = "^3.14.1"
freezegun = "^1.2.2"
moto = {version = "^5", extras = ["s3", "dynamodb"]}
ruff = "^0.11.12"
pytest-snapshot = "^0.9.0"
pytest-httpx = "^0.30.0"
[tool.hatch.build.targets.wheel]
include = ["nodestream"]

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.pytest.ini_options]
markers = [
Expand All @@ -81,24 +106,6 @@ markers = [
]
asyncio_default_fixture_loop_scope = "function"

[tool.poetry.scripts]
nodestream = 'nodestream.cli.application:run'

# To prevent the "builin" stuff being a special case of plugins,
# nodestream just considers itself a plugin for all of the things that are pluggable.
[tool.poetry.plugins."nodestream.plugins"]
"argument_resolvers" = "nodestream.pipeline.argument_resolvers"
"file_formats" = "nodestream.pipeline.extractors.files"
"interpretations" = "nodestream.interpreting.interpretations"
"normalizers" = "nodestream.pipeline.normalizers"
"value_providers" = "nodestream.pipeline.value_providers"
"record_formats" = "nodestream.pipeline.extractors.streams"
"stream_connectors" = "nodestream.pipeline.extractors.streams"
"commands" = "nodestream.cli.commands"
"audits" = "nodestream.project.audits"
"schema_printers" = "nodestream.schema.printers"
"databases" = "nodestream.databases.null"

[tool.black]
line-length = 88
target-version = ['py310']
Expand Down
3 changes: 3 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[pytest]
addopts = --verbose --cov=nodestream --tb=long --cov-report=html:build/reports/html --cov-report=xml:build/reports/coverage.xml --cov-report=term-missing --junitxml=build/reports/junit.xml --disable-warnings
testpaths = tests
37 changes: 19 additions & 18 deletions tests/unit/pipeline/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,23 @@
from hamcrest import assert_that, equal_to, instance_of

from nodestream.pipeline.filters import (
EnforceSchema,
ExcludeWhenValuesMatchPossibilities,
Filter,
Schema,
SchemaEnforcer,
ValueMatchesRegexFilter,
ValuesMatchPossibilitiesFilter,
WarnSchema,
)
from nodestream.schema.state import Schema

from ..stubs import StubbedValueProvider

PASSING_FILTER_CONFIGURATION = [
{
"value": StubbedValueProvider("test"),
"possibilities": [StubbedValueProvider("A Miss"), StubbedValueProvider("test")],
"possibilities": [
StubbedValueProvider("A Miss"),
StubbedValueProvider("test"),
],
}
]

Expand Down Expand Up @@ -84,7 +85,7 @@ async def filter_record(self, record):
assert_that(results, equal_to([]))


REGEX = r".*[\[\]\{\}\(\)\\\/~,]+"
REGEX = r".*[\[\]\{\}\(\)\\\/~]+"
REGEX_TEST_CASES = [
{"value": "[test]", "include": True, "expect": False},
{"value": "test", "include": True, "expect": True},
Expand All @@ -97,7 +98,9 @@ async def filter_record(self, record):
async def test_match_regex():
for test_case in REGEX_TEST_CASES:
subject = ValueMatchesRegexFilter.from_file_data(
value=test_case["value"], regex=REGEX, include=test_case["include"]
value=test_case["value"],
regex=REGEX,
include=test_case["include"],
)
result = await subject.filter_record({})
assert_that(result, equal_to(test_case["expect"]))
Expand All @@ -116,9 +119,7 @@ async def test_schema_enforcer_with_fetch_schema(mocker, schema_dict):
context = mocker.Mock()
context.object_store = object_store

subject = SchemaEnforcer.from_file_data(
enforcement_policy="enforce", key="test_key"
)
subject = SchemaEnforcer(enforcement_policy="enforce", key="test_key")
await subject.start(context)

record = {"name": "test"}
Expand All @@ -138,7 +139,7 @@ async def test_schema_enforcer_with_infer_schema(mocker):
context = mocker.Mock()
context.object_store = object_store

subject = SchemaEnforcer.from_file_data(inference_sample_size=2)
subject = SchemaEnforcer(inference_sample_size=2)
await subject.start(context)

record = {"name": "test"}
Expand All @@ -157,8 +158,8 @@ async def test_schema_enforcer_with_infer_schema(mocker):
async def test_schema_enforcement_modes(schema_dict):
schema = Schema(schema_dict)

enforce_mode = EnforceSchema(schema)
warn_mode = WarnSchema(schema)
enforce_mode = SchemaEnforcer(schema=schema, enforcement_policy="enforce")
warn_mode = SchemaEnforcer(schema=schema, enforcement_policy="warn")

record = {"name": "test"}
assert_that(enforce_mode.should_filter(record), equal_to(False))
Expand All @@ -171,7 +172,7 @@ async def test_schema_enforcement_modes(schema_dict):

@pytest.mark.asyncio
async def test_infer_mode_schema_already_exists(mocker, schema_dict):
subject = SchemaEnforcer.from_file_data(inference_sample_size=2)
subject = SchemaEnforcer(inference_sample_size=2)

context = mocker.Mock()
context.object_store.get_pickled.return_value = schema_dict
Expand All @@ -184,7 +185,7 @@ async def test_infer_mode_schema_already_exists(mocker, schema_dict):

@pytest.mark.asyncio
async def test_enforce_mode_schema_not_present(mocker):
subject = SchemaEnforcer.from_file_data(key="test_key")
subject = SchemaEnforcer(key="test_key")

context = mocker.Mock()
context.object_store.get_pickled.return_value = None
Expand All @@ -197,7 +198,7 @@ async def test_enforce_mode_schema_not_present(mocker):
async def test_invalid_enforcement_policy(mocker, schema_dict):
context = mocker.Mock()
context.object_store.get_pickled.return_value = schema_dict
subject = SchemaEnforcer.from_file_data(
subject = SchemaEnforcer(
enforcement_policy="invalid", inference_sample_size=2
)

Expand All @@ -209,12 +210,12 @@ async def test_invalid_enforcement_policy(mocker, schema_dict):
async def test_warn_policy(mocker):
context = mocker.Mock()
context.object_store.get_pickled.return_value = None
subject = SchemaEnforcer.from_file_data(
subject = SchemaEnforcer(
inference_sample_size=0, enforcement_policy="warn"
)
await subject.start(context)
assert_that(await subject.filter_record({}), equal_to(False))
assert_that(subject.mode, instance_of(WarnSchema))
assert_that(subject.mode, instance_of(SchemaEnforcer.WarnSchema))


def test_filter_import_error(mocker):
Expand All @@ -230,6 +231,6 @@ def test_filter_import_error(mocker):
SchemaEnforcer()

assert (
"SchemaEnforcer requires genson and jsonschema to be installed. Install the `validation` extra."
"SchemaEnforcer requires genson and jsonschema to be installed." " Install the `validation` extra."
in str(excinfo.value)
)
Loading
Loading