The Catkin Execution Engine¶
One of the core modules in catkin_tools
is the job executor. The
executor performs jobs required to complete a task in a way that maximizes (or
achives a specific) resource utilization subject to job dependency constraints.
The executor is closely integrated with logging and job output capture. This
page details the design and implementation of the executor.
Execution Model¶
The execution model is fairly simple. The executor executes a single task
for a given command (i.e. build
, clean
, etc.). A task is a set of
jobs which are related by an acyclic dependency graph. Each job is
given a unique identifier and is composed of a set of dependencies and a
sequence of executable stages, which are arbitrary functions or subprocess
calls which utilize one or more workers to be executed. The allocation of
workers is managed by the job server. Throughout execution, synchronization
with the user-facing interface and output formatting are mediated by a simple
event queue.
The executor is single-threaded and uses an asynchronous loop to execute jobs as futures. If a job contains blocking stages it can utilize a normal thread pool for execution, but is still only guaranteed one worker by the main loop of the executor. See the following section for more information on workers and the job server.
The input to the executor is a list of topologically-sorted jobs with no circular dependencies and some parameters which control the jobserver behavior. These behavior parameters are explained in detail in the following section.
Each job is in one of the following lifecycle states at any time:
PENDING
Not ready to be executed (dependencies not yet completed)QUEUED
Ready to be executed once workers are availableACTIVE
Being executed by one or more workersFINISHED
Has been executed and either succeded or failed (terminal)ABANDONED
Was not built because a prerequisite was not met (terminal)
All jobs begin in the PENDING
state, and any jobs with unsatisfiable
dependencies are immediately set to ABANDONED
, and any jobs without
dependencies are immediately set to QUEUED
. After the state initialization,
the executor processes jobs in a main loop until they are in one of the two
terminal states (FINISHED
or ABANDONED
).
Each main loop iteration does the following:
- While job server tokens are available, create futures for
QUEUED
jobs and make themACTIVE
- Report status of all jobs to the event queue
- Retrieve
ACTIVE
job futures which have completed and set themFINISHED
- Check for any
PENDING
jobs which need to beABANDONED
due to failed jobs- Change all
PENDING
jobs whose dependencies are satisifed toQUEUED
Once each job is in one of terminal states, the executor pushes a final status event and returns.
Job Server Resource Model¶
As mentioned in the previous section, each task includes a set of jobs which are activated by the job server. In order to start a queued job, at least one worker needs to be available. Once a job is started, it is assigned a single worker from the job server. These are considered top-level jobs since they are managed directly by the catkin executor. The number of top-level jobs can be configured for a given task.
Additionally to top-level paralellism, some job stages are capable of running in parallel, themselves. In such cases, the job server can interface directly with the underlying stage’s low-level job allocation. This enables multi-level parallelism without allocating more than a fixed number of jobs.
One such parallel-capable stage is the GNU Make build stage. In this case, the job server implements a GNU Make job server interface, which involves reading and writing tokens from file handles passed as build flags to the Make command.
For top-level jobs, additional resources are monitored in addition to the number of workers. Both system load and memory utilization checks can be enabled to prevent overloading a system.
Executor Job Failure Behavior¶
The executor’s behavior when a job fails can be modified with the following two parameters:
continue_on_failure
Continue executing jobs even if one job fails. If this is set tofalse
(the default), it will cause the executor to abandon all pending and queued jobs and stop after the first failure. Note that active jobs will still be allowed to complete before the executor returns.continue_without_deps
Continue executing jobs even if one or more of their dependencies have failed. If this is set tofalse
(the default), it will cause the executor to abandon only the jobs which depend on the failed job. If it is set totrue
, then it will build dependent jobs regardless.
Jobs and Job Stages¶
As mentioned above, a job is a set of dependencies and a sequence of job stages. Jobs and stages are constructed before a given task starts executing, and hold only specificaitons of what needs to be done to complete them. All stages are given a label for user introspection, a logger interface, and can either require or not require allocation of a worker from the job server.
Stage execution is performed asynchronously by Python’s asyncio
module.
This means that exceptions thrown in job stages are handled directly by the
executor. It also means job stages can be interrupted easily through Python’s
normal signal handling mechanism.
Stages can either be command stages (subprocess commands) or function
stages (python functions). In either case, loggers used by stages support
segmentation of stdout
and stderr
from job stages for both real-time
introspection and logging.
Command Stages¶
In addition to the basic arguments mentioned above, command stages are paramterized by the standard subprocess command arguments including the following:
- The command, itself, and its arguments,
- The working directory for the command,
- Any additional environment variables,
- Whether to use a shell interpreter
- Whether to emulate a TTY
- Whether to partition
stdout
andstderr
When executed, command stages use asncio
‘s asynchronous process executor
with a custom I/O protocol.
Function Stages¶
In addition to the basic arguments mentioned above, function stages are parameterized by a function handle and a set of function-specific Python arguments and keyword arguments. When executed, they use the thread pool mentioned above.
Since the function stages aren’t subprocesses, I/O isn’t piped or redirected.
Instead, a custom I/O logger is passed to the function for output. Functions
used as function stages should use this logger to write to stdout
and
stderr
instead of using normal system calls.
Introspection via Executor Events¶
Introspection into the different asynchronously-executed components of a task is performed by a simple event queue. Events are created by the executor, loggers, and stages, and they are consumed by an output controller. Events are defined by an event identifier and a data payload, which is an arbitrary dictionary.
There are numerous events which correspond to changes in job states, but events are also used for transporting captured I/O from job stages.
The modeled events include the following:
JOB_STATUS
A report of running job states,QUEUED_JOB
A job has been queued to be executed,STARTED_JOB
A job has started to be executed,FINISHED_JOB
A job has finished executing (succeeded or failed),ABANDONED_JOB
A job has been abandoned for some reason,STARTED_STAGE
A job stage has started to be executed,FINISHED_STAGE
A job stage has finished executing (succeeded or failed),STAGE_PROGRESS
A job stage has executed partially,STDOUT
A status message from a job,STDERR
A warning or error message from a job,SUBPROCESS
A subprocess has been created,MESSAGE
Arbitrary string message