Skip to content

Commit f806d07

Browse files
committed
WIP: In process of boto3 upgrades for latest/ put_item.
1 parent 8406bd1 commit f806d07

File tree

3 files changed

+91
-25
lines changed

3 files changed

+91
-25
lines changed

ingester/datalake_ingester/storage.py

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@
1212
# License for the specific language governing permissions and limitations under
1313
# the License.
1414

15+
import boto3
16+
from boto3.dynamodb.conditions import Attr
17+
1518
from memoized_property import memoized_property
1619
import boto.dynamodb2
1720
from boto.dynamodb2.table import Table
18-
from boto.dynamodb2.exceptions import (ConditionalCheckFailedException,
19-
ItemNotFound)
21+
from boto.dynamodb2.exceptions import ConditionalCheckFailedException
2022
import os
2123
from datalake.common.errors import InsufficientConfiguration
2224

@@ -26,6 +28,9 @@ class DynamoDBStorage(object):
2628

2729
def __init__(self, table_name, connection=None):
2830
self.table_name = table_name
31+
self.latest_table_name = os.environ.get("DATALAKE_DNAMODB_LATEST_TABLE",
32+
f"{self.table_name}-latest")
33+
self.use_latest = os.environ.get("DATALAKE_LATEST_FLAG", False)
2934
self._prepare_connection(connection)
3035

3136
@classmethod
@@ -48,27 +53,29 @@ def _prepare_connection(self, connection):
4853
@memoized_property
4954
def _table(self):
5055
return Table(self.table_name, connection=self._connection)
56+
57+
@memoized_property
58+
def _latest_table(self):
59+
dynamodb = boto3.resource('dynamodb')
60+
return dynamodb.Table(self.latest_table_name, connection=self._connection)
5161

5262
def store(self, record):
5363
try:
54-
primary_key = {
55-
'time_index_key': record['time_index_key'],
56-
'range_key': record['range_key']
57-
}
58-
existing_record = self._table.get_item(**primary_key)
59-
60-
if existing_record and \
61-
existing_record['metadata']['start'] < record['metadata']['start']:
62-
self._table.put_item(data=record, overwrite=True)
63-
else:
64-
print(f'Existing record in latest table has later or same start time. No update performed')
65-
66-
except ItemNotFound:
67-
# Item doesn't exist, lets insert
6864
self._table.put_item(data=record)
6965
except ConditionalCheckFailedException:
7066
# Tolerate duplicate stores
7167
pass
7268

7369
def update(self, record):
7470
self._table.put_item(data=record, overwrite=True)
71+
72+
def store_latest(self, record):
73+
item_attrs = {'time_index_key': record['time_index_key'],
74+
'range_key': record['range_key']}
75+
condition = (Attr('metadata.start').lt(record['metadata']['start']))
76+
try:
77+
self._latest_table.put_item(item_attrs,
78+
condition)
79+
except ConditionalCheckFailedException:
80+
pass
81+

ingester/tests/conftest.py

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,31 @@
1414
import boto.sns
1515
import boto.sqs
1616

17+
18+
import boto3
19+
from boto3 import client
20+
from botocore.exceptions import ClientError
21+
1722
from datalake.tests import * # noqa
1823

1924
from datalake_ingester import SQSQueue
2025

26+
import logging
27+
logging.basicConfig(level=logging.DEBUG)
2128

2229
@pytest.fixture
2330
def dynamodb_connection(aws_connector):
2431
return aws_connector(mock_dynamodb2,
2532
lambda: boto.dynamodb2.connect_to_region('us-west-1'))
2633

2734

35+
@pytest.fixture
36+
def dynamodb_latest_connection(aws_connector):
37+
return aws_connector(mock_dynamodb2,
38+
lambda: boto3.resource('dynamodb',
39+
region_name='us-west-1'))
40+
41+
2842
def _delete_table_if_exists(conn, name):
2943
try:
3044
table = Table(name, connection=conn)
@@ -33,6 +47,16 @@ def _delete_table_if_exists(conn, name):
3347
if e.status == 400 and e.error_code == 'ResourceNotFoundException':
3448
return
3549
raise e
50+
51+
def _delete_latest_if_exists(dynamodb, name):
52+
try:
53+
table = dynamodb.Table(name)
54+
table.delete()
55+
table.wait_until_not_exists()
56+
except ClientError as e:
57+
if e.response['Error']['Code'] == 'ResourceNotFoundException':
58+
return
59+
raise e
3660

