Skip to content

Commit c085cb5

Browse files
authored
refactor: Adjust CoPilot init and Pod status logic (#6523)
Signed-off-by: Kevin Su <[email protected]>
1 parent e3cabef commit c085cb5

File tree

3 files changed

+15
-3
lines changed

3 files changed

+15
-3
lines changed

flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,8 @@ func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilot
271271
if err != nil {
272272
return primaryInitContainerName, err
273273
}
274-
coPilotPod.InitContainers = append(coPilotPod.InitContainers, sidecar)
274+
// Let the sidecar container start before the downloader; it will ensure the signal watcher is started before the main container finishes.
275+
coPilotPod.InitContainers = append([]v1.Container{sidecar}, coPilotPod.InitContainers...)
275276

276277
coPilotPod.TerminationGracePeriodSeconds = (*int64)(&cfg.Timeout.Duration)
277278
}

flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,8 @@ func assertContainerHasVolumeMounts(t *testing.T, cfg config.FlyteCoPilotConfig,
228228
}
229229

230230
func assertPodHasCoPilot(t *testing.T, cfg config.FlyteCoPilotConfig, pilot *core.DataLoadingConfig, iFace *core.TypedInterface, pod *v1.PodSpec) {
231-
for _, c := range pod.Containers {
231+
containers := append(pod.Containers, pod.InitContainers...)
232+
for _, c := range containers {
232233
if c.Name == "test" {
233234
cntr := c
234235
assertContainerHasVolumeMounts(t, cfg, pilot, iFace, &cntr)
@@ -510,6 +511,8 @@ func TestAddCoPilotToPod(t *testing.T) {
510511
primaryInitContainerName, err := AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot)
511512
assert.NoError(t, err)
512513
assert.Equal(t, "test-downloader", primaryInitContainerName)
514+
assert.Equal(t, pod.InitContainers[0].Name, cfg.NamePrefix+flyteSidecarContainerName)
515+
assert.Equal(t, pod.InitContainers[1].Name, cfg.NamePrefix+flyteDownloaderContainerName)
513516
assertPodHasCoPilot(t, cfg, pilot, iface, &pod)
514517
})
515518

flyteplugins/go/tasks/plugins/k8s/pod/plugin.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ func DemystifyPodStatus(ctx context.Context, pod *v1.Pod, info pluginsCore.TaskI
208208
// DO NOTHING
209209
default:
210210
if !primaryContainerExists {
211-
// if all of the containers in the Pod are complete, as an optimization, we can declare the task as
211+
// if all of the containers and sidecars in the Pod are complete, as an optimization, we can declare the task as
212212
// succeeded rather than waiting for the Pod to be marked completed.
213213
allSuccessfullyTerminated := len(pod.Status.ContainerStatuses) > 0
214214
for _, s := range pod.Status.ContainerStatuses {
@@ -217,6 +217,14 @@ func DemystifyPodStatus(ctx context.Context, pod *v1.Pod, info pluginsCore.TaskI
217217
}
218218
}
219219

220+
// Init container will become sidecar if the restart policy is set to Always
221+
// https://kubernetes.io/docs/concepts/workloads/pods/sidecar-containers/#sidecar-containers-and-pod-lifecycle
222+
for _, s := range pod.Status.InitContainerStatuses {
223+
if s.State.Waiting != nil || s.State.Running != nil || (s.State.Terminated != nil && s.State.Terminated.ExitCode != 0) {
224+
allSuccessfullyTerminated = false
225+
}
226+
}
227+
220228
if allSuccessfullyTerminated {
221229
return flytek8s.DemystifySuccess(pod.Status, info)
222230
}

0 commit comments

Comments
 (0)