Skip to content

Commit 01af862

Browse files
committed
Ingester: call store_latest on proper storage object.
1 parent d4712f2 commit 01af862

File tree

4 files changed

+12
-21
lines changed

4 files changed

+12
-21
lines changed

ingester/datalake_ingester/ingester.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,28 +69,23 @@ def _make_record(self, r):
6969

7070
class Ingester(object):
7171

72-
def __init__(self, storage, latest_storage=None, queue=None, reporter=None):
72+
def __init__(self, storage, queue=None, reporter=None):
7373
self.storage = storage
74-
self.latest_storage = latest_storage
7574
self.queue = queue
7675
self.reporter = reporter
7776

7877
@classmethod
7978
def from_config(cls):
8079
storage = DynamoDBStorage.from_config()
81-
if os.environ.get("DATALAKE_USE_LATEST_TABLE", False):
82-
latest_storage = DynamoDBStorage.from_config(use_latest=True)
8380
queue = SQSQueue.from_config()
8481
reporter = SNSReporter.from_config()
85-
return cls(storage, latest_storage=latest_storage, queue=queue, reporter=reporter)
82+
return cls(storage, queue=queue, reporter=reporter)
8683

8784
def ingest(self, url):
8885
'''ingest the metadata associated with the given url'''
8986
records = DatalakeRecord.list_from_url(url)
9087
for r in records:
9188
self.storage.store(r)
92-
if self.latest_storage:
93-
self.latest_storage.store_latest(r)
9489

9590
def handler(self, msg):
9691
ir = IngesterReport().start()

ingester/datalake_ingester/storage.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,10 @@
2626
class DynamoDBStorage(object):
2727
'''store datalake records in a dynamoDB table'''
2828

29-
def __init__(self, table_name=None, latest_table=None, connection=None):
29+
def __init__(self, table_name=None, latest_table_name=None, connection=None):
3030
self.table_name = table_name
3131
self.latest_table_name = os.environ.get("DATALAKE_LATEST_TABLE",
32-
f"{latest_table}")
33-
self.use_latest = os.environ.get("DATALAKE_USE_LATEST_TABLE", False)
32+
False)
3433
self._prepare_connection(connection)
3534
self.logger = logging.getLogger('storage')
3635

@@ -60,7 +59,7 @@ def _latest_table(self):
6059
return Table(self.latest_table_name, connection=self._connection)
6160

6261
def store(self, record):
63-
if self.use_latest:
62+
if self.latest_table_name:
6463
self.store_latest(record)
6564
try:
6665
self._table.put_item(data=record)

ingester/tests/test_ingester.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,6 @@ def storage(dynamodb_records_table, dynamodb_connection):
1313
return DynamoDBStorage(table_name='records',
1414
connection=dynamodb_connection)
1515

16-
@pytest.fixture
17-
def latest_storage(dynamodb_latest_table, dynamodb_connection):
18-
return DynamoDBStorage(connection=dynamodb_connection)
19-
2016

2117
@pytest.fixture
2218
def random_s3_file_maker(s3_file_from_metadata, random_metadata):
@@ -36,11 +32,10 @@ def test_ingest_random(storage, dynamodb_records_table, random_s3_file_maker):
3632
for r in records:
3733
assert r['metadata'] == metadata
3834

39-
def test_ingest_random_latest(storage, latest_storage, dynamodb_latest_table, random_s3_file_maker):
40-
latest_storage.latest_table_name = 'latest'
41-
latest_storage.use_latest = True
35+
def test_ingest_random_latest(storage, dynamodb_latest_table, random_s3_file_maker):
36+
storage.latest_table_name = 'latest'
4237
url, metadata = random_s3_file_maker()
43-
ingester = Ingester(storage, latest_storage=latest_storage)
38+
ingester = Ingester(storage)
4439
ingester.ingest(url)
4540
records = [dict(r) for r in dynamodb_latest_table.scan()]
4641
def convert_metadata(metadata):

ingester/tests/test_storage.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ def test_store_duplicate(dynamodb_users_table, dynamodb_connection):
1818
assert dict(user) == expected_user
1919

2020
def test_insert_new_record(dynamodb_latest_table, dynamodb_connection):
21-
storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection)
21+
storage = DynamoDBStorage(connection=dynamodb_connection)
22+
storage.latest_table_name = 'latest'
2223

2324
new_record = {
2425
'what_where_key': 'syslog:ground_server2',
@@ -51,7 +52,8 @@ def test_insert_new_record(dynamodb_latest_table, dynamodb_connection):
5152

5253

5354
def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dynamodb_connection):
54-
storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection)
55+
storage = DynamoDBStorage(connection=dynamodb_connection)
56+
storage.latest_table_name = 'latest'
5557

5658
file1 = {
5759
'what_where_key': 'syslog:ground_server2',

0 commit comments

Comments
 (0)