1
+ import os
1
2
from pathlib import Path
2
3
import shutil
4
+ import typer
5
+ from typing import Literal
3
6
4
7
from dcpy .utils import postgres
8
+ from dcpy .utils .collections import indented_report
5
9
from dcpy .models .data import comparison
6
10
from dcpy .data import compare
7
11
from dcpy .connectors .edm import recipes
8
12
from dcpy .lifecycle .ingest import run as ingest
13
+ from dcpy .lifecycle .builds import metadata as build_metadata
14
+
15
+ DATABASE = "sandbox"
16
+ LIBRARY_PATH = recipes .LIBRARY_DEFAULT_PATH / "datasets"
17
+ print (os .environ .get ("BUILD_NAME" ))
18
+ SCHEMA = build_metadata .build_name (os .environ .get ("BUILD_NAME" ))
19
+ print (SCHEMA )
20
+
21
+
22
+ def call_library (dataset : str , version : str | None = None , file_type = "pgdump" ):
23
+ # BEWARE: once you import library, parquet file writing fails
24
+ # Something to do with gdal's interaction with parquet file driver
25
+ from dcpy .library .archive import Archive
26
+
27
+ a = Archive ()
28
+ config = a (name = dataset , output_format = file_type , version = version )
29
+ # We're running ingest too, so change version after the fact
30
+ # Can't just feed this version to archive call because of datasets that template in the version
31
+ target_dir = LIBRARY_PATH / dataset / "library"
32
+ if target_dir .is_dir ():
33
+ shutil .rmtree (target_dir )
34
+ os .rename (LIBRARY_PATH / dataset / config .version , target_dir )
35
+
36
+
37
+ def call_ingest (
38
+ dataset : str , version : str | None = None , ingest_parent_dir : Path = ingest .TMP_DIR
39
+ ) -> None :
40
+ ingest_dir = ingest_parent_dir / dataset / "staging"
41
+ if ingest_dir .is_dir ():
42
+ shutil .rmtree (ingest_dir )
43
+ ingest .run (dataset , version = version , staging_dir = ingest_dir , skip_archival = True )
44
+
45
+ ingest_output_path = ingest_dir / f"{ dataset } .parquet"
46
+ ingest_path = LIBRARY_PATH / dataset / "ingest" / f"{ dataset } .parquet"
47
+
48
+ ingest_path .parent .mkdir (exist_ok = True , parents = True )
49
+ shutil .copy (ingest_output_path , ingest_path )
50
+
51
+
52
+ def load_recipe (
53
+ dataset : str ,
54
+ version : Literal ["library" , "ingest" ],
55
+ file_type : recipes .DatasetType | None = None ,
56
+ ) -> None :
57
+ if not file_type :
58
+ if version == "library" :
59
+ file_type = recipes .DatasetType .pg_dump
60
+ else :
61
+ file_type = recipes .DatasetType .parquet
62
+
63
+ target_table = f"{ dataset } _{ version } "
64
+
65
+ client = postgres .PostgresClient (schema = SCHEMA , database = DATABASE )
66
+ client .drop_table (dataset )
67
+ client .drop_table (target_table )
68
+
69
+ left_ds = recipes .Dataset (id = dataset , version = version , file_type = file_type )
70
+ recipes .import_dataset (
71
+ left_ds ,
72
+ client ,
73
+ import_as = target_table ,
74
+ )
9
75
10
76
11
77
def compare_recipes_in_postgres (
12
78
dataset : str ,
13
- left_version : str ,
14
- right_version : str ,
79
+ left_version : str = "library" ,
80
+ right_version : str = "ingest" ,
15
81
* ,
16
- build_name : str ,
17
82
key_columns : list [str ] | None = None ,
18
83
ignore_columns : list [str ] | None = None ,
19
- local_library_dir : Path = recipes .LIBRARY_DEFAULT_PATH ,
20
- left_type : recipes .DatasetType = recipes .DatasetType .pg_dump ,
21
- right_type : recipes .DatasetType = recipes .DatasetType .pg_dump ,
22
- ) -> comparison .Report :
84
+ ) -> comparison .SqlReport :
23
85
ignore_columns = ignore_columns or []
86
+ ignore_columns .append ("ogc_fid" )
24
87
ignore_columns .append ("data_library_version" )
25
- left_table = dataset + "_left"
26
- right_table = dataset + "_right"
27
88
28
- client = postgres .PostgresClient (schema = build_name , database = "sandbox" )
29
- client .drop_table (dataset )
30
- client .drop_table (left_table )
31
- client .drop_table (right_table )
32
-
33
- left_ds = recipes .Dataset (id = dataset , version = left_version , file_type = left_type )
34
- right_ds = recipes .Dataset (id = dataset , version = right_version , file_type = right_type )
89
+ client = postgres .PostgresClient (schema = SCHEMA , database = "sandbox" )
90
+ left_table = dataset + "_" + left_version
91
+ right_table = dataset + "_" + right_version
35
92
36
- recipes .import_dataset (
37
- left_ds ,
38
- client ,
39
- import_as = left_table ,
40
- local_library_dir = local_library_dir ,
41
- )
42
- recipes .import_dataset (
43
- right_ds ,
44
- client ,
45
- import_as = right_table ,
46
- local_library_dir = local_library_dir ,
47
- )
48
93
if key_columns :
49
94
return compare .get_sql_keyed_report (
50
95
left_table ,
@@ -62,50 +107,48 @@ def compare_recipes_in_postgres(
62
107
)
63
108
64
109
65
- def run_ingest_and_library (
66
- dataset : str ,
67
- ingest_parent_dir : Path = Path ("." ),
68
- library_file_type : str = "pg_dump" ,
69
- ) -> None :
70
- ingest_dir = ingest_parent_dir / dataset / "special_folder"
71
- ingest .run (dataset , staging_dir = ingest_dir , skip_archival = True )
110
+ app = typer .Typer ()
72
111
73
- # BEWARE: once you import library, parquet file writing fails
74
- # Something to do with gdal's interaction with parquet file driver
75
- from dcpy .library .archive import Archive
76
112
77
- a = Archive ()
78
- a (name = dataset , output_format = library_file_type , version = "library" )
113
+ @app .command ("load_single" )
114
+ def load_single (
115
+ tool : str = typer .Argument (),
116
+ dataset : str = typer .Argument (),
117
+ version : str | None = typer .Option (None , "--version" , "-v" ),
118
+ ):
119
+ if tool == "library" :
120
+ call_library (dataset , version )
121
+ elif tool == "ingest" :
122
+ call_ingest (dataset , version )
123
+ else :
124
+ raise NotImplementedError ("'tool' must be either 'library' or 'ingest'" )
79
125
80
- ingest_output_path = ingest_dir / f"{ dataset } .parquet"
81
- ingest_path = (
82
- Path (".library" ) / "datasets" / dataset / "ingest" / f"{ dataset } .parquet"
83
- )
84
- ingest_path .parent .mkdir (exist_ok = True , parents = True )
85
- shutil .copy (ingest_output_path , ingest_path )
126
+ load_recipe (dataset , tool ) # type: ignore
86
127
87
128
88
- def compare_ingest_and_library (
89
- dataset : str ,
90
- key_columns : list [str ] | None ,
91
- build_name : str ,
92
- * ,
93
- ignore_columns : list [str ] | None = None ,
94
- library_file_type : str = "pgdump" ,
95
- ingest_parent_dir : Path = Path ("." ),
96
- ) -> comparison .Report :
97
- run_ingest_and_library (
98
- dataset ,
99
- ingest_parent_dir = ingest_parent_dir ,
100
- library_file_type = library_file_type ,
129
+ @app .command ("load" )
130
+ def _load_both (
131
+ dataset : str = typer .Argument (),
132
+ version : str | None = typer .Option (None , "--version" , "-v" ),
133
+ ):
134
+ call_ingest (dataset , version )
135
+ call_library (dataset , version )
136
+
137
+ load_recipe (dataset , "library" )
138
+ load_recipe (dataset , "ingest" )
139
+
140
+
141
+ @app .command ("compare" )
142
+ def _compare (
143
+ dataset : str = typer .Argument (),
144
+ key_columns : list [str ] = typer .Option (None , "-k" , "--key" ),
145
+ ignore_columns : list [str ] = typer .Option (None , "-i" , "--ignore" ),
146
+ ):
147
+ report = compare_recipes_in_postgres (
148
+ dataset , key_columns = key_columns , ignore_columns = ignore_columns
101
149
)
102
- return compare_recipes_in_postgres (
103
- dataset ,
104
- "library" ,
105
- "ingest" ,
106
- key_columns = key_columns ,
107
- build_name = build_name ,
108
- left_type = recipes .DatasetType .pg_dump ,
109
- right_type = recipes .DatasetType .parquet ,
110
- ignore_columns = ignore_columns ,
150
+ print (
151
+ indented_report (
152
+ report .model_dump (), pretty_print_fields = True , include_line_breaks = True
153
+ )
111
154
)
0 commit comments