Skip to content

Commit 0869334

Browse files
committed
Added error handling and logging
1 parent b420be0 commit 0869334

File tree

14 files changed

+268
-112
lines changed

14 files changed

+268
-112
lines changed

cmd/kar-controllers/app/options/options.go

+8
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"os"
2222
"strconv"
2323
"strings"
24+
25+
"k8s.io/klog/v2"
2426
)
2527

2628
// ServerOption is the main context object for the controller manager.
@@ -96,6 +98,8 @@ func (s *ServerOption) loadDefaultsFromEnvVars() {
9698
backoffInt, err := strconv.Atoi(backoffString)
9799
if err == nil {
98100
s.BackoffTime = backoffInt
101+
} else {
102+
klog.Errorf("[loadDefaultsFromEnvVars] unable to parse int, - error: %#v", err)
99103
}
100104
}
101105

@@ -105,6 +109,8 @@ func (s *ServerOption) loadDefaultsFromEnvVars() {
105109
holInt, err := strconv.Atoi(holString)
106110
if err == nil {
107111
s.HeadOfLineHoldingTime = holInt
112+
} else {
113+
klog.Errorf("[loadDefaultsFromEnvVars] unable to parse int, - error: %#v", err)
108114
}
109115
}
110116

@@ -126,6 +132,8 @@ func (s *ServerOption) loadDefaultsFromEnvVars() {
126132
to, err := strconv.ParseInt(dispatchResourceReservationTimeoutString, 10, 64)
127133
if err == nil {
128134
s.DispatchResourceReservationTimeout = to
135+
} else {
136+
klog.Errorf("[loadDefaultsFromEnvVars] unable to parse int, - error: %#v", err)
129137
}
130138
}
131139
}

cmd/kar-controllers/app/server.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package app
1818

