@@ -2,8 +2,14 @@ import { DynamoDBStreamHandler } from "aws-lambda";
2
2
import { unmarshall } from "@aws-sdk/util-dynamodb" ;
3
3
import { AttributeValue , DynamoDBClient } from "@aws-sdk/client-dynamodb" ;
4
4
import { DynamoDBDocumentClient , UpdateCommand } from "@aws-sdk/lib-dynamodb" ;
5
+ import {
6
+ SchedulerClient ,
7
+ CreateScheduleCommand ,
8
+ } from "@aws-sdk/client-scheduler" ;
5
9
6
10
const ENV_VARIABLE_REVANT_COST_TABLE_NAME = "REVANT_COST_TABLE_NAME" ;
11
+ const ENV_VARIABLE_REVANT_COST_LIMIT_PREFIX = "REVANT_COST_LIMIT" ;
12
+ const ENV_VARIABLE_REVANT_SCHEDULE_ROLE_ARN = "REVANT_SCHEDULE_ROLE_ARN" ;
7
13
8
14
const DYNAMODB_ACCRUED_EXPENSES_ATTRIBUTE_NAME = "accruedExpenses" ;
9
15
const DYNAMODB_INCURRED_EXPENSES_RATE_ATTRIBUTE_NAME = "incurredExpensesRate" ;
@@ -23,6 +29,7 @@ type BudgetUpdateOperation = {
23
29
24
30
const dynamoDBClient = new DynamoDBClient ( { } ) ;
25
31
const dynamoDBDocumentClient = DynamoDBDocumentClient . from ( dynamoDBClient ) ;
32
+ const schedulerClient = new SchedulerClient ( { } ) ;
26
33
27
34
const isBudgetUpdateOperation = ( {
28
35
oldBudget,
@@ -41,6 +48,25 @@ const calculateNewAccruedExpenses = ({
41
48
1000
42
49
) * oldBudget [ DYNAMODB_INCURRED_EXPENSES_RATE_ATTRIBUTE_NAME ] ;
43
50
51
+ const calculateBudgetReachedEstimatedDate = ( {
52
+ accruedExpenses,
53
+ incurredExpensesRate,
54
+ updatedAt,
55
+ budget,
56
+ } : {
57
+ accruedExpenses : number ;
58
+ incurredExpensesRate : number ;
59
+ updatedAt : Date ;
60
+ budget : number ;
61
+ } ) : Date => {
62
+ const budgetReachedDate = new Date ( updatedAt ) ;
63
+ budgetReachedDate . setSeconds (
64
+ budgetReachedDate . getSeconds ( ) +
65
+ ( budget - accruedExpenses ) / incurredExpensesRate
66
+ ) ;
67
+ return budgetReachedDate ;
68
+ } ;
69
+
44
70
export const handler : DynamoDBStreamHandler = async ( { Records } ) => {
45
71
console . log ( `${ Records . length } records received` ) ;
46
72
const budgetUpdatesOperations = Records . map ( ( record ) => ( {
@@ -56,12 +82,21 @@ export const handler: DynamoDBStreamHandler = async ({ Records }) => {
56
82
`${ budgetUpdatesOperations . length } budget update operations received`
57
83
) ;
58
84
85
+ const budgets = Object . fromEntries (
86
+ Object . entries ( process . env )
87
+ . filter ( ( [ key ] ) => key . startsWith ( ENV_VARIABLE_REVANT_COST_LIMIT_PREFIX ) )
88
+ . map ( ( [ key , value ] ) => [
89
+ key . slice ( ENV_VARIABLE_REVANT_COST_LIMIT_PREFIX . length + 1 ) ,
90
+ Number ( value ) ,
91
+ ] )
92
+ ) ;
93
+
59
94
const failedUpdateIds : { itemIdentifier : string } [ ] = [ ] ;
60
95
await Promise . all (
61
96
budgetUpdatesOperations . map (
62
97
async ( { itemIdentifier, oldBudget, newBudget } ) => {
63
98
try {
64
- await dynamoDBDocumentClient . send (
99
+ const { Attributes } = await dynamoDBDocumentClient . send (
65
100
new UpdateCommand ( {
66
101
TableName : process . env [ ENV_VARIABLE_REVANT_COST_TABLE_NAME ] ,
67
102
Key : { PK : oldBudget . PK } ,
@@ -75,8 +110,38 @@ export const handler: DynamoDBStreamHandler = async ({ Records }) => {
75
110
newBudget,
76
111
} ) ,
77
112
} ,
113
+ ReturnValues : "ALL_NEW" ,
78
114
} )
79
115
) ;
116
+ if ( Attributes === undefined ) {
117
+ console . error ( "Did not get any updated budget from DynamDB" ) ;
118
+ return ;
119
+ }
120
+
121
+ const address = oldBudget . PK . split ( "#" ) [ 1 ] ;
122
+ const budget = Number ( budgets [ address ) ;
123
+ const budgetReachedDate = calculateBudgetReachedEstimatedDate ( {
124
+ accruedExpenses : Attributes [
125
+ DYNAMODB_ACCRUED_EXPENSES_ATTRIBUTE_NAME
126
+ ] as number ,
127
+ incurredExpensesRate : Attributes [
128
+ DYNAMODB_INCURRED_EXPENSES_RATE_ATTRIBUTE_NAME
129
+ ] as number ,
130
+ updatedAt : new Date (
131
+ Attributes [ DYNAMODB_LAST_UPDATE_ATTRIBUTE_NAME ]
132
+ ) ,
133
+ budget,
134
+ } ) ;
135
+ await schedulerClient . send ( new CreateScheduleCommand ( {
136
+ Name : address ,
137
+ ScheduleExpression : `at(${ budgetReachedDate . toISOString ( ) . split ( '.' ) [ 0 ] } )` ,
138
+ Target : {
139
+ RoleArn : process . env [ ENV_VARIABLE_REVANT_SCHEDULE_ROLE_ARN ]
140
+ } ,
141
+ FlexibleTimeWindow : {
142
+ Mode : "OFF"
143
+ }
144
+ } ) ) ;
80
145
} catch ( error ) {
81
146
failedUpdateIds . push ( { itemIdentifier } ) ;
82
147
}
0 commit comments