diff --git a/bigquery_etl/cli/dryrun.py b/bigquery_etl/cli/dryrun.py index c4f03eec4e4..cc1c34082b5 100644 --- a/bigquery_etl/cli/dryrun.py +++ b/bigquery_etl/cli/dryrun.py @@ -154,6 +154,7 @@ def _sql_file_valid( respect_skip=respect_skip, id_token=id_token, billing_project=billing_project, + strip_dml=True if validate_schemas else False, ) if validate_schemas: try: diff --git a/bigquery_etl/dryrun.py b/bigquery_etl/dryrun.py index 4448f4ec365..f3abeb0bfaa 100644 --- a/bigquery_etl/dryrun.py +++ b/bigquery_etl/dryrun.py @@ -25,13 +25,14 @@ import click import google.auth +import sqlglot from google.auth.transport.requests import Request as GoogleAuthRequest from google.cloud import bigquery from google.oauth2.id_token import fetch_id_token from .config import ConfigLoader from .metadata.parse_metadata import Metadata -from .util.common import render +from .util.common import random_str, render try: from functools import cached_property # type: ignore @@ -79,6 +80,89 @@ def get_id_token(dry_run_url=ConfigLoader.get("dry_run", "function"), credential return id_token +def wrap_in_view_for_dryrun(sql: str) -> str: + """ + Wrap SELECT queries in CREATE VIEW statement for faster dry runs. + + CREATE VIEW statements don't scan partition metadata which makes dry runs faster. + """ + try: + statements = [ + stmt for stmt in sqlglot.parse(sql, dialect="bigquery") if stmt is not None + ] + + # Only wrap if the last statement is a SELECT statement + if not statements or not isinstance(statements[-1], sqlglot.exp.Select): + return sql + + # Replace CREATE TEMP FUNCTION with CREATE FUNCTION using fully qualified names + # CREATE VIEW doesn't support temp functions + test_project = ConfigLoader.get( + "default", "test_project", fallback="bigquery-etl-integration-test" + ) + + def replace_temp_function(match): + func_name = match.group(1) + # If function name is already qualified, keep it; otherwise add project.dataset prefix + if "." not in func_name and "`" not in func_name: + return f"CREATE FUNCTION `{test_project}.tmp.{func_name}`" + else: + return f"CREATE FUNCTION {func_name}" + + sql = re.sub( + r"\bCREATE\s+TEMP(?:ORARY)?\s+FUNCTION\s+([^\s(]+)", + replace_temp_function, + sql, + flags=re.IGNORECASE, + ) + + # Single statement - just wrap it + if len(statements) == 1: + view_name = f"_dryrun_view_{random_str(8)}" + test_project = ConfigLoader.get( + "default", "test_project", fallback="bigquery-etl-integration-test" + ) + query_sql = sql.strip().rstrip(";") + return f"CREATE VIEW `{test_project}.tmp.{view_name}` AS\n{query_sql}" + + # Multiple statements: use sqlglot tokenizer to find statement boundaries + # This handles semicolons in strings and comments + tokens = list(sqlglot.tokens.Tokenizer(dialect="bigquery").tokenize(sql)) + + # Find semicolon tokens that separate statements (not in strings/comments) + semicolon_positions = [] + for token in tokens: + if token.token_type == sqlglot.tokens.TokenType.SEMICOLON: + semicolon_positions.append(token.end) + + # We need (len(statements) - 1) semicolons to separate statements + if len(semicolon_positions) >= len(statements) - 1: + # The (n-1)th semicolon separates the prefix from the last statement + split_pos = semicolon_positions[len(statements) - 2] + prefix_sql = sql[:split_pos].strip() + query_sql = sql[split_pos:].strip().lstrip(";").strip() + else: + # Fallback: regenerate prefix statements, use regenerated query + prefix_statements = statements[:-1] + prefix_sql = ";\n".join( + stmt.sql(dialect="bigquery") for stmt in prefix_statements + ) + query_sql = statements[-1].sql(dialect="bigquery") + + # Wrap in view + view_name = f"_dryrun_view_{random_str(8)}" + test_project = ConfigLoader.get( + "default", "test_project", fallback="bigquery-etl-integration-test" + ) + wrapped_query = f"CREATE VIEW `{test_project}.tmp.{view_name}` AS\n{query_sql}" + + return f"{prefix_sql};\n\n{wrapped_query}" + + except Exception as e: + print(f"Warning: Failed to wrap SQL in view: {e}") + return sql + + class Errors(Enum): """DryRun errors that require special handling.""" @@ -254,6 +338,30 @@ def dry_run_result(self): ) ) + # Wrap the query in a CREATE VIEW for faster dry runs + # Skip wrapping when strip_dml=True as it's used for special analysis modes + if not self.strip_dml: + # If query has parameters, replace them with literal values in the wrapped version + # since CREATE VIEW cannot use parameterized queries + sql_for_wrapping = sql + if query_parameters: + for param in query_parameters: + param_name = f"@{param.name}" + # Convert parameter value to SQL literal + if param.type_ == "DATE": + param_value = f"DATE '{param.value}'" + elif param.type_ in ("STRING", "DATETIME", "TIMESTAMP"): + param_value = f"'{param.value}'" + elif param.type_ == "BOOL": + param_value = str(param.value).upper() + else: + param_value = str(param.value) + sql_for_wrapping = sql_for_wrapping.replace(param_name, param_value) + + sql = wrap_in_view_for_dryrun(sql_for_wrapping) + + # print(sql) + project = basename(dirname(dirname(dirname(self.sqlfile)))) dataset = basename(dirname(dirname(self.sqlfile))) try: @@ -387,6 +495,7 @@ def get_referenced_tables(self): filtered_content, client=self.client, id_token=self.id_token, + strip_dml=self.strip_dml, ).get_error() == Errors.DATE_FILTER_NEEDED_AND_SYNTAX ): @@ -408,6 +517,7 @@ def get_referenced_tables(self): content=filtered_content, client=self.client, id_token=self.id_token, + strip_dml=self.strip_dml, ).get_error() == Errors.DATE_FILTER_NEEDED_AND_SYNTAX ): @@ -420,6 +530,7 @@ def get_referenced_tables(self): content=filtered_content, client=self.client, id_token=self.id_token, + strip_dml=self.strip_dml, ) if ( stripped_dml_result.get_error() is None @@ -582,8 +693,11 @@ def validate_schema(self): client=self.client, id_token=self.id_token, partitioned_by=partitioned_by, + strip_dml=self.strip_dml, ) + # print(table_schema) + # This check relies on the new schema being deployed to prod if not query_schema.compatible(table_schema): click.echo( diff --git a/bigquery_etl/schema/stable_table_schema.py b/bigquery_etl/schema/stable_table_schema.py index f2abbc6442b..0db08a2fb64 100644 --- a/bigquery_etl/schema/stable_table_schema.py +++ b/bigquery_etl/schema/stable_table_schema.py @@ -59,7 +59,8 @@ def prod_schemas_uri(): with the most recent production schemas deploy. """ dryrun = DryRun( - "moz-fx-data-shared-prod/telemetry_derived/foo/query.sql", content="SELECT 1" + "moz-fx-data-shared-prod/telemetry_derived/foo/query.sql", + content="SELECT 1 AS field", ) build_id = dryrun.get_dataset_labels()["schemas_build_id"] commit_hash = build_id.split("_")[-1] diff --git a/bigquery_etl/view/__init__.py b/bigquery_etl/view/__init__.py index f0e8e356a11..f1efebc3212 100644 --- a/bigquery_etl/view/__init__.py +++ b/bigquery_etl/view/__init__.py @@ -221,7 +221,10 @@ def dryrun_schema(self): """ ) return Schema.from_query_file( - Path(self.path), content=schema_query, id_token=self.id_token + Path(self.path), + content=schema_query, + id_token=self.id_token, + strip_dml=True, ) except Exception as e: print(f"Error dry-running view {self.view_identifier} to get schema: {e}") diff --git a/sql/moz-fx-data-shared-prod/analysis/bqetl_default_task_v1/query.sql b/sql/moz-fx-data-shared-prod/analysis/bqetl_default_task_v1/query.sql index da7610433d4..e69e649106c 100644 --- a/sql/moz-fx-data-shared-prod/analysis/bqetl_default_task_v1/query.sql +++ b/sql/moz-fx-data-shared-prod/analysis/bqetl_default_task_v1/query.sql @@ -2,6 +2,6 @@ SELECT * FROM - (SELECT 1) + (SELECT 1 AS field) WHERE FALSE diff --git a/sql_generators/README.md b/sql_generators/README.md index 3b2ceef8a28..751c9bf7a80 100644 --- a/sql_generators/README.md +++ b/sql_generators/README.md @@ -9,3 +9,4 @@ The directories in `sql_generators/` represent the generated queries and will co Each `__init__.py` file needs to implement a `generate()` method that is configured as a [click command](https://click.palletsprojects.com/en/8.0.x/). The `bqetl` CLI will automatically add these commands to the `./bqetl query generate` command group. After changes to a schema or adding new tables, the schema is automatically derived from the query and deployed the next day in DAG [bqetl_artifact_deployment](https://workflow.telemetry.mozilla.org/dags/bqetl_artifact_deployment/grid). Alternatively, it can be manually generated and deployed using `./bqetl generate all` and `./bqetl query schema deploy`. + diff --git a/sql_generators/stable_views/__init__.py b/sql_generators/stable_views/__init__.py index 05cd34a0117..f38c26a658b 100644 --- a/sql_generators/stable_views/__init__.py +++ b/sql_generators/stable_views/__init__.py @@ -389,7 +389,7 @@ def write_view_if_not_exists( content = VIEW_CREATE_REGEX.sub("", target_file.read_text()) content += " WHERE DATE(submission_timestamp) = '2020-01-01'" view_schema = Schema.from_query_file( - target_file, content=content, sql_dir=sql_dir, id_token=id_token + target_file, content=content, sql_dir=sql_dir, id_token=id_token, strip_dml=True ) stable_table_schema = Schema.from_json({"fields": schema.schema}) diff --git a/tests/test_dryrun.py b/tests/test_dryrun.py index 21a9f7a0848..1cc978d01d0 100644 --- a/tests/test_dryrun.py +++ b/tests/test_dryrun.py @@ -15,9 +15,10 @@ def tmp_query_path(tmp_path): class TestDryRun: def test_dry_run_sql_file(self, tmp_query_path): query_file = tmp_query_path / "query.sql" - query_file.write_text("SELECT 123") + query_file.write_text("SELECT 123 AS field") dryrun = DryRun(str(query_file)) + print(dryrun.dry_run_result) response = dryrun.dry_run_result assert response["valid"] @@ -31,7 +32,7 @@ def test_dry_run_invalid_sql_file(self, tmp_query_path): def test_sql_file_valid(self, tmp_query_path): query_file = tmp_query_path / "query.sql" - query_file.write_text("SELECT 123") + query_file.write_text("SELECT 123 AS field") dryrun = DryRun(str(query_file)) assert dryrun.is_valid() @@ -61,7 +62,7 @@ def test_sql_file_invalid(self, tmp_query_path): def test_get_referenced_tables_empty(self, tmp_query_path): query_file = tmp_query_path / "query.sql" - query_file.write_text("SELECT 123") + query_file.write_text("SELECT 123 AS field") dryrun = DryRun(str(query_file)) assert dryrun.get_referenced_tables() == [] @@ -70,7 +71,7 @@ def test_get_sql(self, tmp_path): os.makedirs(tmp_path / "telmetry_derived") query_file = tmp_path / "telmetry_derived" / "query.sql" - sql_content = "SELECT 123 " + sql_content = "SELECT 123 AS field" query_file.write_text(sql_content) assert DryRun(sqlfile=str(query_file)).get_sql() == sql_content @@ -83,7 +84,7 @@ def test_get_referenced_tables(self, tmp_query_path): "SELECT * FROM `moz-fx-data-shared-prod.telemetry_derived.clients_daily_v6` " "WHERE submission_date = '2020-01-01'" ) - query_dryrun = DryRun(str(query_file)).get_referenced_tables() + query_dryrun = DryRun(str(query_file), strip_dml=True).get_referenced_tables() assert len(query_dryrun) == 1 assert query_dryrun[0]["datasetId"] == "telemetry_derived"