make dev # Create .venv and install dependencies
make fmt # Format code — run before every commit
make lint # ruff + mypy
make test # Unit tests
make integration # Integration tests (needs Databricks workspace)
make e2e # End-to-end tests
make coverage # Coverage report
make docs-install # Install docs dependencies (yarn)
make docs-build # Build docs (pydoc-markdown + Docusaurus)
make help # All available targets
.venv/bin/pytest tests/unit/test_build_rules.py -v # single file
.venv/bin/pytest tests/unit/test_check_funcs.py::test_foo # single test
git config --global commit.gpgsign true # GPG-sign all commits (required)
git verify-commit <hash> # verify after first commitDQX is a Databricks Labs Python library for PySpark data quality checks — batch and streaming. It provides rule-based checks, LLM-driven rule generation, ML anomaly detection, data profiling, PII detection, and a Databricks CLI installer.
Data flow: DQRule → check_func → PySpark Column expression → appended to DataFrame as _errors / _warnings columns.
src/databricks/labs/dqx/
├── engine.py # DQEngine / DQEngineCore — primary public API
├── base.py # DQEngineBase, DQEngineCoreBase (abstract)
├── rule.py # DQRule (abstract), DQRowRule, DQDatasetRule, DQForEachColRule, Criticality
├── rule_fingerprint.py # Rule fingerprinting (compute_rule_fingerprint, compute_rule_set_fingerprint_by_metadata)
├── check_funcs.py # Built-in checks: null, range, regex, referential, aggregate…
├── manager.py # DQRuleManager — build/manage rule collections
├── config.py # WorkspaceConfig, RunConfig, AnomalyParams, LLMModelConfig, ExtraParams
├── checks_storage.py # WorkspaceFileChecksStorageHandler, VolumeFileChecksStorageHandler
├── checks_serializer.py / checks_resolver.py / checks_validator.py / checks_formats.py
├── config_serializer.py # ConfigSerializer — use instead of dataclasses.asdict()
├── cli.py # Databricks Labs CLI commands (@dqx.command)
├── errors.py # For example: MissingParameterError, InvalidParameterError, UnsafeSqlQueryError — use instead of built-in exceptions
├── telemetry.py # telemetry_logger, log_telemetry, log_dataframe_telemetry
├── utils.py # shared helpers (column name resolution, SQL safety checks, etc.)
├── executor.py / io.py / table_manager.py / workflows_runner.py
├── metrics_listener.py / metrics_observer.py / reporting_columns.py
├── profiling_utils.py # Profiling utilities (stats, column metadata helpers)
├── profiler/ # Data profiling, rule generation, DLT pipeline generation
├── anomaly/ # ML row-level anomaly detection (MLflow, ensemble, drift, explainability)
├── llm/ # LLM-based rule generation via DSPy (DQLLMEngine)
├── pii/ # PII detection — optional dep: presidio + spaCy
├── geo/ # Geospatial check functions
├── datacontract/ # Data contract rule generation
├── schema/ # DQ result and info schemas
├── installer/ # Workspace install/uninstall, workflow, dashboard, warehouse installer
├── quality_checker/ # E2E workflow runner
├── contexts/ # workspace_context, workflow_context, cli_context
└── dashboards/ # Lakeview dashboard support
tests/
├── unit/ # No Spark / workspace — runs in seconds
├── integration/ # Needs live workspace + databricks-connect
├── e2e/ # Full workflow tests
├── integration_anomaly/ # MLflow + Unity Catalog
└── perf/ # Benchmarks
- Apply the DRY principle: avoid duplicating logic, validation, or configuration. Extract shared utilities only when the same logic appears in three or more places.
- Apply dependency injection: pass
WorkspaceClient,SparkSession, and other external dependencies as constructor or function arguments rather than constructing them inside classes. - Maintain separation of concerns: keep business logic decoupled from I/O, persistence, or framework details.
- Prefer composition over inheritance for new code. Where the existing mixin hierarchy (e.g.
DQRuleTypeMixin) already provides the right extension point, use it — do not introduce additional inheritance layers on top.
- Write pure functions wherever possible: no hidden state, no side effects, deterministic outputs. All check functions in
check_funcs.pymust be pure — they receive column references and return aColumnexpression only. - Encapsulate side effects (workspace API calls, file I/O, network) at well-defined boundaries, not inside check logic or rule construction.
- Cover all changes with tests. New check functions and rule logic → unit tests. Workspace interactions → integration tests. Bug fixes → regression tests.
- Unit tests (
tests/unit/) run without Spark or a live workspace and must stay fast. - Integration tests (
tests/integration/) require a real workspace and spark session; do not add workspace API calls to unit tests. - Test behaviour, not implementation details: assert on outputs and observable state, not on private methods or internal data structures.
- Use dependency injection to enable testing: construct dependencies with
create_autospecrather than patching internal module state. - Use pytest fixtures (
conftest.py) to share setup and teardown logic across tests. Unit-level fixtures live intests/unit/conftest.py; integration-level fixtures intests/integration/conftest.py. Do not duplicate fixture logic inline in individual tests. - For workspace resource creation and cleanup in integration tests, use the pytester
factoryhelper — see ## Testing for the established patterns. - If a test requires a real
SparkSession, it is an integration test — place it intests/integration/, nottests/unit/. Unit tests must never start or depend on a Spark session; usecreate_autospec(SparkSession)for any unit-level Spark dependency. - Avoid
unittest.mock.patchandpytest.monkeypatchunless the target is a module-level constant or a third-party boundary with no injectable seam. Patching internal symbols couples tests to implementation details. - Tests must be deterministic and isolated: no timing dependencies, randomness, shared mutable state, or real network calls in unit tests.
- Keep responses concise and practical.
- Provide production-grade solutions, not prototype or demo code.
- Avoid speculative or unrequested changes.
- Align with the project's coding conventions and testing standards above.
Without it the function is absent from CHECK_FUNC_REGISTRY and invisible to apply_checks_by_metadata. Always decorate.
| Module | Requires |
|---|---|
pii/ |
pip install -e ".[pii]" + spaCy model download |
llm/ |
dspy |
anomaly/ |
mlflow |
Never import these unconditionally in engine.py, check_funcs.py, or any non-optional module.
DQRule is abstract. Use DQRowRule for row-level rules and DQDatasetRule for dataset-level rules. To apply the same rule to multiple columns at once, use DQForEachColRule — note that DQForEachColRule does not inherit from DQRule; it is a factory that produces DQRule instances via get_rules().
ExtraParams is @dataclass(frozen=True). Use dataclasses.replace(extra_params, field=value).
Use ConfigSerializer — it preserves nested types. dataclasses.asdict() loses them.
Fix the code instead of adding # pylint: disable, # type: ignore, # noqa, or per-file ignores. Use project-wide exceptions in pyproject.toml only when there is no viable fix (e.g., third-party API compatibility).
Mandatory: Agents must comply with the following security requirements. These are non-negotiable; any added or modified code must not introduce security gaps. Violations must be rejected or remediated before merge.
- SQL injection: User-provided or templated SQL must be validated with
is_sql_query_safe()fromutils.pybefore execution. RaiseUnsafeSqlQueryErrorwhen the query is unsafe. Do not embed user input directly into SQL strings; use parameterized queries or validated template substitution (e.g.,re.escapefor keys). - Sensitive content: Do not include in the repository, history, issues, PRs, workflows, or CI logs: PII, customer identifiers, internal URLs, internal system names, architecture details, secrets, keys, or tokens. Use structured logging with redaction. Before release, commit history must be reviewed and remediated or rewritten if needed.
- Untrusted input: When parsing JSON, YAML, or other formats from external sources, handle parse failures gracefully and avoid exposing raw error details that could leak internal structure.
- Path traversal: Validate file and workspace paths; avoid constructing paths from unsanitized user input.
- Regex (ReDoS): Be aware of Regular Expression Denial of Service—complex patterns with nested quantifiers, alternation, or backtracking (e.g.
(a+)+,(a|a)*) can cause catastrophic backtracking on adversarial input and hang the process. Validate or limit regex inputs from users; prefer simple patterns or use libraries with ReDoS-safe engines when handling untrusted strings. - Dependencies: Do not add dependencies with known vulnerabilities. Prefer well-maintained, widely used packages. Ensure dependencies and their licenses are compliant for intended distribution (e.g., external or OSS).
- Documentation and metadata: Ensure metadata and documentation are safe for public distribution. Documentation must clearly describe intended use, supported environments, and any restrictions.
- Open source hygiene: Do not remove LICENSE, NOTICE, or attribution files. Maintainer contact should be identifiable.
- Security controls: Do not disable or bypass secret scanning, software composition analysis, or vulnerability scanning. Releases process should use CI-built tagged artifacts.
- GitHub workflows: Avoid
pull_request_targetunless strictly necessary. The danger arises when this trigger (which bypasses GitHub's approval gate and runs in the base-branch context with secrets) is combined with checkout of untrusted fork code (e.g.refs/pull/.../merge) — that PR code can then exfiltrate secrets. Preferpull_requestfor CI. Ifpull_request_targetis required (e.g., for label-based workflows), never checkout or run PR-provided code when secrets are available; useworkflow_runorworkflow_dispatchfor sensitive operations instead. - Pickle / model deserialization:
cloudpickleandmlflow.sklearn.load_model()execute arbitrary code on deserialization. Only load models from trusted, authenticated MLflow registries within the controlled Databricks workspace. Never pass user-supplied model URIs to load functions without validating they resolve to a known, controlled registry path. - LLM prompt injection: User-supplied inputs to the
llm/module (business descriptions, column names, sample data) are embedded in LLM prompts and can be crafted to manipulate model behaviour. Treat all such inputs as untrusted. LLM-generated output — function names, SQL fragments, rule arguments — must be validated before execution: function names must resolve throughCHECK_FUNC_REGISTRY; any generated SQL must passis_sql_query_safe(). - Log injection: Never embed user-supplied values (column names, rule names, file paths, contract identifiers) directly in log messages via f-strings without sanitization. Newlines and control characters in these values can forge log entries or corrupt log pipelines. Strip or escape newlines before logging untrusted strings; prefer structured logging fields over interpolated messages (CWE-117).
- SSRF (Server-Side Request Forgery): User-controlled URL fields such as api_base in LLMModelConfig are passed to outbound HTTP clients. Validate that any user-supplied URL targets an expected, allowlisted host before use. Do not allow arbitrary internal network endpoints to be reached via user config (CWE-918).
- Secrets management: Design credential-accepting fields (API keys, tokens, passwords) to support secret scope references (e.g.,
secret_scope/secret_key) rather than raw strings. Document in docstrings that plaintext values are for local development only. Never log credential fields, even partially — redact them at the point of construction, not at the call site. - LLM denial of service and information disclosure: Malicious or pathological prompts can trigger unbounded or very expensive inference calls (OWASP LLM04) — enforce token/budget limits on all LLM calls. LLM responses may inadvertently echo back sensitive inputs such as schema metadata or data samples (OWASP LLM06) — treat LLM output as untrusted data; do not relay raw LLM responses to end users or logs without review.
# ✅ correct
import pyspark.sql.functions as F
from pyspark.sql import Column
from databricks.labs.dqx.check_funcs import make_condition, get_normalized_column_and_expr
from databricks.labs.dqx.rule import register_rule
@register_rule("row")
def is_not_null(column: str | Column) -> Column:
col_str_norm, col_expr_str, col_expr = get_normalized_column_and_expr(column)
return make_condition(
col_expr.isNull(),
f"Column '{col_expr_str}' value is null",
f"{col_str_norm}_is_null"
)
# ❌ wrong — missing decorator, missing return type, returns DataFrame, bypasses make_condition
def is_not_null(column):
return df.filter(...)Rules:
- Decorate with
@register_rule("row")(row-level) or@register_rule("dataset")(group of rows) - Return a PySpark
Column— never aDataFrame - Use
make_condition(condition, message, name)fromcheck_funcs.pyto build the return value.
Rules can be defined programmatically (DQX classes) or declaratively (dict metadata/YAML/JSON). Both are equivalent — choose based on context.
Programmatic API — use when building rules in code using DQX classes:
from databricks.labs.dqx.rule import DQRowRule, DQForEachColRule, Criticality
from databricks.labs.dqx import check_funcs as checks
# ✅ use concrete subclasses
DQRowRule(check_func=checks.is_not_null, column="id", criticality=Criticality.ERROR, name="id_not_null")
DQForEachColRule(check_func=checks.is_not_null, columns=["id", "name", "date"])
# ❌ DQRule is abstract — instantiation raises TypeError
DQRule(check_func=checks.is_not_null, ...)Metadata API — build rules declaratively using list[dict] or YAML/JSON:
- criticality: error
check:
function: is_not_null
arguments:
column: id
- check:
function: is_not_null
for_each_column:
- id
- name
- dateEvery parameter and return value must be annotated. Enforced by mypy (make lint).
# ✅
def resolve(column: str | Column, spark: SparkSession) -> Column: ...
# ❌
def resolve(column, spark): ...| Rule | ✅ Do | ❌ Don't |
|---|---|---|
| Generics | list[str], dict[str, int] |
List[str], Dict[str, int] |
| Optional | str | None |
Optional[str] |
| Union | str | int |
Union[str, int] |
| Callables | collections.abc.Callable |
typing.Callable |
| Unknown types | object or Protocol |
Any |
Avoid Any. When it is truly unavoidable (e.g., untyped external ML library), add:
model: Any # type: ignore[assignment] — mlflow has no stubsAny in anomaly/ is a known legacy exception. New code outside that module must not introduce it.
Use Google Style Python Docstrings for public functions, classes, and modules. See Writing Docstrings for full guidance.
- No backticks around object names — use italics instead (e.g., arg1, column). Backticks cause rendering issues in API docs.
- Double curly braces — mask with backslashes, e.g.
{{. For code examples, use triple backticks.
databricks-labs-blueprint provides CLI, installation, parallelism, and rate-limiting. Use it — don't reinvent.
| Need | Import | Usage |
|---|---|---|
| Module logger | import logging |
logger = logging.getLogger(__name__); use f-strings for log messages (e.g. logger.warning(f"value={x}")), not %-style args |
| CLI entrypoint logger | blueprint.entrypoint.get_logger |
logger = get_logger(__file__) |
| Package logger setup | blueprint.logger.install_logger |
call once in __init__.py |
| Parallel tasks | blueprint.parallel.Threads |
Threads.strict("label", tasks) |
| Aggregate errors | blueprint.parallel.ManyError |
catch at top level |
| Rate limiting | blueprint.limiter.rate_limited |
@rate_limited(max_requests=100) |
| Config persistence | blueprint.installation.Installation |
installation.load(WorkspaceConfig) |
| Test config | blueprint.installation.MockInstallation |
in-memory, no workspace needed |
| Test prompts | blueprint.tui.MockPrompts |
regex → response map |
| Product versioning | blueprint.wheels.ProductInfo |
ProductInfo.for_testing(WorkspaceConfig) |
Parallel execution:
from databricks.labs.blueprint.parallel import ManyError, Threads
Threads.strict("label", tasks) # raises ManyError if any fail
results, errors = Threads.gather("label", tasks) # returns (successes, failures)
# ❌ don't use concurrent.futures directlyInstallation (config persistence):
from databricks.labs.blueprint.installation import Installation, MockInstallation
# production
installation = Installation(ws, "dqx")
config = installation.load(WorkspaceConfig)
installation.save(config)
# unit tests — never use real Installation in unit tests
mock = MockInstallation({"version": "1.0.0"})MockPrompts (test interactive CLI):
from databricks.labs.blueprint.tui import MockPrompts
prompts = MockPrompts({r"Do you want to uninstall.*": "yes", r".*": ""})# WorkspaceClient auto-injected; keyword-only args become CLI flags
@dqx.command
def my_command(w: WorkspaceClient, *, install_folder: str = "", ctx: WorkspaceContext | None = None):
ctx = ctx or WorkspaceContext(w, install_folder=install_folder or None)
...All configs use @dataclass. Serialize via ConfigSerializer. Frozen configs use dataclasses.replace().
Wrap new public engine methods with @telemetry_logger. Call log_telemetry() / log_dataframe_telemetry() from telemetry.py.
| Layer | Location | Requirements | When to write |
|---|---|---|---|
| Unit | tests/unit/ |
None — no Spark, no workspace | Always: every new function/class |
| Integration | tests/integration/ |
Live workspace + databricks-connect |
CLI commands, storage handlers |
| E2E | tests/e2e/ |
Full workspace | Complete workflows |
| Anomaly | tests/integration_anomaly/ |
Unity Catalog + MLflow | anomaly/ module changes |
| Perf | tests/perf/ |
Workspace | Performance-sensitive paths |
Minimum per new check function: one positive test + one negative test.
Minimum per new CLI command: one integration test with MockInstallation + MockPrompts.
Use factory from databricks.labs.pytester.fixtures.baseline for every integration fixture that creates a Databricks resource. It guarantees cleanup on test failure.
from databricks.labs.pytester.fixtures.baseline import factory
@pytest.fixture
def make_check_file(ws, make_directory, checks_yaml_content):
def create(**kwargs) -> str:
path = str(make_directory().absolute()) + "/checks.yml"
ws.workspace.upload(path=path, content=checks_yaml_content.encode(), overwrite=True)
return path
def delete(path: str) -> None:
ws.workspace.delete(path)
yield from factory("file", create, delete)
# ❌ don't delete manually in test body — factory handles it even on failureAuto-provided fixtures (from pytester, no need to define): ws (WorkspaceClient), spark (SparkSession), make_random, make_directory, env_or_skip.
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.rule import DQRowRule, Criticality
from databricks.labs.dqx import check_funcs as checks
engine = DQEngine(WorkspaceClient())
rules = [
DQRowRule(check_func=checks.is_not_null, column="id", criticality=Criticality.ERROR),
DQRowRule(check_func=checks.is_not_null_and_not_empty, column="name", criticality=Criticality.WARN),
]
# Returns DataFrame with _errors and _warnings columns appended
checked_df = engine.apply_checks(df, rules)
# Or split into valid / invalid
valid_df, invalid_df = engine.apply_checks_and_split(df, rules)from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.engine import DQEngine
engine = DQEngine(WorkspaceClient())
checks = yaml.safe_load("""
- criticality: error
check:
function: is_not_null
arguments:
column: id
- criticality: warn
check:
function: is_not_null_and_not_empty
arguments:
column: name
""")
checked_df = engine.apply_checks_by_metadata(df, checks)
# Or split into valid / invalid (quarantine)
valid_df, invalid_df = engine.apply_checks_by_metadata_and_split(df, checks)# Load from workspace YAML/JSON
checks_list = engine.load_checks_from_local_file("checks.yml")
checked_df = engine.apply_checks_by_metadata(df, checks_list)- Add to
src/databricks/labs/dqx/check_funcs.pywith@register_rule("row")or@register_rule("dataset") - Return a PySpark
Column - Add to
__all__if public - Add unit tests in
tests/unit/test_check_funcs_<category>.py
- Add
@dqx.commandfunction tocli.py - Inject
WorkspaceClient; useWorkspaceContextfor workspace operations - Add integration test in
tests/integration/test_cli.pyusingMockInstallation+MockPrompts
- README.md — Project description
- docs/ — Full site including contribution workflow (Docusaurus): https://databrickslabs.github.io/dqx/
- CHANGELOG.md — Release history
Stack: Python 3.10+ · PySpark · Databricks SDK · databricks-labs-blueprint · databricks-labs-pytester · DSPy · Presidio · MLflow