1+ import os
2+ import django
3+ import sys
4+ import re
5+ sys .path .append (os .path .dirname (os .path .dirname (os .path .abspath (__file__ ))))
6+ os .environ .setdefault ('DJANGO_SETTINGS_MODULE' , 'file_invalidation_server.settings' )
7+ django .setup ()
8+
9+ from kubernetes import client , config
10+ from fi_manager .models import FileInvalidationRequests
11+ import logging
12+
13+ logging .basicConfig (level = logging .INFO ,format = '(%(asctime)s) [%(name)s] %(levelname)s: %(message)s' )
14+ logger = logging .getLogger (__name__ )
15+
16+ def fetch_and_process ():
17+ config .load_incluster_config ()
18+ batch_v1 = client .BatchV1Api ()
19+ core_v1 = client .CoreV1Api ()
20+
21+ namespace = 'file-invalidation-tool'
22+ jobs = batch_v1 .list_namespaced_job (namespace = namespace )
23+
24+ for job in jobs .items :
25+ job_name = job .metadata .name
26+ if not job .status .conditions :
27+ continue
28+
29+ if job .kind == "CronJob" or 'jobs-log-processor' in job_name :
30+ continue
31+
32+ condition_types = {cond .type : cond .status for cond in job .status .conditions }
33+ if condition_types .get ("Failed" ) == "True" :
34+ logger .warning (f"Job { job_name } failed" )
35+ elif condition_types .get ("Complete" ) == "True" :
36+ # Get pods created by this job
37+ pods = core_v1 .list_namespaced_pod (
38+ namespace = namespace ,
39+ label_selector = f"job-name={ job_name } "
40+ )
41+
42+ # Use most recent pod
43+ try :
44+ latest_pod = sorted (
45+ pods .items ,
46+ key = lambda pod : pod .status .start_time or pod .metadata .creation_timestamp ,
47+ reverse = True
48+ )[0 ]
49+ except IndexError as e :
50+ logger .error (f"There are no pods under the { job_name } job name." )
51+ continue
52+
53+
54+ pod_name = latest_pod .metadata .name
55+ logger .info (f"Pod name: { pod_name } " )
56+ logs = core_v1 .read_namespaced_pod_log (pod_name , namespace = namespace )
57+ try :
58+ rucio_invalidated_dids , dbs_invalidated_dids , dry_run = parse_job_logs (logs )
59+ logger .info (logs )
60+ logger .info (f"Job { job_name } has invalidated { len (rucio_invalidated_dids )} dids on Rucio and { len (dbs_invalidated_dids )} dids on DBS." )
61+ logger .info (f"Job { job_name } has invalidated the following DIDs on Rucio: { rucio_invalidated_dids } " )
62+ logger .info (f"Job { job_name } has invalidated the following DIDs on DBS: { dbs_invalidated_dids } " )
63+ if (len (rucio_invalidated_dids )> 0 ) or (len (dbs_invalidated_dids )> 0 ):
64+ update_database (job_name , rucio_invalidated_dids , dbs_invalidated_dids , dry_run )
65+ logger .info (f"Job { job_name } has completed and the DIDs have updated." )
66+ else :
67+ raise Exception (f"Job { job_name } did not invalidate any DIDs on Rucio or DBS." )
68+ except Exception as e :
69+ logger .error (f"Job { pod_name } has failed with error: { e } " )
70+ update_database_for_failed_job (pod_name ,f'Job { pod_name } has failed with error: { str (e )} \n { logs } ' )
71+
72+
73+
74+ delete_opts = client .V1DeleteOptions (propagation_policy = 'Foreground' )
75+
76+ #batch_v1.delete_namespaced_job(
77+ # name=job_name,
78+ # namespace=namespace,
79+ # body=delete_opts)
80+ logger .info (f"Job { job_name } would be deleted but is being kept for dev purposes." )
81+
82+ def parse_job_logs (logs : str ):
83+ if "Error running shell script" in logs :
84+ raise Exception (f"Job has failed with error: Error running shell script" )
85+ dry_run = 'Would declare file' in logs
86+
87+ if dry_run :
88+ rucio_invalidated_files = re .findall (pattern = '(?:Would declare file) (\/[\w\/\-]+.root) as bad at' ,
89+ string = logs )
90+
91+ dbs_invalidated_files = re .findall (pattern = '(?:Would invalidate file on DBS:) (\/[\w\/\-]+.root)\s' ,
92+ string = logs )
93+ else :
94+ rucio_invalidated_files = re .findall (pattern = '(?:Declared file) (\/[\w\/\-]+.root) as bad at' ,
95+ string = logs )
96+
97+ dbs_invalidated_files = re .findall (pattern = '(?:Invalidation OK for file:) (\/[\w\/\-]+.root)\s' ,
98+ string = logs )
99+
100+ dbs_invalidated_dataset = re .findall (pattern = '(?:Invalidation OK for dataset:) (\/[\w\/\-]+.root)\s' ,
101+ string = logs )
102+
103+ if dbs_invalidated_dataset :
104+ # Assumes that for datasets, DBS dataset invalidation implies Rucio file invalidation
105+ dbs_invalidated_files = dbs_invalidated_files .append (dbs_invalidated_dataset )
106+ rucio_invalidated_files = rucio_invalidated_files .append (dbs_invalidated_dataset )
107+
108+ return rucio_invalidated_files , dbs_invalidated_files , dry_run
109+
110+ def update_database (job_name , rucio_list , dbs_list , dry_run ):
111+ globally_invalidated_dids = set (rucio_list ) & set (dbs_list )
112+ job_id = re .findall (pattern = 'file-invalidation-job-(\w{8})' ,string = job_name )[0 ]
113+
114+ job_files = FileInvalidationRequests .objects .filter (job_id = job_id )
115+
116+ only_rucio_invalidated = FileInvalidationRequests .objects .filter (job_id = job_id ,file_name__in = rucio_list )
117+ only_rucio_invalidated .update (status = 'success' ,mode = 'rucio_only' ,dry_run = dry_run )
118+
119+ only_dbs_invalidated = FileInvalidationRequests .objects .filter (job_id = job_id ,file_name__in = dbs_list )
120+ only_dbs_invalidated .update (status = 'success' ,mode = 'dbs_only' ,dry_run = dry_run )
121+
122+ global_invalidated = FileInvalidationRequests .objects .filter (job_id = job_id ,file_name__in = globally_invalidated_dids )
123+ global_invalidated .update (status = 'success' ,mode = 'global' ,dry_run = dry_run )
124+
125+ def update_database_for_failed_job (job_name ,logs ):
126+ job_id = re .findall (pattern = 'file-invalidation-job-(\w{8})-\w' ,string = job_name )[0 ]
127+ failed_invalidation = FileInvalidationRequests .objects .filter (job_id = job_id )
128+ failed_invalidation .update (status = 'failed' ,logs = logs )
129+
130+ if __name__ == "__main__" :
131+ fetch_and_process ()
0 commit comments