66import singer
77from google .api_core import exceptions as google_exceptions
88from google .cloud import bigquery
9- from google .cloud .bigquery import LoadJobConfig , CopyJobConfig
9+ from google .cloud .bigquery import LoadJobConfig , CopyJobConfig , QueryJobConfig
1010from google .cloud .bigquery import WriteDisposition
1111from google .cloud .bigquery .job import SourceFormat
12+ from google .cloud .exceptions import NotFound
1213from jsonschema import validate
1314
1415from target_bigquery .encoders import DecimalEncoder
@@ -32,6 +33,7 @@ def __init__(self, logger, **kwargs):
3233
3334 # LoadJobProcessHandler kwargs
3435 self .truncate = kwargs .get ("truncate" , False )
36+ self .incremental = kwargs .get ("incremental" , False )
3537 self .add_metadata_columns = kwargs .get ("add_metadata_columns" , True )
3638 self .validate_records = kwargs .get ("validate_records" , True )
3739 self .table_configs = kwargs .get ("table_configs" , {}) or {}
@@ -94,6 +96,8 @@ def handle_schema_message(self, msg):
9496 self .bq_schema_dicts [msg .stream ] = self ._build_bq_schema_dict (schema )
9597 self .bq_schemas [msg .stream ] = schema
9698
99+ self .logger .info (f"{ msg .stream } BigQuery schema { schema } " )
100+
97101 yield from ()
98102
99103 def on_stream_end (self ):
@@ -174,9 +178,15 @@ def handle_record_message(self, msg):
174178 raise Exception (f"A record for stream { msg .stream } was encountered before a corresponding schema" )
175179
176180 schema = self .schemas [stream ]
177-
181+ bq_schema = self . bq_schema_dicts [ stream ]
178182 nr = cleanup_record (schema , msg .record )
179- nr = format_record_to_schema (nr , self .bq_schema_dicts [stream ])
183+
184+ try :
185+ nr = format_record_to_schema (nr , self .bq_schema_dicts [stream ])
186+ except Exception as e :
187+ extra = {"record" : msg .record , "schema" : schema , "bq_schema" : bq_schema }
188+ self .logger .critical (f"Cannot format a record for stream { msg .stream } to its corresponding BigQuery schema. Details: { extra } " )
189+ raise e
180190
181191 # schema validation may fail if data doesn't match schema in terms of data types
182192 # in this case, we validate schema again on data which has been forced to match schema
@@ -208,6 +218,15 @@ def on_stream_end(self):
208218 self ._do_temp_table_based_load (self .rows )
209219 yield self .STATE
210220
221+ def primary_key_condition (self , stream ):
222+ self .logger .info (f"Primary keys: { ', ' .join (self .key_properties [stream ])} " )
223+ keys = [f"t.{ k } =s.{ k } " for k in self .key_properties [stream ]]
224+ if len (keys ) < 1 :
225+ raise Exception (f"No primary keys specified from the tap and Incremental option selected" )
226+ return " and " .join (keys )
227+ #TODO: test it with multiple ids (an array of ids, if there are multiple key_properties in JSON schema)
228+ #TODO: test it with dupe ids in the data
229+
211230 def _do_temp_table_based_load (self , rows ):
212231 assert isinstance (rows , dict )
213232
@@ -231,22 +250,66 @@ def _do_temp_table_based_load(self, rows):
231250 loaded_tmp_tables .append ((stream , tmp_table_name ))
232251
233252 # copy tables to production tables
253+ # destination table can have dupe ids used in MERGE statement
254+ # new data which being appended should have no dupes
255+
256+ # if new data has dupes, then MERGE will fail with a similar error:
257+ # INFO Primary keys: id
258+ # CRITICAL 400 UPDATE/MERGE must match at most one source row for each target row
259+
260+ # https://stackoverflow.com/questions/50504504/bigquery-error-update-merge-must-match-at-most-one-source-row-for-each-target-r
261+ # https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax
262+
263+ # If a row in the table to be updated joins with more than one row from the FROM clause,
264+ # then the query generates the following runtime error: UPDATE/MERGE must match at most one source row for each target row.
234265 for stream , tmp_table_name in loaded_tmp_tables :
235- truncate = self .truncate if stream not in self .partially_loaded_streams else False
236-
237- copy_config = CopyJobConfig ()
238- if truncate :
239- copy_config .write_disposition = WriteDisposition .WRITE_TRUNCATE
240- self .logger .info (f"Copy { tmp_table_name } to { self .tables [stream ]} by FULL_TABLE" )
241- else :
242- copy_config .write_disposition = WriteDisposition .WRITE_APPEND
243- self .logger .info (f"Copy { tmp_table_name } to { self .tables [stream ]} by APPEND" )
244-
245- self .client .copy_table (
246- sources = self .dataset .table (tmp_table_name ),
247- destination = self .dataset .table (self .tables [stream ]),
248- job_config = copy_config
249- ).result ()
266+ incremental_success = False
267+ if self .incremental :
268+ self .logger .info (f"Copy { tmp_table_name } to { self .tables [stream ]} by INCREMENTAL" )
269+ self .logger .warning (f"INCREMENTAL replication method (MERGE SQL statement) is not recommended. It might result in loss of production data, because historical records get updated during the sync operation. Instead, we recommend using the APPEND replication method, which will preserve historical data." )
270+ table_id = f"{ self .project_id } .{ self .dataset .dataset_id } .{ self .tables [stream ]} "
271+ try :
272+ self .client .get_table (table_id )
273+ column_names = [x .name for x in self .bq_schemas [stream ]]
274+
275+ query = """MERGE `{table}` t
276+ USING `{temp_table}` s
277+ ON {primary_key_condition}
278+ WHEN MATCHED THEN
279+ UPDATE SET {set_values}
280+ WHEN NOT MATCHED THEN
281+ INSERT ({new_cols}) VALUES ({cols})
282+ """ .format (table = table_id ,
283+ temp_table = f"{ self .project_id } .{ self .dataset .dataset_id } .{ tmp_table_name } " ,
284+ primary_key_condition = self .primary_key_condition (stream ),
285+ set_values = ', ' .join (f'{ c } =s.{ c } ' for c in column_names ),
286+ new_cols = ', ' .join (column_names ),
287+ cols = ', ' .join (f's.{ c } ' for c in column_names ))
288+
289+ job_config = QueryJobConfig ()
290+ query_job = self .client .query (query , job_config = job_config )
291+ query_job .result ()
292+ self .logger .info (f'LOADED { query_job .num_dml_affected_rows } rows' )
293+ incremental_success = True
294+
295+ except NotFound :
296+ self .logger .info (f"Table { table_id } is not found, proceeding to upload with TRUNCATE" )
297+ self .truncate = True
298+ if not incremental_success :
299+ truncate = self .truncate if stream not in self .partially_loaded_streams else False
300+ copy_config = CopyJobConfig ()
301+ if truncate :
302+ copy_config .write_disposition = WriteDisposition .WRITE_TRUNCATE
303+ self .logger .info (f"Copy { tmp_table_name } to { self .tables [stream ]} by FULL_TABLE" )
304+ else :
305+ copy_config .write_disposition = WriteDisposition .WRITE_APPEND
306+ self .logger .info (f"Copy { tmp_table_name } to { self .tables [stream ]} by APPEND" )
307+
308+ self .client .copy_table (
309+ sources = self .dataset .table (tmp_table_name ),
310+ destination = self .dataset .table (self .tables [stream ]),
311+ job_config = copy_config
312+ ).result ()
250313
251314 self .partially_loaded_streams .add (stream )
252315 self .rows [stream ].close () # erase the file
0 commit comments