Skip to content

Commit 718e3a0

Browse files
authored
Customize ingress URL with Spark application ID (#2554)
* Customize ingress URL with Spark application ID Signed-off-by: Yi Chen <[email protected]> * Update policy rules for ingress Signed-off-by: Yi Chen <[email protected]> --------- Signed-off-by: Yi Chen <[email protected]>
1 parent 84a7749 commit 718e3a0

File tree

4 files changed

+98
-70
lines changed

4 files changed

+98
-70
lines changed

charts/spark-operator-chart/templates/controller/_helpers.tpl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,11 @@ Create the role policy rules for the controller in every Spark job namespace
172172
- ingresses
173173
verbs:
174174
- get
175-
- create
176-
- delete
177175
- list
178176
- watch
177+
- create
178+
- update
179+
- delete
179180
- apiGroups:
180181
- sparkoperator.k8s.io
181182
resources:

config/rbac/role.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ rules:
6262
- delete
6363
- get
6464
- list
65-
- patch
6665
- update
6766
- watch
6867
- apiGroups:

internal/controller/sparkapplication/controller.go

Lines changed: 57 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ func NewReconciler(
114114
// +kubebuilder:rbac:groups=,resources=nodes,verbs=get
115115
// +kubebuilder:rbac:groups=,resources=events,verbs=create;update;patch
116116
// +kubebuilder:rbac:groups=,resources=resourcequotas,verbs=get;list;watch
117-
// +kubebuilder:rbac:groups=extensions,resources=ingresses,verbs=get;list;watch;create;update;patch;delete
118-
// +kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;patch;delete
117+
// +kubebuilder:rbac:groups=extensions,resources=ingresses,verbs=get;list;watch;create;update;delete
118+
// +kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;delete
119119
// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get
120120
// +kubebuilder:rbac:groups=sparkoperator.k8s.io,resources=sparkapplications,verbs=get;list;watch;create;update;patch;delete
121121
// +kubebuilder:rbac:groups=sparkoperator.k8s.io,resources=sparkapplications/status,verbs=get;update;patch
@@ -297,9 +297,64 @@ func (r *Reconciler) reconcileSubmittedSparkApplication(ctx context.Context, req
297297
if err := r.updateSparkApplicationState(ctx, app); err != nil {
298298
return err
299299
}
300+
301+
// Create web UI service for spark applications if enabled.
302+
if r.options.EnableUIService {
303+
service, err := r.createWebUIService(app)
304+
if err != nil {
305+
return fmt.Errorf("failed to create web UI service: %v", err)
306+
}
307+
app.Status.DriverInfo.WebUIServiceName = service.serviceName
308+
app.Status.DriverInfo.WebUIPort = service.servicePort
309+
app.Status.DriverInfo.WebUIAddress = fmt.Sprintf("%s:%d", service.serviceIP, app.Status.DriverInfo.WebUIPort)
310+
311+
// Create UI Ingress if ingress-format is set.
312+
if r.options.IngressURLFormat != "" {
313+
// We are going to want to use an ingress url.
314+
ingressURL, err := getDriverIngressURL(r.options.IngressURLFormat, app)
315+
if err != nil {
316+
return fmt.Errorf("failed to get ingress url: %v", err)
317+
}
318+
// need to ensure the spark.ui variables are configured correctly if a subPath is used.
319+
if ingressURL.Path != "" {
320+
if app.Spec.SparkConf == nil {
321+
app.Spec.SparkConf = make(map[string]string)
322+
}
323+
app.Spec.SparkConf[common.SparkUIProxyBase] = ingressURL.Path
324+
app.Spec.SparkConf[common.SparkUIProxyRedirectURI] = "/"
325+
}
326+
ingress, err := r.createWebUIIngress(app, *service, ingressURL, r.options.IngressClassName)
327+
if err != nil {
328+
return fmt.Errorf("failed to create web UI ingress: %v", err)
329+
}
330+
app.Status.DriverInfo.WebUIIngressAddress = ingress.ingressURL.String()
331+
app.Status.DriverInfo.WebUIIngressName = ingress.ingressName
332+
}
333+
}
334+
335+
for _, driverIngressConfiguration := range app.Spec.DriverIngressOptions {
336+
service, err := r.createDriverIngressServiceFromConfiguration(app, &driverIngressConfiguration)
337+
if err != nil {
338+
return fmt.Errorf("failed to create driver ingress service for SparkApplication: %v", err)
339+
}
340+
// Create ingress if ingress-format is set.
341+
if driverIngressConfiguration.IngressURLFormat != "" {
342+
// We are going to want to use an ingress url.
343+
ingressURL, err := getDriverIngressURL(driverIngressConfiguration.IngressURLFormat, app)
344+
if err != nil {
345+
return fmt.Errorf("failed to get driver ingress url: %v", err)
346+
}
347+
_, err = r.createDriverIngress(app, &driverIngressConfiguration, *service, ingressURL, r.options.IngressClassName)
348+
if err != nil {
349+
return fmt.Errorf("failed to create driver ingress: %v", err)
350+
}
351+
}
352+
}
353+
300354
if err := r.updateSparkApplicationStatus(ctx, app); err != nil {
301355
return err
302356
}
357+
303358
return nil
304359
},
305360
)
@@ -687,63 +742,6 @@ func (r *Reconciler) submitSparkApplication(ctx context.Context, app *v1beta2.Sp
687742
}
688743
}
689744

