Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Unable to see logs in the web UI when the job is running #45516

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
1 of 2 tasks
vba opened this issue Jan 9, 2025 · 8 comments
Closed
1 of 2 tasks

Unable to see logs in the web UI when the job is running #45516

vba opened this issue Jan 9, 2025 · 8 comments
Labels
area:core area:logging area:webserver Webserver related Issues kind:bug This is a clearly a bug pending-response provider:amazon AWS/Amazon - related issues

Comments

@vba
Copy link

vba commented Jan 9, 2025

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.10.3

What happened?

Since our migration from Airflow 2.4.3 to 2.9.3 and then to 2.10.3, we have noticed that it has become impossible to access logs via the web UI or the Rest API for a running Task instance.

We run our Airflow instance within the in-house k8s infrastructure, using S3 as our remote logging end.

When the Task instance completes its run, the remote log is visible through the web UI. In v2.4.3 for the same params we never encountered similar issues. Here are our logging config section:

[logging]
base_log_folder = /opt/airflow/logs
remote_logging = True
remote_log_conn_id = s3_airflow_logs
delete_local_logs = False
google_key_path = 
remote_base_log_folder = s3://the-bucket
remote_task_handler_kwargs = 
encrypt_s3_logs = False
logging_level = INFO
celery_logging_level = 
fab_logging_level = WARNING
logging_config_class = 
colored_console_log = False
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
dag_processor_log_target = file
dag_processor_log_format = [%%(asctime)s] [SOURCE:DAG_PROCESSOR] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
log_formatter_class = airflow.utils.log.timezone_aware.TimezoneAware
secret_mask_adapter = 
task_log_prefix_template = 
log_filename_template = dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{%% if ti.map_index >= 0 %%}map_index={{ ti.map_index }}/{%% endif %%}attempt={{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
dag_processor_manager_log_stdout = False
task_log_reader = task
extra_logger_names = 
worker_log_server_port = 8793
trigger_log_server_port = 8794
# interleave_timestamp_parser = 
file_task_handler_new_folder_permissions = 0o775
file_task_handler_new_file_permissions = 0o664
celery_stdout_stderr_separation = False
enable_task_context_logger = True
color_log_error_keywords = error,exception
color_log_warning_keywords = warn

When we try to access the logs for the running task, we see the following text with no content:
image

Same result for already finalized task attempts:
image

When we try to get the logs via the REST API (/api/v1/dags/MY-DAG1/dagRuns/manual__DATE/taskInstances/MY-TASK/logs/8?full_content=false) after long waiting, we get a time-out exception and following page:
image

What you think should happen instead?

If we check the webserver logs we notice the following exceptions:

Traceback (most recent call last):
  File "/my-path/python3.10/site-packages/flask/app.py", line 2529, in wsgi_app
    response = self.full_dispatch_request()
  File "/my-path/python3.10/site-packages/flask/app.py", line 1825, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/my-path/python3.10/site-packages/flask/app.py", line 1823, in full_dispatch_request
    rv = self.dispatch_request()
  File "/my-path/python3.10/site-packages/flask/app.py", line 1799, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
  File "/my-path/python3.10/site-packages/connexion/decorators/decorator.py", line 68, in wrapper
    response = function(request)
  File "/my-path/python3.10/site-packages/connexion/decorators/uri_parsing.py", line 149, in wrapper
    response = function(request)
  File "/my-path/python3.10/site-packages/connexion/decorators/validation.py", line 399, in wrapper
    return function(request)
  File "/my-path/python3.10/site-packages/connexion/decorators/response.py", line 113, in wrapper
    return _wrapper(request, response)
  File "/my-path/python3.10/site-packages/connexion/decorators/response.py", line 90, in _wrapper
    self.operation.api.get_connexion_response(response, self.mimetype)
  File "/my-path/python3.10/site-packages/connexion/apis/abstract.py", line 366, in get_connexion_response
    return cls._framework_to_connexion_response(response=response, mimetype=mimetype)
  File "/my-path/python3.10/site-packages/connexion/apis/flask_api.py", line 165, in _framework_to_connexion_response
    body=response.get_data() if not response.direct_passthrough else None,
  File "/my-path/python3.10/site-packages/werkzeug/wrappers/response.py", line 314, in get_data
    self._ensure_sequence()
  File "/my-path/python3.10/site-packages/werkzeug/wrappers/response.py", line 376, in _ensure_sequence
    self.make_sequence()
  File "/my-path/python3.10/site-packages/werkzeug/wrappers/response.py", line 391, in make_sequence
    self.response = list(self.iter_encoded())
  File "/my-path/python3.10/site-packages/werkzeug/wrappers/response.py", line 50, in _iter_encoded
    for item in iterable:
  File "/my-path/python3.10/site-packages/airflow/utils/log/log_reader.py", line 94, in read_log_stream
    logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
  File "/my-path/python3.10/site-packages/airflow/utils/log/log_reader.py", line 66, in read_log_chunks
    logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
  File "/my-path/python3.10/site-packages/airflow/utils/log/file_task_handler.py", line 491, in read
    log, out_metadata = self._read(task_instance, try_number_element, metadata)
  File "/my-path/python3.10/site-packages/airflow/utils/log/file_task_handler.py", line 389, in _read
    response = self._executor_get_task_log(ti, try_number)
  File "/my-path/python3.10/functools.py", line 981, in __get__
    val = self.func(instance)
  File "/my-path/python3.10/site-packages/airflow/utils/log/file_task_handler.py", line 346, in _executor_get_task_log
    executor = ExecutorLoader.get_default_executor()
  File "/my-path/python3.10/site-packages/airflow/executors/executor_loader.py", line 165, in get_default_executor
    default_executor = cls.load_executor(cls.get_default_executor_name())
  File "/my-path/python3.10/site-packages/airflow/executors/executor_loader.py", line 246, in load_executor
    executor = executor_cls()
  File "/my-path/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 135, in __init__
    self.task_queue: Queue[KubernetesJobType] = self._manager.Queue()
  File "/my-path/python3.10/multiprocessing/managers.py", line 723, in temp
    token, exp = self._create(typeid, *args, **kwds)
  File "/my-path/python3.10/multiprocessing/managers.py", line 606, in _create
    conn = self._Client(self._address, authkey=self._authkey)
  File "/my-path/python3.10/multiprocessing/connection.py", line 508, in Client
    answer_challenge(c, authkey)
  File "/my-path/python3.10/multiprocessing/connection.py", line 752, in answer_challenge
    message = connection.recv_bytes(256)         # reject large message
  File "/my-path/python3.10/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/my-path/python3.10/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "/my-path/python3.10/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
BlockingIOError: [Errno 11] Resource temporarily unavailable
*.*.*.* - - [???? +0000] "GET /api/v1/dags/MY-DAG1/dagRuns/manual__DATE/taskInstances/MY-TASK/logs/8?full_content=false HTTP/1.1" 500 1589 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:133.0) Gecko/20100101 Firefox/133.0"

What we notice is that the s3_task_handler does its part of the job correctly, for a running task it gets the s3 content and if there is no content it clearly says No logs found on s3 for ti=<TaskInstance: ... The problem starts when we try to get stdout for the running k8s pod, as shown above it ends with BlockingIOError - Resource temporarily unavailable. It all fails in file_task_handler within _read method:

log, out_metadata = self._read(task_instance, try_number_element, metadata)

It looks like this problem has been around for several minor releases.

How to reproduce

You need to deploy an instance of airflow within a k8s cluster with remote logs activated, it should be enough. For solving another issue related to the remote logging, we set up following env vars(not sure if it's relevant):

_AIRFLOW_PATCH_GEVENT=1
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES___AIRFLOW_PATCH_GEVENT=1

Operating System

Debian GNU/Linux trixie/sid

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==9.0.0
apache-airflow-providers-apache-cassandra==3.6.0
apache-airflow-providers-apache-druid==3.12.0
apache-airflow-providers-apache-hdfs==4.6.0
apache-airflow-providers-apache-hive==8.2.0
apache-airflow-providers-apache-kylin==3.7.0
apache-airflow-providers-apache-livy==3.9.2
apache-airflow-providers-apache-spark==5.0.0
apache-airflow-providers-celery==3.8.3
apache-airflow-providers-cncf-kubernetes==9.0.1
apache-airflow-providers-common-compat==1.2.1
apache-airflow-providers-common-io==1.4.2
apache-airflow-providers-common-sql==1.19.0
apache-airflow-providers-fab==1.5.0
apache-airflow-providers-ftp==3.11.1
apache-airflow-providers-google==10.25.0
apache-airflow-providers-grpc==3.6.0
apache-airflow-providers-hashicorp==3.8.0
apache-airflow-providers-http==4.13.2
apache-airflow-providers-imap==3.7.0
apache-airflow-providers-microsoft-azure==11.0.0
apache-airflow-providers-microsoft-mssql==3.9.1
apache-airflow-providers-microsoft-winrm==3.6.0
apache-airflow-providers-mongo==4.2.2
apache-airflow-providers-papermill==3.8.1
apache-airflow-providers-postgres==5.13.1
apache-airflow-providers-sftp==4.11.1
apache-airflow-providers-slack==8.9.1
apache-airflow-providers-smtp==1.8.0
apache-airflow-providers-sqlite==3.9.0
apache-airflow-providers-ssh==3.14.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

Kube version: v1.30.4
Helm: version.BuildInfo{Version:"v3.15.2", GitCommit:"1a500d5625419a524fdae4b33de351cc4f58ec35", GitTreeState:"clean", GoVersion:"go1.22.4"}

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@vba vba added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jan 9, 2025
Copy link

boring-cyborg bot commented Jan 9, 2025

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@dosubot dosubot bot added area:logging area:webserver Webserver related Issues provider:amazon AWS/Amazon - related issues labels Jan 9, 2025
@potiuk
Copy link
Member

potiuk commented Jan 10, 2025

Can you please explain what executir and what log volume configuration you have?

I believe it might have something to do with the the volume you are using to store the logs. This looks very much like the volume does not allow to concurrently write and read files from. I think it would be great if you could check that and see what type of volume you have there.

Or @dstandish -> do you think that might be related to the logging change you implemented? Maybe that rings a bell too?

@potiuk potiuk added pending-response and removed needs-triage label for new issues that we didn't triage yet labels Jan 10, 2025
@jason810496
Copy link
Member

jason810496 commented Jan 10, 2025

Could this issue be related to #45529? The error log traceback also references executor_loader.
If the TaskInstance is still in RUNNING stage, the file task handler will read from _executor_get_task_log instead of reading from _read_from_logs_server.

@vba
Copy link
Author

vba commented Jan 13, 2025

Hi @potiuk,

Can you please explain what executir and what log volume configuration you have?

As you can see in the stacktrace in the middle of my issue, the executor is kubernetes_executor and as for the volume used for logging, here's the configuration (as shown in the issue):

base_log_folder = /opt/airflow/logs
remote_logging = True
remote_log_conn_id = s3_airflow_logs
delete_local_logs = False
google_key_path = 
remote_base_log_folder = s3://the-bucket

here is how it's run by the k8s cluster:

# ...
spec:
  containers:
  - volumeMounts:
    - mountPath: /opt/airflow/logs
      name: logs
# ...
  volumes
  - emptyDir:
      sizeLimit: 10Gi
    name: logs

I believe it might have something to do with the the volume you are using to store the logs. This looks very much like the volume does not allow to concurrently write and read files from. I think it would be great if you could check that and see what type of volume you have there.

I don't think my problem is purely a configuration issue. If I downgrade my airflow instance to version 2.4.3, everything works fine for the same k8s infrastructure. A silly test reveals no difficulty:

echo "my test" > /opt/airflow/logs/log.txt && cat /opt/airflow/logs/log.txt
...
my test

@vba
Copy link
Author

vba commented Jan 13, 2025

Hi @jason810496

Could this issue be related to #45529? The error log traceback also references executor_loader. If the TaskInstance is still in RUNNING stage, the file task handler will read from _executor_get_task_log instead of reading from _read_from_logs_server.

Sorry, I was not clear in my previous message, the problem only occurs when a job and its tasks of interest are to be run, which assumes that its state is RUNNING, once the job is finished all logs for the last active task instance regain their visibility. I think this issue was introduced with the version 2.6 or 2.7.

@potiuk
Copy link
Member

potiuk commented Jan 13, 2025

As you can see in the stacktrace in the middle of my issue, the executor is kubernetes_executor

Thanks. that saves a bit of searching through a stack trace. In the future might be better to specify it explicitly rather than leave a chnce that somoene will find it. It allows for people who look at it and try to help to quickly assess whether they can help or whether the case "rings a bell" without actually spending time and looking at such details. It simply optimizes for time of those who try to help you to solve your problem

and as for the volume used for logging, here's the configuration (as shown in the issue):

I was more thinking - what are properties of the volume you have. Something that you can look at your K8S way of handling volumes of the specific kind you use. The error indicates, that somewhere during receiving logs you get "resource unavailable" error. After looking at this - it seems that somewhere the k8s reads logs from remote pod and something does not let it read it.

And I think in this case it's something in your K8S configuration., There is a similar issue kubernetes/kubernetes#105928 which indicates that somewhere logs are filing space - for example containerd version used has a problem.

And yes - I think the way how logs are read has changed between versions of k8s provider - you can take a look at the changelog - so maybe you had uncovered a configuration or another issue in your K8S. Maybe you can try to see your k8s logs correlating with the events and see if you have some other errors in other components of K8S that indicate what is a root cause.

Unfortunately k8s has 100s of moving parts and sometimes you need to dig deeper to find out the root causes (for example often problems - very strange) might occur when your DNS does not have enough resources to respond on time, and the only way to see what's going on is to generally look at what happens in your K8S and see potential issues that are correlated with the event.

But I am mostly guessing here - I wanted to help and direct the discussion but I have no deep knowledge on this particular part.

@vba
Copy link
Author

vba commented Jan 13, 2025

Hi @potiuk,

Thanks for your reply.

It simply optimizes for time of those who try to help you to solve your problem.

No pb, as the subject is very complex, I didn't know how to present the key elements.

it seems that somewhere the k8s reads logs from remote pod and something does not let it read it.

I've actually researched this problem quite a bit. Firstly, if I roll back to Airflow 2.4.3, the problem disappears. Another thing is that I've patched the Airflow code with icecream, trying to understand the problem step by step. In the log below, I call the API for the task instance that is running, you will notice that the remote log was fetched, and the problem starts afterwards :

10.*.*.* - - [09/Jan/2025:10:23:02 +0000] "GET /api/v1/dags/code-python-airflow-sample/dagRuns/manual__2024-12-27T20:29:34.637180+00:00/taskInstances/python_task/logs/8?full_content=false HTTP/1.1" 500 1589 "https://airflow1-sandbox-dv.numberly.dev/dags/code-python-airflow-sample/grid?dag_run_id=manual__2024-12-27T20%3A29%3A34.637180%2B00%3A00&tab=logs&task_id=python_task" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:133.0) Gecko/20100101 Firefox/133.0"
[2025-01-09 10:23:02 +0000] [70] [INFO] Parent changed, shutting down: <Worker 61>
[2025-01-09 10:23:02 +0000] [70] [INFO] Worker exiting (pid: 61)
10.*.*.* - - [09/Jan/2025:10:23:03 +0000] "GET /health HTTP/1.1" 200 283 "-" "kube-probe/1.30"
10.*.*.* - - [09/Jan/2025:10:23:03 +0000] "GET /health HTTP/1.1" 200 283 "-" "kube-probe/1.30"
LOG ISSUE DEBUG -> log_endpoint.py:60 in get_log()
                   "Starting #get_log": 'Starting #get_log'
LOG ISSUE DEBUG -> log_endpoint.py:62 in get_log()
                   key: '****************'
LOG ISSUE DEBUG -> log_endpoint.py:63 in get_log()- token: None
LOG ISSUE DEBUG -> log_endpoint.py:71 in get_log()- metadata: {}
LOG ISSUE DEBUG -> log_endpoint.py:80 in get_log()
                   metadata: {'download_logs': False}
LOG ISSUE DEBUG -> log_endpoint.py:82 in get_log()
                   task_log_reader: <airflow.utils.log.log_reader.TaskLogReader object at 0x7f539c1ed510>
LOG ISSUE DEBUG -> log_reader.py:119 in log_handler()
                   task_log_reader: 'task'
LOG ISSUE DEBUG -> log_reader.py:120 in log_handler()
                   logging.getLogger("airflow.task").handlers: [<S3TaskHandler (NOTSET)>]
LOG ISSUE DEBUG -> log_reader.py:121 in log_handler()
                   logging.getLogger().handlers: [<RedirectStdHandler <stdout> (NOTSET)>]
LOG ISSUE DEBUG -> log_endpoint.py:96 in get_log()
                   ti: <TaskInstance: code-python-airflow-sample.python_task manual__2024-12-27T20:29:34.637180+00:00 [running]>
LOG ISSUE DEBUG -> log_endpoint.py:103 in get_log()
                   dag: <DAG: code-python-airflow-sample>
LOG ISSUE DEBUG -> log_endpoint.py:107 in get_log()
                   ti.task: <Task(PythonOperator): python_task>
LOG ISSUE DEBUG -> log_endpoint.py:112 in get_log()
                   return_type: 'text/plain'
LOG ISSUE DEBUG -> log_endpoint.py:128 in get_log()
                   logs: <generator object TaskLogReader.read_log_stream at 0x7f5396d165e0>
LOG ISSUE DEBUG -> log_endpoint.py:130 in get_log()- 'Ending'
LOG ISSUE DEBUG -> log_reader.py:78 in read_log_stream()
                   "Starting #read_log_stream": 'Starting #read_log_stream'
LOG ISSUE DEBUG -> log_reader.py:84 in read_log_stream()
                   try_numbers: [8]
LOG ISSUE DEBUG -> log_reader.py:85 in read_log_stream()
                   ti: <TaskInstance: code-python-airflow-sample.python_task manual__2024-12-27T20:29:34.637180+00:00 [running]>
LOG ISSUE DEBUG -> log_reader.py:86 in read_log_stream()
                   metadata: {'download_logs': False}
LOG ISSUE DEBUG -> log_reader.py:92 in read_log_stream()
                   metadata: {'download_logs': False}
LOG ISSUE DEBUG -> log_reader.py:65 in read_log_chunks()
                   self.log_handler: <S3TaskHandler (NOTSET)>
LOG ISSUE DEBUG -> s3_task_handler.py:122 in _read_remote_logs()
                   ti: <TaskInstance: code-python-airflow-sample.python_task manual__2024-12-27T20:29:34.637180+00:00 [running]>
LOG ISSUE DEBUG -> s3_task_handler.py:123 in _read_remote_logs()
                   metadata: {'download_logs': False}
LOG ISSUE DEBUG -> s3_task_handler.py:125 in _read_remote_logs()
                   worker_log_rel_path: 'dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log'
LOG ISSUE DEBUG -> s3_task_handler.py:130 in _read_remote_logs()
                   bucket: 'my-bucket-logs'
LOG ISSUE DEBUG -> s3_task_handler.py:131 in _read_remote_logs()
                   prefix: 'airflow1-sandbox-dv/dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log'
LOG ISSUE DEBUG -> s3_task_handler.py:133 in _read_remote_logs()
                   keys: ['airflow1-sandbox-dv/dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log']
LOG ISSUE DEBUG -> s3_task_handler.py:142 in _read_remote_logs()
                   messages: ['Found logs in s3:',
                              '  * '
                              's3://my-bucket-logs/airflow1-sandbox-dv/dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log']
LOG ISSUE DEBUG -> s3_task_handler.py:143 in _read_remote_logs()
                   logs: ['[2025-01-08T19:29:02.537+0000] {local_task_job_runner.py:123} INFO - '
                          '::group::Pre task execution logs
                         '
                          '[2025-01-08T19:29:02.589+0000] {taskinstance.py:2613} INFO - Dependencies '
                          'all met for dep_context=non-requeueable deps ti=<TaskInstance: '
                          'code-python-airflow-sample.python_task '
                          'manual__2024-12-27T20:29:34.637180+00:00 [queued]>
                         '
                          '[2025-01-08T19:29:02.602+0000] {taskinstance.py:2613} INFO - Dependencies '
                          'all met for dep_context=requeueable deps ti=<TaskInstance: '
                          'code-python-airflow-sample.python_task '
                          'manual__2024-12-27T20:29:34.637180+00:00 [queued]>
                         '
                          '[2025-01-08T19:29:02.602+0000] {taskinstance.py:2866} INFO - Starting '
                          'attempt 8 of 9
                         '
                          '[2025-01-08T19:29:02.627+0000] {taskinstance.py:2889} INFO - Executing '
                          '<Task(PythonOperator): python_task> on 2024-12-27 20:29:34.637180+00:00
                         '
                          '[2025-01-08T19:29:02.633+0000] {standard_task_runner.py:72} INFO - Started '
                          'process 9 to run task
                         '
                          '[2025-01-08T19:29:02.641+0000] {standard_task_runner.py:104} INFO - Running: '
                          "['airflow', 'tasks', 'run', 'code-python-airflow-sample', 'python_task', "
                          "'manual__2024-12-27T20:29:34.637180+00:00', '--job-id', '133', '--raw', "
                          "'--subdir', 'DAGS_FOLDER/code_python_airflow_sample/airflow_dag.py', "
                          "'--cfg-path', '/tmp/tmp38klyd4c']
                         "
                          '[2025-01-08T19:29:02.646+0000] {standard_task_runner.py:105} INFO - Job 133: '
                          'Subtask python_task
                          '[2025-01-08T19:59:03.705+0000] {taskinstance.py:352} INFO - Marking task as '
                          'SUCCESS. dag_id=code-python-airflow-sample, task_id=python_task, '
                          'run_id=manual__2024-12-27T20:29:34.637180+00:00, '
                          'execution_date=20241227T202934, start_date=20250108T192902, '
                          'end_date=20250108T195903
                         '
                          '[2025-01-08T19:59:03.763+0000] {local_task_job_runner.py:266} INFO - Task '
                          'exited with return code 0
                         '
                          '[2025-01-08T19:59:03.857+0000] {taskinstance.py:3895} INFO - 0 downstream '
                          'tasks scheduled from follow-on schedule check
                         '
                          '[2025-01-08T19:59:03.860+0000] {local_task_job_runner.py:245} INFO - '
                          '::endgroup::
                         ']
[2025-01-09T10:23:03.662+0000] {app.py:1744} ERROR - Exception on /api/v1/dags/code-python-airflow-sample/dagRuns/manual__2024-12-27T20:29:34.637180+00:00/taskInstances/python_task/logs/8 [GET]
Traceback (most recent call last):
  File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 2529, in wsgi_app
    response = self.full_dispatch_request()
  File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 1825, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 1823, in full_dispatch_request
    rv = self.dispatch_request()
  File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 1799, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
  File "/my-loc/lib/python3.10/site-packages/connexion/decorators/decorator.py", line 68, in wrapper
    response = function(request)
  File "/my-loc/lib/python3.10/site-packages/connexion/decorators/uri_parsing.py", line 149, in wrapper
    response = function(request)
  File "/my-loc/lib/python3.10/site-packages/connexion/decorators/validation.py", line 399, in wrapper
    return function(request)
  File "/my-loc/lib/python3.10/site-packages/connexion/decorators/response.py", line 113, in wrapper
    return _wrapper(request, response)
  File "/my-loc/lib/python3.10/site-packages/connexion/decorators/response.py", line 90, in _wrapper
    self.operation.api.get_connexion_response(response, self.mimetype)
  File "/my-loc/lib/python3.10/site-packages/connexion/apis/abstract.py", line 366, in get_connexion_response
    return cls._framework_to_connexion_response(response=response, mimetype=mimetype)
  File "/my-loc/lib/python3.10/site-packages/connexion/apis/flask_api.py", line 165, in _framework_to_connexion_response
    body=response.get_data() if not response.direct_passthrough else None,
  File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", line 314, in get_data
    self._ensure_sequence()
  File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", line 376, in _ensure_sequence
    self.make_sequence()
  File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", line 391, in make_sequence
    self.response = list(self.iter_encoded())
  File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", line 50, in _iter_encoded
    for item in iterable:
  File "/my-loc/lib/python3.10/site-packages/airflow/utils/log/log_reader.py", line 94, in read_log_stream
    logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
  File "/my-loc/lib/python3.10/site-packages/airflow/utils/log/log_reader.py", line 66, in read_log_chunks
    logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
  File "/my-loc/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py", line 491, in read
    log, out_metadata = self._read(task_instance, try_number_element, metadata)
  File "/my-loc/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py", line 389, in _read
    response = self._executor_get_task_log(ti, try_number)
  File "/my-loc/lib/python3.10/functools.py", line 981, in __get__
    val = self.func(instance)
  File "/my-loc/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py", line 346, in _executor_get_task_log
    executor = ExecutorLoader.get_default_executor()
  File "/my-loc/lib/python3.10/site-packages/airflow/executors/executor_loader.py", line 165, in get_default_executor
    default_executor = cls.load_executor(cls.get_default_executor_name())
  File "/my-loc/lib/python3.10/site-packages/airflow/executors/executor_loader.py", line 246, in load_executor
    executor = executor_cls()
  File "/my-loc/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 135, in __init__
    self.task_queue: Queue[KubernetesJobType] = self._manager.Queue()
  File "/my-loc/lib/python3.10/multiprocessing/managers.py", line 723, in temp
    token, exp = self._create(typeid, *args, **kwds)
  File "/my-loc/lib/python3.10/multiprocessing/managers.py", line 606, in _create
    conn = self._Client(self._address, authkey=self._authkey)
  File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 508, in Client
    answer_challenge(c, authkey)
  File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 752, in answer_challenge
    message = connection.recv_bytes(256)         # reject large message
  File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
BlockingIOError: [Errno 11] Resource temporarily unavailable

As you can see, all lines of code are slightly offset.

@potiuk
Copy link
Member

potiuk commented Jan 13, 2025

Yeah, I think I knw what it is. it looks like your processes have too low ulimit set. The error usually happens when there are not enough sockets. This is an "os" level error and often caused by ulimit constraints on the kernel level. Simply it looks like whatever underlying kernel configuration you have, it opened too many sockets already. This might also happen when you have many, many many processes/pods sharing the same kernel on the machine you run it - each of them will open sockets for all kind of communication and at some point of time you simply run out of those.

So you have to look at the configuration of your Kubernetes and Kernel to see how you can increase the numbers.

Converting it into a discussion in case more is needeed.

@apache apache locked and limited conversation to collaborators Jan 13, 2025
@potiuk potiuk converted this issue into discussion #45624 Jan 13, 2025

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
area:core area:logging area:webserver Webserver related Issues kind:bug This is a clearly a bug pending-response provider:amazon AWS/Amazon - related issues
Projects
None yet
Development

No branches or pull requests

3 participants