Skip to content

Commit 6785f50

Browse files
authored
Initial error handling for deployer (#213)
This pr adds initial error handling for the deployer. Failures at the controller while sending events to the deployer and at the deployer while adding or revoking resources can be handled. Propagating the error back from deployer to controller will be a part of another pr.
1 parent b1b2bee commit 6785f50

File tree

4 files changed

+52
-28
lines changed

4 files changed

+52
-28
lines changed

cmd/controller/app/job/default_handler.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -184,23 +184,26 @@ func (h *DefaultHandler) doHandle(event *JobEvent) {
184184
}
185185

186186
func (h *DefaultHandler) cleanup() {
187-
h.notifyDeploy(pbNotify.DeployEventType_REVOKE_RESOURCE)
188-
zap.S().Infof("Invoking notifyDeploy - revoke resource - from cleanup()")
189-
// TODO use DeployResponse returned and verify that the deployment notification was acted upon correctly. Handle failures
190-
191-
h.tskWatchCancelFn()
187+
err := h.notifyDeploy(pbNotify.DeployEventType_REVOKE_RESOURCE)
188+
zap.S().Infof("Invoked notifyDeploy - revoke resource for jobId %s", h.jobId)
189+
if err != nil {
190+
zap.S().Errorf("notifyDeploy could not notify all deployers to revoke resources, err: %v", err)
191+
} else {
192+
h.tskWatchCancelFn()
193+
}
192194
}
193195

194196
func (h *DefaultHandler) ChangeState(state JobHandlerState) {
195197
h.state = state
196198
}
197199

198200
func (h *DefaultHandler) allocateComputes() error {
199-
// Placeholder invocation for new deployer
200-
h.notifyDeploy(pbNotify.DeployEventType_ADD_RESOURCE)
201-
zap.S().Infof("Invoking notifyDeploy - add resource - from allocateComputes()")
202-
// TODO use DeployResponse returned and verify that the deployment notification was acted upon correctly. Handle failures
203-
201+
err := h.notifyDeploy(pbNotify.DeployEventType_ADD_RESOURCE)
202+
zap.S().Infof("Invoked notifyDeploy - add resource for jobId %s", h.jobId)
203+
if err != nil {
204+
zap.S().Errorf("notifyDeploy could not notify all deployers to allocate resources, err: %v", err)
205+
return err
206+
}
204207
return nil
205208
}
206209

@@ -251,7 +254,7 @@ func (h *DefaultHandler) notifyDeploy(evtType pbNotify.DeployEventType) error {
251254

252255
resp, err := newNotifyClient(h.notifier, h.bInsecure, h.bPlain).sendDeployNotification(req)
253256
if err != nil {
254-
return fmt.Errorf("failed to notify for deployment: %v", err)
257+
return fmt.Errorf("failed to notify for deployment: %v, failed deployers: %v, status: %v", err, resp.FailedDeployers, resp.Status)
255258
}
256259

257260
zap.S().Infof("response status from notifyDeploy = %s", resp.Status.String())

cmd/controller/app/job/notify.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,9 @@ func (nc *notifyClient) sendDeployNotification(req *pbNotify.DeployEventRequest)
9393
response, err := trClient.NotifyDeploy(context.Background(), req)
9494

9595
if err != nil {
96-
errMsg := fmt.Sprintf("sendDeployNotification failed: %v", err)
96+
errMsg := fmt.Sprintf("sendDeployNotification failed with response: %v, err: %v", response, err)
9797
zap.S().Warn(errMsg)
98-
return nil, fmt.Errorf(errMsg)
98+
return response, fmt.Errorf(errMsg)
9999
}
100100

101101
return response, nil

cmd/deployer/app/resource_handler.go

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -159,44 +159,64 @@ func (r *resourceHandler) do() {
159159
break
160160
}
161161

162-
r.dealWith(resp)
162+
err = r.dealWith(resp)
163+
if err != nil {
164+
zap.S().Errorf("Failed to dealWith task: %v", err)
165+
}
163166
}
164167

165168
zap.S().Info("Disconnected from notifier")
166169
}
167170

