12
12
bucket = "BUCKET_NAME"
13
13
14
14
15
- def killdeadAlarms (fleetId , monitorapp , project ):
15
+ def killdeadAlarms (fleetId , project ):
16
16
checkdates = [
17
17
datetime .datetime .now ().strftime ("%Y-%m-%d" ),
18
18
(datetime .datetime .now () - datetime .timedelta (days = 1 )).strftime ("%Y-%m-%d" ),
@@ -27,7 +27,12 @@ def killdeadAlarms(fleetId, monitorapp, project):
27
27
if eachevent ["EventInformation" ]["EventSubType" ] == "terminated" :
28
28
todel .append (eachevent ["EventInformation" ]["InstanceId" ])
29
29
todel = [f"{ project } _{ x } " for x in todel ]
30
- cloudwatch .delete_alarms (AlarmNames = todel )
30
+ while len (todel ) > 100 :
31
+ dellist = todel [:100 ]
32
+ cloudwatch .delete_alarms (AlarmNames = dellist )
33
+ todel = todel [100 :]
34
+ if len (todel ) <= 100 :
35
+ cloudwatch .delete_alarms (AlarmNames = todel )
31
36
print ("Old alarms deleted" )
32
37
33
38
@@ -41,8 +46,18 @@ def seeIfLogExportIsDone(logExportId):
41
46
time .sleep (30 )
42
47
43
48
44
- def downscaleSpotFleet (queue , spotFleetID ):
45
- response = sqs .get_queue_url (QueueName = queue )
49
+ def downscaleSpotFleet (nonvisible , spotFleetID ):
50
+ status = ec2 .describe_spot_fleet_instances (SpotFleetRequestId = spotFleetID )
51
+ if nonvisible < len (status ["ActiveInstances" ]):
52
+ ec2 .modify_spot_fleet_request (
53
+ ExcessCapacityTerminationPolicy = "noTermination" ,
54
+ TargetCapacity = str (nonvisible ),
55
+ SpotFleetRequestId = spotFleetID ,
56
+ )
57
+
58
+
59
+ def check_sqs_queue (queueName ):
60
+ response = sqs .get_queue_url (QueueName = queueName )
46
61
queueUrl = response ["QueueUrl" ]
47
62
response = sqs .get_queue_attributes (
48
63
QueueUrl = queueUrl ,
@@ -53,25 +68,22 @@ def downscaleSpotFleet(queue, spotFleetID):
53
68
)
54
69
visible = int (response ["Attributes" ]["ApproximateNumberOfMessages" ])
55
70
nonvisible = int (response ["Attributes" ]["ApproximateNumberOfMessagesNotVisible" ])
56
- status = ec2 .describe_spot_fleet_instances (SpotFleetRequestId = spotFleetID )
57
- if nonvisible < len (status ["ActiveInstances" ]):
58
- result = ec2 .modify_spot_fleet_request (
59
- ExcessCapacityTerminationPolicy = "noTermination" ,
60
- TargetCapacity = str (nonvisible ),
61
- SpotFleetRequestId = spotFleetID ,
62
- )
71
+ print (
72
+ f"Found { visible } visible messages and { nonvisible } nonvisible messages in queue."
73
+ )
74
+ return visible , nonvisible
63
75
64
76
65
77
def lambda_handler (event , lambda_context ):
66
78
# Triggered any time SQS queue ApproximateNumberOfMessagesVisible = 0
67
79
# OR ApproximateNumberOfMessagesNotVisible = 0
68
80
messagestring = event ["Records" ][0 ]["Sns" ]["Message" ]
69
81
messagedict = json .loads (messagestring )
70
- queueId = messagedict ["Trigger" ]["Dimensions" ][0 ]["value" ]
71
- project = queueId .rsplit ("_" , 1 )[0 ]
82
+ queueName = messagedict ["Trigger" ]["Dimensions" ][0 ]["value" ]
83
+ project = queueName .rsplit ("_" , 1 )[0 ]
72
84
73
85
# Download monitor file
74
- monitor_file_name = f"{ queueId .split ('Queue' )[0 ]} SpotFleetRequestId.json"
86
+ monitor_file_name = f"{ queueName .split ('Queue' )[0 ]} SpotFleetRequestId.json"
75
87
monitor_local_name = f"/tmp/{ monitor_file_name } "
76
88
monitor_on_bucket_name = f"monitors/{ monitor_file_name } "
77
89
@@ -88,18 +100,19 @@ def lambda_handler(event, lambda_context):
88
100
monitorapp = monitorInfo ["MONITOR_APP_NAME" ]
89
101
fleetId = monitorInfo ["MONITOR_FLEET_ID" ]
90
102
loggroupId = monitorInfo ["MONITOR_LOG_GROUP_NAME" ]
91
- starttime = monitorInfo ["MONITOR_START_TIME" ]
92
103
CLEAN_DASHBOARD = monitorInfo ["CLEAN_DASHBOARD" ]
93
104
print (f"Monitor triggered for { monitorcluster } { monitorapp } { fleetId } { loggroupId } " )
94
105
106
+ visible , nonvisible = check_sqs_queue (queueName )
107
+
95
108
# If no visible messages, downscale machines
96
- if "ApproximateNumberOfMessagesVisible" in event [ "Records" ][ 0 ][ "Sns" ][ "Message" ] :
109
+ if visible == 0 and nonvisible > 0 :
97
110
print ("No visible messages. Tidying as we go." )
98
- killdeadAlarms (fleetId , monitorapp , project )
99
- downscaleSpotFleet (queueId , fleetId )
111
+ killdeadAlarms (fleetId , project )
112
+ downscaleSpotFleet (nonvisible , fleetId )
100
113
101
114
# If no messages in progress, cleanup
102
- if "ApproximateNumberOfMessagesNotVisible" in event [ "Records" ][ 0 ][ "Sns" ][ "Message" ] :
115
+ if visible == 0 and nonvisible == 0 :
103
116
print ("No messages in progress. Cleaning up." )
104
117
ecs .update_service (
105
118
cluster = monitorcluster ,
@@ -115,7 +128,12 @@ def lambda_handler(event, lambda_context):
115
128
active_instances = []
116
129
for instance in active_dictionary ["ActiveInstances" ]:
117
130
active_instances .append (instance ["InstanceId" ])
118
- cloudwatch .delete_alarms (AlarmNames = active_instances )
131
+ while len (active_instances ) > 100 :
132
+ dellist = active_instances [:100 ]
133
+ cloudwatch .delete_alarms (AlarmNames = dellist )
134
+ active_instances = active_instances [100 :]
135
+ if len (active_instances ) <= 100 :
136
+ cloudwatch .delete_alarms (AlarmNames = active_instances )
119
137
killdeadAlarms (fleetId , monitorapp , project )
120
138
121
139
# Read spot fleet id and terminate all EC2 instances
@@ -129,7 +147,7 @@ def lambda_handler(event, lambda_context):
129
147
ECS_SERVICE_NAME = monitorapp + "Service"
130
148
131
149
print ("Deleting existing queue." )
132
- queueoutput = sqs .list_queues (QueueNamePrefix = queueId )
150
+ queueoutput = sqs .list_queues (QueueNamePrefix = queueName )
133
151
try :
134
152
if len (queueoutput ["QueueUrls" ]) == 1 :
135
153
queueUrl = queueoutput ["QueueUrls" ][0 ]
@@ -148,7 +166,7 @@ def lambda_handler(event, lambda_context):
148
166
print ("Couldn't delete service." )
149
167
150
168
print ("De-registering task" )
151
- taskArns = ecs .list_task_definitions ()
169
+ taskArns = ecs .list_task_definitions (familyPrefix = ECS_TASK_NAME )
152
170
for eachtask in taskArns ["taskDefinitionArns" ]:
153
171
fulltaskname = eachtask .split ("/" )[- 1 ]
154
172
ecs .deregister_task_definition (taskDefinition = fulltaskname )
@@ -185,3 +203,6 @@ def lambda_handler(event, lambda_context):
185
203
cloudwatch .delete_dashboards (
186
204
DashboardNames = [entry ["DashboardName" ]]
187
205
)
206
+
207
+ # Delete monitor file
208
+ s3 .delete_object (Bucket = bucket , Key = monitor_on_bucket_name )
0 commit comments