3761

3862
@pytest.fixture
@@ -55,6 +79,32 @@ def tear_down():
5579
return table_maker
5680

5781

82+
@pytest.fixture
83+
def dynamodb_latest_table_maker(request, dynamodb_latest_connection):
84+
85+
def table_maker(name, key_schema, attributes):
86+
_delete_latest_if_exists(dynamodb_latest_connection, name)
87+
table = dynamodb_latest_connection.create_table(
88+
TableName=name,
89+
KeySchema=key_schema,
90+
AttributeDefinitions=attributes,
91+
ProvisionedThroughput={
92+
'ReadCapacityUnits': 5,
93+
'WriteCapacityUnits': 5
94+
}
95+
)
96+
table.wait_until_exists()
97+
98+
def tear_down():
99+
table.delete()
100+
table.wait_until_not_exists()
101+
102+
request.addfinalizer(tear_down)
103+
return table
104+
105+
return table_maker
106+
107+
58108
@pytest.fixture
59109
def dynamodb_users_table(dynamodb_table_maker):
60110
schema = [HashKey('name'), RangeKey('last_name')]
@@ -68,9 +118,16 @@ def dynamodb_records_table(dynamodb_table_maker):
68118

69119

70120
@pytest.fixture
71-
def dynamodb_latest_table(dynamodb_table_maker):
72-
schema = [HashKey('time_index_key'), RangeKey('range_key')]
73-
return dynamodb_table_maker('latest', schema)
121+
def dynamodb_latest_table(dynamodb_latest_table_maker):
122+
schema = [
123+
{'AttributeName': 'time_index_key', 'KeyType': 'HASH'},
124+
{'AttributeName': 'range_key', 'KeyType': 'RANGE'}
125+
]
126+
attributes = [
127+
{'AttributeName': 'time_index_key', 'AttributeType': 'S'},
128+
{'AttributeName': 'range_key', 'AttributeType': 'S'}
129+
]
130+
return dynamodb_latest_table_maker('latest', schema, attributes)
74131

75132

76133
@pytest.fixture

ingester/tests/test_storage.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
from datalake_ingester import DynamoDBStorage
22
from decimal import Decimal
3+
import boto3
34

45
def test_insert_new_record(dynamodb_latest_table, dynamodb_connection):
6+
boto3.setup_default_session(fake_credentials=True)
57
storage = DynamoDBStorage('latest', connection=dynamodb_connection)
68
new_record = {
79
'time_index_key': '15225:newlog',
@@ -13,7 +15,7 @@ def test_insert_new_record(dynamodb_latest_table, dynamodb_connection):
1315
'create_time': 1500000000000
1416
}
1517

16-
storage.store(new_record)
18+
storage.store_latest(new_record)
1719

1820
stored_record = dynamodb_latest_table.get_item(
1921
time_index_key='15225:newlog',
@@ -64,9 +66,9 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna
6466
'size': 1048576
6567
}
6668

67-
storage.store(file3)
68-
storage.store(file1)
69-
storage.store(file2) # same what:where, but should replace file1 b/c newer
69+
storage.store_latest(file3)
70+
storage.store_latest(file1)
71+
storage.store_latest(file2) # same what:where, but should replace file1 b/c newer
7072

7173
records = [dict(i) for i in dynamodb_latest_table.scan()]
7274

@@ -99,8 +101,8 @@ def test_concurrent_updates(dynamodb_latest_table, dynamodb_connection):
99101
updated_record2['metadata']['start'] += 5
100102

101103

102-
storage.store(updated_record1)
103-
storage.store(updated_record2)
104+
storage.store_latest(updated_record1)
105+
storage.store_latest(updated_record2)
104106

105107
stored_record = dynamodb_latest_table.get_item(
106108
time_index_key='15219:zlcdzvawsp',

0 commit comments

Comments
 (0)