1
1
import argparse
2
2
import os
3
3
import json
4
+ import webbrowser
5
+ from pathlib import Path
4
6
import dbt_column_lineage_extractor .utils as utils
5
7
from dbt_column_lineage_extractor import DbtColumnLineageExtractor
6
-
7
- def find_model_in_lineage (lineage_data , model_name ):
8
- """Find full node paths in lineage data that match the model name."""
9
- matching_nodes = []
10
- for node in lineage_data .keys ():
11
- # Check if the node ends with the model name
12
- if node .split ('.' )[- 1 ] == model_name :
13
- matching_nodes .append (node )
14
- return matching_nodes
8
+ from dbt_column_lineage_extractor .visualization import create_html_viewer , convert_to_mermaid
15
9
16
10
def main ():
17
11
parser = argparse .ArgumentParser (description = "Recursive DBT Column Lineage Extractor CLI" )
@@ -21,12 +15,11 @@ def main():
21
15
help = 'Path to the lineage_to_direct_parents.json file, default to ./outputs/lineage_to_direct_parents.json' )
22
16
parser .add_argument ('--lineage-children-file' , default = './outputs/lineage_to_direct_children.json' ,
23
17
help = 'Path to the lineage_to_direct_children.json file, default to ./outputs/lineage_to_direct_children.json' )
24
- parser .add_argument ('--manifest' , default = './target/manifest.json' ,
25
- help = 'Path to the dbt manifest.json file, default to ./target/manifest.json' )
26
- parser .add_argument ('--catalog' , default = './target/catalog.json' ,
27
- help = 'Path to the dbt catalog.json file, default to ./target/catalog.json' )
28
18
parser .add_argument ('--output-dir' , default = './outputs' , help = 'Output directory for lineage files, default to ./outputs' )
29
- parser .add_argument ('--show-ui' , action = 'store_true' , help = 'Show web UI for lineage visualization' )
19
+ parser .add_argument ('--no-ui' , action = 'store_true' , help = 'Do not automatically open the visualization in browser' )
20
+ parser .add_argument ('--output-format' , choices = ['json' , 'mermaid' , 'both' ], default = 'both' ,
21
+ help = 'Output format for lineage data. Choose between json, mermaid, or both. Default is both.' )
22
+ parser .add_argument ('--show-details' , action = 'store_true' , help = 'Show detailed squashed/structured ancestors/descendants in terminal' )
30
23
31
24
args = parser .parse_args ()
32
25
@@ -40,63 +33,41 @@ def main():
40
33
lineage_to_direct_children = utils .read_dict_from_file (args .lineage_children_file )
41
34
except FileNotFoundError as e :
42
35
print (f"Error: Could not find required lineage file: { e } " )
36
+ print ("\n To generate the required lineage files, please run one of the following commands first:" )
37
+ print ("\n 1. To scan the whole project (takes longer, but you don't need to run it again for different models if there is no model change):" )
38
+ print (" dbt_column_lineage_direct --manifest path/to/manifest.json --catalog path/to/catalog.json" )
39
+ print ("\n 2. If only interested in this model (faster):" )
40
+ print (f" dbt_column_lineage_direct --manifest path/to/manifest.json --catalog path/to/catalog.json --model +{ args .model } +" )
41
+ print ("\n After running one of these commands, try this command again." )
43
42
return 1
44
43
except json .JSONDecodeError as e :
45
44
print (f"Error: Invalid JSON in lineage file: { e } " )
46
45
return 1
47
46
48
- # Resolve model name to full node path if needed
47
+ # Use model name as provided - must be full node path
49
48
model_node = args .model
50
- # Check if this is not already a full node path
51
- if not model_node .startswith (('model.' , 'source.' )):
52
- # First try to find the model in lineage files
53
- matching_nodes_from_lineage = find_model_in_lineage (lineage_to_direct_parents , model_node )
54
-
55
- if matching_nodes_from_lineage :
56
- if len (matching_nodes_from_lineage ) > 1 :
57
- print (f"Warning: Multiple models match '{ model_node } ' in lineage files. Using the first match: { matching_nodes_from_lineage [0 ]} " )
58
- model_node = matching_nodes_from_lineage [0 ]
59
- print (f"Resolved model name '{ args .model } ' to full node path from lineage file: { model_node } " )
60
- else :
61
- # Try alternate lineage file if first one didn't have matches
62
- matching_nodes_from_lineage = find_model_in_lineage (lineage_to_direct_children , model_node )
63
- if matching_nodes_from_lineage :
64
- if len (matching_nodes_from_lineage ) > 1 :
65
- print (f"Warning: Multiple models match '{ model_node } ' in lineage files. Using the first match: { matching_nodes_from_lineage [0 ]} " )
66
- model_node = matching_nodes_from_lineage [0 ]
67
- print (f"Resolved model name '{ args .model } ' to full node path from lineage file: { model_node } " )
68
- else :
69
- # If not found in lineage files, try to use manifest if available
70
- try :
71
- # First check if manifest and catalog files exist
72
- if not os .path .exists (args .manifest ):
73
- print (f"Warning: Manifest file not found at '{ args .manifest } '. Cannot resolve model name." )
74
- print ("Proceeding with the original model name. If this is incorrect, please provide a full node path (e.g., model.package.model_name)" )
75
- elif not os .path .exists (args .catalog ):
76
- print (f"Warning: Catalog file not found at '{ args .catalog } '. Cannot resolve model name." )
77
- print ("Proceeding with the original model name. If this is incorrect, please provide a full node path (e.g., model.package.model_name)" )
78
- else :
79
- extractor = DbtColumnLineageExtractor (
80
- manifest_path = args .manifest ,
81
- catalog_path = args .catalog
82
- )
83
- # Try to resolve the model name
84
- matching_nodes = extractor ._resolve_node_by_name (model_node )
85
- if matching_nodes :
86
- if len (matching_nodes ) > 1 :
87
- print (f"Warning: Multiple models match '{ model_node } '. Using the first match: { matching_nodes [0 ]} " )
88
- model_node = matching_nodes [0 ]
89
- print (f"Resolved model name '{ args .model } ' to full node path: { model_node } " )
90
- else :
91
- print (f"Warning: Could not find any model with name '{ model_node } ' in the manifest file." )
92
- print ("Proceeding with the original model name. If this is incorrect, please provide a full node path (e.g., model.package.model_name)" )
93
- except Exception as e :
94
- print (f"Warning: Error when trying to resolve model name: { e } " )
95
- print ("Proceeding with the original model name. If this is incorrect, please provide a full node path (e.g., model.package.model_name)" )
96
49
97
50
# Check if model exists in lineage files
98
51
if model_node not in lineage_to_direct_parents and model_node not in lineage_to_direct_children :
99
- print (f"Warning: Model '{ model_node } ' not found in lineage files. Results may be empty or incomplete." )
52
+ # Search for potential matches in both lineage files
53
+ parent_matches = utils .find_potential_matches (lineage_to_direct_parents , model_node )
54
+ child_matches = utils .find_potential_matches (lineage_to_direct_children , model_node )
55
+
56
+ # Combine unique matches
57
+ all_matches = list (set (parent_matches + child_matches ))
58
+
59
+ if all_matches :
60
+ if len (all_matches ) > 1 :
61
+ print ("\n Error: Multiple potential matches found. Please use one of the following full node paths:" )
62
+ for match in sorted (all_matches ):
63
+ print (f" - { match } " )
64
+ return 1
65
+ else :
66
+ # Single match found - use it automatically
67
+ model_node = all_matches [0 ]
68
+ print (f"Using model: { model_node } " )
69
+ else :
70
+ print (f"No matches found for '{ model_node } '. Results may be empty or incomplete." )
100
71
101
72
print ("========================================" )
102
73
# Find all ancestors for a specific model and column
@@ -106,17 +77,14 @@ def main():
106
77
lineage_to_direct_parents , model_node , args .column
107
78
)
108
79
109
- print ("---squashed ancestors---" )
110
- utils .pretty_print_dict (ancestors_squashed )
111
- print ("---structured ancestors---" )
112
- utils .pretty_print_dict (ancestors_structured )
113
-
114
- # Save ancestors to files
115
- ancestors_file = os .path .join (args .output_dir , f"{ model_node } _{ args .column } _ancestors.json" )
116
- utils .write_dict_to_file (ancestors_structured , ancestors_file )
80
+ if args .show_details :
81
+ print ("---squashed ancestors---" )
82
+ utils .pretty_print_dict (ancestors_squashed )
83
+ print ("---structured ancestors---" )
84
+ utils .pretty_print_dict (ancestors_structured )
117
85
118
- print ("========================================" )
119
86
# Find all descendants for a specific model and column
87
+ print ("========================================" )
120
88
print (f"Finding all descendants of { model_node } .{ args .column } :" )
121
89
descendants_squashed = DbtColumnLineageExtractor .find_all_related (
122
90
lineage_to_direct_children , model_node , args .column
@@ -125,25 +93,48 @@ def main():
125
93
lineage_to_direct_children , model_node , args .column
126
94
)
127
95
128
- print ("---squashed descendants---" )
129
- utils .pretty_print_dict (descendants_squashed )
130
- print ("---structured descendants---" )
131
- utils .pretty_print_dict (descendants_structured )
96
+ if args .show_details :
97
+ print ("---squashed descendants---" )
98
+ utils .pretty_print_dict (descendants_squashed )
99
+ print ("---structured descendants---" )
100
+ utils .pretty_print_dict (descendants_structured )
132
101
133
- # Save descendants to files
134
- descendants_file = os .path .join (args .output_dir , f"{ model_node } _{ args .column } _descendants.json" )
135
- utils .write_dict_to_file (descendants_structured , descendants_file )
102
+ # Save outputs based on format
103
+ if args .output_format in ['json' , 'both' ]:
104
+ # Create safe filenames by replacing dots with underscores
105
+ safe_model_name = model_node .replace ('.' , '_' )
106
+
107
+ # Save ancestors to files
108
+ ancestors_file = os .path .join (args .output_dir , f"{ safe_model_name } _{ args .column } _ancestors.json" )
109
+ utils .write_dict_to_file (ancestors_structured , ancestors_file )
110
+
111
+ # Save descendants to files
112
+ descendants_file = os .path .join (args .output_dir , f"{ safe_model_name } _{ args .column } _descendants.json" )
113
+ utils .write_dict_to_file (descendants_structured , descendants_file )
114
+
115
+ print ("========================================" )
116
+ print (f"Lineage outputs saved to { ancestors_file } and { descendants_file } " )
136
117
137
- print ("========================================" )
138
- print (
139
- "You can use the structured ancestors and descendants to programmatically use the lineage, "
140
- "such as for impact analysis, data tagging, etc."
141
- )
142
- print (
143
- "Or, you can copy the json outputs to tools like https://github.com/AykutSarac/jsoncrack.com, "
144
- "https://jsoncrack.com/editor to visualize the lineage"
145
- )
146
- print (f"Lineage outputs saved to { ancestors_file } and { descendants_file } " )
118
+ if args .output_format in ['mermaid' , 'both' ]:
119
+ # Convert to Mermaid format
120
+ mermaid_output = convert_to_mermaid (model_node , args .column , ancestors_structured , descendants_structured )
121
+
122
+ # Save Mermaid output
123
+ mermaid_file = os .path .join (args .output_dir , f"{ model_node } _{ args .column } _lineage.mmd" )
124
+ with open (mermaid_file , 'w' ) as f :
125
+ f .write (mermaid_output )
126
+
127
+ # Always create HTML viewer for Mermaid output
128
+ viewer_file = create_html_viewer (mermaid_output , args .output_dir , model_node , args .column )
129
+
130
+ print (f"Mermaid output saved to { mermaid_file } " )
131
+ print (f"HTML viewer created at: { viewer_file } " )
132
+
133
+ # Open the viewer by default unless --no-ui is specified
134
+ if not args .no_ui :
135
+ print ("Opening Mermaid diagram in local viewer..." )
136
+ webbrowser .open (f"file://{ os .path .abspath (viewer_file )} " )
137
+
147
138
return 0
148
139
except Exception as e :
149
140
print (f"Error: { str (e )} " )
0 commit comments