23
23
import itertools
24
24
import logging
25
25
import math
26
- import operator
27
26
import os
28
27
import signal
29
28
import traceback
106
105
from airflow .models .xcom import LazyXComSelectSequence , XCom
107
106
from airflow .plugins_manager import integrate_macros_plugins
108
107
from airflow .sdk .api .datamodels ._generated import AssetProfile
109
- from airflow .sdk .definitions ._internal .templater import SandboxedEnvironment
110
108
from airflow .sdk .definitions .asset import Asset , AssetAlias , AssetNameRef , AssetUniqueKey , AssetUriRef
111
109
from airflow .sdk .definitions .taskgroup import MappedTaskGroup
112
110
from airflow .sentry import Sentry
123
121
OutletEventAccessors ,
124
122
VariableAccessor ,
125
123
context_get_outlet_events ,
126
- context_merge ,
127
124
)
128
- from airflow .utils .email import send_email
129
- from airflow .utils .helpers import prune_dict , render_template_to_string
125
+ from airflow .utils .helpers import prune_dict
130
126
from airflow .utils .log .logging_mixin import LoggingMixin
131
127
from airflow .utils .net import get_hostname
132
128
from airflow .utils .operator_helpers import ExecutionCallableRunner , context_to_airflow_vars
@@ -1117,15 +1113,6 @@ def _handle_failure(
1117
1113
)
1118
1114
1119
1115
_log_state (task_instance = task_instance , lead_msg = "Immediate failure requested. " if force_fail else "" )
1120
- if (
1121
- failure_context ["task" ]
1122
- and failure_context ["email_for_state" ](failure_context ["task" ])
1123
- and failure_context ["task" ].email
1124
- ):
1125
- try :
1126
- task_instance .email_alert (error , failure_context ["task" ])
1127
- except Exception :
1128
- log .exception ("Failed to send email to: %s" , failure_context ["task" ].email )
1129
1116
1130
1117
if failure_context ["callbacks" ] and failure_context ["context" ]:
1131
1118
_run_finished_callback (
@@ -1317,116 +1304,6 @@ def _get_previous_start_date(
1317
1304
return pendulum .instance (prev_ti .start_date ) if prev_ti and prev_ti .start_date else None
1318
1305
1319
1306
1320
- def _email_alert (* , task_instance : TaskInstance , exception , task : BaseOperator ) -> None :
1321
- """
1322
- Send alert email with exception information.
1323
-
1324
- :param task_instance: the task instance
1325
- :param exception: the exception
1326
- :param task: task related to the exception
1327
-
1328
- :meta private:
1329
- """
1330
- subject , html_content , html_content_err = task_instance .get_email_subject_content (exception , task = task )
1331
- if TYPE_CHECKING :
1332
- assert task .email
1333
- try :
1334
- send_email (task .email , subject , html_content )
1335
- except Exception :
1336
- send_email (task .email , subject , html_content_err )
1337
-
1338
-
1339
- def _get_email_subject_content (
1340
- * ,
1341
- task_instance : TaskInstance ,
1342
- exception : BaseException ,
1343
- task : BaseOperator | None = None ,
1344
- ) -> tuple [str , str , str ]:
1345
- """
1346
- Get the email subject content for exceptions.
1347
-
1348
- :param task_instance: the task instance
1349
- :param exception: the exception sent in the email
1350
- :param task:
1351
-
1352
- :meta private:
1353
- """
1354
- # For a ti from DB (without ti.task), return the default value
1355
- if task is None :
1356
- task = getattr (task_instance , "task" )
1357
- use_default = task is None
1358
- exception_html = str (exception ).replace ("\n " , "<br>" )
1359
-
1360
- default_subject = "Airflow alert: {{ti}}"
1361
- # For reporting purposes, we report based on 1-indexed,
1362
- # not 0-indexed lists (i.e. Try 1 instead of
1363
- # Try 0 for the first attempt).
1364
- default_html_content = (
1365
- "Try {{try_number}} out of {{max_tries + 1}}<br>"
1366
- "Exception:<br>{{exception_html}}<br>"
1367
- 'Log: <a href="{{ti.log_url}}">Link</a><br>'
1368
- "Host: {{ti.hostname}}<br>"
1369
- 'Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>'
1370
- )
1371
-
1372
- default_html_content_err = (
1373
- "Try {{try_number}} out of {{max_tries + 1}}<br>"
1374
- "Exception:<br>Failed attempt to attach error logs<br>"
1375
- 'Log: <a href="{{ti.log_url}}">Link</a><br>'
1376
- "Host: {{ti.hostname}}<br>"
1377
- 'Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>'
1378
- )
1379
-
1380
- additional_context : dict [str , Any ] = {
1381
- "exception" : exception ,
1382
- "exception_html" : exception_html ,
1383
- "try_number" : task_instance .try_number ,
1384
- "max_tries" : task_instance .max_tries ,
1385
- }
1386
-
1387
- if use_default :
1388
- default_context = {"ti" : task_instance , ** additional_context }
1389
- jinja_env = jinja2 .Environment (
1390
- loader = jinja2 .FileSystemLoader (os .path .dirname (__file__ )), autoescape = True
1391
- )
1392
- subject = jinja_env .from_string (default_subject ).render (** default_context )
1393
- html_content = jinja_env .from_string (default_html_content ).render (** default_context )
1394
- html_content_err = jinja_env .from_string (default_html_content_err ).render (** default_context )
1395
-
1396
- else :
1397
- if TYPE_CHECKING :
1398
- assert task_instance .task
1399
-
1400
- # Use the DAG's get_template_env() to set force_sandboxed. Don't add
1401
- # the flag to the function on task object -- that function can be
1402
- # overridden, and adding a flag breaks backward compatibility.
1403
- dag = task_instance .task .get_dag ()
1404
- if dag :
1405
- jinja_env = dag .get_template_env (force_sandboxed = True )
1406
- else :
1407
- jinja_env = SandboxedEnvironment (cache_size = 0 )
1408
- jinja_context = task_instance .get_template_context ()
1409
- context_merge (jinja_context , additional_context )
1410
-
1411
- def render (key : str , content : str ) -> str :
1412
- if conf .has_option ("email" , key ):
1413
- path = conf .get_mandatory_value ("email" , key )
1414
- try :
1415
- with open (path ) as f :
1416
- content = f .read ()
1417
- except FileNotFoundError :
1418
- log .warning ("Could not find email template file '%s'. Using defaults..." , path )
1419
- except OSError :
1420
- log .exception ("Error while using email template %s. Using defaults..." , path )
1421
- return render_template_to_string (jinja_env .from_string (content ), jinja_context )
1422
-
1423
- subject = render ("subject_template" , default_subject )
1424
- html_content = render ("html_content_template" , default_html_content )
1425
- html_content_err = render ("html_content_template" , default_html_content_err )
1426
-
1427
- return subject , html_content , html_content_err
1428
-
1429
-
1430
1307
def _run_finished_callback (
1431
1308
* ,
1432
1309
callbacks : None | TaskStateChangeCallback | list [TaskStateChangeCallback ],
@@ -3131,8 +3008,7 @@ def fetch_handle_failure_context(
3131
3008
if context is not None :
3132
3009
context ["exception" ] = error
3133
3010
3134
- # Set state correctly and figure out how to log it and decide whether
3135
- # to email
3011
+ # Set state correctly and figure out how to log it
3136
3012
3137
3013
# Note, callback invocation needs to be handled by caller of
3138
3014
# _run_raw_task to avoid race conditions which could lead to duplicate
@@ -3150,11 +3026,10 @@ def fetch_handle_failure_context(
3150
3026
assert isinstance (ti .task , BaseOperator )
3151
3027
task = ti .task .unmap ((context , session ))
3152
3028
except Exception :
3153
- cls .logger ().error ("Unable to unmap task to determine if we need to send an alert email " )
3029
+ cls .logger ().error ("Unable to unmap task to determine what callback to use " )
3154
3030
3155
3031
if force_fail or not ti .is_eligible_to_retry ():
3156
3032
ti .state = TaskInstanceState .FAILED
3157
- email_for_state = operator .attrgetter ("email_on_failure" )
3158
3033
callbacks = task .on_failure_callback if task else None
3159
3034
3160
3035
if task and fail_fast :
@@ -3169,7 +3044,6 @@ def fetch_handle_failure_context(
3169
3044
TaskInstanceHistory .record_ti (ti , session = session )
3170
3045
3171
3046
ti .state = State .UP_FOR_RETRY
3172
- email_for_state = operator .attrgetter ("email_on_retry" )
3173
3047
callbacks = task .on_retry_callback if task else None
3174
3048
3175
3049
get_listener_manager ().hook .on_task_instance_failed (
@@ -3178,7 +3052,6 @@ def fetch_handle_failure_context(
3178
3052
3179
3053
return {
3180
3054
"ti" : ti ,
3181
- "email_for_state" : email_for_state ,
3182
3055
"task" : task ,
3183
3056
"callbacks" : callbacks ,
3184
3057
"context" : context ,
@@ -3326,26 +3199,6 @@ def render_templates(
3326
3199
3327
3200
return original_task
3328
3201
3329
- def get_email_subject_content (
3330
- self , exception : BaseException , task : BaseOperator | None = None
3331
- ) -> tuple [str , str , str ]:
3332
- """
3333
- Get the email subject content for exceptions.
3334
-
3335
- :param exception: the exception sent in the email
3336
- :param task:
3337
- """
3338
- return _get_email_subject_content (task_instance = self , exception = exception , task = task )
3339
-
3340
- def email_alert (self , exception , task : BaseOperator ) -> None :
3341
- """
3342
- Send alert email with exception information.
3343
-
3344
- :param exception: the exception
3345
- :param task: task related to the exception
3346
- """
3347
- _email_alert (task_instance = self , exception = exception , task = task )
3348
-
3349
3202
def set_duration (self ) -> None :
3350
3203
"""Set task instance duration."""
3351
3204
_set_duration (task_instance = self )
0 commit comments