@@ -9,7 +9,7 @@ use stackable_operator::{
99 cache:: UserInformationCache ,
1010 cluster_operation:: ClusterOperation ,
1111 opa:: OpaConfig ,
12- product_image_selection:: ProductImage ,
12+ product_image_selection:: { ProductImage , ResolvedProductImage } ,
1313 resources:: {
1414 CpuLimitsFragment , MemoryLimitsFragment , NoRuntimeLimits , NoRuntimeLimitsFragment ,
1515 Resources , ResourcesFragment ,
@@ -323,11 +323,6 @@ impl v1alpha1::AirflowCluster {
323323 self . spec . cluster_config . volume_mounts . clone ( )
324324 }
325325
326- /// The name of the role-level load-balanced Kubernetes `Service`
327- pub fn node_role_service_name ( & self ) -> Option < String > {
328- self . metadata . name . clone ( )
329- }
330-
331326 /// Retrieve and merge resource configs for role and role groups
332327 pub fn merged_config (
333328 & self ,
@@ -550,6 +545,7 @@ impl AirflowRole {
550545 pub fn get_commands (
551546 & self ,
552547 auth_config : & AirflowClientAuthenticationDetailsResolved ,
548+ resolved_product_image : & ResolvedProductImage ,
553549 ) -> Vec < String > {
554550 let mut command = vec ! [
555551 format!(
@@ -560,43 +556,79 @@ impl AirflowRole {
560556 remove_vector_shutdown_file_command( STACKABLE_LOG_DIR ) ,
561557 ] ;
562558
563- match & self {
564- AirflowRole :: Webserver => {
565- // Getting auth commands for AuthClass
566- command. extend ( Self :: authentication_start_commands ( auth_config) ) ;
567- command. extend ( vec ! [
559+ if resolved_product_image. product_version . starts_with ( "3." ) {
560+ // Start-up commands have changed in 3.x.
561+ // See https://airflow.apache.org/docs/apache-airflow/3.0.1/installation/upgrading_to_airflow3.html#step-6-changes-to-your-startup-scripts and
562+ // https://airflow.apache.org/docs/apache-airflow/3.0.1/installation/setting-up-the-database.html#setting-up-the-database.
563+ // `airflow db migrate` is not run for each role so there may be
564+ // re-starts of webserver and/or workers (which require the DB).
565+ // DB-migrations should be eventually be optional:
566+ // See https://github.com/stackabletech/airflow-operator/issues/589.
567+ match & self {
568+ AirflowRole :: Webserver => {
569+ command. extend ( Self :: authentication_start_commands ( auth_config) ) ;
570+ command. extend ( vec ! [
571+ "prepare_signal_handlers" . to_string( ) ,
572+ container_debug_command( ) ,
573+ "airflow api-server &" . to_string( ) ,
574+ ] ) ;
575+ }
576+ AirflowRole :: Scheduler => command. extend ( vec ! [
577+ "airflow db migrate" . to_string( ) ,
578+ "airflow users create \
579+ --username \" $ADMIN_USERNAME\" \
580+ --firstname \" $ADMIN_FIRSTNAME\" \
581+ --lastname \" $ADMIN_LASTNAME\" \
582+ --email \" $ADMIN_EMAIL\" \
583+ --password \" $ADMIN_PASSWORD\" \
584+ --role \" Admin\" "
585+ . to_string( ) ,
586+ "prepare_signal_handlers" . to_string( ) ,
587+ container_debug_command( ) ,
588+ "airflow dag-processor &" . to_string( ) ,
589+ "airflow scheduler &" . to_string( ) ,
590+ ] ) ,
591+ AirflowRole :: Worker => command. extend ( vec ! [
568592 "prepare_signal_handlers" . to_string( ) ,
569- format!( "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" ) ,
570- "airflow webserver &" . to_string( ) ,
571- ] ) ;
593+ container_debug_command( ) ,
594+ "airflow celery worker &" . to_string( ) ,
595+ ] ) ,
596+ }
597+ } else {
598+ match & self {
599+ AirflowRole :: Webserver => {
600+ // Getting auth commands for AuthClass
601+ command. extend ( Self :: authentication_start_commands ( auth_config) ) ;
602+ command. extend ( vec ! [
603+ "prepare_signal_handlers" . to_string( ) ,
604+ container_debug_command( ) ,
605+ "airflow webserver &" . to_string( ) ,
606+ ] ) ;
607+ }
608+ AirflowRole :: Scheduler => command. extend ( vec ! [
609+ // Database initialization is limited to the scheduler, see https://github.com/stackabletech/airflow-operator/issues/259
610+ "airflow db init" . to_string( ) ,
611+ "airflow db upgrade" . to_string( ) ,
612+ "airflow users create \
613+ --username \" $ADMIN_USERNAME\" \
614+ --firstname \" $ADMIN_FIRSTNAME\" \
615+ --lastname \" $ADMIN_LASTNAME\" \
616+ --email \" $ADMIN_EMAIL\" \
617+ --password \" $ADMIN_PASSWORD\" \
618+ --role \" Admin\" "
619+ . to_string( ) ,
620+ "prepare_signal_handlers" . to_string( ) ,
621+ container_debug_command( ) ,
622+ "airflow scheduler &" . to_string( ) ,
623+ ] ) ,
624+ AirflowRole :: Worker => command. extend ( vec ! [
625+ "prepare_signal_handlers" . to_string( ) ,
626+ container_debug_command( ) ,
627+ "airflow celery worker &" . to_string( ) ,
628+ ] ) ,
572629 }
573-
574- AirflowRole :: Scheduler => command. extend ( vec ! [
575- // Database initialization is limited to the scheduler, see https://github.com/stackabletech/airflow-operator/issues/259
576- "airflow db init" . to_string( ) ,
577- "airflow db upgrade" . to_string( ) ,
578- "airflow users create \
579- --username \" $ADMIN_USERNAME\" \
580- --firstname \" $ADMIN_FIRSTNAME\" \
581- --lastname \" $ADMIN_LASTNAME\" \
582- --email \" $ADMIN_EMAIL\" \
583- --password \" $ADMIN_PASSWORD\" \
584- --role \" Admin\" "
585- . to_string( ) ,
586- "prepare_signal_handlers" . to_string( ) ,
587- format!(
588- "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"
589- ) ,
590- "airflow scheduler &" . to_string( ) ,
591- ] ) ,
592- AirflowRole :: Worker => command. extend ( vec ! [
593- "prepare_signal_handlers" . to_string( ) ,
594- format!(
595- "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"
596- ) ,
597- "airflow celery worker &" . to_string( ) ,
598- ] ) ,
599630 }
631+
600632 // graceful shutdown part
601633 command. extend ( vec ! [
602634 "wait_for_termination $!" . to_string( ) ,
@@ -657,6 +689,10 @@ impl AirflowRole {
657689 }
658690}
659691
692+ fn container_debug_command ( ) -> String {
693+ format ! ( "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" )
694+ }
695+
660696#[ derive( Clone , Debug , Deserialize , Display , JsonSchema , PartialEq , Serialize ) ]
661697pub enum AirflowExecutor {
662698 /// The celery executor.
@@ -854,17 +890,17 @@ fn default_resources(role: &AirflowRole) -> ResourcesFragment<AirflowStorageConf
854890 let ( cpu, memory) = match role {
855891 AirflowRole :: Worker => (
856892 CpuLimitsFragment {
857- min : Some ( Quantity ( "500m " . into ( ) ) ) ,
893+ min : Some ( Quantity ( "1 " . into ( ) ) ) ,
858894 max : Some ( Quantity ( "2" . into ( ) ) ) ,
859895 } ,
860896 MemoryLimitsFragment {
861- limit : Some ( Quantity ( "2Gi " . into ( ) ) ) ,
897+ limit : Some ( Quantity ( "3Gi " . into ( ) ) ) ,
862898 runtime_limits : NoRuntimeLimitsFragment { } ,
863899 } ,
864900 ) ,
865901 AirflowRole :: Webserver => (
866902 CpuLimitsFragment {
867- min : Some ( Quantity ( "500m " . into ( ) ) ) ,
903+ min : Some ( Quantity ( "1 " . into ( ) ) ) ,
868904 max : Some ( Quantity ( "2" . into ( ) ) ) ,
869905 } ,
870906 MemoryLimitsFragment {
@@ -874,11 +910,11 @@ fn default_resources(role: &AirflowRole) -> ResourcesFragment<AirflowStorageConf
874910 ) ,
875911 AirflowRole :: Scheduler => (
876912 CpuLimitsFragment {
877- min : Some ( Quantity ( "500m " . to_owned ( ) ) ) ,
913+ min : Some ( Quantity ( "1 " . to_owned ( ) ) ) ,
878914 max : Some ( Quantity ( "2" . to_owned ( ) ) ) ,
879915 } ,
880916 MemoryLimitsFragment {
881- limit : Some ( Quantity ( "512Mi " . to_owned ( ) ) ) ,
917+ limit : Some ( Quantity ( "1Gi " . to_owned ( ) ) ) ,
882918 runtime_limits : NoRuntimeLimitsFragment { } ,
883919 } ,
884920 ) ,
0 commit comments