Skip to content

Initial commit for conditional put on metadata.start for latest/ table #88

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 11, 2024
88 changes: 82 additions & 6 deletions ingester/datalake_ingester/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,26 @@
# License for the specific language governing permissions and limitations under
# the License.


from memoized_property import memoized_property
import boto.dynamodb2
from boto.dynamodb2.table import Table
from boto.dynamodb2.exceptions import ConditionalCheckFailedException
import os
from datalake.common.errors import InsufficientConfiguration
import logging


class DynamoDBStorage(object):
'''store datalake records in a dynamoDB table'''

def __init__(self, table_name, connection=None):
def __init__(self, table_name=None, latest_table=None, connection=None):
self.table_name = table_name
self.latest_table_name = os.environ.get("DATALAKE_LATEST_TABLE",
f"{latest_table}")
self.use_latest = os.environ.get("DATALAKE_USE_LATEST_TABLE", False)
self._prepare_connection(connection)
self.logger = logging.getLogger('storage')

@classmethod
def from_config(cls):
Expand All @@ -47,13 +53,83 @@ def _prepare_connection(self, connection):
@memoized_property
def _table(self):
return Table(self.table_name, connection=self._connection)

@memoized_property
def _latest_table(self):
return Table(self.latest_table_name, connection=self._connection)

def store(self, record):
try:
self._table.put_item(data=record)
except ConditionalCheckFailedException:
# Tolerate duplicate stores
pass
if self.use_latest:
self._latest_table.store_latest(record)
else:
try:
self._table.put_item(data=record)
except ConditionalCheckFailedException:
# Tolerate duplicate stores
pass

def update(self, record):
self._table.put_item(data=record, overwrite=True)

def store_latest(self, record):
"""
note: Record must utilize AttributeValue syntax
for the conditional put.
"""

condition_expression = " attribute_not_exists(what_where_key) OR metadata.start < :new_start"
expression_attribute_values = {
':new_start': {'N': str(record['metadata']['start'])}
}
record = {
'what_where_key': {"S": record['metadata']['what']+':'+record['metadata']['where']},
'time_index_key': {"S": record['time_index_key']},
'range_key': {"S": record['range_key']},
'metadata': {
'M': {
'start': {
'N': str(record['metadata']['start'])
},
'end': {
'N': str(record['metadata']['end'])
},
'id': {
'S': str(record['metadata']['id'])
},
'path': {
'S': str(record['metadata']['path'])
},
'hash': {
'S': str(record['metadata']['hash'])
},
'version': {
'N': str(record['metadata']['version'])
},
'what': {
'S': str(record['metadata']['what'])
},
'where': {
'S': str(record['metadata']['where'])
},
'work_id': {
'S': str(record['metadata']['work_id'])
}
}
},
'url': {"S": record['url']},
'create_time': {'N': str(record['create_time'])}
}

try:
self._connection.put_item(
table_name=self.latest_table_name,
item=record,
condition_expression=condition_expression,
expression_attribute_values=expression_attribute_values
)
self.logger.info("Record stored successfully.")
except ConditionalCheckFailedException:
self.logger.error("Condition not met, no operation was performed.")
except Exception as e:
self.logger.error(f"Error occurred: {str(e)}")

7 changes: 7 additions & 0 deletions ingester/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def tear_down():
return table_maker



@pytest.fixture
def dynamodb_users_table(dynamodb_table_maker):
schema = [HashKey('name'), RangeKey('last_name')]
Expand All @@ -67,6 +68,12 @@ def dynamodb_records_table(dynamodb_table_maker):
return dynamodb_table_maker('records', schema)


@pytest.fixture
def dynamodb_latest_table(dynamodb_table_maker):
schema = [HashKey('what_where_key')]
return dynamodb_table_maker('latest', schema)


@pytest.fixture
def sns_connection(aws_connector):
return aws_connector(mock_sns, boto.connect_sns)
Expand Down
127 changes: 126 additions & 1 deletion ingester/tests/test_storage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datalake_ingester import DynamoDBStorage
from decimal import Decimal


def test_dynamodb_store(dynamodb_users_table, dynamodb_connection):
Expand All @@ -8,11 +9,135 @@ def test_dynamodb_store(dynamodb_users_table, dynamodb_connection):
user = dict(dynamodb_users_table.get_item(name='John', last_name='Muir'))
assert dict(user) == expected_user


