Skip to content

Commit ef9d6b2

Browse files
author
RuslanBergenov
committed
Merge branch 'development-state-handling-command-line-option' into development
# Conflicts: # .github/workflows/python-package.yml
2 parents 3fd9fec + 542c87a commit ef9d6b2

File tree

6 files changed

+231
-9
lines changed

6 files changed

+231
-9
lines changed

README.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,16 @@ Add the following files to *sandbox* directory under project root directory:
456456
"project_id": "{your-project-id}",
457457
"dataset_id": "{your_dataset_id}",
458458
"validate_records": false
459-
}
459+
}
460+
```
461+
462+
- **target_config_merge_state_false_flag.json**:
463+
```
464+
{
465+
"project_id": "{your-project-id}",
466+
"dataset_id": "{your_dataset_id}",
467+
"merge_state_messages": 0
468+
}
460469
```
461470
## Config files in this project
462471

target_bigquery/__init__.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from target_bigquery.encoders import DecimalEncoder
1212
from target_bigquery.process import process
1313
from target_bigquery.utils import emit_state, ensure_dataset
14+
from target_bigquery.state import State, LiteralState
1415

1516
logger = singer.get_logger()
1617

@@ -21,12 +22,24 @@ def main():
2122
parser.add_argument("-c", "--config", help="Config file", required=True)
2223
parser.add_argument("-t", "--tables", help="Table configs file", required=False)
2324
parser.add_argument("-s", "--state", help="Initial state file", required=False)
25+
26+
# https://stackoverflow.com/questions/15008758/parsing-boolean-values-with-argparse
27+
parser.add_argument('--merge_state_messages', help="Merge many state messages to construct a state file",
28+
dest='merge_state_messages', action='store_true')
29+
parser.add_argument('--no-merge_state_messages',
30+
help="Don't merge many state messages into one message. The latest state message becomes the state file.",
31+
dest='merge_state_messages', action='store_false')
32+
parser.set_defaults(merge_state_messages=None)
33+
# default needs to be None. If it's None, it means it's not supplied and we need to check the config file
34+
# if default is True here, then setting it in config file will not work
35+
# in the config file, default will be True
36+
2437
parser.add_argument("-ph", "--processhandler",
2538
help="Defines the loading process. Partial loads by default.",
2639
required=False,
2740
choices=["load-job", "partial-load-job", "bookmarks-partial-load-job"],
2841
default="partial-load-job"
29-
)
42+
)
3043

3144
flags = parser.parse_args()
3245

@@ -58,10 +71,22 @@ def main():
5871
location = config.get("location", "US")
5972
validate_records = config.get("validate_records", True)
6073
add_metadata_columns = config.get("add_metadata_columns", True)
74+
75+
# we can pass merge state option via CLI param
76+
merge_state_messages_cli = flags.merge_state_messages
77+
78+
# we can pass merge state option via config file per Meltano request
79+
merge_state_messages_config = config.get("merge_state_messages", True)
80+
81+
# merge state option via CLI trumps one passed via config file
82+
# we need to check if CLI option was passed at all. if not, we check the config file
83+
merge_state_messages = merge_state_messages_cli if type(
84+
merge_state_messages_cli) == bool else merge_state_messages_config
85+
6186
project_id, dataset_id = config["project_id"], config["dataset_id"]
6287

6388
table_configs = tables.get("streams", {})
64-
max_cache = 1024 * 1024 * config.get("max_cache", 50) # this is needed for partial loads
89+
max_cache = 1024 * 1024 * config.get("max_cache", 50) # this is needed for partial loads
6590

6691
tap_stream = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8")
6792

@@ -87,6 +112,7 @@ def main():
87112
ph,
88113
tap_stream,
89114
initial_state=state,
115+
state_handler=State if merge_state_messages else LiteralState,
90116
project_id=project_id,
91117
dataset=dataset,
92118
location=location,

target_bigquery/processhandler.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
from target_bigquery.encoders import DecimalEncoder
1515
from target_bigquery.schema import build_schema, cleanup_record, format_record_to_schema
16-
from target_bigquery.state import State
16+
1717
from target_bigquery.simplify_json_schema import simplify
1818
from target_bigquery.validate_json_schema import validate_json_schema_completeness, \
1919
check_schema_for_dupes_in_field_names
@@ -131,7 +131,9 @@ def __init__(self, logger, **kwargs):
131131
# self.table_configs = kwargs.get("table_configs", {}) or {}
132132
#
133133
# self.INIT_STATE = kwargs.get("initial_state") or {}
134-
self.STATE = State(**self.INIT_STATE)
134+
# self.STATE = State(**self.INIT_STATE)
135+
self.STATE_HANDLER = kwargs.get("state_handler")
136+
self.STATE = self.STATE_HANDLER(**self.INIT_STATE)
135137

136138
self.bq_schema_dicts = {}
137139
self.rows = {}
@@ -298,7 +300,7 @@ def _load_to_bq(self,
298300
field=partition_field
299301
)
300302

301-
# clusteing
303+
# clustering
302304
if cluster_fields:
303305
load_config.clustering_fields = cluster_fields
304306

@@ -372,7 +374,8 @@ class BookmarksStatePartialLoadJobProcessHandler(PartialLoadJobProcessHandler):
372374
def __init__(self, logger, **kwargs):
373375
super(BookmarksStatePartialLoadJobProcessHandler, self).__init__(logger, **kwargs)
374376

375-
self.EMITTED_STATE = State(**self.INIT_STATE)
377+
self.STATE_HANDLER = kwargs.get("state_handler")
378+
self.EMITTED_STATE = self.STATE_HANDLER(**self.INIT_STATE)
376379

377380
def handle_state_message(self, msg):
378381
assert isinstance(msg, singer.StateMessage)

target_bigquery/state.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,12 @@ def merge(self, state):
3131

3232
elif isinstance(m, Remove):
3333
pass
34+
35+
36+
class LiteralState(State):
37+
38+
def merge(self, state):
39+
if state:
40+
self.clear()
41+
self.update(state)
42+

tests/test_state.py

Lines changed: 163 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
"""
1616

1717
from tests import unittestcore
18-
from target_bigquery.state import State
18+
from target_bigquery.state import State, LiteralState
1919
import os
2020

2121

@@ -45,6 +45,9 @@ def test_flat_schema(self):
4545
)
4646
print(state)
4747

48+
assert state == {'bookmarks': {'stream_one': {'timestamp': '2020-01-11T00:00:00.000000Z'},
49+
'stream_two': {'timestamp': '2020-01-11T00:00:00.000000Z'}}}
50+
4851
def test_state_but_no_data(self):
4952
from target_bigquery import main
5053

@@ -82,3 +85,162 @@ def test_state_but_no_data_bookmarks_load(self):
8285

8386
self.assertEqual(ret, 0, msg="Exit code is not 0!")
8487
self.assertDictEqual(state[-1], {"bookmarks": {"simple_stream": {"timestamp": "2020-01-11T00:00:00.000000Z"}}})
88+
89+
def test_state_facebook_stream(self):
90+
from target_bigquery import main
91+
92+
self.set_cli_args(
93+
stdin=os.path.join(os.path.join(
94+
os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), 'rsc'),
95+
'data'), 'facebook_stream.json'),
96+
config=os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'),
97+
'target-config.json'),
98+
processhandler="load-job"
99+
)
100+
101+
ret = main()
102+
state = self.get_state()[-1]
103+
104+
self.assertEqual(ret, 0, msg="Exit code is not 0!")
105+
106+
self.assertEqual(state, {"bookmarks": {"ads": {"updated_time": "2020-07-24T13:03:56-05:00"},
107+
"adsets": {"updated_time": "2020-07-23T16:16:54-05:00"},
108+
"campaigns": {"updated_time": "2020-07-23T16:16:52-05:00"},
109+
"ads_insights": {"date_start": "2020-07-24T00:00:00+00:00"}}}
110+
)
111+
112+
def test_state_facebook_stream_merge_state_cli_trumps_config(self):
113+
from target_bigquery import main
114+
115+
self.set_cli_args(
116+
stdin=os.path.join(os.path.join(
117+
os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), 'rsc'),
118+
'data'), 'facebook_stream.json'),
119+
config=os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'),
120+
'target_config_merge_state_false_flag.json'),
121+
processhandler="load-job",
122+
flag='--merge_state_messages'
123+
124+
)
125+
126+
ret = main()
127+
128+
state = self.get_state()[-1]
129+
130+
self.assertEqual(ret, 0, msg="Exit code is not 0!")
131+
132+
self.assertEqual(state, {"bookmarks": {"ads": {"updated_time": "2020-07-24T13:03:56-05:00"},
133+
"adsets": {"updated_time": "2020-07-23T16:16:54-05:00"},
134+
"campaigns": {"updated_time": "2020-07-23T16:16:52-05:00"},
135+
"ads_insights": {"date_start": "2020-07-24T00:00:00+00:00"}}}
136+
)
137+
138+
139+
class TestSimpleStreamLiteralStateNoMerging(TestSimpleStream):
140+
141+
def test_init(self):
142+
s = LiteralState(**{"a": 1})
143+
print(s)
144+
145+
def test_flat_schema(self):
146+
state = LiteralState()
147+
state.merge(
148+
{"bookmarks": {"stream_one": {"timestamp": "2020-01-10T00:00:00.000000Z"}}}
149+
)
150+
print(state)
151+
152+
state.merge(
153+
{"bookmarks": {"stream_one": {"timestamp": "2020-01-11T00:00:00.000000Z"}}}
154+
)
155+
print(state)
156+
157+
state.merge(
158+
{"bookmarks": {"stream_two": {"timestamp": "2020-01-11T00:00:00.000000Z"}}}
159+
)
160+
print(state)
161+
162+
assert state == {"bookmarks": {"stream_two": {"timestamp": "2020-01-11T00:00:00.000000Z"}}}
163+
164+
def test_state_but_no_data(self):
165+
from target_bigquery import main
166+
167+
self.set_cli_args(
168+
stdin=os.path.join(os.path.join(
169+
os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), 'rsc'),
170+
'partial_load_streams'), 'no_data_stream.json'),
171+
config=os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'),
172+
'target_config_cache.json'),
173+
processhandler="partial-load-job",
174+
flag='--no-merge_state_messages'
175+
)
176+
177+
ret = main()
178+
state = self.get_state()
179+
self.assertEqual(1, len(state)) # only 1 state is expected: end state emit
180+
181+
self.assertEqual(ret, 0, msg="Exit code is not 0!")
182+
self.assertDictEqual(state[-1], {"bookmarks": {"simple_stream": {"timestamp": "2020-01-11T00:00:00.000000Z"}}})
183+
184+
def test_state_but_no_data_bookmarks_load(self):
185+
from target_bigquery import main
186+
187+
self.set_cli_args(
188+
stdin=os.path.join(os.path.join(
189+
os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), 'rsc'),
190+
'partial_load_streams'), 'no_data_stream.json'),
191+
config=os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'),
192+
'target_config_cache.json'),
193+
processhandler="bookmarks-partial-load-job",
194+
flag='--no-merge_state_messages'
195+
)
196+
197+
ret = main()
198+
state = self.get_state()
199+
self.assertEqual(1, len(state)) # only 1 state is expected: end state emit
200+
201+
self.assertEqual(ret, 0, msg="Exit code is not 0!")
202+
self.assertDictEqual(state[-1], {"bookmarks": {"simple_stream": {"timestamp": "2020-01-11T00:00:00.000000Z"}}})
203+
204+
def test_state_facebook_stream(self):
205+
from target_bigquery import main
206+
207+
self.set_cli_args(
208+
stdin=os.path.join(os.path.join(
209+
os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), 'rsc'),
210+
'data'), 'facebook_stream.json'),
211+
config=os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'),
212+
'target-config.json'),
213+
processhandler="load-job",
214+
flag='--no-merge_state_messages'
215+
)
216+
217+
ret = main()
218+
219+
state = self.get_state()[-1]
220+
221+
self.assertEqual(ret, 0, msg="Exit code is not 0!")
222+
223+
self.assertEqual(state, {"bookmarks": {"ads_insights": {"date_start": "2020-07-24T00:00:00+00:00"}}}
224+
)
225+
226+
def test_state_facebook_stream_merge_state_config(self):
227+
from target_bigquery import main
228+
229+
self.set_cli_args(
230+
stdin=os.path.join(os.path.join(
231+
os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), 'rsc'),
232+
'data'), 'facebook_stream.json'),
233+
config=os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'),
234+
'target_config_merge_state_false_flag.json'),
235+
processhandler="load-job"
236+
237+
)
238+
239+
ret = main()
240+
241+
state = self.get_state()[-1]
242+
243+
self.assertEqual(ret, 0, msg="Exit code is not 0!")
244+
245+
self.assertEqual(state, {"bookmarks": {"ads_insights": {"date_start": "2020-07-24T00:00:00+00:00"}}}
246+
)

tests/unittestcore.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import os
33
import sys
44
import unittest
5+
import re
56

67
from google.cloud import bigquery
78

@@ -34,7 +35,12 @@ def setUp(self):
3435
os.environ["MALFORMED_TARGET_CONFIG"] = os.path.join(
3536
os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'),
3637
'malformed_target_config.json')
37-
#TODO: make naming convention of target config files consistent "_" vs "-". Use "_" as it's easier to copy with a click
38+
39+
os.environ["TARGET_CONFIG_MERGE_STATE_FALSE_FLAG"] = os.path.join(
40+
os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'),
41+
'target_config_merge_state_false_flag.json')
42+
43+
# TODO: make naming convention of target config files consistent "_" vs "-". Use "_" as it's easier to copy with a click
3844
# I think we would just need to rename target-config.json to target_config.json (also update it in README)
3945
self.client = None
4046
self.project_id = None
@@ -46,11 +52,18 @@ def tearDown(self):
4652

4753
def set_cli_args(self, ds_delete=True, *args, **kwargs):
4854
arg = [arg for arg in args]
55+
4956
for k, v in kwargs.items():
5057
if k == "stdin":
5158
sys.stdin = open(v, "r")
5259
continue
5360

61+
# if some flag is being passed, such as --merge-state or --no-merge-state:
62+
# we want to add this flag to CLI arguments
63+
if k == "flag":
64+
arg.append(v)
65+
continue
66+
5467
arg.append("--{}".format(k))
5568
arg.append("{}".format(v))
5669

0 commit comments

Comments
 (0)