Solvedairflow scheduler gets stuck without a trace
✔️Accepted Answer
We just saw this on 2.0.1 when we added a largish number of new DAGs (We're adding around 6000 DAGs total, but this seems to lock up when about 200 try to be scheduled at once).
Here's py-spy stacktraces from our scheduler:
Process 6: /usr/local/bin/python /usr/local/bin/airflow scheduler
Python v3.8.7 (/usr/local/bin/python3.8)
Thread 0x7FF5C09C8740 (active): "MainThread"
_send (multiprocessing/connection.py:368)
_send_bytes (multiprocessing/connection.py:411)
send (multiprocessing/connection.py:206)
send_callback_to_execute (airflow/utils/dag_processing.py:283)
_send_dag_callbacks_to_processor (airflow/jobs/scheduler_job.py:1795)
_schedule_dag_run (airflow/jobs/scheduler_job.py:1762)
_do_scheduling (airflow/jobs/scheduler_job.py:1521)
_run_scheduler_loop (airflow/jobs/scheduler_job.py:1382)
_execute (airflow/jobs/scheduler_job.py:1280)
run (airflow/jobs/base_job.py:237)
scheduler (airflow/cli/commands/scheduler_command.py:63)
wrapper (airflow/utils/cli.py:89)
command (airflow/cli/cli_parser.py:48)
main (airflow/__main__.py:40)
<module> (airflow:8)
Process 77: airflow scheduler -- DagFileProcessorManager
Python v3.8.7 (/usr/local/bin/python3.8)
Thread 0x7FF5C09C8740 (active): "MainThread"
_send (multiprocessing/connection.py:368)
_send_bytes (multiprocessing/connection.py:405)
send (multiprocessing/connection.py:206)
_run_parsing_loop (airflow/utils/dag_processing.py:698)
start (airflow/utils/dag_processing.py:596)
_run_processor_manager (airflow/utils/dag_processing.py:365)
run (multiprocessing/process.py:108)
_bootstrap (multiprocessing/process.py:315)
_launch (multiprocessing/popen_fork.py:75)
__init__ (multiprocessing/popen_fork.py:19)
_Popen (multiprocessing/context.py:277)
start (multiprocessing/process.py:121)
start (airflow/utils/dag_processing.py:248)
_execute (airflow/jobs/scheduler_job.py:1276)
run (airflow/jobs/base_job.py:237)
scheduler (airflow/cli/commands/scheduler_command.py:63)
wrapper (airflow/utils/cli.py:89)
command (airflow/cli/cli_parser.py:48)
main (airflow/__main__.py:40)
<module> (airflow:8)
What I think is happening is that the pipe between the DagFileProcessorAgent
and the DagFileProcessorManager
is full and is causing the Scheduler to deadlock.
From what I can see the DagFileProcessorAgent
only pulls data off the pipe in it's heartbeat
and wait_until_finished
functions
(
airflow/airflow/utils/dag_processing.py
Line 374 in beb8af5
and that the SchedulerJob is responsible for calling it's heartbeat
function each scheduler loop (
airflow/airflow/jobs/scheduler_job.py
Line 1388 in beb8af5
However, the SchedulerJob is blocked from calling heartbeat
because it's blocked forever trying to send data to the same full pipe as part of the _send_dag_callbacks_to_processor
in the _do_scheduling_
function causing a deadlock.
Other Answers:
+1 on this issue.
Airflow 2.0.1
CeleryExecutor.
7000 dags~ seems to happen under load (when we have a bunch all dags all kick off at midnight)
py-spy dump --pid 132 --locals
Process 132: /usr/local/bin/python /usr/local/bin/airflow scheduler
Python v3.8.3 (/usr/local/bin/python)
Thread 132 (idle): "MainThread"
_send (multiprocessing/connection.py:368)
Arguments::
self: <Connection at 0x7f5db7aac550>
buf: <bytes at 0x5564f22e5260>
write: <builtin_function_or_method at 0x7f5dbed8a540>
Locals::
remaining: 1213
_send_bytes (multiprocessing/connection.py:411)
Arguments::
self: <Connection at 0x7f5db7aac550>
buf: <memoryview at 0x7f5db66f4a00>
Locals::
n: 1209
header: <bytes at 0x7f5dbc01fb10>
send (multiprocessing/connection.py:206)
Arguments::
self: <Connection at 0x7f5db7aac550>
obj: <TaskCallbackRequest at 0x7f5db7398940>
send_callback_to_execute (airflow/utils/dag_processing.py:283)
Arguments::
self: <DagFileProcessorAgent at 0x7f5db7aac880>
request: <TaskCallbackRequest at 0x7f5db7398940>
_process_executor_events (airflow/jobs/scheduler_job.py:1242)
Arguments::
self: <SchedulerJob at 0x7f5dbed3dd00>
session: <Session at 0x7f5db80cf6a0>
Locals::
ti_primary_key_to_try_number_map: {("redeacted", "redeacted", <datetime.datetime at 0x7f5db768b540>): 1, ...}
event_buffer: {...}
tis_with_right_state: [("redeacted", "redeacted", <datetime.datetime at 0x7f5db768b540>, 1), ...]
ti_key: ("redeacted", "redeacted", ...)
value: ("failed", None)
state: "failed"
_: None
filter_for_tis: <BooleanClauseList at 0x7f5db7427df0>
tis: [<TaskInstance at 0x7f5dbbfd77c0>, <TaskInstance at 0x7f5dbbfd7880>, <TaskInstance at 0x7f5dbbfdd820>, ...]
ti: <TaskInstance at 0x7f5dbbffba90>
try_number: 1
buffer_key: ("redeacted", ...)
info: None
msg: "Executor reports task instance %s finished (%s) although the task says its %s. (Info: %s) Was the task killed externally?"
request: <TaskCallbackRequest at 0x7f5db7398940>
wrapper (airflow/utils/session.py:62)
Locals::
args: (<SchedulerJob at 0x7f5dbed3dd00>)
kwargs: {"session": <Session at 0x7f5db80cf6a0>}
_run_scheduler_loop (airflow/jobs/scheduler_job.py:1386)
Arguments::
self: <SchedulerJob at 0x7f5dbed3dd00>
Locals::
is_unit_test: False
call_regular_interval: <function at 0x7f5db7ac3040>
loop_count: 1
timer: <Timer at 0x7f5db76808b0>
session: <Session at 0x7f5db80cf6a0>
num_queued_tis: 17
_execute (airflow/jobs/scheduler_job.py:1280)
Arguments::
self: <SchedulerJob at 0x7f5dbed3dd00>
Locals::
pickle_dags: False
async_mode: True
processor_timeout_seconds: 600
processor_timeout: <datetime.timedelta at 0x7f5db7ab9300>
execute_start_time: <datetime.datetime at 0x7f5db7727510>
run (airflow/jobs/base_job.py:237)
Arguments::
self: <SchedulerJob at 0x7f5dbed3dd00>
Locals::
session: <Session at 0x7f5db80cf6a0>
scheduler (airflow/cli/commands/scheduler_command.py:63)
Arguments::
args: <Namespace at 0x7f5db816f6a0>
Locals::
job: <SchedulerJob at 0x7f5dbed3dd00>
wrapper (airflow/utils/cli.py:89)
Locals::
args: (<Namespace at 0x7f5db816f6a0>)
kwargs: {}
metrics: {"sub_command": "scheduler", "start_datetime": <datetime.datetime at 0x7f5db80f5db0>, ...}
command (airflow/cli/cli_parser.py:48)
Locals::
args: (<Namespace at 0x7f5db816f6a0>)
kwargs: {}
func: <function at 0x7f5db8090790>
main (airflow/__main__.py:40)
Locals::
parser: <DefaultHelpParser at 0x7f5dbec13700>
args: <Namespace at 0x7f5db816f6a0>
<module> (airflow:8)
py-spy dump --pid 134 --locals
Process 134: airflow scheduler -- DagFileProcessorManager
Python v3.8.3 (/usr/local/bin/python)
Thread 134 (idle): "MainThread"
_send (multiprocessing/connection.py:368)
Arguments::
self: <Connection at 0x7f5db77274f0>
buf: <bytes at 0x5564f1a76590>
write: <builtin_function_or_method at 0x7f5dbed8a540>
Locals::
remaining: 2276
_send_bytes (multiprocessing/connection.py:411)
Arguments::
self: <Connection at 0x7f5db77274f0>
buf: <memoryview at 0x7f5db77d7c40>
Locals::
n: 2272
header: <bytes at 0x7f5db6eb1f60>
send (multiprocessing/connection.py:206)
Arguments::
self: <Connection at 0x7f5db77274f0>
obj: (...)
_run_parsing_loop (airflow/utils/dag_processing.py:698)
Locals::
poll_time: 0.9996239839999816
loop_start_time: 690.422146969
ready: [<Connection at 0x7f5db77274f0>]
agent_signal: <TaskCallbackRequest at 0x7f5db678c8e0>
sentinel: <Connection at 0x7f5db77274f0>
processor: <DagFileProcessorProcess at 0x7f5db6eb1910>
all_files_processed: False
max_runs_reached: False
dag_parsing_stat: (...)
loop_duration: 0.0003760160000183532
start (airflow/utils/dag_processing.py:596)
Arguments::
self: <DagFileProcessorManager at 0x7f5dbcb9c880>
_run_processor_manager (airflow/utils/dag_processing.py:365)
Arguments::
dag_directory: "/code/src/dags"
max_runs: -1
processor_factory: <function at 0x7f5db7b30ee0>
processor_timeout: <datetime.timedelta at 0x7f5db7ab9300>
signal_conn: <Connection at 0x7f5db77274f0>
dag_ids: []
pickle_dags: False
async_mode: True
Locals::
processor_manager: <DagFileProcessorManager at 0x7f5dbcb9c880>
run (multiprocessing/process.py:108)
Arguments::
self: <ForkProcess at 0x7f5db7727220>
_bootstrap (multiprocessing/process.py:315)
Arguments::
self: <ForkProcess at 0x7f5db7727220>
parent_sentinel: 8
Locals::
util: <module at 0x7f5db8011e00>
context: <module at 0x7f5dbcb8ba90>
_launch (multiprocessing/popen_fork.py:75)
Arguments::
self: <Popen at 0x7f5db7727820>
process_obj: <ForkProcess at 0x7f5db7727220>
Locals::
code: 1
parent_r: 6
child_w: 7
child_r: 8
parent_w: 9
__init__ (multiprocessing/popen_fork.py:19)
Arguments::
self: <Popen at 0x7f5db7727820>
process_obj: <ForkProcess at 0x7f5db7727220>
_Popen (multiprocessing/context.py:276)
Arguments::
process_obj: <ForkProcess at 0x7f5db7727220>
Locals::
Popen: <type at 0x5564f1a439e0>
start (multiprocessing/process.py:121)
Arguments::
self: <ForkProcess at 0x7f5db7727220>
start (airflow/utils/dag_processing.py:248)
Arguments::
self: <DagFileProcessorAgent at 0x7f5db7aac880>
Locals::
mp_start_method: "fork"
context: <ForkContext at 0x7f5dbcb9ce80>
child_signal_conn: <Connection at 0x7f5db77274f0>
process: <ForkProcess at 0x7f5db7727220>
_execute (airflow/jobs/scheduler_job.py:1276)
Arguments::
self: <SchedulerJob at 0x7f5dbed3dd00>
Locals::
pickle_dags: False
async_mode: True
processor_timeout_seconds: 600
processor_timeout: <datetime.timedelta at 0x7f5db7ab9300>
run (airflow/jobs/base_job.py:237)
Arguments::
self: <SchedulerJob at 0x7f5dbed3dd00>
Locals::
session: <Session at 0x7f5db80cf6a0>
scheduler (airflow/cli/commands/scheduler_command.py:63)
Arguments::
args: <Namespace at 0x7f5db816f6a0>
Locals::
job: <SchedulerJob at 0x7f5dbed3dd00>
wrapper (airflow/utils/cli.py:89)
Locals::
args: (<Namespace at 0x7f5db816f6a0>)
kwargs: {}
metrics: {"sub_command": "scheduler", "start_datetime": <datetime.datetime at 0x7f5db80f5db0>, ...}
command (airflow/cli/cli_parser.py:48)
Locals::
args: (<Namespace at 0x7f5db816f6a0>)
kwargs: {}
func: <function at 0x7f5db8090790>
main (airflow/__main__.py:40)
Locals::
parser: <DefaultHelpParser at 0x7f5dbec13700>
args: <Namespace at 0x7f5db816f6a0>
<module> (airflow:8)
We had the same issue with Airflow on Google Cloud until increased the setting AIRFLOW__CORE__SQL_ALCHEMY_MAX_OVERFLOW
The default value was 5, with a change to 60 our Airflow server started to perform very well, including on complex DAGs with around 1000 tasks each.
Any scale-up was resting on the database concurrent connections limit, so the scheduler was not able to perform fast.
I've got a fix for the case reported by @MatthewRBruce (for 2.0.1) coming in 2.0.2
Hi @ashb I would like to report that we've been seeing something similar to this issue in Airflow 2.0.2 recently.
We are using airflow 2.0.2 with a single airflow-scheduler + a few airflow-worker using CeleryExecutor and postgres backend running dozens of dags each with hundreds to a few thousand tasks. Python version is 3.8.7.
Here's what we saw:
airflow-scheduler sometimes stops heartbeating and stops scheduling any tasks. This seems to happen at random times, about once or twice a week. When this happens, the last line in the scheduler log shows the following, i.e. it stopped writing out any log after receiving signal 15
. I did strace the airflow scheduler process. It did not capture any other process sending it signal 15. So most likely the signal 15 was sent by the scheduler to itself.
May 11 21:19:56 airflow[12643]: [2021-05-11 21:19:56,908] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', ...]
May 11 21:19:56 airflow[12643]: [2021-05-11 21:19:56,973] {scheduler_job.py:746} INFO - Exiting gracefully upon receiving signal 15
When the scheduler was in this state, there was also a child airflow scheduler
process shown in ps
which was spawned by the main airflow scheduler
process. I forgot py-spy dump
, but I did use py-spy top
to look at the child airflow scheduler
process. This was what I saw. It seems to be stuck somewhere in celery_executor.py::_send_tasks_to_celery
. This sounds similar to what @milton0825 reported previously although he mentioned he was using Airflow 1.10.8.
When I manually SIGTERM the child airflow scheduler process, it died. And immediately the main airflow scheduler
started to heartbeat and schedule tasks again like nothing ever happened. So I suspect somewhere when the airflow scheduler
was spawning a child processes, it got stuck. But I still don't understand how it produced a Exiting gracefully upon receiving signal 15
in the log.
Total Samples 7859
GIL: 0.00%, Active: 0.00%, Threads: 1
%Own %Total OwnTime TotalTime Function (filename:line)
0.00% 0.00% 0.540s 0.540s __enter__ (multiprocessing/synchronize.py:95)
0.00% 0.00% 0.000s 0.540s worker (multiprocessing/pool.py:114)
0.00% 0.00% 0.000s 0.540s _bootstrap (multiprocessing/process.py:315)
0.00% 0.00% 0.000s 0.540s _repopulate_pool (multiprocessing/pool.py:303)
0.00% 0.00% 0.000s 0.540s main (airflow/__main__.py:40)
0.00% 0.00% 0.000s 0.540s start (multiprocessing/process.py:121)
0.00% 0.00% 0.000s 0.540s _send_tasks_to_celery (airflow/executors/celery_executor.py:330)
0.00% 0.00% 0.000s 0.540s Pool (multiprocessing/context.py:119)
0.00% 0.00% 0.000s 0.540s run (airflow/jobs/base_job.py:237)
0.00% 0.00% 0.000s 0.540s _repopulate_pool_static (multiprocessing/pool.py:326)
0.00% 0.00% 0.000s 0.540s heartbeat (airflow/executors/base_executor.py:158)
0.00% 0.00% 0.000s 0.540s _launch (multiprocessing/popen_fork.py:75)
0.00% 0.00% 0.000s 0.540s wrapper (airflow/utils/cli.py:89)
0.00% 0.00% 0.000s 0.540s __init__ (multiprocessing/pool.py:212)
0.00% 0.00% 0.000s 0.540s _Popen (multiprocessing/context.py:277)
One other observation was that when the airflow scheduler was in the stuck state, the DagFileProcessor
processes started by airflow scheduler were still running. I could see them writing out logs to dag_processor_manager.log
.
Apache Airflow version:
Kubernetes version (if you are using kubernetes) (use
kubectl version
):Environment:
uname -a
):What happened:
The scheduler gets stuck without a trace or error. When this happens, the CPU usage of scheduler service is at 100%. No jobs get submitted and everything comes to a halt. Looks it goes into some kind of infinite loop.
The only way I could make it run again is by manually restarting the scheduler service. But again, after running some tasks it gets stuck. I've tried with both Celery and Local executors but same issue occurs. I am using the -n 3 parameter while starting scheduler.
Scheduler configs,
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5
executor = LocalExecutor
parallelism = 32
Please help. I would be happy to provide any other information needed
What you expected to happen:
How to reproduce it:
Anything else we need to know:
Moved here from https://issues.apache.org/jira/browse/AIRFLOW-401