Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add auto capabilities: #946

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions cmd/tink-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
KubeconfigPath string
KubeAPI string
KubeNamespace string

AutoEnrollmentTemplate string
AutoCapMode string
}

const backendKubernetes = "kubernetes"
Expand All @@ -48,6 +51,8 @@
fs.StringVar(&c.KubeconfigPath, "kubeconfig", "", "The path to the Kubeconfig. Only takes effect if `--backend=kubernetes`")
fs.StringVar(&c.KubeAPI, "kubernetes", "", "The Kubernetes API URL, used for in-cluster client construction. Only takes effect if `--backend=kubernetes`")
fs.StringVar(&c.KubeNamespace, "kube-namespace", "", "The Kubernetes namespace to target")
fs.StringVar(&c.AutoEnrollmentTemplate, "auto-enrollment-template", "", "The Template to use for auto enrollment Workflows (only used when `--auto-mode=enrollment`). The Template must exist and is a user defined Template, there is no default.")
fs.Var(newAutoCapModeValue(AutoCapMode(string(server.AutoCapModeDisabled)), (*AutoCapMode)(&c.AutoCapMode)), "auto-cap-mode", "The mode to use for automatic capabilities. Must be one of 'discovery', 'enrollment' or 'disabled'. discovery: creates a Hardware object for each unknown worker, enrollment: creates Hardware and Workflow objects for each unknown worker, disabled: auto capabilities are disabled")

Check warning on line 55 in cmd/tink-server/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/tink-server/main.go#L54-L55

Added lines #L54 - L55 were not covered by tests
}

