Skip to content

Commit e9ee7c3

Browse files
committed
Ingester: Updating storage store_latest, tests and conftest.
1 parent db1ce4e commit e9ee7c3

File tree

3 files changed

+34
-47
lines changed

3 files changed

+34
-47
lines changed

ingester/datalake_ingester/storage.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def __init__(self, table_name=None, latest_table=None, connection=None):
2929
self.table_name = table_name
3030
self.latest_table_name = os.environ.get("DATALAKE_DNAMODB_LATEST_TABLE",
3131
f"{latest_table}")
32-
self.use_latest = os.environ.get("DATALAKE_LATEST_FLAG", False)
32+
self.use_latest = os.environ.get("DATALAKE_USE_LATEST_TABLE", False)
3333
self._prepare_connection(connection)
3434
self.logger = logging.getLogger('storage')
3535

@@ -73,7 +73,14 @@ def store_latest(self, record):
7373
note: Record must utilize AttributeValue syntax
7474
for the conditional put.
7575
"""
76+
77+
condition_expression = " attribute_not_exists(what_where_key) OR metadata.start < :new_start"
78+
expression_attribute_values = {
79+
':what_where_key': {'S': record['what_where_key']},
80+
':new_start': {'N': str(record['metadata']['start'])}
81+
}
7682
record = {
83+
'what_where_key': {"S": record['metadata']['what']+':'+record['metadata']['where']},
7784
'time_index_key': {"S": record['time_index_key']},
7885
'range_key': {"S": record['range_key']},
7986
'metadata': {
@@ -110,12 +117,13 @@ def store_latest(self, record):
110117
'url': {"S": record['url']},
111118
'create_time': {'N': str(record['create_time'])}
112119
}
120+
113121
try:
114122
self._connection.put_item(
115123
table_name=self.latest_table_name,
116124
item=record,
117-
condition_expression=\
118-
f"attribute_not_exists(metadata.M.start.N) OR metadata.M.start.N < {record['metadata']['M']['start']['N']}",
125+
condition_expression=condition_expression,
126+
expression_attribute_values=expression_attribute_values
119127
)
120128
self.logger.info("Record stored successfully.")
121129
except ConditionalCheckFailedException:

ingester/tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def dynamodb_records_table(dynamodb_table_maker):
7070

7171
@pytest.fixture
7272
def dynamodb_latest_table(dynamodb_table_maker):
73-
schema = [HashKey('time_index_key'), RangeKey('range_key')]
73+
schema = [HashKey('what_where_key')]
7474
return dynamodb_table_maker('latest', schema)
7575

7676

ingester/tests/test_storage.py

Lines changed: 22 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def test_insert_new_record(dynamodb_latest_table, dynamodb_connection):
2121
storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection)
2222

2323
new_record = {
24+
'what_where_key': 'syslog:ground_server2',
2425
'time_index_key': '15225:newlog',
2526
'range_key': 'new_server:12345abcde',
2627
'metadata': {
@@ -44,8 +45,7 @@ def test_insert_new_record(dynamodb_latest_table, dynamodb_connection):
4445
print(f"Failed to store record: {str(e)}")
4546

4647
stored_record = dynamodb_latest_table.get_item(
47-
time_index_key='15225:newlog',
48-
range_key='new_server:12345abcde'
48+
what_where_key=new_record['what_where_key']
4949
)
5050
assert stored_record['metadata']['start'] == new_record['metadata']['start']
5151

@@ -54,6 +54,7 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna
5454
storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection)
5555

5656
file1 = {
57+
'what_where_key': 'syslog:ground_server2',
5758
'time_index_key': '15219:zlcdzvawsp',
5859
'range_key': 'lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9',
5960
'metadata': {
@@ -72,6 +73,7 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna
7273
}
7374

7475
file2 = {
76+
'what_where_key': 'syslog:ground_server2',
7577
'time_index_key': '15219:zlcdzvawsp',
7678
'range_key': 'lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9',
7779
'metadata': {
@@ -91,6 +93,7 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna
9193
}
9294

9395
file3 = {
96+
'what_where_key': 'syslog:s114',
9497
'time_index_key': '15220:syslog',
9598
'range_key': 'ground_server2:34fb2d1ec54245c7a57e29ed5a6ea9b2',
9699
'metadata': {
@@ -99,7 +102,7 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna
99102
'end': 1415128740728,
100103
'path': '/var/log/syslog.2',
101104
'work_id': 'foo-bizz',
102-
'where': 'ground_server2',
105+
'where': 's114',
103106
'what': 'syslog',
104107
'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2',
105108
'hash': 'b4f2d8de24af342643d5b78a8f2b9b88'
@@ -113,52 +116,28 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna
113116
storage.store_latest(file1)
114117
storage.store_latest(file2) # same what:where, but should replace file1 b/c newer
115118

119+
query_what_where = 'syslog:ground_server2'
120+
116121
records = [dict(i) for i in dynamodb_latest_table.scan()]
117122

118-
res = dict(dynamodb_latest_table.get_item(time_index_key='15219:zlcdzvawsp',
119-
range_key='lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9'))
123+
res = dict(dynamodb_latest_table.get_item(what_where_key=query_what_where))
120124
assert res['metadata']['start'] == Decimal('1314877177413')
121125
assert len(records) == 2
122126
assert file2 == res
123127

128+
storage.store_latest(file3)
129+
storage.store_latest(file2)
130+
storage.store_latest(file1)
124131

125-
def test_concurrent_updates(dynamodb_latest_table, dynamodb_connection):
126-
storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection)
127-
128-
base_record = {
129-
'time_index_key': '15219:zlcdzvawsp',
130-
'range_key': 'lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9',
131-
'metadata': {
132-
'version': 1,
133-
'start': 1314877177402,
134-
'end': 1314877177412, # ends ten seconds later
135-
'path': '/var/log/syslog.2',
136-
'work_id': 'abc-123',
137-
'where': 'ground_server2',
138-
'what': 'syslog',
139-
'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2',
140-
'hash': 'b4f2d8de24af342643d5b78a8f2b9b88'
141-
},
142-
'url': 's3://existingfile/url',
143-
'create_time': 1314877177402
144-
}
145-
storage.store_latest(base_record)
146-
132+
records = [dict(i) for i in dynamodb_latest_table.scan()]
133+
res = dict(dynamodb_latest_table.get_item(what_where_key=query_what_where))
134+
assert res['metadata']['id'] != file1['metadata']['id']
135+
assert res['metadata']['id'] == file2['metadata']['id']
147136

148-
updated_record1 = base_record.copy()
149-
updated_record1['metadata']['start'] += 10
150-
151-
updated_record2 = base_record.copy()
152-
updated_record2['metadata']['start'] += 5
153-
154-
155-
storage.store_latest(updated_record1)
156-
storage.store_latest(updated_record2)
157-
158-
stored_record = dynamodb_latest_table.get_item(
159-
time_index_key='15219:zlcdzvawsp',
160-
range_key='lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9'
161-
)
162-
163-
assert stored_record['metadata']['start'] == updated_record1['metadata']['start']
137+
storage.store_latest(file1)
138+
storage.store_latest(file1)
139+
storage.store_latest(file2)
140+
storage.store_latest(file3)
141+
res = dict(dynamodb_latest_table.get_item(what_where_key=query_what_where))
142+
assert res['metadata']['start'] == file2['metadata']['start']
164143

0 commit comments

Comments
 (0)