Skip to content

Commit d4712f2

Browse files
committed
Ingester: call store_latest on proper storage object.
1 parent ca47721 commit d4712f2

File tree

4 files changed

+35
-11
lines changed

4 files changed

+35
-11
lines changed

client/datalake/tests/conftest.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ def random_interval():
6767

6868

6969
def random_work_id():
70-
if random.randint(0, 1):
71-
return None
7270
return '{}-{}'.format(random_word(5), random.randint(0, 2**15))
7371

7472

ingester/datalake_ingester/ingester.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
UnsupportedTimeRange, NoSuchDatalakeFile, UnsupportedS3Event
44
from .s3_notification import S3Notification
55
import time
6+
import os
67
import logging
78
from .storage import DynamoDBStorage
89
from .queue import SQSQueue
@@ -68,23 +69,28 @@ def _make_record(self, r):
6869

6970
class Ingester(object):
7071

71-
def __init__(self, storage, queue=None, reporter=None):
72+
def __init__(self, storage, latest_storage=None, queue=None, reporter=None):
7273
self.storage = storage
74+
self.latest_storage = latest_storage
7375
self.queue = queue
7476
self.reporter = reporter
7577

7678
@classmethod
7779
def from_config(cls):
7880
storage = DynamoDBStorage.from_config()
81+
if os.environ.get("DATALAKE_USE_LATEST_TABLE", False):
82+
latest_storage = DynamoDBStorage.from_config(use_latest=True)
7983
queue = SQSQueue.from_config()
8084
reporter = SNSReporter.from_config()
81-
return cls(storage, queue=queue, reporter=reporter)
85+
return cls(storage, latest_storage=latest_storage, queue=queue, reporter=reporter)
8286

8387
def ingest(self, url):
8488
'''ingest the metadata associated with the given url'''
8589
records = DatalakeRecord.list_from_url(url)
8690
for r in records:
8791
self.storage.store(r)
92+
if self.latest_storage:
93+
self.latest_storage.store_latest(r)
8894

8995
def handler(self, msg):
9096
ir = IngesterReport().start()

ingester/datalake_ingester/storage.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import os
2121
from datalake.common.errors import InsufficientConfiguration
2222
import logging
23+
import decimal
2324

2425

2526
class DynamoDBStorage(object):
@@ -61,12 +62,11 @@ def _latest_table(self):
6162
def store(self, record):
6263
if self.use_latest:
6364
self.store_latest(record)
64-
else:
65-
try:
66-
self._table.put_item(data=record)
67-
except ConditionalCheckFailedException:
68-
# Tolerate duplicate stores
69-
pass
65+
try:
66+
self._table.put_item(data=record)
67+
except ConditionalCheckFailedException:
68+
# Tolerate duplicate stores
69+
pass
7070

7171
def update(self, record):
7272
self._table.put_item(data=record, overwrite=True)
@@ -76,7 +76,6 @@ def store_latest(self, record):
7676
note: Record must utilize AttributeValue syntax
7777
for the conditional put.
7878
"""
79-
8079
condition_expression = " attribute_not_exists(what_where_key) OR metadata.start < :new_start"
8180
expression_attribute_values = {
8281
':new_start': {'N': str(record['metadata']['start'])}

ingester/tests/test_ingester.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ def storage(dynamodb_records_table, dynamodb_connection):
1313
return DynamoDBStorage(table_name='records',
1414
connection=dynamodb_connection)
1515

16+
@pytest.fixture
17+
def latest_storage(dynamodb_latest_table, dynamodb_connection):
18+
return DynamoDBStorage(connection=dynamodb_connection)
19+
1620

1721
@pytest.fixture
1822
def random_s3_file_maker(s3_file_from_metadata, random_metadata):
@@ -32,6 +36,23 @@ def test_ingest_random(storage, dynamodb_records_table, random_s3_file_maker):
3236
for r in records:
3337
assert r['metadata'] == metadata
3438

39+
def test_ingest_random_latest(storage, latest_storage, dynamodb_latest_table, random_s3_file_maker):
40+
latest_storage.latest_table_name = 'latest'
41+
latest_storage.use_latest = True
42+
url, metadata = random_s3_file_maker()
43+
ingester = Ingester(storage, latest_storage=latest_storage)
44+
ingester.ingest(url)
45+
records = [dict(r) for r in dynamodb_latest_table.scan()]
46+
def convert_metadata(metadata):
47+
import decimal
48+
return {k: (decimal.Decimal(str(v)) if isinstance(v, (int, float)) else v) for k, v in metadata.items()}
49+
50+
converted_metadata = convert_metadata(metadata)
51+
52+
assert len(records) >= 1
53+
for r in records:
54+
assert r['metadata'] == converted_metadata
55+
3556

3657
def test_ingest_no_end(storage, dynamodb_records_table, s3_file_from_metadata,
3758
random_metadata):

0 commit comments

Comments
 (0)