Skip to content

Implement org/user agents #3539

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 64 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 54 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
3e2368f
Start: implement server side filters for agents
6543 Sep 12, 2024
d5881ad
fix
6543 Sep 13, 2024
0934b5d
Add List func for Argents of {Org,Repo} to store
6543 Sep 13, 2024
9d5949c
revert OwnerID change and define seperate cols to save the info
6543 Sep 13, 2024
ed779bb
generate
6543 Sep 13, 2024
a632982
add api for org and repo agents
6543 Sep 13, 2024
4707747
Update server/grpc/auth_server.go
6543 Sep 13, 2024
0499d2e
rename to make purpose clear
6543 Sep 13, 2024
a27947d
refactor
6543 Sep 13, 2024
9bd9eb7
cleanup
6543 Sep 13, 2024
28637f8
database migration
6543 Sep 13, 2024
de615ed
next
6543 Sep 13, 2024
1bc1b63
document the intentions of -1 and test it
6543 Sep 13, 2024
baf84b7
tests and fix
6543 Sep 13, 2024
60f4653
gen docs
6543 Sep 13, 2024
04a17af
fix-wrong-route
6543 Sep 13, 2024
47f4076
dont set owner lable and hide the enforced labels in webui
6543 Sep 13, 2024
ea97f46
fix bug and also now cover it by test
6543 Sep 13, 2024
a1d263c
fix filter of system agents
6543 Sep 13, 2024
1481c20
web add now api endpoints
6543 Sep 13, 2024
2e6baaf
make it consistent
6543 Sep 13, 2024
ef2c36e
fmt
6543 Sep 13, 2024
aacc368
add badge to see repo and org agents labled as such
6543 Sep 13, 2024
e875628
fix messedup from refactor
6543 Sep 13, 2024
299514b
fix test
6543 Sep 13, 2024
bcb0348
Merge branch 'main' into server-side-agent-filters
6543 Sep 14, 2024
449f581
Merge branch 'main' into server-side-agent-filters
6543 Sep 15, 2024
1a4d05c
add filters to global agent webui
6543 Sep 15, 2024
4976749
make the saved message the right order
6543 Sep 15, 2024
ad94d57
create org and repo agent tab based on agent tab
6543 Sep 15, 2024
5b1fb0b
fix linting
6543 Sep 15, 2024
e8a2743
clarify
6543 Sep 15, 2024
ebc4dc1
fix lint
6543 Sep 15, 2024
1e1abf1
text
6543 Sep 16, 2024
c49f1c3
Merge branch 'main' into server-side-agent-filters
6543 Sep 16, 2024
2733816
move doublestar.Match to #4122
6543 Sep 16, 2024
df5d0ad
Apply suggestions from code review
6543 Sep 16, 2024
5b436ef
Merge branch 'main' into server-side-agent-filters
6543 Sep 17, 2024
5f3845f
fin
6543 Sep 17, 2024
da66bdc
Merge branch 'main' into server-side-agent-filters
6543 Sep 19, 2024
f40356a
clean
6543 Sep 19, 2024
18f7015
Apply suggestions from code review
6543 Sep 19, 2024
7d181b0
unrelated
6543 Sep 19, 2024
fd6a92e
mv
6543 Sep 19, 2024
f41478f
Merge branch 'main' into server-side-agent-filters
6543 Sep 20, 2024
b091341
remove server side agent filters for now
6543 Sep 20, 2024
a2487f8
Revert "remove server side agent filters for now"
6543 Sep 20, 2024
6c2deaf
Reapply "remove server side agent filters for now"
6543 Sep 20, 2024
9b8b6fc
test adopt rename
6543 Sep 20, 2024
595703b
Merge branch 'main' into server-side-agent-filters
6543 Sep 20, 2024
7f49dfc
Merge branch 'main' into server-side-agent-filters
6543 Sep 22, 2024
3e6b938
dedup code and add user agents
6543 Sep 22, 2024
4f50119
Merge branch 'main' into server-side-agent-filters
6543 Sep 22, 2024
c3871a4
fix lint
6543 Sep 22, 2024
44ab14a
Update server/grpc/rpc.go
6543 Sep 25, 2024
28c2040
Merge branch 'main' into server-side-agent-filters
6543 Sep 25, 2024
1a47abb
rename all
6543 Sep 25, 2024
1043ee8
Merge branch 'main' into server-side-agent-filters
6543 Sep 30, 2024
2445bb6
remove repoAgent routes from rest api
6543 Sep 30, 2024
b8e4282
remove repoAgent routes from WebUI
6543 Sep 30, 2024
dc61a24
fix rename
6543 Sep 30, 2024
494c6fd
fix lint
6543 Sep 30, 2024
dc67296
criple
6543 Sep 30, 2024
f31795a
nits
anbraten Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
386 changes: 384 additions & 2 deletions cmd/server/docs/docs.go

Large diffs are not rendered by default.

