Skip to content

Commit cae7e24

Browse files
committed
API: Feature flag enables query from latest_table.
1 parent 9d854f6 commit cae7e24

File tree

5 files changed

+57
-32
lines changed

5 files changed

+57
-32
lines changed

api/datalake_api/querier.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,10 @@ class ArchiveQuerier(object):
180180
def __init__(self, table_name,
181181
latest_table_name=None,
182182
use_latest_table=None,
183-
latest_max_lookback=30,
184183
dynamodb=None):
185184
self.table_name = table_name
186185
self.latest_table_name = latest_table_name
187186
self.use_latest_table = use_latest_table
188-
self.latest_max_lookback = latest_max_lookback
189187
self.dynamodb = dynamodb
190188

191189

@@ -349,14 +347,15 @@ def _latest_table(self):
349347
return self.dynamodb.Table(self.latest_table_name)
350348

351349
def query_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS):
352-
log.info('Inside query_latest method')
353350
if self.use_latest_table:
354351
log.info('inside use_latest_table=TRUE')
355352
response = self._latest_table.query(
356353
KeyConditionExpression=Key('what_where_key').eq(f'{what}:{where}')
357354
)
358355
items = response.get('Items', [])
359-
if not items and self.latest_max_lookback > 0:
356+
357+
if not items:
358+
log.info('Falling back to default latest query')
360359
return self._default_latest(what, where, lookback_days)
361360

362361
latest_item = items[0]
@@ -389,6 +388,7 @@ def _get_all_records_in_bucket(self, bucket, **kwargs):
389388
return records
390389

391390
def _default_latest(self, what, where, lookback_days=DEFAULT_LOOKBACK_DAYS):
391+
log.info("Using default latest behavior")
392392
current = int(time.time() * 1000)
393393
end = current - lookback_days * _ONE_DAY_MS
394394
while current >= end:

api/datalake_api/settings.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
DYNAMODB_TABLE = 'test'
1919
DYNAMODB_LATEST_TABLE = 'test_latest'
20-
DATALAKE_USE_LATEST_TABLE = \
21-
os.environ.get("DATALAKE_USE_LATEST_TABLE", "false").lower() == "true"
22-
LATEST_MAX_LOOKBACK = int(os.environ.get("LATEST_MAX_LOOKBACK", "30"))
20+
DATALAKE_USE_LATEST_TABLE = False
2321

2422
AWS_REGION = 'us-west-2'
2523
AWS_ACCESS_KEY_ID = None

api/datalake_api/v0.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
v0 = flask.Blueprint('v0', __name__, url_prefix='/v0')
2929

30+
_archive_querier = None
3031