690-
// Create web UI service for spark applications if enabled.
691-
if r.options.EnableUIService {
692-
service, err := r.createWebUIService(app)
693-
if err != nil {
694-
return fmt.Errorf("failed to create web UI service: %v", err)
695-
}
696-
app.Status.DriverInfo.WebUIServiceName = service.serviceName
697-
app.Status.DriverInfo.WebUIPort = service.servicePort
698-
app.Status.DriverInfo.WebUIAddress = fmt.Sprintf("%s:%d", service.serviceIP, app.Status.DriverInfo.WebUIPort)
699-
logger.Info("Created web UI service for SparkApplication", "name", app.Name, "namespace", app.Namespace)
700-
701-
// Create UI Ingress if ingress-format is set.
702-
if r.options.IngressURLFormat != "" {
703-
// We are going to want to use an ingress url.
704-
ingressURL, err := getDriverIngressURL(r.options.IngressURLFormat, app.Name, app.Namespace)
705-
if err != nil {
706-
return fmt.Errorf("failed to get ingress url: %v", err)
707-
}
708-
// need to ensure the spark.ui variables are configured correctly if a subPath is used.
709-
if ingressURL.Path != "" {
710-
if app.Spec.SparkConf == nil {
711-
app.Spec.SparkConf = make(map[string]string)
712-
}
713-
app.Spec.SparkConf[common.SparkUIProxyBase] = ingressURL.Path
714-
app.Spec.SparkConf[common.SparkUIProxyRedirectURI] = "/"
715-
}
716-
ingress, err := r.createWebUIIngress(app, *service, ingressURL, r.options.IngressClassName)
717-
if err != nil {
718-
return fmt.Errorf("failed to create web UI ingress: %v", err)
719-
}
720-
app.Status.DriverInfo.WebUIIngressAddress = ingress.ingressURL.String()
721-
app.Status.DriverInfo.WebUIIngressName = ingress.ingressName
722-
logger.Info("Created web UI ingress for SparkApplication", "name", app.Name, "namespace", app.Namespace)
723-
}
724-
}
725-
726-
for _, driverIngressConfiguration := range app.Spec.DriverIngressOptions {
727-
logger.Info("Creating driver ingress service for SparkApplication", "name", app.Name, "namespace", app.Namespace)
728-
service, err := r.createDriverIngressServiceFromConfiguration(app, &driverIngressConfiguration)
729-
if err != nil {
730-
return fmt.Errorf("failed to create driver ingress service for SparkApplication: %v", err)
731-
}
732-
// Create ingress if ingress-format is set.
733-
if driverIngressConfiguration.IngressURLFormat != "" {
734-
// We are going to want to use an ingress url.
735-
ingressURL, err := getDriverIngressURL(driverIngressConfiguration.IngressURLFormat, app.Name, app.Namespace)
736-
if err != nil {
737-
return fmt.Errorf("failed to get driver ingress url: %v", err)
738-
}
739-
ingress, err := r.createDriverIngress(app, &driverIngressConfiguration, *service, ingressURL, r.options.IngressClassName)
740-
if err != nil {
741-
return fmt.Errorf("failed to create driver ingress: %v", err)
742-
}
743-
logger.V(1).Info("Created driver ingress for SparkApplication", "name", app.Name, "namespace", app.Namespace, "ingressName", ingress.ingressName, "ingressURL", ingress.ingressURL)
744-
}
745-
}
746-
747745
defer func() {
748746
if err := r.cleanUpPodTemplateFiles(app); err != nil {
749747
logger.Error(fmt.Errorf("failed to clean up pod template files: %v", err), "name", app.Name, "namespace", app.Namespace)

internal/controller/sparkapplication/driveringress.go

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,20 @@ type SparkIngress struct {
5555
ingressTLS []networkingv1.IngressTLS
5656
}
5757

58-
var ingressAppNameURLRegex = regexp.MustCompile(`{{\s*[$]appName\s*}}`)
59-
var ingressAppNamespaceURLRegex = regexp.MustCompile(`{{\s*[$]appNamespace\s*}}`)
58+
var (
59+
ingressAppNameURLRegex = regexp.MustCompile(`{{\s*[$]appName\s*}}`)
60+
ingressAppNamespaceURLRegex = regexp.MustCompile(`{{\s*[$]appNamespace\s*}}`)
61+
ingressAppIdURLRegex = regexp.MustCompile(`{{\s*[$]appId\s*}}`)
62+
)
63+
64+
func getDriverIngressURL(ingressURLFormat string, app *v1beta2.SparkApplication) (*url.URL, error) {
65+
if app == nil {
66+
return nil, fmt.Errorf("app cannot be nil")
67+
}
6068

61-
func getDriverIngressURL(ingressURLFormat string, appName string, appNamespace string) (*url.URL, error) {
62-
ingressURL := ingressAppNamespaceURLRegex.ReplaceAllString(ingressAppNameURLRegex.ReplaceAllString(ingressURLFormat, appName), appNamespace)
69+
ingressURL := ingressAppNameURLRegex.ReplaceAllString(ingressURLFormat, app.Name)
70+
ingressURL = ingressAppNamespaceURLRegex.ReplaceAllString(ingressURL, app.Namespace)
71+
ingressURL = ingressAppIdURLRegex.ReplaceAllString(ingressURL, app.Status.SparkApplicationID)
6372
parsedURL, err := url.Parse(ingressURL)
6473
if err != nil {
6574
return nil, err
@@ -145,10 +154,19 @@ func (r *Reconciler) createDriverIngressV1(app *v1beta2.SparkApplication, servic
145154
ingress.Spec.IngressClassName = &ingressClassName
146155
}
147156

148-
logger.Info("Creating networking.v1/Ingress for SparkApplication web UI", "name", app.Name, "namespace", app.Namespace, "ingressName", ingress.Name)
149157
if err := r.client.Create(context.TODO(), ingress); err != nil {
150-
return nil, fmt.Errorf("failed to create ingress %s/%s: %v", ingress.Namespace, ingress.Name, err)
158+
if !errors.IsAlreadyExists(err) {
159+
return nil, fmt.Errorf("failed to create ingress %s/%s: %v", ingress.Namespace, ingress.Name, err)
160+
}
161+
162+
if err := r.client.Update(context.TODO(), ingress); err != nil {
163+
return nil, fmt.Errorf("failed to update ingress %s/%s: %v", ingress.Namespace, ingress.Name, err)
164+
}
165+
logger.Info("Updated networking.v1/Ingress for SparkApplication", "name", app.Name, "namespace", app.Namespace, "ingressName", ingress.Name)
166+
} else {
167+
logger.Info("Created networking.v1/Ingress for SparkApplication", "name", app.Name, "namespace", app.Namespace, "ingressName", ingress.Name)
151168
}
169+
152170
return &SparkIngress{
153171
ingressName: ingress.Name,
154172
ingressURL: ingressURL,
@@ -212,10 +230,19 @@ func (r *Reconciler) createDriverIngressLegacy(app *v1beta2.SparkApplication, se
212230
if len(ingressTLSHosts) != 0 {
213231
ingress.Spec.TLS = convertIngressTLSHostsToLegacy(ingressTLSHosts)
214232
}
215-
logger.Info("Creating extensions.v1beta1/Ingress for SparkApplication web UI", "name", app.Name, "namespace", app.Namespace, "ingressName", ingress.Name)
216233
if err := r.client.Create(context.TODO(), ingress); err != nil {
217-
return nil, fmt.Errorf("failed to create ingress %s/%s: %v", ingress.Namespace, ingress.Name, err)
234+
if !errors.IsAlreadyExists(err) {
235+
return nil, fmt.Errorf("failed to create ingress %s/%s: %v", ingress.Namespace, ingress.Name, err)
236+
}
237+
238+
if err := r.client.Update(context.TODO(), ingress); err != nil {
239+
return nil, fmt.Errorf("failed to update ingress %s/%s: %v", ingress.Namespace, ingress.Name, err)
240+
}
241+
logger.Info("Updated extensions.v1beta1/Ingress for SparkApplication", "name", app.Name, "namespace", app.Namespace, "ingressName", ingress.Name)
242+
} else {
243+
logger.Info("Created extensions.v1beta1/Ingress for SparkApplication", "name", app.Name, "namespace", app.Namespace, "ingressName", ingress.Name)
218244
}
245+
219246
return &SparkIngress{
220247
ingressName: ingress.Name,
221248
ingressURL: ingressURL,
@@ -288,6 +315,9 @@ func (r *Reconciler) createDriverIngressService(
288315
if err := r.client.Update(context.TODO(), service); err != nil {
289316
return nil, err
290317
}
318+
logger.Info("Updated service for SparkApplication", "name", app.Name, "namespace", app.Namespace, "serviceName", service.Name)
319+
} else {
320+
logger.Info("Created service for SparkApplication", "name", app.Name, "namespace", app.Namespace, "serviceName", service.Name)
291321
}
292322

293323
return &SparkService{

0 commit comments

Comments
 (0)