func (c *Config) PopulateFromLegacyEnvVar() {
Expand All @@ -62,7 +67,7 @@

func main() {
if err := NewRootCommand().Execute(); err != nil {
fmt.Fprint(os.Stderr, err.Error())
fmt.Fprint(os.Stderr, err.Error(), "\n")

Check warning on line 70 in cmd/tink-server/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/tink-server/main.go#L70

Added line #L70 was not covered by tests
os.Exit(1)
}
}
Expand All @@ -88,7 +93,7 @@
RunE: func(cmd *cobra.Command, args []string) error {
// I am not sure if it is right for this to be here,
// but as last step I want to keep compatibility with
// what we have for a little bit and I thinik that's
// what we have for a little bit and I think that's
// the most aggressive way we have to guarantee that
// the old way works as before.
config.PopulateFromLegacyEnvVar()
Expand All @@ -108,6 +113,10 @@
errCh := make(chan error, 2)
var registrar grpcserver.Registrar

if server.AutoCapMode(config.AutoCapMode) == server.AutoCapModeEnrollment && config.AutoEnrollmentTemplate == "" {
return fmt.Errorf("auto-enrollment-template is required when auto-cap-mode is set to enrollment")

Check warning on line 117 in cmd/tink-server/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/tink-server/main.go#L116-L117

Added lines #L116 - L117 were not covered by tests
}

switch config.Backend {
case backendKubernetes:
var err error
Expand All @@ -116,6 +125,8 @@
config.KubeconfigPath,
config.KubeAPI,
config.KubeNamespace,
server.WithAutoCapMode(server.AutoCapMode(config.AutoCapMode)),
server.WithAutoEnrollmentTemplate(config.AutoEnrollmentTemplate),

Check warning on line 129 in cmd/tink-server/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/tink-server/main.go#L128-L129

Added lines #L128 - L129 were not covered by tests
)
if err != nil {
return err
Expand Down Expand Up @@ -206,3 +217,32 @@

return nil
}

type AutoCapMode server.AutoCapMode

func (a *AutoCapMode) String() string {
return string(*a)

Check warning on line 224 in cmd/tink-server/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/tink-server/main.go#L223-L224

Added lines #L223 - L224 were not covered by tests
}

func (a *AutoCapMode) Set(value string) error {
v := server.AutoCapMode(value)
if v == "" {
v = server.AutoCapModeDisabled

Check warning on line 230 in cmd/tink-server/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/tink-server/main.go#L227-L230

Added lines #L227 - L230 were not covered by tests
}
switch v {
case server.AutoCapModeDiscovery, server.AutoCapModeEnrollment, server.AutoCapModeDisabled:
*a = AutoCapMode(v)
return nil

Check warning on line 235 in cmd/tink-server/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/tink-server/main.go#L232-L235

Added lines #L232 - L235 were not covered by tests
}

return fmt.Errorf("invalid value %q, must be one of %q, %q, or %q", value, server.AutoCapModeDiscovery, server.AutoCapModeEnrollment, server.AutoCapModeDisabled)

Check warning on line 238 in cmd/tink-server/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/tink-server/main.go#L238

Added line #L238 was not covered by tests
}

func (a *AutoCapMode) Type() string {
return "string"

Check warning on line 242 in cmd/tink-server/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/tink-server/main.go#L241-L242

Added lines #L241 - L242 were not covered by tests
}

func newAutoCapModeValue(val AutoCapMode, p *AutoCapMode) *AutoCapMode {
*p = val
return p

Check warning on line 247 in cmd/tink-server/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/tink-server/main.go#L245-L247

Added lines #L245 - L247 were not covered by tests
}
65 changes: 65 additions & 0 deletions internal/server/auto.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package server

import (
"context"
"strings"

"github.com/tinkerbell/tink/api/v1alpha1"
"github.com/tinkerbell/tink/internal/ptr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

type AutoCapMode string

var (
AutoCapModeDiscovery AutoCapMode = "discovery"
AutoCapModeEnrollment AutoCapMode = "enrollment"
AutoCapModeDisabled AutoCapMode = "disabled"
)

func (k *KubernetesBackedServer) hardwareObjectExists(ctx context.Context, workerID string) bool {
if err := k.ClientFunc().Get(ctx, types.NamespacedName{Name: strings.ReplaceAll(workerID, ":", "."), Namespace: k.namespace}, &v1alpha1.Hardware{}); err != nil {
return false

Check warning on line 23 in internal/server/auto.go

View check run for this annotation

Codecov / codecov/patch

internal/server/auto.go#L21-L23

Added lines #L21 - L23 were not covered by tests
}
return true

Check warning on line 25 in internal/server/auto.go

View check run for this annotation

Codecov / codecov/patch

internal/server/auto.go#L25

Added line #L25 was not covered by tests
}

func (k *KubernetesBackedServer) createHardwareObject(ctx context.Context, workerID string) error {
hw := &v1alpha1.Hardware{
ObjectMeta: metav1.ObjectMeta{
Name: strings.ReplaceAll(workerID, ":", "."),
Namespace: k.namespace,
},
Spec: v1alpha1.HardwareSpec{
Interfaces: []v1alpha1.Interface{

Check warning on line 35 in internal/server/auto.go

View check run for this annotation

Codecov / codecov/patch

internal/server/auto.go#L28-L35

Added lines #L28 - L35 were not covered by tests
{
DHCP: &v1alpha1.DHCP{
MAC: workerID,
},
Netboot: &v1alpha1.Netboot{
AllowPXE: ptr.Bool(true),
},
},
},
},

Check warning on line 45 in internal/server/auto.go

View check run for this annotation

Codecov / codecov/patch

internal/server/auto.go#L37-L45

Added lines #L37 - L45 were not covered by tests
}
return k.ClientFunc().Create(ctx, hw)

Check warning on line 47 in internal/server/auto.go

View check run for this annotation

Codecov / codecov/patch

internal/server/auto.go#L47

Added line #L47 was not covered by tests
}

func (k *KubernetesBackedServer) createWorkflowObject(ctx context.Context, workerID string) error {
wf := &v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: strings.ReplaceAll(workerID, ":", "."),
Namespace: k.namespace,
},
Spec: v1alpha1.WorkflowSpec{
HardwareRef: strings.ReplaceAll(workerID, ":", "."),
TemplateRef: k.AutoEnrollmentTemplate,
HardwareMap: map[string]string{
"device_1": workerID,
},
},

Check warning on line 62 in internal/server/auto.go

View check run for this annotation

Codecov / codecov/patch

internal/server/auto.go#L50-L62

Added lines #L50 - L62 were not covered by tests
}
return k.ClientFunc().Create(ctx, wf)

Check warning on line 64 in internal/server/auto.go

View check run for this annotation

Codecov / codecov/patch

internal/server/auto.go#L64

Added line #L64 was not covered by tests
}
52 changes: 38 additions & 14 deletions internal/server/kubernetes_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,38 @@
"sigs.k8s.io/controller-runtime/pkg/cluster"
)

// Option for setting optional KubernetesBackedServer fields.
type Option func(*KubernetesBackedServer)

// KubernetesBackedServer is a server that implements a workflow API.
type KubernetesBackedServer struct {
logger logr.Logger
ClientFunc func() client.Client
namespace string
AutoCapMode AutoCapMode
AutoEnrollmentTemplate string

nowFunc func() time.Time
}