3132
def _get_aws_kwargs():
3233
kwargs = dict(
@@ -48,17 +49,23 @@ def get_dynamodb():
4849

4950

5051
def get_archive_querier():
51-
if not hasattr(app, 'archive_querier'):
52+
global _archive_querier
53+
54+
if not _archive_querier:
5255
table_name = app.config.get('DYNAMODB_TABLE')
5356
latest_table_name = app.config.get('DYNAMODB_LATEST_TABLE')
5457
use_latest_table = app.config.get('DATALAKE_USE_LATEST_TABLE')
55-
latest_max_lookback = app.config.get("LATEST_MAX_LOOKBACK")
56-
app.archive_querier = ArchiveQuerier(table_name,
58+
_archive_querier = ArchiveQuerier(table_name,
5759
latest_table_name,
5860
use_latest_table,
59-
latest_max_lookback,
6061
dynamodb=get_dynamodb())
61-
return app.archive_querier
62+
return _archive_querier
63+
64+
65+
def reset_archive_querier():
66+
"""FOR TESTING PURPOSES ONLY"""
67+
global _archive_querier
68+
_archive_querier = None
6269

6370

6471
@v0.route('/archive/')

api/tests/conftest.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@
4242

4343

4444
def get_client():
45+
from datalake_api import settings
46+
datalake_api.app.config.from_object(settings)
47+
4548
datalake_api.app.config['TESTING'] = True
4649
datalake_api.app.config['AWS_ACCESS_KEY_ID'] = 'abc'
4750
datalake_api.app.config['AWS_SECRET_ACCESS_KEY'] = '123'
@@ -181,8 +184,7 @@ def _populate_table(table, records):
181184
for r in records:
182185
batch.put_item(Item=r)
183186

184-
# Adding latest table logic so latest table will be created and records will populate it
185-
# Once that's possible, we will simply query the latest_table for what:where, no bucket logic
187+
186188
@pytest.fixture
187189
def table_maker(request, dynamodb):
188190

api/tests/test_archive_querier.py

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@
1111
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
1212
# License for the specific language governing permissions and limitations under
1313
# the License.
14-
14+
import os
1515
import pytest
16+
from datalake_api.v0 import reset_archive_querier
17+
from datalake_api import settings
18+
# from flask import current_app as app
1619
from datalake.common import DatalakeRecord
1720
from datalake.tests import generate_random_metadata
1821
import simplejson as json
@@ -124,27 +127,35 @@ def query_latest(self, what, where):
124127

125128

126129

127-
"""
128-
Incorporate LATEST_MAX_LOOKBACK HERE
129-
"""
130130
@pytest.fixture(params=[
131131
('archive', 'use_latest'),
132132
('archive', 'use_default'),
133133
('http', 'use_latest'),
134134
('http', 'use_default')
135-
], ids=['archive_latest', 'archive-default', 'http-latest', 'http-default'])
135+
], ids=['archive-latest',
136+
'archive-default',
137+
'http-latest',
138+
'http-default'
139+
])
136140
def querier(monkeypatch, request, dynamodb):
141+
142+
reset_archive_querier()
137143
querier_type, table_usage = request.param
138144

139145
if table_usage == 'use_latest':
140-
monkeypatch.setenv('DATALAKE_USE_LATEST_TABLE', 'true')
146+
settings.DATALAKE_USE_LATEST_TABLE = True
141147
else:
142-
monkeypatch.setenv('DATALAKE_USE_LATEST_TABLE', 'false')
148+
settings.DATALAKE_USE_LATEST_TABLE= False
143149

144150
if querier_type == 'http':
145-
return HttpQuerier('test', 'test_latest', dynamodb=dynamodb)
151+
return HttpQuerier('test',
152+
'test_latest',
153+
dynamodb=dynamodb)
146154
else:
147-
return ArchiveQuerier('test', 'test_latest', dynamodb=dynamodb)
155+
return ArchiveQuerier('test',
156+
'test_latest',
157+
use_latest_table=True if table_usage == 'use_latest' else False,
158+
dynamodb=dynamodb)
148159

149160
def in_url(result, part):
150161
url = result['url']
@@ -567,17 +578,24 @@ def test_latest_table_query(table_maker, querier, record_maker):
567578
what='boo',
568579
where='hoo{}'.format(i))
569580
table_maker(records)
570-
querier.use_latest_table = True
571581
result = querier.query_latest('boo', 'hoo0')
572582
_validate_latest_result(result, what='boo', where='hoo0')
573583

574-
"""
575-
Write tests:
576-
With setup of latest table records,
577-
with DYNAMODB_LATEST_TABLE set, with DATALAKE_USE_LATEST_TABLE=true, with LATEST_MAX_LOOKBACK=0, record is found
578584

579-
With setup of latest table records,
580-
with DYNAMODB_LATEST_TABLE set, with DATALAKE_USE_LATEST_TABLE=false, with LATEST_MAX_LOOKBACK=0, record is not found
585+
def test_query_latest_just_latest_table(table_maker, querier, record_maker):
586+
use_latest_from_env = settings.DATALAKE_USE_LATEST_TABLE
587+
table = table_maker([])[1]
588+
for i in range(3):
589+
record = record_maker(what='meow',
590+
where=f'tree',
591+
path='/{}'.format(i))
592+
593+
# only inserting into latest table
594+
table.put_item(Item=record[0])
595+
time.sleep(1.01)
581596

582-
2-4
583-
"""
597+
result = querier.query_latest('meow', 'tree')
598+
if use_latest_from_env:
599+
_validate_latest_result(result, what='meow', where='tree')
600+
else:
601+
assert result is None

0 commit comments

Comments
 (0)