From 50ace5253fddd4e19894e2672918177692ccc9af Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Thu, 18 Apr 2024 21:59:57 -0400 Subject: [PATCH 1/8] Initial commit for conditional put on metadata.start for latest/ table --- ingester/datalake_ingester/storage.py | 21 ++++- ingester/tests/conftest.py | 6 ++ ingester/tests/test_storage.py | 119 +++++++++++++++++++++++--- 3 files changed, 130 insertions(+), 16 deletions(-) diff --git a/ingester/datalake_ingester/storage.py b/ingester/datalake_ingester/storage.py index a0729cc..982321e 100644 --- a/ingester/datalake_ingester/storage.py +++ b/ingester/datalake_ingester/storage.py @@ -15,7 +15,8 @@ from memoized_property import memoized_property import boto.dynamodb2 from boto.dynamodb2.table import Table -from boto.dynamodb2.exceptions import ConditionalCheckFailedException +from boto.dynamodb2.exceptions import (ConditionalCheckFailedException, + ItemNotFound) import os from datalake.common.errors import InsufficientConfiguration @@ -50,10 +51,24 @@ def _table(self): def store(self, record): try: + primary_key = { + 'time_index_key': record['time_index_key'], + 'range_key': record['range_key'] + } + existing_record = self._table.get_item(**primary_key) + + if existing_record and \ + existing_record['metadata']['start'] < record['metadata']['start']: + self._table.put_item(data=record, overwrite=True) + else: + print(f'Existing record in latest table has later or same start time. No update performed') + + except ItemNotFound: + # Item doesn't exist, lets insert self._table.put_item(data=record) except ConditionalCheckFailedException: - # Tolerate duplicate stores - pass + # Tolerate duplicate stores + pass def update(self, record): self._table.put_item(data=record, overwrite=True) diff --git a/ingester/tests/conftest.py b/ingester/tests/conftest.py index fa432a7..5797639 100644 --- a/ingester/tests/conftest.py +++ b/ingester/tests/conftest.py @@ -67,6 +67,12 @@ def dynamodb_records_table(dynamodb_table_maker): return dynamodb_table_maker('records', schema) +@pytest.fixture +def dynamodb_latest_table(dynamodb_table_maker): + schema = [HashKey('time_index_key'), RangeKey('range_key')] + return dynamodb_table_maker('latest', schema) + + @pytest.fixture def sns_connection(aws_connector): return aws_connector(mock_sns, boto.connect_sns) diff --git a/ingester/tests/test_storage.py b/ingester/tests/test_storage.py index 2e2fd28..ea45196 100644 --- a/ingester/tests/test_storage.py +++ b/ingester/tests/test_storage.py @@ -1,18 +1,111 @@ from datalake_ingester import DynamoDBStorage +from decimal import Decimal +def test_insert_new_record(dynamodb_latest_table, dynamodb_connection): + storage = DynamoDBStorage('latest', connection=dynamodb_connection) + new_record = { + 'time_index_key': '15225:newlog', + 'range_key': 'new_server:12345abcde', + 'metadata': { + 'start': 1500000000000 + }, + 'url': 's3://newfile/url', + 'create_time': 1500000000000 + } -def test_dynamodb_store(dynamodb_users_table, dynamodb_connection): - storage = DynamoDBStorage('users', connection=dynamodb_connection) - expected_user = {'name': 'John', 'last_name': 'Muir'} - storage.store(expected_user) - user = dict(dynamodb_users_table.get_item(name='John', last_name='Muir')) - assert dict(user) == expected_user + storage.store(new_record) + stored_record = dynamodb_latest_table.get_item( + time_index_key='15225:newlog', + range_key='new_server:12345abcde' + ) + assert stored_record['metadata']['start'] == new_record['metadata']['start'] + + +def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dynamodb_connection): + storage = DynamoDBStorage('latest', connection=dynamodb_connection) + + file1 = { + 'time_index_key': '15219:zlcdzvawsp', + 'range_key': 'lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9', + 'metadata': { + 'start': 1314877177402 + }, + 'url': 's3://existingfile/url', + 'create_time': 1314877177402 + } + + file2 = { + 'time_index_key': '15219:zlcdzvawsp', + 'range_key': 'lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9', + 'metadata': { + 'start': 1314877177403 # One millisecond later + }, + 'url': 's3://existingfile/url', + 'create_time': 1314877177403 + } + + file3 = { + 'time_index_key': '15220:syslog', + 'range_key': 'ground_server2:34fb2d1ec54245c7a57e29ed5a6ea9b2', + 'metadata': { + 'version': 1, + 'start': 1414877177402, + 'end': 1415128740728, + 'path': '/var/log/syslog.2', + 'work_id': None, + 'where': 'ground_server2', + 'what': 'syslog', + 'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2', + 'hash': 'b4f2d8de24af342643d5b78a8f2b9b88' + }, + 'url': 's3://datalake/path_to_file1', + 'create_time': 1414877177402, + 'size': 1048576 + } + + storage.store(file3) + storage.store(file1) + storage.store(file2) # same what:where, but should replace file1 b/c newer + + records = [dict(i) for i in dynamodb_latest_table.scan()] + + res = dict(dynamodb_latest_table.get_item(time_index_key='15219:zlcdzvawsp', + range_key='lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9')) + assert res['metadata']['start'] == Decimal('1314877177403') + assert len(records) == 2 + assert file2 == res + + +def test_concurrent_updates(dynamodb_latest_table, dynamodb_connection): + storage = DynamoDBStorage('latest', connection=dynamodb_connection) + + base_record = { + 'time_index_key': '15219:zlcdzvawsp', + 'range_key': 'lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9', + 'metadata': { + 'start': 1314877177402 + }, + 'url': 's3://existingfile/url', + 'create_time': 1314877177402 + } + storage.store(base_record) + + + updated_record1 = base_record.copy() + updated_record1['metadata']['start'] += 10 + + updated_record2 = base_record.copy() + updated_record2['metadata']['start'] += 5 + + + storage.store(updated_record1) + storage.store(updated_record2) + + stored_record = dynamodb_latest_table.get_item( + time_index_key='15219:zlcdzvawsp', + range_key='lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9' + ) + + assert stored_record['metadata']['start'] == updated_record1['metadata']['start'] -def test_store_duplicate(dynamodb_users_table, dynamodb_connection): - storage = DynamoDBStorage('users', connection=dynamodb_connection) - expected_user = {'name': 'Vanilla', 'last_name': 'Ice'} - storage.store(expected_user) - storage.store(expected_user) - user = dict(dynamodb_users_table.get_item(name='Vanilla', last_name='Ice')) - assert dict(user) == expected_user From 8406bd11faf3eb9fc4d5192ef4b670eb06f4f146 Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Fri, 19 Apr 2024 12:03:36 -0400 Subject: [PATCH 2/8] WIP: cleanup indent. --- ingester/datalake_ingester/storage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ingester/datalake_ingester/storage.py b/ingester/datalake_ingester/storage.py index 982321e..a7b4c8a 100644 --- a/ingester/datalake_ingester/storage.py +++ b/ingester/datalake_ingester/storage.py @@ -67,8 +67,8 @@ def store(self, record): # Item doesn't exist, lets insert self._table.put_item(data=record) except ConditionalCheckFailedException: - # Tolerate duplicate stores - pass + # Tolerate duplicate stores + pass def update(self, record): self._table.put_item(data=record, overwrite=True) From f806d0745e0ff685bd83a2c315a4747788211691 Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Thu, 25 Apr 2024 11:35:52 -0400 Subject: [PATCH 3/8] WIP: In process of boto3 upgrades for latest/ put_item. --- ingester/datalake_ingester/storage.py | 39 ++++++++++------- ingester/tests/conftest.py | 63 +++++++++++++++++++++++++-- ingester/tests/test_storage.py | 14 +++--- 3 files changed, 91 insertions(+), 25 deletions(-) diff --git a/ingester/datalake_ingester/storage.py b/ingester/datalake_ingester/storage.py index a7b4c8a..6167cd4 100644 --- a/ingester/datalake_ingester/storage.py +++ b/ingester/datalake_ingester/storage.py @@ -12,11 +12,13 @@ # License for the specific language governing permissions and limitations under # the License. +import boto3 +from boto3.dynamodb.conditions import Attr + from memoized_property import memoized_property import boto.dynamodb2 from boto.dynamodb2.table import Table -from boto.dynamodb2.exceptions import (ConditionalCheckFailedException, - ItemNotFound) +from boto.dynamodb2.exceptions import ConditionalCheckFailedException import os from datalake.common.errors import InsufficientConfiguration @@ -26,6 +28,9 @@ class DynamoDBStorage(object): def __init__(self, table_name, connection=None): self.table_name = table_name + self.latest_table_name = os.environ.get("DATALAKE_DNAMODB_LATEST_TABLE", + f"{self.table_name}-latest") + self.use_latest = os.environ.get("DATALAKE_LATEST_FLAG", False) self._prepare_connection(connection) @classmethod @@ -48,23 +53,14 @@ def _prepare_connection(self, connection): @memoized_property def _table(self): return Table(self.table_name, connection=self._connection) + + @memoized_property + def _latest_table(self): + dynamodb = boto3.resource('dynamodb') + return dynamodb.Table(self.latest_table_name, connection=self._connection) def store(self, record): try: - primary_key = { - 'time_index_key': record['time_index_key'], - 'range_key': record['range_key'] - } - existing_record = self._table.get_item(**primary_key) - - if existing_record and \ - existing_record['metadata']['start'] < record['metadata']['start']: - self._table.put_item(data=record, overwrite=True) - else: - print(f'Existing record in latest table has later or same start time. No update performed') - - except ItemNotFound: - # Item doesn't exist, lets insert self._table.put_item(data=record) except ConditionalCheckFailedException: # Tolerate duplicate stores @@ -72,3 +68,14 @@ def store(self, record): def update(self, record): self._table.put_item(data=record, overwrite=True) + + def store_latest(self, record): + item_attrs = {'time_index_key': record['time_index_key'], + 'range_key': record['range_key']} + condition = (Attr('metadata.start').lt(record['metadata']['start'])) + try: + self._latest_table.put_item(item_attrs, + condition) + except ConditionalCheckFailedException: + pass + diff --git a/ingester/tests/conftest.py b/ingester/tests/conftest.py index 5797639..34f10c0 100644 --- a/ingester/tests/conftest.py +++ b/ingester/tests/conftest.py @@ -14,10 +14,17 @@ import boto.sns import boto.sqs + +import boto3 +from boto3 import client +from botocore.exceptions import ClientError + from datalake.tests import * # noqa from datalake_ingester import SQSQueue +import logging +logging.basicConfig(level=logging.DEBUG) @pytest.fixture def dynamodb_connection(aws_connector): @@ -25,6 +32,13 @@ def dynamodb_connection(aws_connector): lambda: boto.dynamodb2.connect_to_region('us-west-1')) +@pytest.fixture +def dynamodb_latest_connection(aws_connector): + return aws_connector(mock_dynamodb2, + lambda: boto3.resource('dynamodb', + region_name='us-west-1')) + + def _delete_table_if_exists(conn, name): try: table = Table(name, connection=conn) @@ -33,6 +47,16 @@ def _delete_table_if_exists(conn, name): if e.status == 400 and e.error_code == 'ResourceNotFoundException': return raise e + +def _delete_latest_if_exists(dynamodb, name): + try: + table = dynamodb.Table(name) + table.delete() + table.wait_until_not_exists() + except ClientError as e: + if e.response['Error']['Code'] == 'ResourceNotFoundException': + return + raise e @pytest.fixture @@ -55,6 +79,32 @@ def tear_down(): return table_maker +@pytest.fixture +def dynamodb_latest_table_maker(request, dynamodb_latest_connection): + + def table_maker(name, key_schema, attributes): + _delete_latest_if_exists(dynamodb_latest_connection, name) + table = dynamodb_latest_connection.create_table( + TableName=name, + KeySchema=key_schema, + AttributeDefinitions=attributes, + ProvisionedThroughput={ + 'ReadCapacityUnits': 5, + 'WriteCapacityUnits': 5 + } + ) + table.wait_until_exists() + + def tear_down(): + table.delete() + table.wait_until_not_exists() + + request.addfinalizer(tear_down) + return table + + return table_maker + + @pytest.fixture def dynamodb_users_table(dynamodb_table_maker): schema = [HashKey('name'), RangeKey('last_name')] @@ -68,9 +118,16 @@ def dynamodb_records_table(dynamodb_table_maker): @pytest.fixture -def dynamodb_latest_table(dynamodb_table_maker): - schema = [HashKey('time_index_key'), RangeKey('range_key')] - return dynamodb_table_maker('latest', schema) +def dynamodb_latest_table(dynamodb_latest_table_maker): + schema = [ + {'AttributeName': 'time_index_key', 'KeyType': 'HASH'}, + {'AttributeName': 'range_key', 'KeyType': 'RANGE'} + ] + attributes = [ + {'AttributeName': 'time_index_key', 'AttributeType': 'S'}, + {'AttributeName': 'range_key', 'AttributeType': 'S'} + ] + return dynamodb_latest_table_maker('latest', schema, attributes) @pytest.fixture diff --git a/ingester/tests/test_storage.py b/ingester/tests/test_storage.py index ea45196..b04f068 100644 --- a/ingester/tests/test_storage.py +++ b/ingester/tests/test_storage.py @@ -1,7 +1,9 @@ from datalake_ingester import DynamoDBStorage from decimal import Decimal +import boto3 def test_insert_new_record(dynamodb_latest_table, dynamodb_connection): + boto3.setup_default_session(fake_credentials=True) storage = DynamoDBStorage('latest', connection=dynamodb_connection) new_record = { 'time_index_key': '15225:newlog', @@ -13,7 +15,7 @@ def test_insert_new_record(dynamodb_latest_table, dynamodb_connection): 'create_time': 1500000000000 } - storage.store(new_record) + storage.store_latest(new_record) stored_record = dynamodb_latest_table.get_item( time_index_key='15225:newlog', @@ -64,9 +66,9 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna 'size': 1048576 } - storage.store(file3) - storage.store(file1) - storage.store(file2) # same what:where, but should replace file1 b/c newer + storage.store_latest(file3) + storage.store_latest(file1) + storage.store_latest(file2) # same what:where, but should replace file1 b/c newer records = [dict(i) for i in dynamodb_latest_table.scan()] @@ -99,8 +101,8 @@ def test_concurrent_updates(dynamodb_latest_table, dynamodb_connection): updated_record2['metadata']['start'] += 5 - storage.store(updated_record1) - storage.store(updated_record2) + storage.store_latest(updated_record1) + storage.store_latest(updated_record2) stored_record = dynamodb_latest_table.get_item( time_index_key='15219:zlcdzvawsp', From db1ce4e5dfe966dc6b88d978f01dd8fdf17994a1 Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Tue, 30 Apr 2024 11:48:10 -0400 Subject: [PATCH 4/8] Ingester storage conditional put updates with tests. --- ingester/datalake_ingester/storage.py | 68 ++++++++++++++++++----- ingester/tests/conftest.py | 62 ++------------------- ingester/tests/test_storage.py | 77 ++++++++++++++++++++++----- 3 files changed, 123 insertions(+), 84 deletions(-) diff --git a/ingester/datalake_ingester/storage.py b/ingester/datalake_ingester/storage.py index 6167cd4..a49e296 100644 --- a/ingester/datalake_ingester/storage.py +++ b/ingester/datalake_ingester/storage.py @@ -12,8 +12,6 @@ # License for the specific language governing permissions and limitations under # the License. -import boto3 -from boto3.dynamodb.conditions import Attr from memoized_property import memoized_property import boto.dynamodb2 @@ -21,17 +19,19 @@ from boto.dynamodb2.exceptions import ConditionalCheckFailedException import os from datalake.common.errors import InsufficientConfiguration +import logging class DynamoDBStorage(object): '''store datalake records in a dynamoDB table''' - def __init__(self, table_name, connection=None): + def __init__(self, table_name=None, latest_table=None, connection=None): self.table_name = table_name self.latest_table_name = os.environ.get("DATALAKE_DNAMODB_LATEST_TABLE", - f"{self.table_name}-latest") + f"{latest_table}") self.use_latest = os.environ.get("DATALAKE_LATEST_FLAG", False) self._prepare_connection(connection) + self.logger = logging.getLogger('storage') @classmethod def from_config(cls): @@ -56,8 +56,7 @@ def _table(self): @memoized_property def _latest_table(self): - dynamodb = boto3.resource('dynamodb') - return dynamodb.Table(self.latest_table_name, connection=self._connection) + return Table(self.latest_table_name, connection=self._connection) def store(self, record): try: @@ -70,12 +69,57 @@ def update(self, record): self._table.put_item(data=record, overwrite=True) def store_latest(self, record): - item_attrs = {'time_index_key': record['time_index_key'], - 'range_key': record['range_key']} - condition = (Attr('metadata.start').lt(record['metadata']['start'])) + """ + note: Record must utilize AttributeValue syntax + for the conditional put. + """ + record = { + 'time_index_key': {"S": record['time_index_key']}, + 'range_key': {"S": record['range_key']}, + 'metadata': { + 'M': { + 'start': { + 'N': str(record['metadata']['start']) + }, + 'end': { + 'N': str(record['metadata']['end']) + }, + 'id': { + 'S': str(record['metadata']['id']) + }, + 'path': { + 'S': str(record['metadata']['path']) + }, + 'hash': { + 'S': str(record['metadata']['hash']) + }, + 'version': { + 'N': str(record['metadata']['version']) + }, + 'what': { + 'S': str(record['metadata']['what']) + }, + 'where': { + 'S': str(record['metadata']['where']) + }, + 'work_id': { + 'S': str(record['metadata']['work_id']) + } + } + }, + 'url': {"S": record['url']}, + 'create_time': {'N': str(record['create_time'])} + } try: - self._latest_table.put_item(item_attrs, - condition) + self._connection.put_item( + table_name=self.latest_table_name, + item=record, + condition_expression=\ + f"attribute_not_exists(metadata.M.start.N) OR metadata.M.start.N < {record['metadata']['M']['start']['N']}", + ) + self.logger.info("Record stored successfully.") except ConditionalCheckFailedException: - pass + self.logger.error("Condition not met, no operation was performed.") + except Exception as e: + self.logger.error(f"Error occurred: {str(e)}") diff --git a/ingester/tests/conftest.py b/ingester/tests/conftest.py index 34f10c0..3e6b74d 100644 --- a/ingester/tests/conftest.py +++ b/ingester/tests/conftest.py @@ -14,17 +14,10 @@ import boto.sns import boto.sqs - -import boto3 -from boto3 import client -from botocore.exceptions import ClientError - from datalake.tests import * # noqa from datalake_ingester import SQSQueue -import logging -logging.basicConfig(level=logging.DEBUG) @pytest.fixture def dynamodb_connection(aws_connector): @@ -32,13 +25,6 @@ def dynamodb_connection(aws_connector): lambda: boto.dynamodb2.connect_to_region('us-west-1')) -@pytest.fixture -def dynamodb_latest_connection(aws_connector): - return aws_connector(mock_dynamodb2, - lambda: boto3.resource('dynamodb', - region_name='us-west-1')) - - def _delete_table_if_exists(conn, name): try: table = Table(name, connection=conn) @@ -47,16 +33,6 @@ def _delete_table_if_exists(conn, name): if e.status == 400 and e.error_code == 'ResourceNotFoundException': return raise e - -def _delete_latest_if_exists(dynamodb, name): - try: - table = dynamodb.Table(name) - table.delete() - table.wait_until_not_exists() - except ClientError as e: - if e.response['Error']['Code'] == 'ResourceNotFoundException': - return - raise e @pytest.fixture @@ -79,31 +55,6 @@ def tear_down(): return table_maker -@pytest.fixture -def dynamodb_latest_table_maker(request, dynamodb_latest_connection): - - def table_maker(name, key_schema, attributes): - _delete_latest_if_exists(dynamodb_latest_connection, name) - table = dynamodb_latest_connection.create_table( - TableName=name, - KeySchema=key_schema, - AttributeDefinitions=attributes, - ProvisionedThroughput={ - 'ReadCapacityUnits': 5, - 'WriteCapacityUnits': 5 - } - ) - table.wait_until_exists() - - def tear_down(): - table.delete() - table.wait_until_not_exists() - - request.addfinalizer(tear_down) - return table - - return table_maker - @pytest.fixture def dynamodb_users_table(dynamodb_table_maker): @@ -118,16 +69,9 @@ def dynamodb_records_table(dynamodb_table_maker): @pytest.fixture -def dynamodb_latest_table(dynamodb_latest_table_maker): - schema = [ - {'AttributeName': 'time_index_key', 'KeyType': 'HASH'}, - {'AttributeName': 'range_key', 'KeyType': 'RANGE'} - ] - attributes = [ - {'AttributeName': 'time_index_key', 'AttributeType': 'S'}, - {'AttributeName': 'range_key', 'AttributeType': 'S'} - ] - return dynamodb_latest_table_maker('latest', schema, attributes) +def dynamodb_latest_table(dynamodb_table_maker): + schema = [HashKey('time_index_key'), RangeKey('range_key')] + return dynamodb_table_maker('latest', schema) @pytest.fixture diff --git a/ingester/tests/test_storage.py b/ingester/tests/test_storage.py index b04f068..743a886 100644 --- a/ingester/tests/test_storage.py +++ b/ingester/tests/test_storage.py @@ -1,21 +1,47 @@ from datalake_ingester import DynamoDBStorage from decimal import Decimal -import boto3 + + +def test_dynamodb_store(dynamodb_users_table, dynamodb_connection): + storage = DynamoDBStorage('users', connection=dynamodb_connection) + expected_user = {'name': 'John', 'last_name': 'Muir'} + storage.store(expected_user) + user = dict(dynamodb_users_table.get_item(name='John', last_name='Muir')) + assert dict(user) == expected_user + +def test_store_duplicate(dynamodb_users_table, dynamodb_connection): + storage = DynamoDBStorage('users', connection=dynamodb_connection) + expected_user = {'name': 'Vanilla', 'last_name': 'Ice'} + storage.store(expected_user) + storage.store(expected_user) + user = dict(dynamodb_users_table.get_item(name='Vanilla', last_name='Ice')) + assert dict(user) == expected_user def test_insert_new_record(dynamodb_latest_table, dynamodb_connection): - boto3.setup_default_session(fake_credentials=True) - storage = DynamoDBStorage('latest', connection=dynamodb_connection) + storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection) + new_record = { 'time_index_key': '15225:newlog', 'range_key': 'new_server:12345abcde', 'metadata': { - 'start': 1500000000000 + 'version': 1, + 'start': 1500000000000, + 'end': 1500000000010, + 'path': '/var/log/syslog.2', + 'work_id': None, + 'where': 'ground_server2', + 'what': 'syslog', + 'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2', + 'hash': 'b4f2d8de24af342643d5b78a8f2b9b88' }, 'url': 's3://newfile/url', 'create_time': 1500000000000 } - storage.store_latest(new_record) + try: + storage.store_latest(new_record) + except Exception as e: + print(f"Failed to store record: {str(e)}") stored_record = dynamodb_latest_table.get_item( time_index_key='15225:newlog', @@ -25,13 +51,21 @@ def test_insert_new_record(dynamodb_latest_table, dynamodb_connection): def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dynamodb_connection): - storage = DynamoDBStorage('latest', connection=dynamodb_connection) + storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection) file1 = { 'time_index_key': '15219:zlcdzvawsp', 'range_key': 'lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9', 'metadata': { - 'start': 1314877177402 + 'version': 1, + 'start': 1314877177402, + 'end': 1314877177412, # ends ten seconds later + 'path': '/var/log/syslog.2', + 'work_id': 'abc-123', + 'where': 'ground_server2', + 'what': 'syslog', + 'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2', + 'hash': 'b4f2d8de24af342643d5b78a8f2b9b88' }, 'url': 's3://existingfile/url', 'create_time': 1314877177402 @@ -41,7 +75,16 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna 'time_index_key': '15219:zlcdzvawsp', 'range_key': 'lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9', 'metadata': { - 'start': 1314877177403 # One millisecond later + 'version': 1, + 'start': 1314877177413, # One millisecond later + 'end': 1314877177423, # ends ten seconds later + 'path': '/var/log/syslog.2', + 'work_id': 'abc-123', + 'where': 'ground_server2', + 'what': 'syslog', + 'id': '45gb2d1ec54245c7a57e29ed5a6ea9b2', + 'hash': 'c5g3d8de24af342643d5b78a8f2b9b88' + }, 'url': 's3://existingfile/url', 'create_time': 1314877177403 @@ -55,7 +98,7 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna 'start': 1414877177402, 'end': 1415128740728, 'path': '/var/log/syslog.2', - 'work_id': None, + 'work_id': 'foo-bizz', 'where': 'ground_server2', 'what': 'syslog', 'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2', @@ -74,24 +117,32 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna res = dict(dynamodb_latest_table.get_item(time_index_key='15219:zlcdzvawsp', range_key='lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9')) - assert res['metadata']['start'] == Decimal('1314877177403') + assert res['metadata']['start'] == Decimal('1314877177413') assert len(records) == 2 assert file2 == res def test_concurrent_updates(dynamodb_latest_table, dynamodb_connection): - storage = DynamoDBStorage('latest', connection=dynamodb_connection) + storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection) base_record = { 'time_index_key': '15219:zlcdzvawsp', 'range_key': 'lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9', 'metadata': { - 'start': 1314877177402 + 'version': 1, + 'start': 1314877177402, + 'end': 1314877177412, # ends ten seconds later + 'path': '/var/log/syslog.2', + 'work_id': 'abc-123', + 'where': 'ground_server2', + 'what': 'syslog', + 'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2', + 'hash': 'b4f2d8de24af342643d5b78a8f2b9b88' }, 'url': 's3://existingfile/url', 'create_time': 1314877177402 } - storage.store(base_record) + storage.store_latest(base_record) updated_record1 = base_record.copy() From e9ee7c3d3fd46a3ec8ccff940a20a170649cc387 Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Wed, 1 May 2024 15:37:36 -0400 Subject: [PATCH 5/8] Ingester: Updating storage store_latest, tests and conftest. --- ingester/datalake_ingester/storage.py | 14 ++++-- ingester/tests/conftest.py | 2 +- ingester/tests/test_storage.py | 65 +++++++++------------------ 3 files changed, 34 insertions(+), 47 deletions(-) diff --git a/ingester/datalake_ingester/storage.py b/ingester/datalake_ingester/storage.py index a49e296..d553218 100644 --- a/ingester/datalake_ingester/storage.py +++ b/ingester/datalake_ingester/storage.py @@ -29,7 +29,7 @@ def __init__(self, table_name=None, latest_table=None, connection=None): self.table_name = table_name self.latest_table_name = os.environ.get("DATALAKE_DNAMODB_LATEST_TABLE", f"{latest_table}") - self.use_latest = os.environ.get("DATALAKE_LATEST_FLAG", False) + self.use_latest = os.environ.get("DATALAKE_USE_LATEST_TABLE", False) self._prepare_connection(connection) self.logger = logging.getLogger('storage') @@ -73,7 +73,14 @@ def store_latest(self, record): note: Record must utilize AttributeValue syntax for the conditional put. """ + + condition_expression = " attribute_not_exists(what_where_key) OR metadata.start < :new_start" + expression_attribute_values = { + ':what_where_key': {'S': record['what_where_key']}, + ':new_start': {'N': str(record['metadata']['start'])} + } record = { + 'what_where_key': {"S": record['metadata']['what']+':'+record['metadata']['where']}, 'time_index_key': {"S": record['time_index_key']}, 'range_key': {"S": record['range_key']}, 'metadata': { @@ -110,12 +117,13 @@ def store_latest(self, record): 'url': {"S": record['url']}, 'create_time': {'N': str(record['create_time'])} } + try: self._connection.put_item( table_name=self.latest_table_name, item=record, - condition_expression=\ - f"attribute_not_exists(metadata.M.start.N) OR metadata.M.start.N < {record['metadata']['M']['start']['N']}", + condition_expression=condition_expression, + expression_attribute_values=expression_attribute_values ) self.logger.info("Record stored successfully.") except ConditionalCheckFailedException: diff --git a/ingester/tests/conftest.py b/ingester/tests/conftest.py index 3e6b74d..7c9fa9b 100644 --- a/ingester/tests/conftest.py +++ b/ingester/tests/conftest.py @@ -70,7 +70,7 @@ def dynamodb_records_table(dynamodb_table_maker): @pytest.fixture def dynamodb_latest_table(dynamodb_table_maker): - schema = [HashKey('time_index_key'), RangeKey('range_key')] + schema = [HashKey('what_where_key')] return dynamodb_table_maker('latest', schema) diff --git a/ingester/tests/test_storage.py b/ingester/tests/test_storage.py index 743a886..0cf4470 100644 --- a/ingester/tests/test_storage.py +++ b/ingester/tests/test_storage.py @@ -21,6 +21,7 @@ def test_insert_new_record(dynamodb_latest_table, dynamodb_connection): storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection) new_record = { + 'what_where_key': 'syslog:ground_server2', 'time_index_key': '15225:newlog', 'range_key': 'new_server:12345abcde', 'metadata': { @@ -44,8 +45,7 @@ def test_insert_new_record(dynamodb_latest_table, dynamodb_connection): print(f"Failed to store record: {str(e)}") stored_record = dynamodb_latest_table.get_item( - time_index_key='15225:newlog', - range_key='new_server:12345abcde' + what_where_key=new_record['what_where_key'] ) assert stored_record['metadata']['start'] == new_record['metadata']['start'] @@ -54,6 +54,7 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection) file1 = { + 'what_where_key': 'syslog:ground_server2', 'time_index_key': '15219:zlcdzvawsp', 'range_key': 'lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9', 'metadata': { @@ -72,6 +73,7 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna } file2 = { + 'what_where_key': 'syslog:ground_server2', 'time_index_key': '15219:zlcdzvawsp', 'range_key': 'lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9', 'metadata': { @@ -91,6 +93,7 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna } file3 = { + 'what_where_key': 'syslog:s114', 'time_index_key': '15220:syslog', 'range_key': 'ground_server2:34fb2d1ec54245c7a57e29ed5a6ea9b2', 'metadata': { @@ -99,7 +102,7 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna 'end': 1415128740728, 'path': '/var/log/syslog.2', 'work_id': 'foo-bizz', - 'where': 'ground_server2', + 'where': 's114', 'what': 'syslog', 'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2', 'hash': 'b4f2d8de24af342643d5b78a8f2b9b88' @@ -113,52 +116,28 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna storage.store_latest(file1) storage.store_latest(file2) # same what:where, but should replace file1 b/c newer + query_what_where = 'syslog:ground_server2' + records = [dict(i) for i in dynamodb_latest_table.scan()] - res = dict(dynamodb_latest_table.get_item(time_index_key='15219:zlcdzvawsp', - range_key='lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9')) + res = dict(dynamodb_latest_table.get_item(what_where_key=query_what_where)) assert res['metadata']['start'] == Decimal('1314877177413') assert len(records) == 2 assert file2 == res + storage.store_latest(file3) + storage.store_latest(file2) + storage.store_latest(file1) -def test_concurrent_updates(dynamodb_latest_table, dynamodb_connection): - storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection) - - base_record = { - 'time_index_key': '15219:zlcdzvawsp', - 'range_key': 'lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9', - 'metadata': { - 'version': 1, - 'start': 1314877177402, - 'end': 1314877177412, # ends ten seconds later - 'path': '/var/log/syslog.2', - 'work_id': 'abc-123', - 'where': 'ground_server2', - 'what': 'syslog', - 'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2', - 'hash': 'b4f2d8de24af342643d5b78a8f2b9b88' - }, - 'url': 's3://existingfile/url', - 'create_time': 1314877177402 - } - storage.store_latest(base_record) - + records = [dict(i) for i in dynamodb_latest_table.scan()] + res = dict(dynamodb_latest_table.get_item(what_where_key=query_what_where)) + assert res['metadata']['id'] != file1['metadata']['id'] + assert res['metadata']['id'] == file2['metadata']['id'] - updated_record1 = base_record.copy() - updated_record1['metadata']['start'] += 10 - - updated_record2 = base_record.copy() - updated_record2['metadata']['start'] += 5 - - - storage.store_latest(updated_record1) - storage.store_latest(updated_record2) - - stored_record = dynamodb_latest_table.get_item( - time_index_key='15219:zlcdzvawsp', - range_key='lawvuunyws:447a4a801cabc6089f04922abdfa8aad099824e9' - ) - - assert stored_record['metadata']['start'] == updated_record1['metadata']['start'] + storage.store_latest(file1) + storage.store_latest(file1) + storage.store_latest(file2) + storage.store_latest(file3) + res = dict(dynamodb_latest_table.get_item(what_where_key=query_what_where)) + assert res['metadata']['start'] == file2['metadata']['start'] From 8eb2e08f18213a90b2f7af599eff7c6508033d51 Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Wed, 1 May 2024 15:40:44 -0400 Subject: [PATCH 6/8] Ingester: Updating storage store_latest, tests and conftest. --- ingester/datalake_ingester/storage.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ingester/datalake_ingester/storage.py b/ingester/datalake_ingester/storage.py index d553218..f6aab0b 100644 --- a/ingester/datalake_ingester/storage.py +++ b/ingester/datalake_ingester/storage.py @@ -76,7 +76,6 @@ def store_latest(self, record): condition_expression = " attribute_not_exists(what_where_key) OR metadata.start < :new_start" expression_attribute_values = { - ':what_where_key': {'S': record['what_where_key']}, ':new_start': {'N': str(record['metadata']['start'])} } record = { From 7385b9d9605cd48b3e2b9108608c960789f6d59f Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Thu, 9 May 2024 14:09:00 -0400 Subject: [PATCH 7/8] DynamoDBStorage.store() method will call store_latest() if use_latest flag is True. --- ingester/datalake_ingester/storage.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/ingester/datalake_ingester/storage.py b/ingester/datalake_ingester/storage.py index f6aab0b..e3812a7 100644 --- a/ingester/datalake_ingester/storage.py +++ b/ingester/datalake_ingester/storage.py @@ -59,11 +59,14 @@ def _latest_table(self): return Table(self.latest_table_name, connection=self._connection) def store(self, record): - try: - self._table.put_item(data=record) - except ConditionalCheckFailedException: - # Tolerate duplicate stores - pass + if self.use_latest: + self._latest_table.store_latest(record) + else: + try: + self._table.put_item(data=record) + except ConditionalCheckFailedException: + # Tolerate duplicate stores + pass def update(self, record): self._table.put_item(data=record, overwrite=True) From 1ba7f8238b08d9e09dd8a7e52f09af7a65593d3e Mon Sep 17 00:00:00 2001 From: Alex Bednarek Date: Tue, 11 Jun 2024 17:27:11 -0400 Subject: [PATCH 8/8] Ingester: Latest table name typo. --- ingester/datalake_ingester/storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingester/datalake_ingester/storage.py b/ingester/datalake_ingester/storage.py index e3812a7..7654c59 100644 --- a/ingester/datalake_ingester/storage.py +++ b/ingester/datalake_ingester/storage.py @@ -27,7 +27,7 @@ class DynamoDBStorage(object): def __init__(self, table_name=None, latest_table=None, connection=None): self.table_name = table_name - self.latest_table_name = os.environ.get("DATALAKE_DNAMODB_LATEST_TABLE", + self.latest_table_name = os.environ.get("DATALAKE_LATEST_TABLE", f"{latest_table}") self.use_latest = os.environ.get("DATALAKE_USE_LATEST_TABLE", False) self._prepare_connection(connection)