Skip to content

Commit afed95b

Browse files
author
RuslanBergenov
committed
Merge branch 'development'
2 parents 74ea806 + 52247de commit afed95b

File tree

8 files changed

+159
-66
lines changed

8 files changed

+159
-66
lines changed

README.md

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ and [Hubspot] to Google BigQuery.
2222
- [Partitioning background](#partitioning-background)
2323
- [Clustering background](#clustering-background)
2424
- [Setting up partitioning and clustering](#setting-up-partitioning-and-clustering)
25-
- [Step 6: target-tables-config file: force data types and modes](#step-6-target-tables-config-file-force-data-types-and-modes)
25+
- [Step 6 (Optional): target-tables-config file: force data types and modes](#step-6-optional-target-tables-config-file-force-data-types-and-modes)
26+
- [Step 7 (Optional): target-tables-config file: rename a field](#step-7-optional-target-tables-config-file-rename-a-field)
2627
- [Unit tests set up](#unit-tests-set-up)
2728
- [Config files in this project](#config-files-in-this-project)
2829

@@ -363,14 +364,11 @@ You can only set up partitioning.
363364
}}
364365
```
365366

366-
3. Clear you **state.json**, so it's an empty JSON `{}`, because we want to load all data again. Skip this step, if you
367-
didn't previously load this data in **Step 4** above.
367+
3. Clear you **state.json**, so it's an empty JSON `{}`, because we want to load all data again. Skip this step, if you didn't previously load this data in **Step 4** above.
368368

369-
4. Delete your BigQuery destination table **exchangeratesapi**, because we want to re-load it again from scratch. Skip
370-
this step, if you didn't previously load this data in **Step 4** above.
369+
4. Delete your BigQuery destination table **exchangeratesapi**, because we want to re-load it again from scratch. Skip this step, if you didn't previously load this data in **Step 4** above.
371370

372-
3. Load data data into BigQuery, while configuring target tables. Pass **target-tables-config.json** as a command line
373-
argument.
371+
3. Load data data into BigQuery, while configuring target tables. Pass **target-tables-config.json** as a command line argument.
374372

375373
```bash
376374
{project_root_dir}\tap\Scripts\tap-exchangeratesapi --config sample_config/tap-config-exchange-rates-api.json | ^
@@ -379,35 +377,30 @@ You can only set up partitioning.
379377
```
380378

381379
- "^" indicates a new line in Windows Command Prompt. In Mac terminal, use "\\".
382-
- If you don't want to pass **target-tables-config.json** file as a CLI argument, you can
383-
add ```"table_config": "target-tables-config.json"``` to your **target-config.json** file. See **Step 3: Configure**
384-
above.
380+
- If you don't want to pass **target-tables-config.json** file as a CLI argument, you can add ```"table_config": "target-tables-config.json"``` to your **target-config.json** file. See **Step 3: Configure** above.
385381

386382

387-
6. Verify in BigQuery web UI that partitioning and clustering worked (in our example below, we only set up
388-
partitioning):
383+
6. Verify in BigQuery web UI that partitioning and clustering worked (in our example below, we only set up partitioning):
384+
389385

390386
<img src="readme_screenshots/14_Partitioned_Table.png" width="650" alt="Download the service account credential JSON file">
391387

392-
### Step 6: target-tables-config file: force data types and modes
388+
389+
### Step 6 (Optional): target-tables-config file: force data types and modes
393390

394391
#### Problem:
395392

396393
- Normally, tap catalog file governs schema of data which will be loaded into target-bigquery.
397394
- However, sometimes you can get a column of an undesired data type, which is not following your tap-catalog file.
398395

399396
#### Solution:
400-
401-
- You can force that column to the desired data type by using `force_fields` flag inside your *
402-
target-tables-config.json* file.
397+
- You can force that column to the desired data type by using the `force_fields` flag inside your *target-tables-config.json* file.
403398

404399
#### Example:
405-
406-
- We used this solution to fix `"date_start"` field from `"ads_insights_age_and_gender"` stream from tap-facebook.
407-
- In tap catalog file, we said we wanted this column to be a **date**.
408-
- However, the tap generates schema where this column is a **string**, despite our tap catalog file.
409-
- Therefore, we used `force_fields` flag in target-tables-config.json to override what the tap generates and force the
410-
column to be a date.
400+
- We used this solution to fix `"date_start"` field from `"ads_insights_age_and_gender"` stream from tap-facebook.
401+
- In the tap catalog file, we said we wanted this column to be a **date**.
402+
- However, the tap generates schema where this column is a **string**, despite our tap catalog file.
403+
- Therefore, we used `force_fields` flag in target-tables-config.json to override what the tap generates and force the column to be a date.
411404
- Example of *target-tables-config.json* file:
412405

413406
```
@@ -425,6 +418,23 @@ You can only set up partitioning.
425418
}
426419
```
427420

421+
### Step 7 (Optional): target-tables-config file: rename a field
422+
423+
#### Problem and solution:
424+
- You can rename a field, using the `force_fields` flag inside your *target-tables-config.json* file.
425+
426+
#### Example:
427+
- Example of *target-tables-config.json* file where we renamed a field from `old_name` to `new_name`:
428+
```
429+
"ads_insights_age_and_gender": {
430+
"partition_field": "date_start",
431+
"cluster_fields": ["age", "gender","account_id", "campaign_id"],
432+
"force_fields": {
433+
"old_name":{"bq_field_name": "new_name"}
434+
}
435+
}
436+
```
437+
428438
## Unit tests set up
429439

430440
Add the following files to *sandbox* directory under project root directory:

target_bigquery/processhandler.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ def handle_record_message(self, msg):
179179

180180
schema = self.schemas[stream]
181181
bq_schema = self.bq_schema_dicts[stream]
182-
nr = cleanup_record(schema, msg.record)
182+
nr = cleanup_record(schema, msg.record, force_fields=self.table_configs.get(msg.stream, {}).get("force_fields", {}))
183183

184184
try:
185185
nr = format_record_to_schema(nr, self.bq_schema_dicts[stream])
@@ -188,6 +188,8 @@ def handle_record_message(self, msg):
188188
self.logger.critical(f"Cannot format a record for stream {msg.stream} to its corresponding BigQuery schema. Details: {extra}")
189189
raise e
190190

191+
192+
191193
# schema validation may fail if data doesn't match schema in terms of data types
192194
# in this case, we validate schema again on data which has been forced to match schema
193195
# nr is based on msg.record, but all data from msg.record has been forced to match schema

target_bigquery/schema.py

Lines changed: 68 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@
1414
}
1515

1616

17-
def cleanup_record(schema, record):
17+
def cleanup_record(schema, record, force_fields={}):
1818
"""
1919
Clean up / prettify field names, make sure they match BigQuery naming conventions.
2020
21-
:param JSON record generated by the tap and piped into target-bigquery
22-
:param bq_schema: JSON schema generated by the tap and piped into target-bigquery
21+
:param schema: JSON schema generated by the tap and piped into target-bigquery
22+
:param record: JSON record generated by the tap and piped into target-bigquery
23+
:param force_fields: You can force a field to a desired data type via force_fields flag.
24+
Use case example:
25+
tap facebook field "date_start" from stream ads_insights_age_and_gender is being passed as string to BQ,
26+
which contradicts tap catalog file, where we said it's a date. force_fields fixes this issue.
27+
You can also rename a field using the force_fields parameter.
28+
Please see README for more information and examples.
2329
:return: JSON record/data, where field names are cleaned up / prettified.
2430
"""
2531
if not isinstance(record, dict) and not isinstance(record, list):
@@ -28,21 +34,21 @@ def cleanup_record(schema, record):
2834
elif isinstance(record, list):
2935
nr = []
3036
for item in record:
31-
nr.append(cleanup_record(schema, item))
37+
nr.append(cleanup_record(schema, item, force_fields))
3238
return nr
3339

3440
elif isinstance(record, dict):
3541
nr = {}
3642
for key, value in record.items():
37-
nkey = create_valid_bigquery_field_name(key)
38-
nr[nkey] = cleanup_record(schema, value)
43+
nkey = create_valid_bigquery_field_name(key, force_fields)
44+
nr[nkey] = cleanup_record(schema, value, force_fields)
3945
return nr
4046

4147
else:
4248
raise Exception(f"unhandled instance of record: {record}")
4349

4450

45-
def create_valid_bigquery_field_name(field_name):
51+
def create_valid_bigquery_field_name(field_name, force_fields={}):
4652
"""
4753
Clean up / prettify field names, make sure they match BigQuery naming conventions.
4854
@@ -56,9 +62,17 @@ def create_valid_bigquery_field_name(field_name):
5662
-underscore, and
5763
• be at most 300 characters long
5864
59-
:param key: JSON field name
65+
:param field_name: JSON field name
66+
:param force_fields: You can force a field to a desired data type via force_fields flag.
67+
Use case example:
68+
tap facebook field "date_start" from stream ads_insights_age_and_gender is being passed as string to BQ,
69+
which contradicts tap catalog file, where we said it's a date. force_fields fixes this issue.
70+
You can also rename a field using the force_fields parameter.
71+
Please see README for more information and examples.
6072
:return: cleaned up JSON field name
6173
"""
74+
if field_name in force_fields and force_fields[field_name].get("bq_field_name"):
75+
return force_fields[field_name]["bq_field_name"]
6276

6377
cleaned_up_field_name = ""
6478

@@ -150,12 +164,18 @@ def prioritize_one_data_type_from_multiple_ones_in_any_of(field_property):
150164
return min(any_of_data_types, key=any_of_data_types.get)
151165

152166

153-
def convert_field_type(field_property):
167+
def convert_field_type(field_name, field_property, force_fields={}):
154168
"""
169+
:param field_name: field/column name
155170
:param field_property: JSON field property
171+
:param force_fields: You can force a field to a desired data type via force_fields flag.
172+
Use case example:
173+
tap facebook field "date_start" from stream ads_insights_age_and_gender is being passed as string to BQ,
174+
which contradicts tap catalog file, where we said it's a date. force_fields fixes this issue.
175+
You can also rename a field using the force_fields parameter.
176+
Please see README for more information and examples.
156177
:return: BigQuery SchemaField field_type
157178
"""
158-
159179
conversion_dict = {"string": "STRING",
160180
"number": "FLOAT",
161181
"integer": "INTEGER",
@@ -170,7 +190,10 @@ def convert_field_type(field_property):
170190
"bq-bigdecimal": "BIGDECIMAL"
171191
}
172192

173-
if "anyOf" in field_property:
193+
if field_name in force_fields and force_fields[field_name].get("type"):
194+
return force_fields[field_name]["type"]
195+
196+
elif "anyOf" in field_property:
174197

175198
prioritized_data_type = prioritize_one_data_type_from_multiple_ones_in_any_of(field_property)
176199

@@ -208,13 +231,22 @@ def convert_field_type(field_property):
208231
return field_type_bigquery
209232

210233

211-
def determine_field_mode(field_property):
234+
def determine_field_mode(field_name, field_property, force_fields={}):
212235
"""
213236
:param field_name: one nested JSON field name
214237
:param field_property: one nested JSON field property
238+
:param force_fields: You can force a field to a desired data type via force_fields flag.
239+
Use case example:
240+
tap facebook field "date_start" from stream ads_insights_age_and_gender is being passed as string to BQ,
241+
which contradicts tap catalog file, where we said it's a date. force_fields fixes this issue.
242+
You can also rename a field using the force_fields parameter.
243+
Please see README for more information and examples.
215244
:return: BigQuery SchemaField mode
216245
"""
217-
if "items" in field_property:
246+
if field_name in force_fields and force_fields[field_name].get("mode"):
247+
return force_fields[field_name]["mode"]
248+
249+
elif "items" in field_property:
218250

219251
field_mode = 'REPEATED'
220252

@@ -226,6 +258,7 @@ def determine_field_mode(field_property):
226258

227259

228260
def replace_nullable_mode_with_required(schema_field_input):
261+
229262
schema_field_updated = SchemaField(name=schema_field_input.name,
230263
field_type=schema_field_input.field_type,
231264
mode='REQUIRED',
@@ -283,24 +316,30 @@ def determine_precision_and_scale_for_decimal_or_bigdecimal(field_property):
283316
return precision, scale
284317

285318

286-
def build_field(field_name, field_property):
319+
def build_field(field_name, field_property, force_fields):
287320
"""
288321
:param field_name: one nested JSON field name
289322
:param field_property: one nested JSON field property
323+
:param force_fields: You can force a field to a desired data type via force_fields flag.
324+
Use case example:
325+
tap facebook field "date_start" from stream ads_insights_age_and_gender is being passed as string to BQ,
326+
which contradicts tap catalog file, where we said it's a date. force_fields fixes this issue.
327+
You can also rename a field using the force_fields parameter.
328+
Please see README for more information and examples.
290329
:return: one BigQuery nested SchemaField
291330
"""
292331

293332
if not ("items" in field_property and "properties" in field_property["items"]) and not (
294333
"properties" in field_property):
295334

296-
field_type = convert_field_type(field_property)
335+
field_type = convert_field_type(field_name, field_property, force_fields)
297336

298337
precision, scale = determine_precision_and_scale_for_decimal_or_bigdecimal(field_property) if field_type in [
299338
"DECIMAL", "BIGDECIMAL"] else (None, None)
300339

301-
return (SchemaField(name=create_valid_bigquery_field_name(field_name),
340+
return (SchemaField(name=create_valid_bigquery_field_name(field_name,force_fields) ,
302341
field_type=field_type,
303-
mode=determine_field_mode(field_property),
342+
mode=determine_field_mode(field_name, field_property, force_fields),
304343
description=None,
305344
fields=(),
306345
policy_tags=None,
@@ -313,7 +352,7 @@ def build_field(field_name, field_property):
313352

314353
processed_subfields = []
315354

316-
field_type = convert_field_type(field_property)
355+
field_type = convert_field_type(field_name, field_property, force_fields)
317356

318357
precision, scale = determine_precision_and_scale_for_decimal_or_bigdecimal(field_property) if field_type in [
319358
"DECIMAL", "BIGDECIMAL"] else (None, None)
@@ -322,11 +361,11 @@ def build_field(field_name, field_property):
322361
for subfield_name, subfield_property in field_property.get("properties",
323362
field_property.get("items", {}).get("properties")
324363
).items():
325-
processed_subfields.append(build_field(subfield_name, subfield_property))
364+
processed_subfields.append(build_field(subfield_name, subfield_property, force_fields))
326365

327-
return (SchemaField(name=create_valid_bigquery_field_name(field_name),
366+
return (SchemaField(name=create_valid_bigquery_field_name(field_name, force_fields),
328367
field_type=field_type,
329-
mode=determine_field_mode(field_property),
368+
mode=determine_field_mode(field_name, field_property, force_fields),
330369
description=None,
331370
fields=processed_subfields,
332371
policy_tags=None,
@@ -343,8 +382,10 @@ def build_schema(schema, key_properties=None, add_metadata=True, force_fields={}
343382
:param add_metadata: do we want BigQuery metadata columns (e.g., when data was uploaded?)
344383
:param force_fields: You can force a field to a desired data type via force_fields flag.
345384
Use case example:
346-
tap facebook field "date_start" from stream ads_insights_age_and_gender is being passed as string to BQ,
347-
which contradicts tap catalog file, where we said it's a date. force_fields fixes this issue.
385+
tap facebook field "date_start" from stream ads_insights_age_and_gender is being passed as string to BQ,
386+
which contradicts tap catalog file, where we said it's a date. force_fields fixes this issue.
387+
You can also rename a field using the force_fields parameter.
388+
Please see README for more information and examples.
348389
:return: a list of BigQuery SchemaFields, which represents one BigQuery table
349390
"""
350391

@@ -356,20 +397,10 @@ def build_schema(schema, key_properties=None, add_metadata=True, force_fields={}
356397

357398
for field_name, field_property in schema.get("properties", schema.get("items", {}).get("properties", {})).items():
358399

359-
if field_name in force_fields:
360-
361-
next_field = (
362-
SchemaField(field_name, force_fields[field_name]["type"],
363-
force_fields[field_name].get("mode", "nullable"),
364-
force_fields[field_name].get("description", None), ())
365-
)
366-
367-
else:
368-
369-
next_field = build_field(field_name, field_property)
400+
next_field = build_field(field_name, field_property, force_fields)
370401

371-
if field_name in required_fields:
372-
next_field = replace_nullable_mode_with_required(next_field)
402+
if field_name in required_fields and field_name not in force_fields:
403+
next_field = replace_nullable_mode_with_required(next_field)
373404

374405
schema_bigquery.append(next_field)
375406

@@ -396,7 +427,7 @@ def format_record_to_schema(record, bq_schema):
396427
397428
RECORD is not included into conversion_dict - it is done on purpose. RECORD is handled recursively.
398429
399-
:param JSON record generated by the tap and piped into target-bigquery
430+
:param record: JSON record generated by the tap and piped into target-bigquery
400431
:param bq_schema: JSON schema generated by the tap and piped into target-bigquery
401432
:return: JSON record/data, where the data types match JSON schema
402433
"""

tests/rsc/config/facebook_stream_tables_config.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
"cluster_fields": ["age", "gender","account_id", "campaign_id"],
3434
"force_fields": {
3535
"date_start": {"type": "DATE", "mode": "NULLABLE"},
36-
"date_stop": {"type": "DATE", "mode": "NULLABLE"}
36+
"date_stop": {"type": "DATE", "mode": "NULLABLE"},
37+
"1d_click":{"bq_field_name": "lower_level_field_renamed"},
38+
"unique_actions":{"bq_field_name": "parent_field_renamed"}
3739
}
3840
},
3941
"ads_insights_platform_and_device": {

0 commit comments

Comments
 (0)