422 changes: 408 additions & 14 deletions server/api/agent.go

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions server/grpc/auth_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ func (s *WoodpeckerAuthServer) getAgent(agentID int64, agentToken string) (*mode
// global agent secret auth
if s.agentMasterToken != "" {
if agentToken == s.agentMasterToken && agentID == -1 {
agent := new(model.Agent)
agent.Name = ""
agent.OwnerID = -1 // system agent
agent.Token = s.agentMasterToken
agent.Backend = ""
agent.Platform = ""
agent.Capacity = -1
agent := &model.Agent{
OwnerID: model.IDNotSet,
OrgID: model.IDNotSet,
RepoID: model.IDNotSet,
Token: s.agentMasterToken,
Capacity: -1,
}
err := s.store.AgentCreate(agent)
if err != nil {
log.Error().Err(err).Msg("error creating system agent")
Expand Down
112 changes: 104 additions & 8 deletions server/grpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er
log.Debug().Msgf("agent connected: %s: polling", hostname)
}

filterFn := createFilterFunc(agentFilter)

agent, err := s.getAgentFromContext(c)
if err != nil {
return nil, err
Expand All @@ -69,6 +67,20 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er
return nil, nil
}

// enforce server set agent filters ...
agentServerFilters, err := agent.GetServerFilters()
if err != nil {
return nil, err
}
// ... by overwrite and extend the agent labels
for k, v := range agentServerFilters {
agentFilter.Labels[k] = v
}

log.Trace().Msgf("Agent %s[%d] try pull task with filter labels: %v", agent.Name, agent.ID, agentFilter.Labels)

filterFn := createFilterFunc(agentFilter)

for {
// poll blocks until a task is available or the context is canceled / worker is kicked
task, err := s.queue.Poll(c, agent.ID, filterFn)
Expand All @@ -91,6 +103,15 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er

// Wait blocks until the workflow with the given ID is done.
func (s *RPC) Wait(c context.Context, workflowID string) error {
agent, err := s.getAgentFromContext(c)
if err != nil {
return err
}

if err := s.checkAgentPermissionByWorkflow(c, agent, workflowID, nil, nil); err != nil {
return err
}

return s.queue.Wait(c, workflowID)
}

Expand All @@ -106,11 +127,15 @@ func (s *RPC) Extend(c context.Context, workflowID string) error {
return err
}

return s.queue.Extend(c, workflowID)
if err := s.checkAgentPermissionByWorkflow(c, agent, workflowID, nil, nil); err != nil {
return err
}

return s.queue.Extend(c, agent.ID, workflowID)
}

// Update updates the state of a step.
func (s *RPC) Update(_ context.Context, strWorkflowID string, state rpc.StepState) error {
func (s *RPC) Update(c context.Context, strWorkflowID string, state rpc.StepState) error {
workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64)
if err != nil {
return err
Expand All @@ -128,6 +153,11 @@ func (s *RPC) Update(_ context.Context, strWorkflowID string, state rpc.StepStat
return err
}

agent, err := s.getAgentFromContext(c)
if err != nil {
return err
}

step, err := s.store.StepByUUID(state.StepUUID)
if err != nil {
log.Error().Err(err).Msgf("cannot find step with uuid %s", state.StepUUID)
Expand All @@ -149,6 +179,11 @@ func (s *RPC) Update(_ context.Context, strWorkflowID string, state rpc.StepStat
return err
}

// check before agent can alter some state
if err := s.checkAgentPermissionByWorkflow(c, agent, strWorkflowID, currentPipeline, repo); err != nil {
return err
}

if err := pipeline.UpdateStepStatus(s.store, step, state); err != nil {
log.Error().Err(err).Msg("rpc.update: cannot update step")
}
Expand Down Expand Up @@ -192,6 +227,7 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt
if err != nil {
return err
}

workflow.AgentID = agent.ID

currentPipeline, err := s.store.GetPipeline(workflow.PipelineID)
Expand All @@ -206,6 +242,11 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt
return err
}

// check before agent can alter some state
if err := s.checkAgentPermissionByWorkflow(c, agent, strWorkflowID, currentPipeline, repo); err != nil {
return err
}

if currentPipeline.Status == model.StatusPending {
if currentPipeline, err = pipeline.UpdateToStatusRunning(s.store, *currentPipeline, state.Started); err != nil {
log.Error().Err(err).Msgf("init: cannot update pipeline %d state", currentPipeline.ID)
Expand Down Expand Up @@ -272,6 +313,16 @@ func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowSt
return err
}

agent, err := s.getAgentFromContext(c)
if err != nil {
return err
}

// check before agent can alter some state
if err := s.checkAgentPermissionByWorkflow(c, agent, strWorkflowID, currentPipeline, repo); err != nil {
return err
}

logger := log.With().
Str("repo_id", fmt.Sprint(repo.ID)).
Str("pipeline_id", fmt.Sprint(currentPipeline.ID)).
Expand Down Expand Up @@ -328,10 +379,6 @@ func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowSt
s.pipelineTime.WithLabelValues(repo.FullName, currentPipeline.Branch, string(workflow.State), workflow.Name).Set(float64(workflow.Finished - workflow.Started))
}

agent, err := s.getAgentFromContext(c)
if err != nil {
return err
}
return s.updateAgentLastWork(agent)
}

Expand All @@ -348,6 +395,17 @@ func (s *RPC) Log(c context.Context, stepUUID string, rpcLogEntries []*rpc.LogEn
return err
}

currentPipeline, err := s.store.GetPipeline(step.PipelineID)
if err != nil {
log.Error().Err(err).Msgf("cannot find pipeline with id %d", step.PipelineID)
return err
}

// check before agent can alter some state
if err := s.checkAgentPermissionByWorkflow(c, agent, "", currentPipeline, nil); err != nil {
return err
}

err = s.updateAgentLastWork(agent)
if err != nil {
return err
Expand Down Expand Up @@ -441,6 +499,44 @@ func (s *RPC) ReportHealth(ctx context.Context, status string) error {
return s.store.AgentUpdate(agent)
}

func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Agent, strWorkflowID string, pipeline *model.Pipeline, repo *model.Repo) error {
var err error
if repo == nil {
if pipeline == nil {
workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64)
if err != nil {
return err
}

workflow, err := s.store.WorkflowLoad(workflowID)
if err != nil {
log.Error().Err(err).Msgf("rpc.update: cannot find workflow with id %d", workflowID)
return err
}

pipeline, err = s.store.GetPipeline(workflow.PipelineID)
if err != nil {
log.Error().Err(err).Msgf("cannot find pipeline with id %d", workflow.PipelineID)
return err
}
}

repo, err = s.store.GetRepo(pipeline.RepoID)
if err != nil {
log.Error().Err(err).Msgf("cannot find repo with id %d", pipeline.RepoID)
return err
}
}

if agent.CanAccessRepo(repo) {
return nil
}

msg := fmt.Sprintf("agent '%d' is not allowed to interact with repo[%d] '%s'", agent.ID, repo.ID, repo.FullName)
log.Error().Int64("repoId", repo.ID).Msg(msg)
return errors.New(msg)
}

func (s *RPC) completeChildrenIfParentCompleted(completedWorkflow *model.Workflow) {
for _, c := range completedWorkflow.Children {
if c.Running() {
Expand Down
50 changes: 49 additions & 1 deletion server/model/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@

package model

import (
"encoding/base32"
"fmt"

"github.com/gorilla/securecookie"
)

type Agent struct {
ID int64 `json:"id" xorm:"pk autoincr 'id'"`
Created int64 `json:"created" xorm:"created"`
Expand All @@ -28,13 +35,54 @@ type Agent struct {
Capacity int32 `json:"capacity" xorm:"capacity"`
Version string `json:"version" xorm:"'version'"`
NoSchedule bool `json:"no_schedule" xorm:"no_schedule"`
// OrgID is counted as unset if set to -1, this is done to ensure a new(Agent) still enforce the OrgID check by default
OrgID int64 `json:"org_id" xorm:"INDEX 'org_id'"`
// RepoID is counted as unset if set to -1, this is done to ensure a new(Agent) still enforce the OrgID check by default
RepoID int64 `json:"repo_id" xorm:"INDEX 'repo_id'"`
} // @name Agent

const (
IDNotSet = -1
agentFilterOrgID = "org-id"
agentFilterRepoID = "repo-id"
)

// TableName return database table name for xorm.
func (Agent) TableName() string {
return "agents"
}

func (a *Agent) IsSystemAgent() bool {
return a.OwnerID == -1
return a.OwnerID == IDNotSet
}

func GenerateNewAgentToken() string {
return base32.StdEncoding.EncodeToString(securecookie.GenerateRandomKey(32))
}

func (a *Agent) GetServerFilters() (map[string]string, error) {
filters := make(map[string]string)

// enforce filters for user and organization agents
if a.OrgID != IDNotSet {
filters[agentFilterOrgID] = fmt.Sprintf("%d", a.OrgID)
} else {
filters[agentFilterOrgID] = "*"
}
if a.RepoID != IDNotSet {
filters[agentFilterRepoID] = fmt.Sprintf("%d", a.RepoID)
} else {
filters[agentFilterRepoID] = "*"
}

return filters, nil
}

func (a *Agent) CanAccessRepo(repo *Repo) bool {
if a.IsSystemAgent() {
return true
}

return a.RepoID != IDNotSet && a.RepoID == repo.ID && (a.OrgID == IDNotSet || a.OrgID == repo.OrgID) ||
a.OrgID != IDNotSet && a.OrgID == repo.OrgID && (a.RepoID == IDNotSet || a.RepoID == repo.ID)
}
Loading