1919
import (
20+
"fmt"
2021
"net/http"
2122
"strings"
2223

@@ -42,7 +43,7 @@ func buildConfig(master, kubeconfig string) (*rest.Config, error) {
4243
func Run(opt *options.ServerOption) error {
4344
restConfig, err := buildConfig(opt.Master, opt.Kubeconfig)
4445
if err != nil {
45-
return err
46+
return fmt.Errorf("[Run] unable to build server config, - error: %#v", err)
4647
}
4748

4849
neverStop := make(chan struct{})
@@ -71,7 +72,8 @@ func Run(opt *options.ServerOption) error {
7172
// This call is blocking (unless an error occurs) which equates to <-neverStop
7273
err = listenHealthProbe(opt)
7374
if err != nil {
74-
return err
75+
return fmt.Errorf("[Run] unable to start health probe listener, - error: %#v", err)
76+
7577
}
7678

7779
return nil
@@ -83,7 +85,7 @@ func listenHealthProbe(opt *options.ServerOption) error {
8385
handler.Handle("/healthz", &health.Handler{})
8486
err := http.ListenAndServe(opt.HealthProbeListenAddr, handler)
8587
if err != nil {
86-
return err
88+
return fmt.Errorf("[listenHealthProbe] unable to listen and serve, - error: %#v", err)
8789
}
8890

8991
return nil

pkg/apis/controller/utils/utils.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@ import (
2121
"k8s.io/apimachinery/pkg/api/meta"
2222
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2323
"k8s.io/apimachinery/pkg/types"
24+
"k8s.io/klog/v2"
2425
)
2526

2627
func GetController(obj interface{}) types.UID {
2728
accessor, err := meta.Accessor(obj)
2829
if err != nil {
30+
klog.Errorf("[GetController] unable to return object as minimum required fields are missing, - error: %#v", err)
2931
return ""
3032
}
3133

@@ -37,10 +39,10 @@ func GetController(obj interface{}) types.UID {
3739
return ""
3840
}
3941

40-
4142
func GetJobID(pod *v1.Pod) types.UID {
4243
accessor, err := meta.Accessor(pod)
4344
if err != nil {
45+
klog.Errorf("[GetJobID] unable to return object as minimum required fields are missing, - error: %#v", err)
4446
return ""
4547
}
4648

pkg/controller/queuejob/queuejob_controller_ex.go

+116-73
Large diffs are not rendered by default.

pkg/controller/queuejob/scheduling_queue.go

+37-10
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,10 @@ func (p *PriorityQueue) Length() int {
129129
func (p *PriorityQueue) IfExist(qj *qjobv1.AppWrapper) bool {
130130
p.lock.Lock()
131131
defer p.lock.Unlock()
132-
_, exists, _ := p.activeQ.Get(qj)
132+
_, exists, err := p.activeQ.Get(qj)
133+
if err != nil {
134+
klog.Errorf("[IfExist] unable to check if app wrapper exists, - error:%#v", err)
135+
}
133136
if p.unschedulableQ.Get(qj) != nil || exists {
134137
return true
135138
}
@@ -140,7 +143,10 @@ func (p *PriorityQueue) IfExist(qj *qjobv1.AppWrapper) bool {
140143
func (p *PriorityQueue) IfExistActiveQ(qj *qjobv1.AppWrapper) bool {
141144
p.lock.Lock()
142145
defer p.lock.Unlock()
143-
_, exists, _ := p.activeQ.Get(qj)
146+
_, exists, err := p.activeQ.Get(qj)
147+
if err != nil {
148+
klog.Errorf("[IfExistActiveQ] unable to check if app wrapper exists, - error:%#v", err)
149+
}
144150
return exists
145151
}
146152

@@ -196,12 +202,15 @@ func (p *PriorityQueue) AddIfNotPresent(qj *qjobv1.AppWrapper) error {
196202
if p.unschedulableQ.Get(qj) != nil {
197203
return nil
198204
}
199-
if _, exists, _ := p.activeQ.Get(qj); exists {
205+
if _, exists, err := p.activeQ.Get(qj); exists {
206+
if err != nil {
207+
klog.Errorf("[AddIfNotPresent] unable to check if pod exists, - error:%#v", err)
208+
}
200209
return nil
201210
}
202211
err := p.activeQ.Add(qj)
203212
if err != nil {
204-
klog.Errorf("Error adding pod %s/%s to the scheduling queue: %v", qj.Namespace, qj.Name, err)
213+
klog.Errorf("[AddIfNotPresent] Error adding pod %s/%s to the scheduling queue, - error:%#v", qj.Namespace, qj.Name, err)
205214
} else {
206215
p.cond.Broadcast()
207216
}
@@ -218,7 +227,10 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(qj *qjobv1.AppWrapper) erro
218227
if p.unschedulableQ.Get(qj) != nil {
219228
return fmt.Errorf("pod is already present in unschedulableQ")
220229
}
221-
if _, exists, _ := p.activeQ.Get(qj); exists {
230+
if _, exists, err := p.activeQ.Get(qj); exists {
231+
if err != nil {
232+
klog.Errorf("[AddUnschedulableIfNotPresent] unable to check if pod exists, - error:%#v", err)
233+
}
222234
return fmt.Errorf("pod is already present in the activeQ")
223235
}
224236
// if !p.receivedMoveRequest && isPodUnschedulable(qj) {
@@ -227,7 +239,9 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(qj *qjobv1.AppWrapper) erro
227239
return nil
228240
}
229241
err := p.activeQ.Add(qj)
230-
if err == nil {
242+
if err != nil {
243+
klog.Errorf("[AddUnschedulableIfNotPresent] Error adding QJ %s/%s to the scheduling queue: %v", qj.Namespace, qj.Name, err)
244+
} else {
231245
p.cond.Broadcast()
232246
}
233247
return err
@@ -271,16 +285,24 @@ func (p *PriorityQueue) Update(oldQJ, newQJ *qjobv1.AppWrapper) error {
271285
p.lock.Lock()
272286
defer p.lock.Unlock()
273287
// If the pod is already in the active queue, just update it there.
274-
if _, exists, _ := p.activeQ.Get(newQJ); exists {
288+
if _, exists, errp := p.activeQ.Get(newQJ); exists {
289+
if errp != nil {
290+
klog.Errorf("[Update] unable to check if pod exists, - error:%#v", errp)
291+
}
275292
err := p.activeQ.Update(newQJ)
293+
if err != nil {
294+
klog.Errorf("[Update] unable to update pod, - error: %#v", err)
295+
}
276296
return err
277297
}
278298
// If the pod is in the unschedulable queue, updating it may make it schedulable.
279299
if usQJ := p.unschedulableQ.Get(newQJ); usQJ != nil {
280300
if p.isQJUpdated(oldQJ, newQJ) {
281301
p.unschedulableQ.Delete(usQJ)
282302
err := p.activeQ.Add(newQJ)
283-
if err == nil {
303+
if err != nil {
304+
klog.Errorf("Error adding QJ %s/%s to the scheduling queue: %v", newQJ.Namespace, newQJ.Name, err)
305+
} else {
284306
p.cond.Broadcast()
285307
}
286308
return err
@@ -290,7 +312,9 @@ func (p *PriorityQueue) Update(oldQJ, newQJ *qjobv1.AppWrapper) error {
290312
}
291313
// If pod is not in any of the two queue, we put it in the active queue.
292314
err := p.activeQ.Add(newQJ)
293-
if err == nil {
315+
if err != nil {
316+
klog.Errorf("Error adding QJ %s/%s to the scheduling queue: %v", newQJ.Namespace, newQJ.Name, err)
317+
} else {
294318
p.cond.Broadcast()
295319
}
296320
return err
@@ -303,7 +327,10 @@ func (p *PriorityQueue) Delete(qj *qjobv1.AppWrapper) error {
303327
p.lock.Lock()
304328
defer p.lock.Unlock()
305329
p.unschedulableQ.Delete(qj)
306-
if _, exists, _ := p.activeQ.Get(qj); exists {
330+
if _, exists, err := p.activeQ.Get(qj); exists {
331+
if err != nil {
332+
klog.Errorf("[Delete] unable to check if pod exists - error: %#v", err)
333+
}
307334
return p.activeQ.Delete(qj)
308335
}
309336
// p.unschedulableQ.Delete(qj)

pkg/controller/queuejobdispatch/queuejobagent.go

+16-4
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,10 @@ func NewJobClusterAgent(config string, agentEventQueue *cache.FIFO) *JobClusterA
118118

119119
qa.jobSynced = qa.jobInformer.Informer().HasSynced
120120

121-
qa.UpdateAggrResources(context.Background())
121+
err = qa.UpdateAggrResources(context.Background())
122+
if err != nil {
123+
klog.Errorf("[NewJobClusterAgent] Unable to update aggr resources - error: %#v", err)
124+
}
122125

123126
return qa
124127
}
@@ -161,7 +164,10 @@ func (qa *JobClusterAgent) Run(stopCh <-chan struct{}) {
161164
func (qa *JobClusterAgent) DeleteJob(ctx context.Context, cqj *arbv1.AppWrapper) {
162165
qj_temp := cqj.DeepCopy()
163166
klog.V(2).Infof("[Dispatcher: Agent] Request deletion of XQJ %s/%s to Agent %s\n", qj_temp.Namespace, qj_temp.Name, qa.AgentId)
164-
qa.queuejobclients.WorkloadV1beta1().AppWrappers(qj_temp.Namespace).Delete(ctx, qj_temp.Name, metav1.DeleteOptions{})
167+
err := qa.queuejobclients.WorkloadV1beta1().AppWrappers(qj_temp.Namespace).Delete(ctx, qj_temp.Name, metav1.DeleteOptions{})
168+
if err != nil {
169+
klog.Errorf("[DeleteJob] Unable to delete app wrapper, - error: %#v", err)
170+
}
165171
}
166172

167173
func (qa *JobClusterAgent) CreateJob(ctx context.Context, cqj *arbv1.AppWrapper) {
@@ -183,7 +189,10 @@ func (qa *JobClusterAgent) CreateJob(ctx context.Context, cqj *arbv1.AppWrapper)
183189
agent_qj.Labels["IsDispatched"] = "true"
184190

185191
klog.V(2).Infof("[Dispatcher: Agent] Create XQJ: %s/%s (Status: %+v) in Agent %s\n", agent_qj.Namespace, agent_qj.Name, agent_qj.Status, qa.AgentId)
186-
qa.queuejobclients.WorkloadV1beta1().AppWrappers(agent_qj.Namespace).Create(ctx, agent_qj, metav1.CreateOptions{})
192+
_, err := qa.queuejobclients.WorkloadV1beta1().AppWrappers(agent_qj.Namespace).Create(ctx, agent_qj, metav1.CreateOptions{})
193+
if err != nil {
194+
klog.Errorf("[CreateJob] Unable to create app wrapper, - error: %#v", err)
195+
}
187196
}
188197

189198
type ClusterMetricsList struct {
@@ -228,7 +237,10 @@ func (qa *JobClusterAgent) UpdateAggrResources(ctx context.Context) error {
228237
clusterMetricType := res.Items[i].MetricLabels["cluster"]
229238

230239
if strings.Compare(clusterMetricType, "cpu") == 0 || strings.Compare(clusterMetricType, "memory") == 0 {
231-
val, units, _ := getFloatString(res.Items[i].Value)
240+
val, units, err := getFloatString(res.Items[i].Value)
241+
if err != nil {
242+
klog.Errorf("[Dispatcher: UpdateAggrResources] Possible issue getting float string - error: %#v", err)
243+
}
232244
num, err := strconv.ParseFloat(val, 64)
233245
if err != nil {
234246
klog.Warningf("[Dispatcher: UpdateAggrResources] Possible issue converting %s string value of %s due to error: %v\n",

0 commit comments

Comments
 (0)