func WithAutoCapMode(mode AutoCapMode) Option {
return func(k *KubernetesBackedServer) {
k.AutoCapMode = mode

Check warning on line 39 in internal/server/kubernetes_api.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api.go#L37-L39

Added lines #L37 - L39 were not covered by tests
}
}

func WithAutoEnrollmentTemplate(name string) Option {
return func(k *KubernetesBackedServer) {
k.AutoEnrollmentTemplate = name

Check warning on line 45 in internal/server/kubernetes_api.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api.go#L43-L45

Added lines #L43 - L45 were not covered by tests
}
}

// +kubebuilder:rbac:groups=tinkerbell.org,resources=hardware;hardware/status,verbs=get;list;watch
// +kubebuilder:rbac:groups=tinkerbell.org,resources=templates;templates/status,verbs=get;list;watch
// +kubebuilder:rbac:groups=tinkerbell.org,resources=workflows;workflows/status,verbs=get;list;watch;update;patch

// NewKubeBackedServer returns a server that implements the Workflow server interface for a given kubeconfig.
func NewKubeBackedServer(logger logr.Logger, kubeconfig, apiserver, namespace string) (*KubernetesBackedServer, error) {
func NewKubeBackedServer(logger logr.Logger, kubeconfig, apiserver, namespace string, opts ...Option) (*KubernetesBackedServer, error) {

Check warning on line 54 in internal/server/kubernetes_api.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api.go#L54

Added line #L54 was not covered by tests
ccfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfig},
&clientcmd.ConfigOverrides{
Expand All @@ -43,12 +69,12 @@
return nil, err
}

return NewKubeBackedServerFromREST(logger, cfg, namespace)
return NewKubeBackedServerFromREST(logger, cfg, namespace, opts...)

Check warning on line 72 in internal/server/kubernetes_api.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api.go#L72

Added line #L72 was not covered by tests
}

// NewKubeBackedServerFromREST returns a server that implements the Workflow
// server interface with the given Kubernetes rest client and namespace.
func NewKubeBackedServerFromREST(logger logr.Logger, config *rest.Config, namespace string) (*KubernetesBackedServer, error) {
func NewKubeBackedServerFromREST(logger logr.Logger, config *rest.Config, namespace string, opts ...Option) (*KubernetesBackedServer, error) {

Check warning on line 77 in internal/server/kubernetes_api.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api.go#L77

Added line #L77 was not covered by tests
clstr, err := cluster.New(config, func(opts *cluster.Options) {
opts.Scheme = controller.DefaultScheme()
opts.Logger = zapr.NewLogger(zap.NewNop())
Expand Down Expand Up @@ -79,22 +105,20 @@
}
}()

return &KubernetesBackedServer{
k := &KubernetesBackedServer{

Check warning on line 108 in internal/server/kubernetes_api.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api.go#L108

Added line #L108 was not covered by tests
logger: logger,
ClientFunc: clstr.GetClient,
nowFunc: time.Now,
}, nil
}

// KubernetesBackedServer is a server that implements a workflow API.
type KubernetesBackedServer struct {
logger logr.Logger
ClientFunc func() client.Client
namespace: namespace,

Check warning on line 112 in internal/server/kubernetes_api.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api.go#L112

Added line #L112 was not covered by tests
}
for _, opt := range opts {
opt(k)

Check warning on line 115 in internal/server/kubernetes_api.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api.go#L114-L115

Added lines #L114 - L115 were not covered by tests
}

nowFunc func() time.Time
return k, nil

Check warning on line 118 in internal/server/kubernetes_api.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api.go#L118

Added line #L118 was not covered by tests
}

// Register registers the service on the gRPC server.
func (s *KubernetesBackedServer) Register(server *grpc.Server) {
proto.RegisterWorkflowServiceServer(server, s)
func (k *KubernetesBackedServer) Register(server *grpc.Server) {
proto.RegisterWorkflowServiceServer(server, k)

Check warning on line 123 in internal/server/kubernetes_api.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api.go#L122-L123

Added lines #L122 - L123 were not covered by tests
}
59 changes: 41 additions & 18 deletions internal/server/kubernetes_api_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
}
}

