@@ -192,63 +192,103 @@ func (ad *Service) GetAsyncDownloadTaskStatus(kt *kit.Kit, bizID uint32, taskID
192192 return task .Status , nil
193193}
194194
195+ // nolint
195196func (ad * Service ) upsertAsyncDownloadJob (kt * kit.Kit , bizID , appID uint32 , filePath , fileName ,
196197 targetAgentID , targetContainerID , targetUser , targetDir , signature string ) (string , error ) {
197- fileKey := fmt .Sprintf ("AsyncDownloadJob:%d:%d:%s:*" , bizID , appID , path .Join (filePath , fileName ))
198- // lock by file to prevent concurrency writing in other requests
198+ startTime := time .Now ()
199+ rid := kt .Rid
200+ fullPath := path .Join (filePath , fileName )
201+ fileKey := fmt .Sprintf ("AsyncDownloadJob:%d:%d:%s:*" , bizID , appID , fullPath )
202+
203+ // 获取外层锁(按文件路径)- 关键:记录等待锁的时间
204+ outerLockStart := time .Now ()
199205 ad .redLock .Acquire (fileKey )
206+ outerLockWaitDuration := time .Since (outerLockStart )
207+ if outerLockWaitDuration > 100 * time .Millisecond {
208+ logs .Warnf ("[upsertAsyncDownloadJob] outer lock wait time long, rid: %s, biz_id: %d, file: %s, wait_duration: %v" ,
209+ rid , bizID , fullPath , outerLockWaitDuration )
210+ }
200211 defer ad .redLock .Release (fileKey )
212+
213+ // 查找现有 job
214+ keysStart := time .Now ()
201215 keys , err := ad .cs .Redis ().Keys (kt .Ctx , fileKey )
216+ keysDuration := time .Since (keysStart )
202217 if err != nil {
218+ logs .Errorf ("[upsertAsyncDownloadJob] redis keys failed, rid: %s, biz_id: %d, file: %s, duration: %v, err: %v" ,
219+ rid , bizID , fullPath , keysDuration , err )
203220 return "" , err
204221 }
222+ if keysDuration > 100 * time .Millisecond {
223+ logs .Warnf ("[upsertAsyncDownloadJob] redis keys slow, rid: %s, biz_id: %d, file: %s, found_keys: %d, duration: %v" ,
224+ rid , bizID , fullPath , len (keys ), keysDuration )
225+ }
226+
205227 for _ , key := range keys {
206228 if jobID , ok := func () (string , bool ) {
207- // lock by job to prevent concurrency writing in job manager
229+ // 获取内层锁(按 job ID)- 关键:记录等待锁的时间
230+ innerLockStart := time .Now ()
208231 ad .redLock .Acquire (key )
232+ innerLockWaitDuration := time .Since (innerLockStart )
233+ if innerLockWaitDuration > 100 * time .Millisecond {
234+ logs .Warnf ("[upsertAsyncDownloadJob] inner lock wait time long, rid: %s, biz_id: %d, job_key: %s, wait_duration: %v" ,
235+ rid , bizID , key , innerLockWaitDuration )
236+ }
209237 defer ad .redLock .Release (key )
238+
210239 data , e := ad .cs .Redis ().Get (kt .Ctx , key )
211240 if e != nil {
212- logs .Errorf ("get key %s from redis failed, err %s" , key , e .Error ())
241+ logs .Errorf ("[upsertAsyncDownloadJob] redis get failed, rid: %s, biz_id: %d, job_key: %s, err: %v" ,
242+ rid , bizID , key , e )
213243 return "" , false
214244 }
215245 if data == "" {
216246 return "" , false
217247 }
218248 job := & types.AsyncDownloadJob {}
219249 if e := jsoni .UnmarshalFromString (data , & job ); e != nil {
220- logs .Errorf ("unmarshal job %s failed, err %s" , key , e .Error ())
250+ logs .Errorf ("[upsertAsyncDownloadJob] unmarshal job failed, rid: %s, biz_id: %d, job_key: %s, err: %v" ,
251+ rid , bizID , key , e )
221252 return "" , false
222253 }
223254 if job .Status == types .AsyncDownloadJobStatusPending {
224255 // pending job exists, update it
256+ updateStart := time .Now ()
225257 job .Targets = append (job .Targets , & types.AsyncDownloadTarget {
226258 AgentID : targetAgentID ,
227259 ContainerID : targetContainerID ,
228260 })
229261 js , e := jsoni .Marshal (job )
230262 if e != nil {
231- logs .Errorf ("marshal job %s failed, err %s" , key , e .Error ())
263+ logs .Errorf ("[upsertAsyncDownloadJob] marshal job failed, rid: %s, biz_id: %d, job_key: %s, err: %v" ,
264+ rid , bizID , key , e )
232265 return "" , false
233266 }
234267 if e := ad .cs .Redis ().Set (kt .Ctx , key , string (js ), 30 * 60 ); e != nil {
235- logs .Errorf ("set job %s to redis failed, err %s" , key , e .Error ())
268+ logs .Errorf ("[upsertAsyncDownloadJob] redis set failed, rid: %s, biz_id: %d, job_key: %s, err: %v" ,
269+ rid , bizID , key , e )
236270 return "" , false
237271 }
272+ updateDuration := time .Since (updateStart )
273+ totalDuration := time .Since (startTime )
274+ logs .Infof ("[upsertAsyncDownloadJob] update existing job, rid: %s, biz_id: %d, job_id: %s, targets_count: %d, update_duration: %v, total_duration: %v" ,
275+ rid , bizID , key , len (job .Targets ), updateDuration , totalDuration )
238276 return key , true
239277 }
240278 return "" , false
241279
242280 }(); ok {
243- logs .Infof ("async download, job %s exists, update it" , jobID )
281+ logs .Infof ("[upsertAsyncDownloadJob] update existing job, rid: %s, biz_id: %d, job_id: %s" ,
282+ rid , bizID , jobID )
244283 return jobID , nil
245284 }
246285 }
247286
248287 // no pendeing job exists, create a new one
249288 // it's not possible to create two same job in same time, so use time stamp as unique id would be friendly.
289+ createStart := time .Now ()
250290 jobID := fmt .Sprintf ("AsyncDownloadJob:%d:%d:%s:%s" , bizID , appID ,
251- path . Join ( filePath , fileName ) , time .Now ().Format ("20060102150405" ))
291+ fullPath , time .Now ().Format ("20060102150405" ))
252292 job := & types.AsyncDownloadJob {
253293 JobID : jobID ,
254294 BizID : bizID ,
@@ -273,15 +313,28 @@ func (ad *Service) upsertAsyncDownloadJob(kt *kit.Kit, bizID, appID uint32, file
273313 }
274314 js , err := jsoni .Marshal (job )
275315 if err != nil {
316+ logs .Errorf ("[upsertAsyncDownloadJob] marshal new job failed, rid: %s, biz_id: %d, job_id: %s, err: %v" ,
317+ rid , bizID , jobID , err )
276318 return "" , err
277319 }
278320
279321 ad .metric .jobCounter .With (prm.Labels {"biz" : strconv .Itoa (int (job .BizID )),
280322 "app" : strconv .Itoa (int (job .AppID )), "file" : path .Join (job .FilePath , job .FileName ),
281323 "targets" : strconv .Itoa (len (job .Targets )), "status" : job .Status }).Inc ()
282324
283- logs .Infof ("async download, create new job %s" , jobID )
284- return jobID , ad .cs .Redis ().Set (kt .Ctx , jobID , string (js ), 30 * 60 )
325+ err = ad .cs .Redis ().Set (kt .Ctx , jobID , string (js ), 30 * 60 )
326+ if err != nil {
327+ logs .Errorf ("[upsertAsyncDownloadJob] redis set new job failed, rid: %s, biz_id: %d, job_id: %s, err: %v" ,
328+ rid , bizID , jobID , err )
329+ return "" , err
330+ }
331+
332+ createDuration := time .Since (createStart )
333+ totalDuration := time .Since (startTime )
334+ logs .Infof ("[upsertAsyncDownloadJob] create new job, rid: %s, biz_id: %d, job_id: %s, create_duration: %v, total_duration: %v (outer_lock_wait: %v, keys_duration: %v)" ,
335+ rid , bizID , jobID , createDuration , totalDuration , outerLockWaitDuration , keysDuration )
336+
337+ return jobID , nil
285338}
286339
287340func (ad * Service ) upsertAsyncDownloadTask (ctx context.Context , taskID string ,
0 commit comments