Skip to content

Commit f8857bd

Browse files
committed
feat(opencurve#27): supports for deployment waiting
Signed-off-by: Anur Ijuokarukas <[email protected]>
1 parent ebde98e commit f8857bd

File tree

3 files changed

+134
-31
lines changed

3 files changed

+134
-31
lines changed

pkg/chunkserver/chunkserver.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/coreos/pkg/capnslog"
88
"github.com/pkg/errors"
9+
v1 "k8s.io/api/apps/v1"
910
"k8s.io/apimachinery/pkg/types"
1011

1112
curvev1 "github.com/opencurve/curve-operator/api/v1"
@@ -106,14 +107,17 @@ func (c *Cluster) Start(nodeNameIP map[string]string) error {
106107
logger.Info("create physical pool successed")
107108

108109
// 3. startChunkServers start all chunkservers for each device of every node
109-
err = c.startChunkServers()
110+
var chunkServers []*v1.Deployment
111+
chunkServers, err = c.startChunkServers()
110112
if err != nil {
111113
return errors.Wrap(err, "failed to start chunkserver")
112114
}
113115

114116
// 4. wait all chunkservers online before create logical pool
115117
logger.Info("starting all chunkserver")
116-
time.Sleep(30 * time.Second)
118+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
119+
defer cancel()
120+
k8sutil.WaitForDeploymentsToStart(ctx, c.context.Clientset, 3*time.Second, chunkServers...)
117121

118122
// 5. create logical pool
119123
_, err = c.runCreatePoolJob(nodeNameIP, "logical_pool")

pkg/chunkserver/spec.go

+34-29
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ import (
66

77
"github.com/pkg/errors"
88
apps "k8s.io/api/apps/v1"
9-
v1 "k8s.io/api/core/v1"
9+
v1 "k8s.io/api/apps/v1"
10+
corev1 "k8s.io/api/core/v1"
1011
kerrors "k8s.io/apimachinery/pkg/api/errors"
1112
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1213

@@ -15,20 +16,21 @@ import (
1516
)
1617

1718
// startChunkServers start all chunkservers for each device of every node
18-
func (c *Cluster) startChunkServers() error {
19+
func (c *Cluster) startChunkServers() ([]*v1.Deployment, error) {
20+
results := make([]*v1.Deployment, 0)
1921
if len(job2DeviceInfos) == 0 {
2022
logger.Errorf("no job to format device and provision chunk file")
21-
return nil
23+
return results, nil
2224
}
2325

2426
if len(chunkserverConfigs) == 0 {
2527
logger.Errorf("no device need to start chunkserver")
26-
return nil
28+
return results, nil
2729
}
2830

2931
if len(job2DeviceInfos) != len(chunkserverConfigs) {
3032
logger.Errorf("no device need to start chunkserver")
31-
return errors.New("failed to start chunkserver because of job numbers is not equal with chunkserver config")
33+
return results, errors.New("failed to start chunkserver because of job numbers is not equal with chunkserver config")
3234
}
3335

3436
_ = c.createStartCSConfigMap()
@@ -41,18 +43,19 @@ func (c *Cluster) startChunkServers() error {
4143

4244
err := c.createConfigMap(csConfig)
4345
if err != nil {
44-
return errors.Wrapf(err, "failed to create chunkserver configmap for %v", config.ChunkserverConfigMapName)
46+
return results, errors.Wrapf(err, "failed to create chunkserver configmap for %v",
47+
config.ChunkserverConfigMapName)
4548
}
4649

4750
d, err := c.makeDeployment(&csConfig)
4851
if err != nil {
49-
return errors.Wrap(err, "failed to create chunkserver Deployment")
52+
return results, errors.Wrap(err, "failed to create chunkserver Deployment")
5053
}
5154

5255
newDeployment, err := c.context.Clientset.AppsV1().Deployments(c.namespacedName.Namespace).Create(d)
5356
if err != nil {
5457
if !kerrors.IsAlreadyExists(err) {
55-
return errors.Wrapf(err, "failed to create chunkserver deployment %s", csConfig.ResourceName)
58+
return results, errors.Wrapf(err, "failed to create chunkserver deployment %s", csConfig.ResourceName)
5659
}
5760
logger.Infof("deployment for chunkserver %s already exists. updating if needed", csConfig.ResourceName)
5861

@@ -63,12 +66,11 @@ func (c *Cluster) startChunkServers() error {
6366
} else {
6467
logger.Infof("Deployment %s has been created , waiting for startup", newDeployment.GetName())
6568
// TODO:wait for the new deployment
66-
// deploymentsToWaitFor = append(deploymentsToWaitFor, newDeployment)
69+
results = append(results, newDeployment)
6770
}
6871
// update condition type and phase etc.
6972
}
70-
71-
return nil
73+
return results, nil
7274
}
7375

7476
// createCSClientConfigMap create cs_client configmap
@@ -88,14 +90,15 @@ func (c *Cluster) createCSClientConfigMap() error {
8890
// 3. replace ${} to specific parameters
8991
replacedCsClientData, err := config.ReplaceConfigVars(csClientCMData, &chunkserverConfigs[0])
9092
if err != nil {
91-
return errors.Wrap(err, "failed to Replace cs_client config template to generate a new cs_client configmap to start server.")
93+
return errors.Wrap(err,
94+
"failed to Replace cs_client config template to generate a new cs_client configmap to start server.")
9295
}
9396

9497
csClientConfigMap := map[string]string{
9598
config.CSClientConfigMapDataKey: replacedCsClientData,
9699
}
97100

98-
cm := &v1.ConfigMap{
101+
cm := &corev1.ConfigMap{
99102
ObjectMeta: metav1.ObjectMeta{
100103
Name: config.CSClientConfigMapName,
101104
Namespace: c.namespacedName.Namespace,
@@ -146,7 +149,7 @@ func (c *Cluster) CreateS3ConfigMap() error {
146149
config.S3ConfigMapDataKey: configMapData,
147150
}
148151

149-
cm := &v1.ConfigMap{
152+
cm := &corev1.ConfigMap{
150153
ObjectMeta: metav1.ObjectMeta{
151154
Name: config.S3ConfigMapName,
152155
Namespace: c.namespacedName.Namespace,
@@ -175,7 +178,7 @@ func (c *Cluster) createStartCSConfigMap() error {
175178
startChunkserverScriptFileDataKey: script.START,
176179
}
177180

178-
cm := &v1.ConfigMap{
181+
cm := &corev1.ConfigMap{
179182
ObjectMeta: metav1.ObjectMeta{
180183
Name: startChunkserverConfigMapName,
181184
Namespace: c.namespacedName.Namespace,
@@ -217,7 +220,8 @@ func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error {
217220
// 3. replace ${} to specific parameters
218221
replacedChunkServerData, err := config.ReplaceConfigVars(chunkserverData, &csConfig)
219222
if err != nil {
220-
return errors.Wrap(err, "failed to Replace chunkserver config template to generate a new chunkserver configmap to start server.")
223+
return errors.Wrap(err,
224+
"failed to Replace chunkserver config template to generate a new chunkserver configmap to start server.")
221225
}
222226

223227
// for debug
@@ -227,7 +231,7 @@ func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error {
227231
config.ChunkserverConfigMapDataKey: replacedChunkServerData,
228232
}
229233

230-
cm := &v1.ConfigMap{
234+
cm := &corev1.ConfigMap{
231235
ObjectMeta: metav1.ObjectMeta{
232236
Name: csConfig.CurrentConfigMapName,
233237
Namespace: c.namespacedName.Namespace,
@@ -237,7 +241,8 @@ func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error {
237241

238242
err = c.ownerInfo.SetControllerReference(cm)
239243
if err != nil {
240-
return errors.Wrapf(err, "failed to set owner reference to chunkserverconfig configmap %q", config.ChunkserverConfigMapName)
244+
return errors.Wrapf(err, "failed to set owner reference to chunkserverconfig configmap %q",
245+
config.ChunkserverConfigMapName)
241246
}
242247

243248
// Create chunkserver config in cluster
@@ -254,19 +259,19 @@ func (c *Cluster) makeDeployment(csConfig *chunkserverConfig) (*apps.Deployment,
254259
vols, _ := c.createTopoAndToolVolumeAndMount()
255260
volumes = append(volumes, vols...)
256261

257-
podSpec := v1.PodTemplateSpec{
262+
podSpec := corev1.PodTemplateSpec{
258263
ObjectMeta: metav1.ObjectMeta{
259264
Name: csConfig.ResourceName,
260265
Labels: c.getChunkServerPodLabels(csConfig),
261266
},
262-
Spec: v1.PodSpec{
263-
Containers: []v1.Container{
267+
Spec: corev1.PodSpec{
268+
Containers: []corev1.Container{
264269
c.makeCSDaemonContainer(csConfig),
265270
},
266271
NodeName: csConfig.NodeName,
267-
RestartPolicy: v1.RestartPolicyAlways,
272+
RestartPolicy: corev1.RestartPolicyAlways,
268273
HostNetwork: true,
269-
DNSPolicy: v1.DNSClusterFirstWithHostNet,
274+
DNSPolicy: corev1.DNSClusterFirstWithHostNet,
270275
Volumes: volumes,
271276
},
272277
}
@@ -301,7 +306,7 @@ func (c *Cluster) makeDeployment(csConfig *chunkserverConfig) (*apps.Deployment,
301306
}
302307

303308
// makeCSDaemonContainer create chunkserver container
304-
func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) v1.Container {
309+
func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) corev1.Container {
305310

306311
privileged := true
307312
runAsUser := int64(0)
@@ -321,7 +326,7 @@ func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) v1.Containe
321326
argsChunkserverPort := strconv.Itoa(csConfig.Port)
322327
argsConfigFileMountPath := path.Join(config.ChunkserverConfigMapMountPathDir, config.ChunkserverConfigMapDataKey)
323328

324-
container := v1.Container{
329+
container := corev1.Container{
325330
Name: "chunkserver",
326331
Command: []string{
327332
"/bin/bash",
@@ -339,16 +344,16 @@ func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) v1.Containe
339344
Image: c.spec.CurveVersion.Image,
340345
ImagePullPolicy: c.spec.CurveVersion.ImagePullPolicy,
341346
VolumeMounts: volMounts,
342-
Ports: []v1.ContainerPort{
347+
Ports: []corev1.ContainerPort{
343348
{
344349
Name: "listen-port",
345350
ContainerPort: int32(csConfig.Port),
346351
HostPort: int32(csConfig.Port),
347-
Protocol: v1.ProtocolTCP,
352+
Protocol: corev1.ProtocolTCP,
348353
},
349354
},
350-
Env: []v1.EnvVar{{Name: "TZ", Value: "Asia/Hangzhou"}},
351-
SecurityContext: &v1.SecurityContext{
355+
Env: []corev1.EnvVar{{Name: "TZ", Value: "Asia/Hangzhou"}},
356+
SecurityContext: &corev1.SecurityContext{
352357
Privileged: &privileged,
353358
RunAsUser: &runAsUser,
354359
RunAsNonRoot: &runAsNonRoot,

pkg/k8sutil/deployment.go

+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package k8sutil
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
v1 "k8s.io/api/apps/v1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/client-go/kubernetes"
10+
)
11+
12+
// WaitForDeploymentsToStart waits for the deployments to start, and returns a channel to indicate whether
13+
// all deployments are started or not
14+
//
15+
// tickDuration is the interval to check the deployment status
16+
// objectMeta is the metadata of the deployment
17+
//
18+
// we use the hub chan to collect the result of each deployment, and when all deployments are started,
19+
// we return true, otherwise, we return false, this design let WaitForDeploymentToStart and
20+
// WaitForDeploymentsToStart can be used in the same way
21+
func WaitForDeploymentsToStart(ctx context.Context, clientSet kubernetes.Interface, tickDuration time.Duration,
22+
objectMetas ...*v1.Deployment) chan bool {
23+
length := len(objectMetas)
24+
hub := make(chan bool, length)
25+
defer close(hub)
26+
for i := range objectMetas {
27+
objectMata := objectMetas[i]
28+
go func() {
29+
if succeed := <-WaitForDeploymentToStart(ctx, clientSet, tickDuration, objectMata); !succeed {
30+
hub <- false
31+
return
32+
}
33+
}()
34+
}
35+
36+
chn := make(chan bool)
37+
go func() {
38+
defer close(chn)
39+
for i := 0; i < length; i++ {
40+
if succeed := <-hub; !succeed {
41+
chn <- false
42+
return
43+
}
44+
}
45+
chn <- true
46+
return
47+
}()
48+
return chn
49+
}
50+
51+
// WaitForDeploymentToStart waits for the deployment to start, and returns a channel to indicate whether
52+
// the deployment is started or not
53+
//
54+
// tickDuration is the interval to check the deployment status
55+
// objectMeta is the metadata of the deployment
56+
func WaitForDeploymentToStart(ctx context.Context, clientSet kubernetes.Interface, tickDuration time.Duration,
57+
objectMeta *v1.Deployment) chan bool {
58+
ticker := time.NewTicker(tickDuration)
59+
defer ticker.Stop()
60+
61+
chn := make(chan bool)
62+
go func() {
63+
defer close(chn)
64+
for {
65+
select {
66+
case <-ticker.C:
67+
deployment, err := clientSet.AppsV1().Deployments(objectMeta.GetNamespace()).Get(objectMeta.GetName(),
68+
metav1.GetOptions{})
69+
logger.Infof("waiting for deployment %s starting", deployment.Name)
70+
if err != nil {
71+
72+
// TODO: return the failed reason is required??
73+
logger.Errorf("failed to get deployment %s in cluster", objectMeta.GetName())
74+
chn <- false
75+
return
76+
}
77+
if deployment.Status.ObservedGeneration != deployment.Status.ObservedGeneration &&
78+
deployment.Status.UpdatedReplicas > 0 &&
79+
deployment.Status.ReadyReplicas > 0 {
80+
logger.Infof("deployment %s has been started", deployment.Name)
81+
chn <- true
82+
return
83+
}
84+
85+
// TODO: should log the unready reason, e.g. conditions, etc. to help debugging??
86+
case <-ctx.Done():
87+
chn <- false
88+
logger.Infof("stop waiting for deployment %s to start due to context is done", objectMeta.GetName())
89+
return
90+
}
91+
}
92+
}()
93+
return chn
94+
}

0 commit comments

Comments
 (0)