func (s *KubernetesBackedServer) getCurrentAssignedNonTerminalWorkflowsForWorker(ctx context.Context, workerID string) ([]v1alpha1.Workflow, error) {
func (k *KubernetesBackedServer) getCurrentAssignedNonTerminalWorkflowsForWorker(ctx context.Context, workerID string) ([]v1alpha1.Workflow, error) {

Check warning on line 38 in internal/server/kubernetes_api_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api_workflow.go#L38

Added line #L38 was not covered by tests
stored := &v1alpha1.WorkflowList{}
err := s.ClientFunc().List(ctx, stored, &client.MatchingFields{
err := k.ClientFunc().List(ctx, stored, &client.MatchingFields{

Check warning on line 40 in internal/server/kubernetes_api_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api_workflow.go#L40

Added line #L40 was not covered by tests
workflowByNonTerminalState: workerID,
})
if err != nil {
Expand All @@ -53,27 +53,50 @@
return wfs, nil
}

func (s *KubernetesBackedServer) getWorkflowByName(ctx context.Context, workflowID string) (*v1alpha1.Workflow, error) {
func (k *KubernetesBackedServer) getWorkflowByName(ctx context.Context, workflowID string) (*v1alpha1.Workflow, error) {

Check warning on line 56 in internal/server/kubernetes_api_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api_workflow.go#L56

Added line #L56 was not covered by tests
workflowNamespace, workflowName, _ := strings.Cut(workflowID, "/")
wflw := &v1alpha1.Workflow{}
err := s.ClientFunc().Get(ctx, types.NamespacedName{Name: workflowName, Namespace: workflowNamespace}, wflw)
err := k.ClientFunc().Get(ctx, types.NamespacedName{Name: workflowName, Namespace: workflowNamespace}, wflw)

Check warning on line 59 in internal/server/kubernetes_api_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api_workflow.go#L59

Added line #L59 was not covered by tests
if err != nil {
s.logger.Error(err, "get client", "workflow", workflowID)
k.logger.Error(err, "get client", "workflow", workflowID)

Check warning on line 61 in internal/server/kubernetes_api_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api_workflow.go#L61

Added line #L61 was not covered by tests
return nil, err
}
return wflw, nil
}

// The following APIs are used by the worker.

func (s *KubernetesBackedServer) GetWorkflowContexts(req *proto.WorkflowContextRequest, stream proto.WorkflowService_GetWorkflowContextsServer) error {
func (k *KubernetesBackedServer) GetWorkflowContexts(req *proto.WorkflowContextRequest, stream proto.WorkflowService_GetWorkflowContextsServer) error {

Check warning on line 69 in internal/server/kubernetes_api_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api_workflow.go#L69

Added line #L69 was not covered by tests
if req.GetWorkerId() == "" {
return status.Errorf(codes.InvalidArgument, errInvalidWorkflowID)
}
wflows, err := s.getCurrentAssignedNonTerminalWorkflowsForWorker(stream.Context(), req.WorkerId)
wflows, err := k.getCurrentAssignedNonTerminalWorkflowsForWorker(stream.Context(), req.WorkerId)

Check warning on line 73 in internal/server/kubernetes_api_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api_workflow.go#L73

Added line #L73 was not covered by tests
if err != nil {
return err
}

ctx := context.TODO()
id := req.WorkerId
if k.AutoCapMode != AutoCapModeDisabled && len(wflows) == 0 && (k.AutoCapMode == AutoCapModeDiscovery || k.AutoCapMode == AutoCapModeEnrollment) && !k.hardwareObjectExists(ctx, id) {

Check warning on line 80 in internal/server/kubernetes_api_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api_workflow.go#L78-L80

Added lines #L78 - L80 were not covered by tests
// In the future, the worker could be signaled to send hardware device information to be used in creation of the Hardware object.
// or the proto.WorkflowContextRequest could be extended to include Hardware information.
if err := k.createHardwareObject(ctx, id); err != nil {
k.logger.Error(err, "failed to create hardware object")
return err

Check warning on line 85 in internal/server/kubernetes_api_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api_workflow.go#L83-L85

Added lines #L83 - L85 were not covered by tests
}

if k.AutoCapMode == AutoCapModeEnrollment {
if err := k.createWorkflowObject(ctx, id); err != nil {
k.logger.Error(err, "failed to create workflow object")
return err

Check warning on line 91 in internal/server/kubernetes_api_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api_workflow.go#L88-L91

Added lines #L88 - L91 were not covered by tests
}
wflows, err = k.getCurrentAssignedNonTerminalWorkflowsForWorker(stream.Context(), req.WorkerId)
if err != nil {
return err

Check warning on line 95 in internal/server/kubernetes_api_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api_workflow.go#L93-L95

Added lines #L93 - L95 were not covered by tests
}
}
}

for _, wf := range wflows {
if err := stream.Send(getWorkflowContext(wf)); err != nil {
return err
Expand All @@ -82,20 +105,20 @@
return nil
}

func (s *KubernetesBackedServer) GetWorkflowActions(ctx context.Context, req *proto.WorkflowActionsRequest) (*proto.WorkflowActionList, error) {
func (k *KubernetesBackedServer) GetWorkflowActions(ctx context.Context, req *proto.WorkflowActionsRequest) (*proto.WorkflowActionList, error) {

Check warning on line 108 in internal/server/kubernetes_api_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api_workflow.go#L108

Added line #L108 was not covered by tests
wfID := req.GetWorkflowId()
if wfID == "" {
return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID)
}
wf, err := s.getWorkflowByName(ctx, wfID)
wf, err := k.getWorkflowByName(ctx, wfID)

Check warning on line 113 in internal/server/kubernetes_api_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api_workflow.go#L113

Added line #L113 was not covered by tests
if err != nil {
return nil, err
}
return workflow.ActionListCRDToProto(wf), nil
}

// Modifies a workflow for a given workflowContext.
func (s *KubernetesBackedServer) modifyWorkflowState(wf *v1alpha1.Workflow, wfContext *proto.WorkflowContext) error {
func (k *KubernetesBackedServer) modifyWorkflowState(wf *v1alpha1.Workflow, wfContext *proto.WorkflowContext) error {
if wf == nil {
return errors.New("no workflow provided")
}
Expand Down Expand Up @@ -136,19 +159,19 @@
// Workflow is running, so set the start time to now
wf.Status.State = v1alpha1.WorkflowState(proto.State_name[int32(wfContext.CurrentActionState)])
wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt = func() *metav1.Time {
t := metav1.NewTime(s.nowFunc())
t := metav1.NewTime(k.nowFunc())
return &t
}()
case proto.State_STATE_FAILED, proto.State_STATE_TIMEOUT:
// Handle terminal statuses by updating the workflow state and time
wf.Status.State = v1alpha1.WorkflowState(proto.State_name[int32(wfContext.CurrentActionState)])
if wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt != nil {
wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(s.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds())
wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(k.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds())
}
case proto.State_STATE_SUCCESS:
// Handle a success by marking the task as complete
if wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt != nil {
wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(s.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds())
wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(k.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds())
}
// Mark success on last action success
if wfContext.CurrentActionIndex+1 == wfContext.TotalNumberOfActions {
Expand Down Expand Up @@ -183,15 +206,15 @@
return wfContext
}

func (s *KubernetesBackedServer) ReportActionStatus(ctx context.Context, req *proto.WorkflowActionStatus) (*proto.Empty, error) {
func (k *KubernetesBackedServer) ReportActionStatus(ctx context.Context, req *proto.WorkflowActionStatus) (*proto.Empty, error) {

Check warning on line 209 in internal/server/kubernetes_api_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api_workflow.go#L209

Added line #L209 was not covered by tests
err := validateActionStatusRequest(req)
if err != nil {
return nil, err
}
wfID := req.GetWorkflowId()
l := s.logger.WithValues("actionName", req.GetActionName(), "status", req.GetActionStatus(), "workflowID", req.GetWorkflowId(), "taskName", req.GetTaskName(), "worker", req.WorkerId)
l := k.logger.WithValues("actionName", req.GetActionName(), "status", req.GetActionStatus(), "workflowID", req.GetWorkflowId(), "taskName", req.GetTaskName(), "worker", req.WorkerId)

Check warning on line 215 in internal/server/kubernetes_api_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api_workflow.go#L215

Added line #L215 was not covered by tests

wf, err := s.getWorkflowByName(ctx, wfID)
wf, err := k.getWorkflowByName(ctx, wfID)

Check warning on line 217 in internal/server/kubernetes_api_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api_workflow.go#L217

Added line #L217 was not covered by tests
if err != nil {
l.Error(err, "get workflow")
return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID)
Expand All @@ -204,13 +227,13 @@
}

wfContext := getWorkflowContextForRequest(req, wf)
err = s.modifyWorkflowState(wf, wfContext)
err = k.modifyWorkflowState(wf, wfContext)

Check warning on line 230 in internal/server/kubernetes_api_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api_workflow.go#L230

Added line #L230 was not covered by tests
if err != nil {
l.Error(err, "modify workflow state")
return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID)
}
l.Info("updating workflow in Kubernetes")
err = s.ClientFunc().Status().Update(ctx, wf)
err = k.ClientFunc().Status().Update(ctx, wf)

Check warning on line 236 in internal/server/kubernetes_api_workflow.go

View check run for this annotation

Codecov / codecov/patch

internal/server/kubernetes_api_workflow.go#L236

Added line #L236 was not covered by tests
if err != nil {
l.Error(err, "applying update to workflow")
return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID)
Expand Down
Loading