168-
func (r *resourceHandler) dealWith(in *pbNotify.DeployEvent) {
171+
func (r *resourceHandler) dealWith(in *pbNotify.DeployEvent) error {
169172
switch in.GetType() {
170173
case pbNotify.DeployEventType_ADD_RESOURCE:
171-
r.addResource(in.JobId)
174+
err := r.addResource(in.JobId)
175+
if err != nil {
176+
// TODO propagate the error back to the control plane
177+
errMsg := fmt.Sprintf("deploy event addResource failed with err: %v", err)
178+
zap.S().Errorf(errMsg)
179+
return fmt.Errorf(errMsg)
180+
}
172181

173182
case pbNotify.DeployEventType_REVOKE_RESOURCE:
174-
r.revokeResource(in.JobId)
183+
err := r.revokeResource(in.JobId)
184+
if err != nil {
185+
// TODO propagate the error back to the control plane
186+
errMsg := fmt.Sprintf("deploy event revokeResource failed with err: %v", err)
187+
zap.S().Errorf(errMsg)
188+
return fmt.Errorf(errMsg)
189+
}
175190

176191
case pbNotify.DeployEventType_UNKNOWN_DEPLOYMENT_TYPE:
177192
fallthrough
178193
default:
179-
zap.S().Errorf("Invalid message type: %s", in.GetType())
194+
errMsg := fmt.Sprintf("invalid message type: %s", in.GetType())
195+
zap.S().Errorf(errMsg)
196+
return fmt.Errorf(errMsg)
180197
}
198+
return nil
181199
}
182200

183-
func (r *resourceHandler) addResource(jobId string) {
201+
func (r *resourceHandler) addResource(jobId string) error {
184202
zap.S().Infof("Received add resource request for job %s", jobId)
185203

186204
// Sending request to apiserver to get deployment config for specific jobId and computeId
187205
deploymentConfig, err := r.getDeploymentConfig(jobId)
188206
// TODO update deployment status to computeCollection in DB via restapi
189207
if err != nil {
190-
fmt.Printf("Failed to get deploymentConfig for job %s: %v\n", jobId, err)
208+
zap.S().Errorf("Failed to get deploymentConfig for job %s: %v\n", jobId, err)
191209
}
192210
zap.S().Infof("Got deployment config from apiserver: %v", deploymentConfig)
193211

194212
// Deploy resources (agents) for the job based on the configuration
195213
err = r.deployResources(deploymentConfig)
196214
if err != nil {
197-
fmt.Printf("Failed to deploy resources for job %s: %v\n", jobId, err)
215+
zap.S().Errorf("Failed to deploy resources for job %s: %v\n", jobId, err)
216+
return err
198217
}
199218
zap.S().Infof("Successfully added resources for compute %s and jobId %s", r.spec.ComputeId, jobId)
219+
return nil
200220
}
201221

202222
func (r *resourceHandler) revokeResource(jobId string) error {
@@ -206,6 +226,7 @@ func (r *resourceHandler) revokeResource(jobId string) error {
206226
if r.dplyr != nil {
207227
if err := r.dplyr.Uninstall("job-" + jobId + "-" + r.spec.ComputeId); err != nil {
208228
zap.S().Warnf("failed to release resources for job %s: %v", jobId, err)
229+
return err
209230
}
210231
}
211232

@@ -245,7 +266,6 @@ func (r *resourceHandler) deployResources(deploymentConfig openapi.DeploymentCon
245266
targetTemplateDirPath := filepath.Join(deploymentChartPath, deploymentTemplateDir)
246267
if err := os.MkdirAll(targetTemplateDirPath, util.FilePerm0644); err != nil {
247268
errMsg := fmt.Sprintf("failed to create a deployment template folder: %v", err)
248-
zap.S().Debugf(errMsg)
249269
return fmt.Errorf(errMsg)
250270
}
251271

@@ -255,7 +275,6 @@ func (r *resourceHandler) deployResources(deploymentConfig openapi.DeploymentCon
255275
dstFilePath := filepath.Join(deploymentChartPath, chartFile)
256276
if err := util.CopyFile(srcFilePath, dstFilePath); err != nil {
257277
errMsg := fmt.Sprintf("failed to copy a deployment chart file %s: %v", chartFile, err)
258-
zap.S().Debugf(errMsg)
259278
return fmt.Errorf(errMsg)
260279
}
261280
}
@@ -271,7 +290,6 @@ func (r *resourceHandler) deployResources(deploymentConfig openapi.DeploymentCon
271290
rendered, err := mustache.RenderFile(jobTemplatePath, &context)
272291
if err != nil {
273292
errMsg := fmt.Sprintf("failed to render a template for task %s: %v", taskId, err)
274-
zap.S().Debugf(errMsg)
275293
return fmt.Errorf(errMsg)
276294
}
277295

@@ -280,28 +298,24 @@ func (r *resourceHandler) deployResources(deploymentConfig openapi.DeploymentCon
280298
err = os.WriteFile(deploymentFilePath, []byte(rendered), util.FilePerm0644)
281299
if err != nil {
282300
errMsg := fmt.Sprintf("failed to write a job rosource spec %s: %v", taskId, err)
283-
zap.S().Debugf(errMsg)
284301
return fmt.Errorf(errMsg)
285302
}
286303
}
287304

288305
dplyr, err := deployer.NewDeployer(r.platform)
289306
if err != nil {
290307
errMsg := fmt.Sprintf("failed to obtain a job deployer: %v", err)
291-
zap.S().Debugf(errMsg)
292308
return fmt.Errorf(errMsg)
293309
}
294310

295311
if err = dplyr.Initialize("", r.namespace); err != nil {
296312
errMsg := fmt.Sprintf("failed to initialize a job deployer: %v", err)
297-
zap.S().Debugf(errMsg)
298313
return fmt.Errorf(errMsg)
299314
}
300315

301316
err = dplyr.Install("job-"+deploymentConfig.JobId+"-"+r.spec.ComputeId, deploymentChartPath)
302317
if err != nil {
303318
errMsg := fmt.Sprintf("failed to deploy tasks: %v", err)
304-
zap.S().Debugf(errMsg)
305319
return fmt.Errorf(errMsg)
306320
}
307321

cmd/notifier/app/triggerroute.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,15 +110,22 @@ func (s *notificationServer) NotifyDeploy(ctx context.Context, in *pbNotify.Depl
110110
FailedDeployers: failedDeployers,
111111
}
112112

113+
raiseError := false
114+
113115
if len(in.ComputeIds) > 0 && len(failedDeployers) == len(in.ComputeIds) {
114116
resp.Message = "Failed to issue deployment instructions for all deployers"
115117
resp.Status = pbNotify.DeployResponse_ERROR
118+
raiseError = true
116119
} else if len(failedDeployers) > 0 && len(failedDeployers) < len(in.ComputeIds) {
117120
resp.Message = "Issued deployment instructions for some deployers successfully"
118121
resp.Status = pbNotify.DeployResponse_PARTIAL_SUCCESS
122+
raiseError = true
119123
}
120124

121125
zap.S().Info(resp.Message)
126+
if raiseError {
127+
return resp, fmt.Errorf(resp.Message)
128+
}
122129

123130
return resp, nil
124131
}

0 commit comments

Comments
 (0)