Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a new branch if makePullRequest is true in git config #4395

Merged
merged 18 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ Note: By default, the sum of traffic is rounded to 100. If both `primary` and `c
| Field | Type | Description | Required |
|-|-|-|-|
| commitMessage | string | The commit message used to push after replacing values. Default message is used if not given. | No |
| makePullRequest | bool | Whether to create a new branch or not when commit changes in event watcher. Default is `false`. | No |
| replacements | [][EventWatcherReplacement](#eventwatcherreplacement) | List of places where will be replaced when the new event matches. | Yes |

## DriftDetection
Expand Down
113 changes: 66 additions & 47 deletions pkg/app/piped/eventwatcher/eventwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"text/template"
"time"

"github.com/google/uuid"
"go.uber.org/zap"
"google.golang.org/grpc"

Expand Down Expand Up @@ -329,11 +330,11 @@ func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eve
firstRead = false
}
var (
handledEvents = make([]*pipedservice.ReportEventStatusesRequest_Event, 0, len(eventCfgs))
outDatedEvents = make([]*pipedservice.ReportEventStatusesRequest_Event, 0)
maxTimestamp int64
outDatedDuration = time.Hour
gitUpdateEvent = false
outDatedEvents = make([]*pipedservice.ReportEventStatusesRequest_Event, 0)
maxTimestamp int64
outDatedDuration = time.Hour
gitUpdateEvent = false
branchHandledEvents = make(map[string][]*pipedservice.ReportEventStatusesRequest_Event, len(eventCfgs))
)
for _, e := range eventCfgs {
for _, cfg := range e.Configs {
Expand Down Expand Up @@ -383,23 +384,25 @@ func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eve
})
continue
}

switch handler.Type {
case config.EventWatcherHandlerTypeGitUpdate:
if err := w.commitFiles(ctx, latestEvent.Data, matcher.Name, handler.Config.CommitMessage, e.GitPath, handler.Config.Replacements, tmpRepo); err != nil {
branchName, err := w.commitFiles(ctx, latestEvent.Data, matcher.Name, handler.Config.CommitMessage, e.GitPath, handler.Config.Replacements, tmpRepo, handler.Config.MakePullRequest)
if err != nil {
w.logger.Error("failed to commit outdated files", zap.Error(err))
handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{
handledEvent := &pipedservice.ReportEventStatusesRequest_Event{
Id: latestEvent.Id,
Status: model.EventStatus_EVENT_FAILURE,
StatusDescription: fmt.Sprintf("Failed to change files: %v", err),
})
}
branchHandledEvents[branchName] = append(branchHandledEvents[branchName], handledEvent)
continue
}
handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{
handledEvent := &pipedservice.ReportEventStatusesRequest_Event{
Id: latestEvent.Id,
Status: model.EventStatus_EVENT_SUCCESS,
StatusDescription: fmt.Sprintf("Successfully updated %d files in the %q repository", len(handler.Config.Replacements), repoID),
})
}
branchHandledEvents[branchName] = append(branchHandledEvents[branchName], handledEvent)
if latestEvent.CreatedAt > maxTimestamp {
maxTimestamp = latestEvent.CreatedAt
}
Expand All @@ -419,46 +422,51 @@ func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eve
}
w.logger.Info(fmt.Sprintf("successfully made %d events OUTDATED", len(outDatedEvents)))
}
if len(handledEvents) == 0 {
return nil
}

if !gitUpdateEvent {
return nil
}

var responseError error
retry := backoff.NewRetry(retryPushNum, backoff.NewConstant(retryPushInterval))
_, err = retry.Do(ctx, func() (interface{}, error) {
err := tmpRepo.Push(ctx, tmpRepo.GetClonedBranch())
return nil, err
})
if err == nil {
if _, err := w.apiClient.ReportEventStatuses(ctx, &pipedservice.ReportEventStatusesRequest{Events: handledEvents}); err != nil {
return fmt.Errorf("failed to report event statuses: %w", err)
}
w.executionMilestoneMap.Store(repoID, maxTimestamp)
return nil
}
for branch, events := range branchHandledEvents {
_, err = retry.Do(ctx, func() (interface{}, error) {
err := tmpRepo.Push(ctx, branch)
return nil, err
})

// If push fails because the local branch was not fresh, exit to retry again in the next interval.
if err == git.ErrBranchNotFresh {
w.logger.Warn("failed to push commits", zap.Error(err))
return nil
}
if err == nil {
if _, err := w.apiClient.ReportEventStatuses(ctx, &pipedservice.ReportEventStatusesRequest{Events: events}); err != nil {
w.logger.Error("failed to report event statuses", zap.Error(err))
}
w.executionMilestoneMap.Store(repoID, maxTimestamp)
continue
}

// If push fails because of the other reason, re-set all statuses to FAILURE.
for i := range handledEvents {
if handledEvents[i].Status == model.EventStatus_EVENT_FAILURE {
// If push fails because the local branch was not fresh, exit to retry again in the next interval.
if err == git.ErrBranchNotFresh {
w.logger.Warn("failed to push commits", zap.Error(err))
continue
}
handledEvents[i].Status = model.EventStatus_EVENT_FAILURE
handledEvents[i].StatusDescription = fmt.Sprintf("Failed to push changed files: %v", err)

// If push fails because of the other reason, re-set all statuses to FAILURE.
for i := range events {
if events[i].Status == model.EventStatus_EVENT_FAILURE {
continue
}
events[i].Status = model.EventStatus_EVENT_FAILURE
events[i].StatusDescription = fmt.Sprintf("Failed to push changed files: %v", err)
}
if _, err := w.apiClient.ReportEventStatuses(ctx, &pipedservice.ReportEventStatusesRequest{Events: events}); err != nil {
w.logger.Error("failed to report event statuses", zap.Error(err))
}
w.executionMilestoneMap.Store(repoID, maxTimestamp)
responseError = errors.Join(responseError, err)
}
if _, err := w.apiClient.ReportEventStatuses(ctx, &pipedservice.ReportEventStatusesRequest{Events: handledEvents}); err != nil {
return fmt.Errorf("failed to report event statuses: %w", err)
if responseError != nil {
return responseError
}
w.executionMilestoneMap.Store(repoID, maxTimestamp)
return fmt.Errorf("failed to push commits: %w", err)
return nil
}

// updateValues inspects all Event-definition and pushes the changes to git repo if there is.
Expand Down Expand Up @@ -530,7 +538,8 @@ func (w *watcher) updateValues(ctx context.Context, repo git.Repo, repoID string
})
continue
}
if err := w.commitFiles(ctx, latestEvent.Data, e.Name, commitMsg, "", e.Replacements, tmpRepo); err != nil {
_, err := w.commitFiles(ctx, latestEvent.Data, e.Name, commitMsg, "", e.Replacements, tmpRepo, false)
if err != nil {
w.logger.Error("failed to commit outdated files", zap.Error(err))
handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{
Id: latestEvent.Id,
Expand Down Expand Up @@ -593,7 +602,7 @@ func (w *watcher) updateValues(ctx context.Context, repo git.Repo, repoID string
}

// commitFiles commits changes if the data in Git is different from the latest event.
func (w *watcher) commitFiles(ctx context.Context, latestData, eventName, commitMsg, gitPath string, replacements []config.EventWatcherReplacement, repo git.Repo) error {
func (w *watcher) commitFiles(ctx context.Context, latestData, eventName, commitMsg, gitPath string, replacements []config.EventWatcherReplacement, repo git.Repo, newBranch bool) (string, error) {
// Determine files to be changed by comparing with the latest event.
changes := make(map[string][]byte, len(replacements))
for _, r := range replacements {
Expand All @@ -619,31 +628,32 @@ func (w *watcher) commitFiles(ctx context.Context, latestData, eventName, commit
newContent, upToDate, err = modifyText(path, r.Regex, latestData)
}
if err != nil {
return err
return "", err
}
if upToDate {
continue
}

if err := os.WriteFile(path, newContent, os.ModePerm); err != nil {
return fmt.Errorf("failed to write file: %w", err)
return "", fmt.Errorf("failed to write file: %w", err)
}
changes[filePath] = newContent
}
if len(changes) == 0 {
return nil
return "", nil
}

args := argsTemplate{
Value: latestData,
EventName: eventName,
}
commitMsg = parseCommitMsg(commitMsg, args)
if err := repo.CommitChanges(ctx, repo.GetClonedBranch(), commitMsg, false, changes); err != nil {
return fmt.Errorf("failed to perform git commit: %w", err)
branch := makeBranchName(newBranch, eventName, repo.GetClonedBranch())
if err := repo.CommitChanges(ctx, branch, commitMsg, newBranch, changes); err != nil {
return "", fmt.Errorf("failed to perform git commit: %w", err)
}
w.logger.Info(fmt.Sprintf("event watcher will update values of Event %q", eventName))
return nil
return branch, nil
}

// modifyYAML returns a new YAML content as a first returned value if the value of given
Expand Down Expand Up @@ -777,3 +787,12 @@ func parseCommitMsg(msg string, args argsTemplate) string {
}
return buf.String()
}

// makeBranchName generates a new branch name in the format {eventName}-{uuid} if newBranch is true.
// If newBranch is false, the function returns the existing branch name.
func makeBranchName(newBranch bool, eventName, branch string) string {
if newBranch {
return fmt.Sprintf("%s-%s", eventName, uuid.New().String())
}
return branch
}
37 changes: 37 additions & 0 deletions pkg/app/piped/eventwatcher/eventwatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,40 @@ spec:
})
}
}

func TestGetBranchName(t *testing.T) {
t.Parallel()
testcases := []struct {
name string
newBranch bool
eventName string
branch string
want string
}{
{
name: "create new branch",
newBranch: true,
eventName: "event",
branch: "main",
},
{
name: "return existing branch",
newBranch: false,
eventName: "event",
branch: "main",
want: "main",
},
}
for _, tc := range testcases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
got := makeBranchName(tc.newBranch, tc.eventName, tc.branch)
if tc.newBranch {
assert.NotEqual(t, tc.branch, got)
} else {
assert.Equal(t, tc.want, got)
}
})
}
}
2 changes: 2 additions & 0 deletions pkg/config/event_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type EventWatcherHandlerConfig struct {
// The commit message used to push after replacing values.
// Default message is used if not given.
CommitMessage string `json:"commitMessage,omitempty"`
// Whether to create a new branch or not when event watcher commits changes.
MakePullRequest bool `json:"makePullRequest,omitempty"`
// List of places where will be replaced when the new event matches.
Replacements []EventWatcherReplacement `json:"replacements"`
}
Expand Down
Loading