def test_store_duplicate(dynamodb_users_table, dynamodb_connection):
storage = DynamoDBStorage('users', connection=dynamodb_connection)
expected_user = {'name': 'Vanilla', 'last_name': 'Ice'}
storage.store(expected_user)
storage.store(expected_user)
user = dict(dynamodb_users_table.get_item(name='Vanilla', last_name='Ice'))
assert dict(user) == expected_user

def test_insert_new_record(dynamodb_latest_table, dynamodb_connection):
storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection)

new_record = {
'what_where_key': 'syslog:ground_server2',
'time_index_key': '15225:newlog',
'range_key': 'new_server:12345abcde',
'metadata': {
'version': 1,
'start': 1500000000000,
'end': 1500000000010,
'path': '/var/log/syslog.2',
'work_id': None,
'where': 'ground_server2',
'what': 'syslog',
'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2',
'hash': 'b4f2d8de24af342643d5b78a8f2b9b88'
},
'url': 's3://newfile/url',
'create_time': 1500000000000
}

try:
storage.store_latest(new_record)
except Exception as e:
print(f"Failed to store record: {str(e)}")

stored_record = dynamodb_latest_table.get_item(
what_where_key=new_record['what_where_key']
)
assert stored_record['metadata']['start'] == new_record['metadata']['start']


def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dynamodb_connection):
storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection)

file1 = {
'what_where_key': 'syslog:ground_server2',
'time_index_key': '15219:zlcdzvawsp',
'range_key': 'lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9',
'metadata': {
'version': 1,
'start': 1314877177402,
'end': 1314877177412, # ends ten seconds later
'path': '/var/log/syslog.2',
'work_id': 'abc-123',
'where': 'ground_server2',
'what': 'syslog',
'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2',
'hash': 'b4f2d8de24af342643d5b78a8f2b9b88'
},
'url': 's3://existingfile/url',
'create_time': 1314877177402
}

file2 = {
'what_where_key': 'syslog:ground_server2',
'time_index_key': '15219:zlcdzvawsp',
'range_key': 'lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9',
'metadata': {
'version': 1,
'start': 1314877177413, # One millisecond later
'end': 1314877177423, # ends ten seconds later
'path': '/var/log/syslog.2',
'work_id': 'abc-123',
'where': 'ground_server2',
'what': 'syslog',
'id': '45gb2d1ec54245c7a57e29ed5a6ea9b2',
'hash': 'c5g3d8de24af342643d5b78a8f2b9b88'

},
'url': 's3://existingfile/url',
'create_time': 1314877177403
}

file3 = {
'what_where_key': 'syslog:s114',
'time_index_key': '15220:syslog',
'range_key': 'ground_server2:34fb2d1ec54245c7a57e29ed5a6ea9b2',
'metadata': {
'version': 1,
'start': 1414877177402,
'end': 1415128740728,
'path': '/var/log/syslog.2',
'work_id': 'foo-bizz',
'where': 's114',
'what': 'syslog',
'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2',
'hash': 'b4f2d8de24af342643d5b78a8f2b9b88'
},
'url': 's3://datalake/path_to_file1',
'create_time': 1414877177402,
'size': 1048576
}

storage.store_latest(file3)
storage.store_latest(file1)
storage.store_latest(file2) # same what:where, but should replace file1 b/c newer

query_what_where = 'syslog:ground_server2'

records = [dict(i) for i in dynamodb_latest_table.scan()]

res = dict(dynamodb_latest_table.get_item(what_where_key=query_what_where))
assert res['metadata']['start'] == Decimal('1314877177413')
assert len(records) == 2
assert file2 == res

storage.store_latest(file3)
storage.store_latest(file2)
storage.store_latest(file1)

records = [dict(i) for i in dynamodb_latest_table.scan()]
res = dict(dynamodb_latest_table.get_item(what_where_key=query_what_where))
assert res['metadata']['id'] != file1['metadata']['id']
assert res['metadata']['id'] == file2['metadata']['id']

storage.store_latest(file1)
storage.store_latest(file1)
storage.store_latest(file2)
storage.store_latest(file3)
res = dict(dynamodb_latest_table.get_item(what_where_key=query_what_where))
assert res['metadata']['start'] == file2['metadata']['start']

Loading