@@ -30,6 +30,7 @@ import (
30
30
"text/template"
31
31
"time"
32
32
33
+ "github.com/google/uuid"
33
34
"go.uber.org/zap"
34
35
"google.golang.org/grpc"
35
36
@@ -329,11 +330,11 @@ func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eve
329
330
firstRead = false
330
331
}
331
332
var (
332
- handledEvents = make ([]* pipedservice.ReportEventStatusesRequest_Event , 0 , len ( eventCfgs ) )
333
- outDatedEvents = make ([] * pipedservice. ReportEventStatusesRequest_Event , 0 )
334
- maxTimestamp int64
335
- outDatedDuration = time . Hour
336
- gitUpdateEvent = false
333
+ outDatedEvents = make ([]* pipedservice.ReportEventStatusesRequest_Event , 0 )
334
+ maxTimestamp int64
335
+ outDatedDuration = time . Hour
336
+ gitUpdateEvent = false
337
+ branchHandledEvents = make ( map [ string ][] * pipedservice. ReportEventStatusesRequest_Event , len ( eventCfgs ))
337
338
)
338
339
for _ , e := range eventCfgs {
339
340
for _ , cfg := range e .Configs {
@@ -383,23 +384,25 @@ func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eve
383
384
})
384
385
continue
385
386
}
386
-
387
387
switch handler .Type {
388
388
case config .EventWatcherHandlerTypeGitUpdate :
389
- if err := w .commitFiles (ctx , latestEvent .Data , matcher .Name , handler .Config .CommitMessage , e .GitPath , handler .Config .Replacements , tmpRepo ); err != nil {
389
+ branchName , err := w .commitFiles (ctx , latestEvent .Data , matcher .Name , handler .Config .CommitMessage , e .GitPath , handler .Config .Replacements , tmpRepo , handler .Config .MakePullRequest )
390
+ if err != nil {
390
391
w .logger .Error ("failed to commit outdated files" , zap .Error (err ))
391
- handledEvents = append ( handledEvents , & pipedservice.ReportEventStatusesRequest_Event {
392
+ handledEvent := & pipedservice.ReportEventStatusesRequest_Event {
392
393
Id : latestEvent .Id ,
393
394
Status : model .EventStatus_EVENT_FAILURE ,
394
395
StatusDescription : fmt .Sprintf ("Failed to change files: %v" , err ),
395
- })
396
+ }
397
+ branchHandledEvents [branchName ] = append (branchHandledEvents [branchName ], handledEvent )
396
398
continue
397
399
}
398
- handledEvents = append ( handledEvents , & pipedservice.ReportEventStatusesRequest_Event {
400
+ handledEvent := & pipedservice.ReportEventStatusesRequest_Event {
399
401
Id : latestEvent .Id ,
400
402
Status : model .EventStatus_EVENT_SUCCESS ,
401
403
StatusDescription : fmt .Sprintf ("Successfully updated %d files in the %q repository" , len (handler .Config .Replacements ), repoID ),
402
- })
404
+ }
405
+ branchHandledEvents [branchName ] = append (branchHandledEvents [branchName ], handledEvent )
403
406
if latestEvent .CreatedAt > maxTimestamp {
404
407
maxTimestamp = latestEvent .CreatedAt
405
408
}
@@ -419,46 +422,51 @@ func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eve
419
422
}
420
423
w .logger .Info (fmt .Sprintf ("successfully made %d events OUTDATED" , len (outDatedEvents )))
421
424
}
422
- if len (handledEvents ) == 0 {
423
- return nil
424
- }
425
425
426
426
if ! gitUpdateEvent {
427
427
return nil
428
428
}
429
429
430
+ var responseError error
430
431
retry := backoff .NewRetry (retryPushNum , backoff .NewConstant (retryPushInterval ))
431
- _ , err = retry .Do (ctx , func () (interface {}, error ) {
432
- err := tmpRepo .Push (ctx , tmpRepo .GetClonedBranch ())
433
- return nil , err
434
- })
435
- if err == nil {
436
- if _ , err := w .apiClient .ReportEventStatuses (ctx , & pipedservice.ReportEventStatusesRequest {Events : handledEvents }); err != nil {
437
- return fmt .Errorf ("failed to report event statuses: %w" , err )
438
- }
439
- w .executionMilestoneMap .Store (repoID , maxTimestamp )
440
- return nil
441
- }
432
+ for branch , events := range branchHandledEvents {
433
+ _ , err = retry .Do (ctx , func () (interface {}, error ) {
434
+ err := tmpRepo .Push (ctx , branch )
435
+ return nil , err
436
+ })
442
437
443
- // If push fails because the local branch was not fresh, exit to retry again in the next interval.
444
- if err == git .ErrBranchNotFresh {
445
- w .logger .Warn ("failed to push commits" , zap .Error (err ))
446
- return nil
447
- }
438
+ if err == nil {
439
+ if _ , err := w .apiClient .ReportEventStatuses (ctx , & pipedservice.ReportEventStatusesRequest {Events : events }); err != nil {
440
+ w .logger .Error ("failed to report event statuses" , zap .Error (err ))
441
+ }
442
+ w .executionMilestoneMap .Store (repoID , maxTimestamp )
443
+ continue
444
+ }
448
445
449
- // If push fails because of the other reason, re-set all statuses to FAILURE .
450
- for i := range handledEvents {
451
- if handledEvents [ i ]. Status == model . EventStatus_EVENT_FAILURE {
446
+ // If push fails because the local branch was not fresh, exit to retry again in the next interval .
447
+ if err == git . ErrBranchNotFresh {
448
+ w . logger . Warn ( "failed to push commits" , zap . Error ( err ))
452
449
continue
453
450
}
454
- handledEvents [i ].Status = model .EventStatus_EVENT_FAILURE
455
- handledEvents [i ].StatusDescription = fmt .Sprintf ("Failed to push changed files: %v" , err )
451
+
452
+ // If push fails because of the other reason, re-set all statuses to FAILURE.
453
+ for i := range events {
454
+ if events [i ].Status == model .EventStatus_EVENT_FAILURE {
455
+ continue
456
+ }
457
+ events [i ].Status = model .EventStatus_EVENT_FAILURE
458
+ events [i ].StatusDescription = fmt .Sprintf ("Failed to push changed files: %v" , err )
459
+ }
460
+ if _ , err := w .apiClient .ReportEventStatuses (ctx , & pipedservice.ReportEventStatusesRequest {Events : events }); err != nil {
461
+ w .logger .Error ("failed to report event statuses" , zap .Error (err ))
462
+ }
463
+ w .executionMilestoneMap .Store (repoID , maxTimestamp )
464
+ responseError = errors .Join (responseError , err )
456
465
}
457
- if _ , err := w . apiClient . ReportEventStatuses ( ctx , & pipedservice. ReportEventStatusesRequest { Events : handledEvents }); err != nil {
458
- return fmt . Errorf ( "failed to report event statuses: %w" , err )
466
+ if responseError != nil {
467
+ return responseError
459
468
}
460
- w .executionMilestoneMap .Store (repoID , maxTimestamp )
461
- return fmt .Errorf ("failed to push commits: %w" , err )
469
+ return nil
462
470
}
463
471
464
472
// updateValues inspects all Event-definition and pushes the changes to git repo if there is.
@@ -530,7 +538,8 @@ func (w *watcher) updateValues(ctx context.Context, repo git.Repo, repoID string
530
538
})
531
539
continue
532
540
}
533
- if err := w .commitFiles (ctx , latestEvent .Data , e .Name , commitMsg , "" , e .Replacements , tmpRepo ); err != nil {
541
+ _ , err := w .commitFiles (ctx , latestEvent .Data , e .Name , commitMsg , "" , e .Replacements , tmpRepo , false )
542
+ if err != nil {
534
543
w .logger .Error ("failed to commit outdated files" , zap .Error (err ))
535
544
handledEvents = append (handledEvents , & pipedservice.ReportEventStatusesRequest_Event {
536
545
Id : latestEvent .Id ,
@@ -593,7 +602,7 @@ func (w *watcher) updateValues(ctx context.Context, repo git.Repo, repoID string
593
602
}
594
603
595
604
// commitFiles commits changes if the data in Git is different from the latest event.
596
- func (w * watcher ) commitFiles (ctx context.Context , latestData , eventName , commitMsg , gitPath string , replacements []config.EventWatcherReplacement , repo git.Repo ) error {
605
+ func (w * watcher ) commitFiles (ctx context.Context , latestData , eventName , commitMsg , gitPath string , replacements []config.EventWatcherReplacement , repo git.Repo , newBranch bool ) ( string , error ) {
597
606
// Determine files to be changed by comparing with the latest event.
598
607
changes := make (map [string ][]byte , len (replacements ))
599
608
for _ , r := range replacements {
@@ -619,31 +628,32 @@ func (w *watcher) commitFiles(ctx context.Context, latestData, eventName, commit
619
628
newContent , upToDate , err = modifyText (path , r .Regex , latestData )
620
629
}
621
630
if err != nil {
622
- return err
631
+ return "" , err
623
632
}
624
633
if upToDate {
625
634
continue
626
635
}
627
636
628
637
if err := os .WriteFile (path , newContent , os .ModePerm ); err != nil {
629
- return fmt .Errorf ("failed to write file: %w" , err )
638
+ return "" , fmt .Errorf ("failed to write file: %w" , err )
630
639
}
631
640
changes [filePath ] = newContent
632
641
}
633
642
if len (changes ) == 0 {
634
- return nil
643
+ return "" , nil
635
644
}
636
645
637
646
args := argsTemplate {
638
647
Value : latestData ,
639
648
EventName : eventName ,
640
649
}
641
650
commitMsg = parseCommitMsg (commitMsg , args )
642
- if err := repo .CommitChanges (ctx , repo .GetClonedBranch (), commitMsg , false , changes ); err != nil {
643
- return fmt .Errorf ("failed to perform git commit: %w" , err )
651
+ branch := makeBranchName (newBranch , eventName , repo .GetClonedBranch ())
652
+ if err := repo .CommitChanges (ctx , branch , commitMsg , newBranch , changes ); err != nil {
653
+ return "" , fmt .Errorf ("failed to perform git commit: %w" , err )
644
654
}
645
655
w .logger .Info (fmt .Sprintf ("event watcher will update values of Event %q" , eventName ))
646
- return nil
656
+ return branch , nil
647
657
}
648
658
649
659
// modifyYAML returns a new YAML content as a first returned value if the value of given
@@ -777,3 +787,12 @@ func parseCommitMsg(msg string, args argsTemplate) string {
777
787
}
778
788
return buf .String ()
779
789
}
790
+
791
+ // makeBranchName generates a new branch name in the format {eventName}-{uuid} if newBranch is true.
792
+ // If newBranch is false, the function returns the existing branch name.
793
+ func makeBranchName (newBranch bool , eventName , branch string ) string {
794
+ if newBranch {
795
+ return fmt .Sprintf ("%s-%s" , eventName , uuid .New ().String ())
796
+ }
797
+ return branch
798
+ }
0 commit comments