Skip to content

Commit 06bc319

Browse files
committed
feat(opencurve#27): supports for deployment waiting
Signed-off-by: Anur Ijuokarukas <[email protected]>
1 parent ebde98e commit 06bc319

File tree

3 files changed

+144
-36
lines changed

3 files changed

+144
-36
lines changed

pkg/chunkserver/chunkserver.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/coreos/pkg/capnslog"
88
"github.com/pkg/errors"
9+
v1 "k8s.io/api/apps/v1"
910
"k8s.io/apimachinery/pkg/types"
1011

1112
curvev1 "github.com/opencurve/curve-operator/api/v1"
@@ -106,14 +107,17 @@ func (c *Cluster) Start(nodeNameIP map[string]string) error {
106107
logger.Info("create physical pool successed")
107108

108109
// 3. startChunkServers start all chunkservers for each device of every node
109-
err = c.startChunkServers()
110+
var chunkServers []*v1.Deployment
111+
chunkServers, err = c.startChunkServers()
110112
if err != nil {
111113
return errors.Wrap(err, "failed to start chunkserver")
112114
}
113115

114116
// 4. wait all chunkservers online before create logical pool
115117
logger.Info("starting all chunkserver")
116-
time.Sleep(30 * time.Second)
118+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
119+
defer cancel()
120+
k8sutil.WaitForDeploymentsToStart(ctx, c.context.Clientset, 3*time.Second, chunkServers...)
117121

118122
// 5. create logical pool
119123
_, err = c.runCreatePoolJob(nodeNameIP, "logical_pool")
@@ -122,7 +126,8 @@ func (c *Cluster) Start(nodeNameIP map[string]string) error {
122126
}
123127
logger.Info("create logical pool successed")
124128

125-
k8sutil.UpdateCondition(context.TODO(), &c.context, c.namespacedName, curvev1.ConditionTypeChunkServerReady, curvev1.ConditionTrue, curvev1.ConditionChunkServerClusterCreatedReason, "Chunkserver cluster has been created")
129+
k8sutil.UpdateCondition(context.TODO(), &c.context, c.namespacedName, curvev1.ConditionTypeChunkServerReady,
130+
curvev1.ConditionTrue, curvev1.ConditionChunkServerClusterCreatedReason, "Chunkserver cluster has been created")
126131

127132
return nil
128133
}

pkg/chunkserver/spec.go

+42-33
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ import (
66

77
"github.com/pkg/errors"
88
apps "k8s.io/api/apps/v1"
9-
v1 "k8s.io/api/core/v1"
9+
v1 "k8s.io/api/apps/v1"
10+
corev1 "k8s.io/api/core/v1"
1011
kerrors "k8s.io/apimachinery/pkg/api/errors"
1112
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1213

@@ -15,20 +16,21 @@ import (
1516
)
1617

1718
// startChunkServers start all chunkservers for each device of every node
18-
func (c *Cluster) startChunkServers() error {
19+
func (c *Cluster) startChunkServers() ([]*v1.Deployment, error) {
20+
results := make([]*v1.Deployment, 0)
1921
if len(job2DeviceInfos) == 0 {
2022
logger.Errorf("no job to format device and provision chunk file")
21-
return nil
23+
return results, nil
2224
}
2325

2426
if len(chunkserverConfigs) == 0 {
2527
logger.Errorf("no device need to start chunkserver")
26-
return nil
28+
return results, nil
2729
}
2830

2931
if len(job2DeviceInfos) != len(chunkserverConfigs) {
3032
logger.Errorf("no device need to start chunkserver")
31-
return errors.New("failed to start chunkserver because of job numbers is not equal with chunkserver config")
33+
return results, errors.New("failed to start chunkserver because of job numbers is not equal with chunkserver config")
3234
}
3335

3436
_ = c.createStartCSConfigMap()
@@ -41,18 +43,19 @@ func (c *Cluster) startChunkServers() error {
4143

4244
err := c.createConfigMap(csConfig)
4345
if err != nil {
44-
return errors.Wrapf(err, "failed to create chunkserver configmap for %v", config.ChunkserverConfigMapName)
46+
return results, errors.Wrapf(err, "failed to create chunkserver configmap for %v",
47+
config.ChunkserverConfigMapName)
4548
}
4649

4750
d, err := c.makeDeployment(&csConfig)
4851
if err != nil {
49-
return errors.Wrap(err, "failed to create chunkserver Deployment")
52+
return results, errors.Wrap(err, "failed to create chunkserver Deployment")
5053
}
5154

5255
newDeployment, err := c.context.Clientset.AppsV1().Deployments(c.namespacedName.Namespace).Create(d)
5356
if err != nil {
5457
if !kerrors.IsAlreadyExists(err) {
55-
return errors.Wrapf(err, "failed to create chunkserver deployment %s", csConfig.ResourceName)
58+
return results, errors.Wrapf(err, "failed to create chunkserver deployment %s", csConfig.ResourceName)
5659
}
5760
logger.Infof("deployment for chunkserver %s already exists. updating if needed", csConfig.ResourceName)
5861

@@ -63,18 +66,18 @@ func (c *Cluster) startChunkServers() error {
6366
} else {
6467
logger.Infof("Deployment %s has been created , waiting for startup", newDeployment.GetName())
6568
// TODO:wait for the new deployment
66-
// deploymentsToWaitFor = append(deploymentsToWaitFor, newDeployment)
69+
results = append(results, newDeployment)
6770
}
6871
// update condition type and phase etc.
6972
}
70-
71-
return nil
73+
return results, nil
7274
}
7375

7476
// createCSClientConfigMap create cs_client configmap
7577
func (c *Cluster) createCSClientConfigMap() error {
7678
// 1. get mds-conf-template from cluster
77-
csClientCMTemplate, err := c.context.Clientset.CoreV1().ConfigMaps(c.namespacedName.Namespace).Get(config.CsClientConfigMapTemp, metav1.GetOptions{})
79+
csClientCMTemplate, err := c.context.Clientset.CoreV1().ConfigMaps(c.namespacedName.Namespace).Get(config.CsClientConfigMapTemp,
80+
metav1.GetOptions{})
7881
if err != nil {
7982
logger.Errorf("failed to get configmap %s from cluster", config.CsClientConfigMapTemp)
8083
if kerrors.IsNotFound(err) {
@@ -88,14 +91,15 @@ func (c *Cluster) createCSClientConfigMap() error {
8891
// 3. replace ${} to specific parameters
8992
replacedCsClientData, err := config.ReplaceConfigVars(csClientCMData, &chunkserverConfigs[0])
9093
if err != nil {
91-
return errors.Wrap(err, "failed to Replace cs_client config template to generate a new cs_client configmap to start server.")
94+
return errors.Wrap(err,
95+
"failed to Replace cs_client config template to generate a new cs_client configmap to start server.")
9296
}
9397

9498
csClientConfigMap := map[string]string{
9599
config.CSClientConfigMapDataKey: replacedCsClientData,
96100
}
97101

98-
cm := &v1.ConfigMap{
102+
cm := &corev1.ConfigMap{
99103
ObjectMeta: metav1.ObjectMeta{
100104
Name: config.CSClientConfigMapName,
101105
Namespace: c.namespacedName.Namespace,
@@ -105,7 +109,8 @@ func (c *Cluster) createCSClientConfigMap() error {
105109

106110
err = c.ownerInfo.SetControllerReference(cm)
107111
if err != nil {
108-
return errors.Wrapf(err, "failed to set owner reference to cs_client.conf configmap %q", config.CSClientConfigMapName)
112+
return errors.Wrapf(err, "failed to set owner reference to cs_client.conf configmap %q",
113+
config.CSClientConfigMapName)
109114
}
110115

111116
// Create cs_client configmap in cluster
@@ -119,7 +124,8 @@ func (c *Cluster) createCSClientConfigMap() error {
119124

120125
// CreateS3ConfigMap creates s3 configmap
121126
func (c *Cluster) CreateS3ConfigMap() error {
122-
s3CMTemplate, err := c.context.Clientset.CoreV1().ConfigMaps(c.namespacedName.Namespace).Get(config.S3ConfigMapTemp, metav1.GetOptions{})
127+
s3CMTemplate, err := c.context.Clientset.CoreV1().ConfigMaps(c.namespacedName.Namespace).Get(config.S3ConfigMapTemp,
128+
metav1.GetOptions{})
123129
if err != nil {
124130
logger.Errorf("failed to get configmap %s from cluster", config.S3ConfigMapTemp)
125131
if kerrors.IsNotFound(err) {
@@ -146,7 +152,7 @@ func (c *Cluster) CreateS3ConfigMap() error {
146152
config.S3ConfigMapDataKey: configMapData,
147153
}
148154

149-
cm := &v1.ConfigMap{
155+
cm := &corev1.ConfigMap{
150156
ObjectMeta: metav1.ObjectMeta{
151157
Name: config.S3ConfigMapName,
152158
Namespace: c.namespacedName.Namespace,
@@ -175,7 +181,7 @@ func (c *Cluster) createStartCSConfigMap() error {
175181
startChunkserverScriptFileDataKey: script.START,
176182
}
177183

178-
cm := &v1.ConfigMap{
184+
cm := &corev1.ConfigMap{
179185
ObjectMeta: metav1.ObjectMeta{
180186
Name: startChunkserverConfigMapName,
181187
Namespace: c.namespacedName.Namespace,
@@ -199,7 +205,8 @@ func (c *Cluster) createStartCSConfigMap() error {
199205
// createConfigMap create chunkserver configmap for chunkserver server
200206
func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error {
201207
// 1. get mds-conf-template from cluster
202-
chunkserverCMTemplate, err := c.context.Clientset.CoreV1().ConfigMaps(c.namespacedName.Namespace).Get(config.ChunkServerConfigMapTemp, metav1.GetOptions{})
208+
chunkserverCMTemplate, err := c.context.Clientset.CoreV1().ConfigMaps(c.namespacedName.Namespace).Get(config.ChunkServerConfigMapTemp,
209+
metav1.GetOptions{})
203210
if err != nil {
204211
logger.Errorf("failed to get configmap %s from cluster", config.ChunkServerConfigMapTemp)
205212
if kerrors.IsNotFound(err) {
@@ -217,7 +224,8 @@ func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error {
217224
// 3. replace ${} to specific parameters
218225
replacedChunkServerData, err := config.ReplaceConfigVars(chunkserverData, &csConfig)
219226
if err != nil {
220-
return errors.Wrap(err, "failed to Replace chunkserver config template to generate a new chunkserver configmap to start server.")
227+
return errors.Wrap(err,
228+
"failed to Replace chunkserver config template to generate a new chunkserver configmap to start server.")
221229
}
222230

223231
// for debug
@@ -227,7 +235,7 @@ func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error {
227235
config.ChunkserverConfigMapDataKey: replacedChunkServerData,
228236
}
229237

230-
cm := &v1.ConfigMap{
238+
cm := &corev1.ConfigMap{
231239
ObjectMeta: metav1.ObjectMeta{
232240
Name: csConfig.CurrentConfigMapName,
233241
Namespace: c.namespacedName.Namespace,
@@ -237,7 +245,8 @@ func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error {
237245

238246
err = c.ownerInfo.SetControllerReference(cm)
239247
if err != nil {
240-
return errors.Wrapf(err, "failed to set owner reference to chunkserverconfig configmap %q", config.ChunkserverConfigMapName)
248+
return errors.Wrapf(err, "failed to set owner reference to chunkserverconfig configmap %q",
249+
config.ChunkserverConfigMapName)
241250
}
242251

243252
// Create chunkserver config in cluster
@@ -254,19 +263,19 @@ func (c *Cluster) makeDeployment(csConfig *chunkserverConfig) (*apps.Deployment,
254263
vols, _ := c.createTopoAndToolVolumeAndMount()
255264
volumes = append(volumes, vols...)
256265

257-
podSpec := v1.PodTemplateSpec{
266+
podSpec := corev1.PodTemplateSpec{
258267
ObjectMeta: metav1.ObjectMeta{
259268
Name: csConfig.ResourceName,
260269
Labels: c.getChunkServerPodLabels(csConfig),
261270
},
262-
Spec: v1.PodSpec{
263-
Containers: []v1.Container{
271+
Spec: corev1.PodSpec{
272+
Containers: []corev1.Container{
264273
c.makeCSDaemonContainer(csConfig),
265274
},
266275
NodeName: csConfig.NodeName,
267-
RestartPolicy: v1.RestartPolicyAlways,
276+
RestartPolicy: corev1.RestartPolicyAlways,
268277
HostNetwork: true,
269-
DNSPolicy: v1.DNSClusterFirstWithHostNet,
278+
DNSPolicy: corev1.DNSClusterFirstWithHostNet,
270279
Volumes: volumes,
271280
},
272281
}
@@ -301,7 +310,7 @@ func (c *Cluster) makeDeployment(csConfig *chunkserverConfig) (*apps.Deployment,
301310
}
302311

303312
// makeCSDaemonContainer create chunkserver container
304-
func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) v1.Container {
313+
func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) corev1.Container {
305314

306315
privileged := true
307316
runAsUser := int64(0)
@@ -321,7 +330,7 @@ func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) v1.Containe
321330
argsChunkserverPort := strconv.Itoa(csConfig.Port)
322331
argsConfigFileMountPath := path.Join(config.ChunkserverConfigMapMountPathDir, config.ChunkserverConfigMapDataKey)
323332

324-
container := v1.Container{
333+
container := corev1.Container{
325334
Name: "chunkserver",
326335
Command: []string{
327336
"/bin/bash",
@@ -339,16 +348,16 @@ func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) v1.Containe
339348
Image: c.spec.CurveVersion.Image,
340349
ImagePullPolicy: c.spec.CurveVersion.ImagePullPolicy,
341350
VolumeMounts: volMounts,
342-
Ports: []v1.ContainerPort{
351+
Ports: []corev1.ContainerPort{
343352
{
344353
Name: "listen-port",
345354
ContainerPort: int32(csConfig.Port),
346355
HostPort: int32(csConfig.Port),
347-
Protocol: v1.ProtocolTCP,
356+
Protocol: corev1.ProtocolTCP,
348357
},
349358
},
350-
Env: []v1.EnvVar{{Name: "TZ", Value: "Asia/Hangzhou"}},
351-
SecurityContext: &v1.SecurityContext{
359+
Env: []corev1.EnvVar{{Name: "TZ", Value: "Asia/Hangzhou"}},
360+
SecurityContext: &corev1.SecurityContext{
352361
Privileged: &privileged,
353362
RunAsUser: &runAsUser,
354363
RunAsNonRoot: &runAsNonRoot,

pkg/k8sutil/deployment.go

+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package k8sutil
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
v1 "k8s.io/api/apps/v1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/client-go/kubernetes"
10+
)
11+
12+
// WaitForDeploymentsToStart waits for the deployments to start, and returns a channel to indicate whether
13+
// all deployments are started or not
14+
//
15+
// tickDuration is the interval to check the deployment status
16+
// objectMeta is the metadata of the deployment
17+
//
18+
// we use the hub chan to collect the result of each deployment, and when all deployments are started,
19+
// we return true, otherwise, we return false, this design let WaitForDeploymentToStart and
20+
// WaitForDeploymentsToStart can be used in the same way
21+
func WaitForDeploymentsToStart(ctx context.Context, clientSet kubernetes.Interface, tickDuration time.Duration,
22+
objectMetas ...*v1.Deployment) chan bool {
23+
length := len(objectMetas)
24+
hub := make(chan bool, length)
25+
defer close(hub)
26+
for i := range objectMetas {
27+
objectMata := objectMetas[i]
28+
go func() {
29+
if succeed := <-WaitForDeploymentToStart(ctx, clientSet, tickDuration, objectMata); !succeed {
30+
hub <- false
31+
return
32+
}
33+
}()
34+
}
35+
36+
chn := make(chan bool)
37+
go func() {
38+
defer close(chn)
39+
for i := 0; i < length; i++ {
40+
if succeed := <-hub; !succeed {
41+
chn <- false
42+
return
43+
}
44+
}
45+
chn <- true
46+
return
47+
}()
48+
return chn
49+
}
50+
51+
// WaitForDeploymentToStart waits for the deployment to start, and returns a channel to indicate whether
52+
// the deployment is started or not
53+
//
54+
// tickDuration is the interval to check the deployment status
55+
// objectMeta is the metadata of the deployment
56+
func WaitForDeploymentToStart(ctx context.Context, clientSet kubernetes.Interface, tickDuration time.Duration,
57+
objectMeta *v1.Deployment) chan bool {
58+
ticker := time.NewTicker(tickDuration)
59+
defer ticker.Stop()
60+
61+
chn := make(chan bool)
62+
go func() {
63+
defer close(chn)
64+
for {
65+
select {
66+
case <-ticker.C:
67+
deployment, err := clientSet.AppsV1().Deployments(objectMeta.GetNamespace()).Get(objectMeta.GetName(),
68+
metav1.GetOptions{})
69+
logger.Infof("waiting for deployment %s starting", deployment.Name)
70+
if err != nil {
71+
72+
// TODO: return the failed reason is required??
73+
logger.Errorf("failed to get deployment %s in cluster", objectMeta.GetName())
74+
chn <- false
75+
return
76+
}
77+
if deployment.Status.ObservedGeneration != deployment.Status.ObservedGeneration &&
78+
deployment.Status.UpdatedReplicas > 0 &&
79+
deployment.Status.ReadyReplicas > 0 {
80+
logger.Infof("deployment %s has been started", deployment.Name)
81+
chn <- true
82+
return
83+
}
84+
85+
// TODO: should log the unready reason, e.g. conditions, etc. to help debugging??
86+
case <-ctx.Done():
87+
chn <- false
88+
logger.Infof("stop waiting for deployment %s to start due to context is done", objectMeta.GetName())
89+
return
90+
}
91+
}
92+
}()
93+
return chn
94+
}

0 commit comments

Comments
 (0)