Skip to content

Commit 24395c2

Browse files
adwk67razvanmaltesander
authored
feat: Airflow 3.0.1 (experimental) (#630)
* wip: working tests (except auth: opa, oidc) * wip: changing python test files * single health.py for all airflow versions * single metrics.py for all airflow versions * update tests with new commons scripts * tests: use "airflow-latest" instead of "airflow" * cleanup and code comments * restore deleted env * use correct webserver service * restore operator to release list * test: fix oidc * wip: get logging tests to work post-merge * make env-vars version-specific * fixed resolution of webserver url for execution api * relaxed default resources * update test defs for oidc/opa * changelog * Update tests/templates/kuttl/logging/51-assert.yaml.j2 Co-authored-by: Malte Sander <[email protected]> * code review changes * code review changes * move containerdebug cmd to a function * replaced random key with hard-coded one with comment * cleanup and better comment * for 3.x: only scheduler updates FAB permissions and restrict workers * make api-server env-vars role dependent --------- Co-authored-by: Razvan-Daniel Mihai <[email protected]> Co-authored-by: Malte Sander <[email protected]>
1 parent d24fb5d commit 24395c2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+723
-300
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
- Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation.
1111
- Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`.
1212
- Add support for airflow `2.10.5` ([#625]).
13+
- Add experimental support for airflow `3.0.1` ([#630]).
1314

1415
### Changed
1516

@@ -41,6 +42,7 @@
4142
[#623]: https://github.com/stackabletech/airflow-operator/pull/623
4243
[#624]: https://github.com/stackabletech/airflow-operator/pull/624
4344
[#625]: https://github.com/stackabletech/airflow-operator/pull/625
45+
[#630]: https://github.com/stackabletech/airflow-operator/pull/630
4446

4547
## [25.3.0] - 2025-03-21
4648

docs/modules/airflow/pages/usage-guide/storage-resources.adoc

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ include::home:concepts:stackable_resource_requests.adoc[]
55

66
A minimal HA setup consisting of 2 schedulers, 2 workers and 2 webservers has the following https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/[resource requirements]:
77

8-
* `5700m` CPU request
8+
* `8700m` CPU request
99
* `17400m` CPU limit
10-
* `10752Mi` memory request and limit
10+
* `15872Mi` memory request and limit
1111
1212
Corresponding to the values above, the operator uses the following resource defaults:
1313

@@ -18,24 +18,24 @@ spec:
1818
config:
1919
resources:
2020
cpu:
21-
min: 500m
21+
min: "1"
2222
max: "2"
2323
memory:
24-
limit: 512Mi
24+
limit: 1Gi
2525
celeryExecutors:
2626
config:
2727
resources:
2828
cpu:
29-
min: 500m
29+
min: "1"
3030
max: "2"
3131
memory:
32-
limit: 2Gi
32+
limit: 3Gi
3333
webservers:
3434
config:
3535
resources:
3636
cpu:
37-
min: 500m
37+
min: "1"
3838
max: "2"
3939
memory:
40-
limit: 2Gi
40+
limit: 3Gi
4141
----

docs/modules/airflow/partials/supported-versions.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// This is a separate file, since it is used by both the direct Airflow-Operator documentation, and the overarching
33
// Stackable Platform documentation.
44

5+
- 3.0.1 (experimental)
56
- 2.10.5
67
- 2.10.4 (deprecated)
78
- 2.9.3 (LTS)

rust/operator-binary/src/airflow_controller.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -957,7 +957,8 @@ fn build_server_rolegroup_statefulset(
957957
.context(GracefulShutdownSnafu)?;
958958

959959
let mut airflow_container_args = Vec::new();
960-
airflow_container_args.extend(airflow_role.get_commands(authentication_config));
960+
airflow_container_args
961+
.extend(airflow_role.get_commands(authentication_config, resolved_product_image));
961962

962963
airflow_container
963964
.image_from_product_image(resolved_product_image)
@@ -980,6 +981,7 @@ fn build_server_rolegroup_statefulset(
980981
authentication_config,
981982
authorization_config,
982983
git_sync_resources,
984+
resolved_product_image,
983985
)
984986
.context(BuildStatefulsetEnvVarsSnafu)?,
985987
);
@@ -1170,7 +1172,10 @@ fn build_server_rolegroup_statefulset(
11701172
match_labels: Some(statefulset_match_labels.into()),
11711173
..LabelSelector::default()
11721174
},
1173-
service_name: Some(rolegroup_ref.object_name()),
1175+
service_name: Some(format!(
1176+
"{name}-metrics",
1177+
name = rolegroup_ref.object_name()
1178+
)),
11741179
template: pod_template,
11751180
volume_claim_templates: pvcs,
11761181
..StatefulSetSpec::default()
@@ -1263,6 +1268,7 @@ fn build_executor_template_config_map(
12631268
env_overrides,
12641269
merged_executor_config,
12651270
git_sync_resources,
1271+
resolved_product_image,
12661272
))
12671273
.add_volume_mounts(airflow.volume_mounts())
12681274
.context(AddVolumeMountSnafu)?

rust/operator-binary/src/crd/mod.rs

Lines changed: 81 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use stackable_operator::{
99
cache::UserInformationCache,
1010
cluster_operation::ClusterOperation,
1111
opa::OpaConfig,
12-
product_image_selection::ProductImage,
12+
product_image_selection::{ProductImage, ResolvedProductImage},
1313
resources::{
1414
CpuLimitsFragment, MemoryLimitsFragment, NoRuntimeLimits, NoRuntimeLimitsFragment,
1515
Resources, ResourcesFragment,
@@ -324,11 +324,6 @@ impl v1alpha1::AirflowCluster {
324324
self.spec.cluster_config.volume_mounts.clone()
325325
}
326326

327-
/// The name of the role-level load-balanced Kubernetes `Service`
328-
pub fn node_role_service_name(&self) -> Option<String> {
329-
self.metadata.name.clone()
330-
}
331-
332327
/// Retrieve and merge resource configs for role and role groups
333328
pub fn merged_config(
334329
&self,
@@ -551,6 +546,7 @@ impl AirflowRole {
551546
pub fn get_commands(
552547
&self,
553548
auth_config: &AirflowClientAuthenticationDetailsResolved,
549+
resolved_product_image: &ResolvedProductImage,
554550
) -> Vec<String> {
555551
let mut command = vec![
556552
format!(
@@ -561,43 +557,79 @@ impl AirflowRole {
561557
remove_vector_shutdown_file_command(STACKABLE_LOG_DIR),
562558
];
563559

564-
match &self {
565-
AirflowRole::Webserver => {
566-
// Getting auth commands for AuthClass
567-
command.extend(Self::authentication_start_commands(auth_config));
568-
command.extend(vec![
560+
if resolved_product_image.product_version.starts_with("3.") {
561+
// Start-up commands have changed in 3.x.
562+
// See https://airflow.apache.org/docs/apache-airflow/3.0.1/installation/upgrading_to_airflow3.html#step-6-changes-to-your-startup-scripts and
563+
// https://airflow.apache.org/docs/apache-airflow/3.0.1/installation/setting-up-the-database.html#setting-up-the-database.
564+
// `airflow db migrate` is not run for each role so there may be
565+
// re-starts of webserver and/or workers (which require the DB).
566+
// DB-migrations should be eventually be optional:
567+
// See https://github.com/stackabletech/airflow-operator/issues/589.
568+
match &self {
569+
AirflowRole::Webserver => {
570+
command.extend(Self::authentication_start_commands(auth_config));
571+
command.extend(vec![
572+
"prepare_signal_handlers".to_string(),
573+
container_debug_command(),
574+
"airflow api-server &".to_string(),
575+
]);
576+
}
577+
AirflowRole::Scheduler => command.extend(vec![
578+
"airflow db migrate".to_string(),
579+
"airflow users create \
580+
--username \"$ADMIN_USERNAME\" \
581+
--firstname \"$ADMIN_FIRSTNAME\" \
582+
--lastname \"$ADMIN_LASTNAME\" \
583+
--email \"$ADMIN_EMAIL\" \
584+
--password \"$ADMIN_PASSWORD\" \
585+
--role \"Admin\""
586+
.to_string(),
587+
"prepare_signal_handlers".to_string(),
588+
container_debug_command(),
589+
"airflow dag-processor &".to_string(),
590+
"airflow scheduler &".to_string(),
591+
]),
592+
AirflowRole::Worker => command.extend(vec![
569593
"prepare_signal_handlers".to_string(),
570-
format!("containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"),
571-
"airflow webserver &".to_string(),
572-
]);
594+
container_debug_command(),
595+
"airflow celery worker &".to_string(),
596+
]),
597+
}
598+
} else {
599+
match &self {
600+
AirflowRole::Webserver => {
601+
// Getting auth commands for AuthClass
602+
command.extend(Self::authentication_start_commands(auth_config));
603+
command.extend(vec![
604+
"prepare_signal_handlers".to_string(),
605+
container_debug_command(),
606+
"airflow webserver &".to_string(),
607+
]);
608+
}
609+
AirflowRole::Scheduler => command.extend(vec![
610+
// Database initialization is limited to the scheduler, see https://github.com/stackabletech/airflow-operator/issues/259
611+
"airflow db init".to_string(),
612+
"airflow db upgrade".to_string(),
613+
"airflow users create \
614+
--username \"$ADMIN_USERNAME\" \
615+
--firstname \"$ADMIN_FIRSTNAME\" \
616+
--lastname \"$ADMIN_LASTNAME\" \
617+
--email \"$ADMIN_EMAIL\" \
618+
--password \"$ADMIN_PASSWORD\" \
619+
--role \"Admin\""
620+
.to_string(),
621+
"prepare_signal_handlers".to_string(),
622+
container_debug_command(),
623+
"airflow scheduler &".to_string(),
624+
]),
625+
AirflowRole::Worker => command.extend(vec![
626+
"prepare_signal_handlers".to_string(),
627+
container_debug_command(),
628+
"airflow celery worker &".to_string(),
629+
]),
573630
}
574-
575-
AirflowRole::Scheduler => command.extend(vec![
576-
// Database initialization is limited to the scheduler, see https://github.com/stackabletech/airflow-operator/issues/259
577-
"airflow db init".to_string(),
578-
"airflow db upgrade".to_string(),
579-
"airflow users create \
580-
--username \"$ADMIN_USERNAME\" \
581-
--firstname \"$ADMIN_FIRSTNAME\" \
582-
--lastname \"$ADMIN_LASTNAME\" \
583-
--email \"$ADMIN_EMAIL\" \
584-
--password \"$ADMIN_PASSWORD\" \
585-
--role \"Admin\""
586-
.to_string(),
587-
"prepare_signal_handlers".to_string(),
588-
format!(
589-
"containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"
590-
),
591-
"airflow scheduler &".to_string(),
592-
]),
593-
AirflowRole::Worker => command.extend(vec![
594-
"prepare_signal_handlers".to_string(),
595-
format!(
596-
"containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"
597-
),
598-
"airflow celery worker &".to_string(),
599-
]),
600631
}
632+
601633
// graceful shutdown part
602634
command.extend(vec![
603635
"wait_for_termination $!".to_string(),
@@ -658,6 +690,10 @@ impl AirflowRole {
658690
}
659691
}
660692

693+
fn container_debug_command() -> String {
694+
format!("containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &")
695+
}
696+
661697
#[derive(Clone, Debug, Deserialize, Display, JsonSchema, PartialEq, Serialize)]
662698
pub enum AirflowExecutor {
663699
/// The celery executor.
@@ -855,17 +891,17 @@ fn default_resources(role: &AirflowRole) -> ResourcesFragment<AirflowStorageConf
855891
let (cpu, memory) = match role {
856892
AirflowRole::Worker => (
857893
CpuLimitsFragment {
858-
min: Some(Quantity("500m".into())),
894+
min: Some(Quantity("1".into())),
859895
max: Some(Quantity("2".into())),
860896
},
861897
MemoryLimitsFragment {
862-
limit: Some(Quantity("2Gi".into())),
898+
limit: Some(Quantity("3Gi".into())),
863899
runtime_limits: NoRuntimeLimitsFragment {},
864900
},
865901
),
866902
AirflowRole::Webserver => (
867903
CpuLimitsFragment {
868-
min: Some(Quantity("500m".into())),
904+
min: Some(Quantity("1".into())),
869905
max: Some(Quantity("2".into())),
870906
},
871907
MemoryLimitsFragment {
@@ -875,11 +911,11 @@ fn default_resources(role: &AirflowRole) -> ResourcesFragment<AirflowStorageConf
875911
),
876912
AirflowRole::Scheduler => (
877913
CpuLimitsFragment {
878-
min: Some(Quantity("500m".to_owned())),
914+
min: Some(Quantity("1".to_owned())),
879915
max: Some(Quantity("2".to_owned())),
880916
},
881917
MemoryLimitsFragment {
882-
limit: Some(Quantity("512Mi".to_owned())),
918+
limit: Some(Quantity("1Gi".to_owned())),
883919
runtime_limits: NoRuntimeLimitsFragment {},
884920
},
885921
),

0 commit comments

Comments
 (0)