Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions js/packages/ui/src/api/cll.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export interface CllInput {
no_cll?: boolean;
no_upstream?: boolean;
no_downstream?: boolean;
full_map?: boolean;
}

export interface ImpactRadiusParams {
Expand Down
88 changes: 46 additions & 42 deletions js/packages/ui/src/components/lineage/LineageViewOss.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import {
trackCopyToClipboard,
trackMultiNodesAction,
} from "../../lib/api/track";
import { sliceCllMap } from "./sliceCllMap";
import "../../styles";
import Box from "@mui/material/Box";
import Divider from "@mui/material/Divider";
Expand Down Expand Up @@ -175,6 +176,10 @@ export function PrivateLineageView(
const actionGetCll = useMutation({
mutationFn: (input: CllInput) => getCll(input, apiClient),
});
// Cached full CLL map fetched once on Impact click with full_map=true.
// All subsequent column/node navigation slices this client-side via sliceCllMap().
// Cleared when lineageGraph changes (re-fetch needed) or CLL is exited.
const fullCllMapRef = useRef<ColumnLineageData | undefined>(undefined);
// Guard against useLayoutEffect re-entry after cache patching.
// When setQueryData patches lineage.diff, queryServerInfo.data changes,
// lineageGraph recomputes via useMemo, and the effect re-fires. This ref
Expand All @@ -186,6 +191,36 @@ export function PrivateLineageView(
}>({ pending: false });
const [nodeColumnSetMap, setNodeColumSetMap] = useState<NodeColumnSetMap>({});

// Fetch the full CLL map (one-time), cache it, slice for the given params,
// and patch the lineage diff cache with change data.
async function fetchAndCacheFullMap(
cllApiInput: CllInput,
): Promise<ColumnLineageData> {
const fullMap = await actionGetCll.mutateAsync({
change_analysis: cllApiInput.change_analysis,
full_map: true,
});
fullCllMapRef.current = fullMap;
const cll = sliceCllMap(fullMap, cllApiInput);
if (cllApiInput.change_analysis && fullMap) {
cllCachePatchRef.current = { pending: true, cllData: cll };
queryClient.setQueryData(
cacheKeys.lineage(),
(old: ServerInfoResult | undefined) => {
if (!old) return old;
return {
...old,
lineage: {
...old.lineage,
diff: patchLineageDiffFromCll(old.lineage.diff, fullMap),
},
};
},
);
}
return cll;
}

const findNodeByName = useCallback(
(name: string) => {
return nodes.filter(isLineageGraphNode).find((n) => n.data.name === name);
Expand Down Expand Up @@ -415,35 +450,16 @@ export function PrivateLineageView(
cll = cllCachePatchRef.current.cllData;
cllCachePatchRef.current = { pending: false };
} else {
// lineageGraph changed externally — invalidate cached full CLL map
fullCllMapRef.current = undefined;
const cllApiInput: CllInput = {
...viewOptions.column_level_lineage,
change_analysis:
viewOptions.column_level_lineage.change_analysis ??
changeAnalysisModeRef.current,
};
try {
cll = await actionGetCll.mutateAsync(cllApiInput);
// Patch the lineage diff cache with change data from CLL
const cllResult = cll;
if (cllApiInput.change_analysis && cllResult) {
cllCachePatchRef.current = { pending: true, cllData: cllResult };
queryClient.setQueryData(
cacheKeys.lineage(),
(old: ServerInfoResult | undefined) => {
if (!old) return old;
return {
...old,
lineage: {
...old.lineage,
diff: patchLineageDiffFromCll(
old.lineage.diff,
cllResult,
),
},
};
},
);
}
cll = await fetchAndCacheFullMap(cllApiInput);
} catch (e) {
if (e instanceof AxiosError) {
const e2 = e as AxiosError<{ detail?: string }>;
Expand Down Expand Up @@ -694,26 +710,12 @@ export function PrivateLineageView(
changeAnalysisMode,
};
try {
cll = await actionGetCll.mutateAsync(cllApiInput);
// Patch the lineage diff cache with change data from CLL.
// Also set the guard ref so the useLayoutEffect that re-fires
// (due to lineageGraph recomputing) skips the redundant CLL call.
const cllResult = cll;
if (cllApiInput.change_analysis && cllResult) {
cllCachePatchRef.current = { pending: true, cllData: cllResult };
queryClient.setQueryData(
cacheKeys.lineage(),
(old: ServerInfoResult | undefined) => {
if (!old) return old;
return {
...old,
lineage: {
...old.lineage,
diff: patchLineageDiffFromCll(old.lineage.diff, cllResult),
},
};
},
);
// If we have the full CLL map cached, slice client-side (instant).
// Otherwise fetch with full_map=true and cache it (one-time cost).
if (fullCllMapRef.current) {
cll = sliceCllMap(fullCllMapRef.current, cllApiInput);
} else {
cll = await fetchAndCacheFullMap(cllApiInput);
}
} catch (e) {
if (e instanceof AxiosError) {
Expand All @@ -731,6 +733,8 @@ export function PrivateLineageView(
// Clear change analysis mode when CLL is cleared by any path
// (reselect, selectParentNodes, selectChildNodes, etc.)
setChangeAnalysisMode(false);
// Clear cached full map when CLL is exited
fullCllMapRef.current = undefined;
}

// Capture positions if preservePositions is true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""
Capture CLL fixtures from a running recce server that has actual dbt changes.

Usage:
python capture_diff_fixtures.py [--port 8000] [--output-dir ./diff]

Prerequisites:
- recce server running with jaffle_shop_duckdb (with base vs current diff)
- Changes should include: added column, modified column def, WHERE clause added
"""

import argparse
import json
import os
import sys
from urllib.request import urlopen, Request


def fetch_cll(base_url: str, params: dict) -> dict:
"""Fetch CLL data from the server via POST."""
url = f"{base_url}/api/cll"
body = json.dumps(params).encode()
req = Request(url, data=body, headers={"Content-Type": "application/json"})
with urlopen(req) as resp:
return json.loads(resp.read().decode())


def save_fixture(output_dir: str, name: str, data: dict):
path = os.path.join(output_dir, f"{name}.json")
with open(path, "w") as f:
json.dump(data, f, indent=2)
print(f" saved: {name}.json")


def main():
parser = argparse.ArgumentParser(description="Capture CLL diff fixtures")
parser.add_argument("--port", type=int, default=8000)
parser.add_argument("--output-dir", default=os.path.join(os.path.dirname(__file__), "diff"))
args = parser.parse_args()

base_url = f"http://localhost:{args.port}"
output_dir = args.output_dir
os.makedirs(output_dir, exist_ok=True)

# 1. Impact overview (no node_id = all changed nodes + their lineage)
print("Fetching impact overview (full map with change_analysis)...")
full_map = fetch_cll(base_url, {"change_analysis": True})
save_fixture(output_dir, "cll-diff-full-map", full_map)

# Find nodes with changes
nodes_with_changes = {}
for nid, node in full_map["current"]["nodes"].items():
if node.get("change_status"):
nodes_with_changes[nid] = node
print(f" changed node: {nid} (status={node['change_status']}, category={node.get('change_category', 'n/a')})")

if not nodes_with_changes:
print("ERROR: No nodes with change_status found. Is the server running with a diff?")
sys.exit(1)

# Find columns with changes
cols_with_changes = {}
for cid, col in full_map["current"]["columns"].items():
if col.get("change_status"):
cols_with_changes[cid] = col
print(f" changed column: {cid} ({col['change_status']})")

# 2. Node-level queries for changed nodes
print("\nFetching node-level fixtures for changed nodes...")
for nid in nodes_with_changes:
safe_name = nid.replace(".", "_")

data = fetch_cll(base_url, {"node_id": nid, "change_analysis": True})
save_fixture(output_dir, f"cll-diff-node-{safe_name}", data)

data = fetch_cll(base_url, {"node_id": nid, "change_analysis": True, "no_upstream": True})
save_fixture(output_dir, f"cll-diff-node-{safe_name}-no-upstream", data)

data = fetch_cll(base_url, {"node_id": nid, "change_analysis": True, "no_downstream": True})
save_fixture(output_dir, f"cll-diff-node-{safe_name}-no-downstream", data)

# 3. Column-level queries for changed columns
print("\nFetching column-level fixtures for changed columns...")
for cid, col in cols_with_changes.items():
node_id = None
col_name = None
for nid in full_map["current"]["nodes"]:
if cid.startswith(f"{nid}_"):
node_id = nid
col_name = cid[len(nid) + 1:]
break

if not node_id or not col_name:
print(f" WARNING: could not parse node_id/column from {cid}, skipping")
continue

safe_name = cid.replace(".", "_")
data = fetch_cll(base_url, {"node_id": node_id, "column": col_name, "change_analysis": True})
save_fixture(output_dir, f"cll-diff-col-{safe_name}", data)

# 4. A few impacted (downstream, not directly changed) nodes
print("\nFetching impacted (not directly changed) node fixtures...")
impacted_nodes = [
nid for nid, node in full_map["current"]["nodes"].items()
if node.get("impacted") and not node.get("change_status")
]
for nid in impacted_nodes[:3]:
safe_name = nid.replace(".", "_")
data = fetch_cll(base_url, {"node_id": nid, "change_analysis": True})
save_fixture(output_dir, f"cll-diff-node-{safe_name}", data)

print(f"\nDone! {len(os.listdir(output_dir))} fixtures in {output_dir}")


if __name__ == "__main__":
main()
Loading
Loading