Skip to content

Commit

Permalink
Create a new branch if makePullRequest is true in git config (#4395)
Browse files Browse the repository at this point in the history
* add new branch flag when commit changes in eventwatcher

Signed-off-by: nnnkkk7 <[email protected]>

* add createPullRequest flag to event watcher config instead of git config

Signed-off-by: nnnkkk7 <[email protected]>

* fix test

Signed-off-by: nnnkkk7 <[email protected]>

* push commit if the branch is new in event watcher

Signed-off-by: nnnkkk7 <[email protected]>

* fix docs-v0.43.x

Signed-off-by: nnnkkk7 <[email protected]>

* fix docs

Signed-off-by: nnnkkk7 <[email protected]>

* replace createPullRequest with makePullRequest

Signed-off-by: nnnkkk7 <[email protected]>

* renamed getBranchName to makeBranchName

Signed-off-by: nnnkkk7 <[email protected]>

* remove makePullRequest from EventWatcherEvent

Signed-off-by: nnnkkk7 <[email protected]>

* retry push commits in execute func

Signed-off-by: nnnkkk7 <[email protected]>

* fix how to push new branch

Signed-off-by: nnnkkk7 <[email protected]>

* use map for branchname

Signed-off-by: nnnkkk7 <[email protected]>

* use branchHandledEvents map

Signed-off-by: nnnkkk7 <[email protected]>

* use errors.Join

Signed-off-by: nnnkkk7 <[email protected]>

* remove docs

Signed-off-by: nnnkkk7 <[email protected]>

* remove handledEvents

Signed-off-by: nnnkkk7 <[email protected]>

* avoid using '!'

Signed-off-by: nnnkkk7 <[email protected]>

* rearrange package

Signed-off-by: nnnkkk7 <[email protected]>

---------

Signed-off-by: nnnkkk7 <[email protected]>
  • Loading branch information
nnnkkk7 authored Feb 2, 2024
1 parent 67562cb commit 82540c7
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,7 @@ Note: By default, the sum of traffic is rounded to 100. If both `primary` and `c
| Field | Type | Description | Required |
|-|-|-|-|
| commitMessage | string | The commit message used to push after replacing values. Default message is used if not given. | No |
| makePullRequest | bool | Whether to create a new branch or not when commit changes in event watcher. Default is `false`. | No |
| replacements | [][EventWatcherReplacement](#eventwatcherreplacement) | List of places where will be replaced when the new event matches. | Yes |

## DriftDetection
Expand Down
113 changes: 66 additions & 47 deletions pkg/app/piped/eventwatcher/eventwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"text/template"
"time"

"github.com/google/uuid"
"go.uber.org/zap"
"google.golang.org/grpc"

Expand Down Expand Up @@ -329,11 +330,11 @@ func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eve
firstRead = false
}
var (
handledEvents = make([]*pipedservice.ReportEventStatusesRequest_Event, 0, len(eventCfgs))
outDatedEvents = make([]*pipedservice.ReportEventStatusesRequest_Event, 0)
maxTimestamp int64
outDatedDuration = time.Hour
gitUpdateEvent = false
outDatedEvents = make([]*pipedservice.ReportEventStatusesRequest_Event, 0)
maxTimestamp int64
outDatedDuration = time.Hour
gitUpdateEvent = false
branchHandledEvents = make(map[string][]*pipedservice.ReportEventStatusesRequest_Event, len(eventCfgs))
)
for _, e := range eventCfgs {
for _, cfg := range e.Configs {
Expand Down Expand Up @@ -383,23 +384,25 @@ func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eve
})
continue
}

switch handler.Type {
case config.EventWatcherHandlerTypeGitUpdate:
if err := w.commitFiles(ctx, latestEvent.Data, matcher.Name, handler.Config.CommitMessage, e.GitPath, handler.Config.Replacements, tmpRepo); err != nil {
branchName, err := w.commitFiles(ctx, latestEvent.Data, matcher.Name, handler.Config.CommitMessage, e.GitPath, handler.Config.Replacements, tmpRepo, handler.Config.MakePullRequest)
if err != nil {
w.logger.Error("failed to commit outdated files", zap.Error(err))
handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{
handledEvent := &pipedservice.ReportEventStatusesRequest_Event{
Id: latestEvent.Id,
Status: model.EventStatus_EVENT_FAILURE,
StatusDescription: fmt.Sprintf("Failed to change files: %v", err),
})
}
branchHandledEvents[branchName] = append(branchHandledEvents[branchName], handledEvent)
continue
}
handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{
handledEvent := &pipedservice.ReportEventStatusesRequest_Event{
Id: latestEvent.Id,
Status: model.EventStatus_EVENT_SUCCESS,
StatusDescription: fmt.Sprintf("Successfully updated %d files in the %q repository", len(handler.Config.Replacements), repoID),
})
}
branchHandledEvents[branchName] = append(branchHandledEvents[branchName], handledEvent)
if latestEvent.CreatedAt > maxTimestamp {
maxTimestamp = latestEvent.CreatedAt
}
Expand All @@ -419,46 +422,51 @@ func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eve
}
w.logger.Info(fmt.Sprintf("successfully made %d events OUTDATED", len(outDatedEvents)))
}
if len(handledEvents) == 0 {
return nil
}

if !gitUpdateEvent {
return nil
}

var responseError error
retry := backoff.NewRetry(retryPushNum, backoff.NewConstant(retryPushInterval))
_, err = retry.Do(ctx, func() (interface{}, error) {
err := tmpRepo.Push(ctx, tmpRepo.GetClonedBranch())
return nil, err
})
if err == nil {
if _, err := w.apiClient.ReportEventStatuses(ctx, &pipedservice.ReportEventStatusesRequest{Events: handledEvents}); err != nil {
return fmt.Errorf("failed to report event statuses: %w", err)
}
w.executionMilestoneMap.Store(repoID, maxTimestamp)
return nil
}
for branch, events := range branchHandledEvents {
_, err = retry.Do(ctx, func() (interface{}, error) {
err := tmpRepo.Push(ctx, branch)
return nil, err
})

// If push fails because the local branch was not fresh, exit to retry again in the next interval.
if err == git.ErrBranchNotFresh {
w.logger.Warn("failed to push commits", zap.Error(err))
return nil
}
if err == nil {
if _, err := w.apiClient.ReportEventStatuses(ctx, &pipedservice.ReportEventStatusesRequest{Events: events}); err != nil {
w.logger.Error("failed to report event statuses", zap.Error(err))
}
w.executionMilestoneMap.Store(repoID, maxTimestamp)
continue
}

// If push fails because of the other reason, re-set all statuses to FAILURE.
for i := range handledEvents {
if handledEvents[i].Status == model.EventStatus_EVENT_FAILURE {
// If push fails because the local branch was not fresh, exit to retry again in the next interval.
if err == git.ErrBranchNotFresh {
w.logger.Warn("failed to push commits", zap.Error(err))
continue
}
handledEvents[i].Status = model.EventStatus_EVENT_FAILURE
handledEvents[i].StatusDescription = fmt.Sprintf("Failed to push changed files: %v", err)

// If push fails because of the other reason, re-set all statuses to FAILURE.
for i := range events {
if events[i].Status == model.EventStatus_EVENT_FAILURE {
continue
}
events[i].Status = model.EventStatus_EVENT_FAILURE
events[i].StatusDescription = fmt.Sprintf("Failed to push changed files: %v", err)
}
if _, err := w.apiClient.ReportEventStatuses(ctx, &pipedservice.ReportEventStatusesRequest{Events: events}); err != nil {
w.logger.Error("failed to report event statuses", zap.Error(err))
}
w.executionMilestoneMap.Store(repoID, maxTimestamp)
responseError = errors.Join(responseError, err)
}
if _, err := w.apiClient.ReportEventStatuses(ctx, &pipedservice.ReportEventStatusesRequest{Events: handledEvents}); err != nil {
return fmt.Errorf("failed to report event statuses: %w", err)
if responseError != nil {
return responseError
}
w.executionMilestoneMap.Store(repoID, maxTimestamp)
return fmt.Errorf("failed to push commits: %w", err)
return nil
}

// updateValues inspects all Event-definition and pushes the changes to git repo if there is.
Expand Down Expand Up @@ -530,7 +538,8 @@ func (w *watcher) updateValues(ctx context.Context, repo git.Repo, repoID string
})
continue
}
if err := w.commitFiles(ctx, latestEvent.Data, e.Name, commitMsg, "", e.Replacements, tmpRepo); err != nil {
_, err := w.commitFiles(ctx, latestEvent.Data, e.Name, commitMsg, "", e.Replacements, tmpRepo, false)
if err != nil {
w.logger.Error("failed to commit outdated files", zap.Error(err))
handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{
Id: latestEvent.Id,
Expand Down Expand Up @@ -593,7 +602,7 @@ func (w *watcher) updateValues(ctx context.Context, repo git.Repo, repoID string
}

// commitFiles commits changes if the data in Git is different from the latest event.
func (w *watcher) commitFiles(ctx context.Context, latestData, eventName, commitMsg, gitPath string, replacements []config.EventWatcherReplacement, repo git.Repo) error {
func (w *watcher) commitFiles(ctx context.Context, latestData, eventName, commitMsg, gitPath string, replacements []config.EventWatcherReplacement, repo git.Repo, newBranch bool) (string, error) {
// Determine files to be changed by comparing with the latest event.
changes := make(map[string][]byte, len(replacements))
for _, r := range replacements {
Expand All @@ -619,31 +628,32 @@ func (w *watcher) commitFiles(ctx context.Context, latestData, eventName, commit
newContent, upToDate, err = modifyText(path, r.Regex, latestData)
}
if err != nil {
return err
return "", err
}
if upToDate {
continue
}

if err := os.WriteFile(path, newContent, os.ModePerm); err != nil {
return fmt.Errorf("failed to write file: %w", err)
return "", fmt.Errorf("failed to write file: %w", err)
}
changes[filePath] = newContent
}
if len(changes) == 0 {
return nil
return "", nil
}

args := argsTemplate{
Value: latestData,
EventName: eventName,
}
commitMsg = parseCommitMsg(commitMsg, args)
if err := repo.CommitChanges(ctx, repo.GetClonedBranch(), commitMsg, false, changes); err != nil {
return fmt.Errorf("failed to perform git commit: %w", err)
branch := makeBranchName(newBranch, eventName, repo.GetClonedBranch())
if err := repo.CommitChanges(ctx, branch, commitMsg, newBranch, changes); err != nil {
return "", fmt.Errorf("failed to perform git commit: %w", err)
}
w.logger.Info(fmt.Sprintf("event watcher will update values of Event %q", eventName))
return nil
return branch, nil
}

// modifyYAML returns a new YAML content as a first returned value if the value of given
Expand Down Expand Up @@ -777,3 +787,12 @@ func parseCommitMsg(msg string, args argsTemplate) string {
}
return buf.String()
}

// makeBranchName generates a new branch name in the format {eventName}-{uuid} if newBranch is true.
// If newBranch is false, the function returns the existing branch name.
func makeBranchName(newBranch bool, eventName, branch string) string {
if newBranch {
return fmt.Sprintf("%s-%s", eventName, uuid.New().String())
}
return branch
}
37 changes: 37 additions & 0 deletions pkg/app/piped/eventwatcher/eventwatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,40 @@ spec:
})
}
}

func TestGetBranchName(t *testing.T) {
t.Parallel()
testcases := []struct {
name string
newBranch bool
eventName string
branch string
want string
}{
{
name: "create new branch",
newBranch: true,
eventName: "event",
branch: "main",
},
{
name: "return existing branch",
newBranch: false,
eventName: "event",
branch: "main",
want: "main",
},
}
for _, tc := range testcases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
got := makeBranchName(tc.newBranch, tc.eventName, tc.branch)
if tc.newBranch {
assert.NotEqual(t, tc.branch, got)
} else {
assert.Equal(t, tc.want, got)
}
})
}
}
2 changes: 2 additions & 0 deletions pkg/config/event_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type EventWatcherHandlerConfig struct {
// The commit message used to push after replacing values.
// Default message is used if not given.
CommitMessage string `json:"commitMessage,omitempty"`
// Whether to create a new branch or not when event watcher commits changes.
MakePullRequest bool `json:"makePullRequest,omitempty"`
// List of places where will be replaced when the new event matches.
Replacements []EventWatcherReplacement `json:"replacements"`
}
Expand Down

0 comments on commit 82540c7

Please sign in to comment.