TaskAttemptImpl Lifecycle
TaskAttemptImpl is the AM-side representation of a single execution
attempt of a task. It owns the container assignment, the umbilical, the
output commit decision, and — critically — the TaskAttemptTerminationCause
that drives upstream retry decisions.
After this chapter you should be able to look at any TaskAttemptImpl state
in an AM log and explain (a) what container holds it, (b) which umbilical
calls have or have not landed, and (c) what its termination cause will be if
it dies right now.
File
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
Tests:
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
Termination cause enum:
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptTerminationCause.java
The states (typical 0.10.x naming)
| State | Meaning |
|---|---|
NEW | Constructed; not yet given to scheduler |
START_WAIT | Request sent to scheduler; awaiting container |
SUBMITTED | Container allocated; awaiting launch ack (some versions) |
RUNNING | Container launched; processor executing |
SUCCEEDED | Terminal: TA_DONE received |
KILL_IN_PROGRESS | Kill requested; awaiting confirmation |
KILLED | Terminal: killed before/during execution |
FAIL_IN_PROGRESS | Failure recognized; cleaning up |
FAILED | Terminal: failed (counts against max.failed.attempts) |
Exact list varies by branch. Verify:
grep -n "TaskAttemptStateInternal\." \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java | head
Tez separates the external state (TaskAttemptState in tez-api, the
3-state coarse enum visible to ATS) from the internal state machine state
(richer). Mapping:
| Internal | External |
|---|---|
| NEW, START_WAIT, SUBMITTED, RUNNING | STARTING / RUNNING |
| SUCCEEDED | SUCCEEDED |
| KILL_IN_PROGRESS, KILLED | KILLED |
| FAIL_IN_PROGRESS, FAILED | FAILED |
State × event matrix (key transitions)
| State | Event | Next state | Notes |
|---|---|---|---|
| NEW | TA_SCHEDULE | START_WAIT | request container |
| START_WAIT | TA_STARTED | SUBMITTED/RUNNING | container launched |
| START_WAIT | TA_CONTAINER_TERMINATING | KILL_IN_PROGRESS | preemption before launch |
| RUNNING | TA_DONE | SUCCEEDED | done(...) umbilical call |
| RUNNING | TA_FAILED | FAIL_IN_PROGRESS | processor threw |
| RUNNING | TA_TIMED_OUT | FAIL_IN_PROGRESS | heartbeat exceeded tez.task.timeout-ms |
| RUNNING | TA_KILL_REQUEST | KILL_IN_PROGRESS | external kill |
| RUNNING | TA_CONTAINER_TERMINATED | FAIL_IN_PROGRESS / KILL_IN_PROGRESS | NM said container died |
| KILL_IN_PROGRESS | TA_CONTAINER_TERMINATED | KILLED | cleanup done |
| FAIL_IN_PROGRESS | TA_CONTAINER_TERMINATED | FAILED | cleanup done |
grep -c "addTransition" \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
Container assignment
When a TaskAttempt becomes schedulable, the AM:
- Builds a
ContainerRequest(resource, priority, locality). - Hands it to
TaskSchedulerManager.allocateTask(...). - The scheduler (
YarnTaskSchedulerService) eventually matches a granted container. - The match drives an
AMSchedulerEventTAEnded/...TALaunchRequestflow that updates theTaskAttemptImplstate. ContainerLauncherManageractually starts the JVM via NMClient.
grep -n "allocateTask\|deallocateTask\|AMSchedulerEvent" \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java | head
The container is not assigned at construction; that's why the START_WAIT
state exists. Some configurations short-circuit this via container reuse
(the scheduler offers a free, idle container).
See container-reuse.md and scheduler.md.
Output commit rules (per attempt)
For attempts of vertices with an OutputCommitter:
| Condition | Commit who? |
|---|---|
Output commits are at the task level (tez.am.commit-all-outputs-on-dag-success=false) | Each TaskAttemptImpl runs commit() from inside the task JVM (via processor) |
| Output commits are at the vertex level (default for MROutput) | Only the AM commits, after all tasks succeed (see vertex-lifecycle.md) |
Losing speculative attempts must not commit. The setOutputCommitted(true)
flag on TaskAttemptImpl records who actually committed. The AM ensures
exactly one attempt of each task has outputCommitted=true.
grep -n "outputCommitted\|commitOutput\|noCommit" \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java | head
TaskAttemptTerminationCause — the policy enum
sed -n '1,200p' \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptTerminationCause.java
Categories (the exact enum is long):
| Cause | Counts as failure? | Typical trigger |
|---|---|---|
TERMINATED_BY_CLIENT | No | User killed DAG |
TERMINATED_AT_SHUTDOWN | No | AM shutting down |
TERMINATED_INEFFECTIVE_SPECULATION | No | Lost the speculation race |
INTERNAL_PREEMPTION | No | AM preempted it (e.g., for higher-priority work) |
EXTERNAL_PREEMPTION | No | YARN preempted the container |
CONTAINER_EXITED | Yes (default) | Container died mid-run |
NODE_FAILED | Yes | NM died |
TASK_HEARTBEAT_ERROR | Yes | Heartbeat timeout |
OUTPUT_LOST | Yes | Downstream reported output gone (rerun) |
APPLICATION_ERROR | Yes | Processor threw |
TaskImpl uses cause.causesFailure() (or equivalent) to decide whether to
bump the failure counter.
Reading exercise
sed -n '1,160p' tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
grep -n "TaskAttemptStateInternal\." \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java | head -20
grep -n "TerminationCause" \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java | head -20
# Heartbeat timeout path
grep -n "TA_TIMED_OUT\|heartbeatTimeout" \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java | head
Answer:
- What event arrives when an attempt's container heartbeat times out? What issues it?
- What is the difference between
TA_FAILEDandTA_CONTAINER_TERMINATED? When does each fire? - Which
TaskAttemptTerminationCausevalues are not counted towardtez.am.task.max.failed.attempts? - In what state does an attempt sit during container provisioning?
- What does
outputCommittedtrack, and how is it used by the AM to choose the canonical attempt? - Why are there separate
FAIL_IN_PROGRESSandFAILEDstates (likewise for kill)?
Common bugs and symptoms
| Symptom | Root cause | Where to look |
|---|---|---|
Attempt stuck in START_WAIT for minutes | Scheduler can't satisfy locality/resource | TaskSchedulerManager log; relax locality |
Attempt marked FAILED when container was preempted | TerminationCause set incorrectly | Check the TA_CONTAINER_TERMINATED handler |
| Two attempts both commit outputs (data corruption) | setOutputCommitted race; speculative commit | Run TestSpeculation; ensure committer is idempotent |
TaskAttempt heartbeat timeout fires even though task was running | AM GC pause; clock skew | Tune AM heap; check NM/AM clock drift |
Recovery comes back with all attempts FAILED | Recovery log lacks TaskAttemptStartedEvent for last attempt | Force flush before submitting next event |
KILL_IN_PROGRESS lingers | TA_CONTAINER_TERMINATED never arrives | NM is dead; AM eventually times out container |
Validation: prove you understand this
- Without running code: given an attempt in
RUNNINGand eventTA_CONTAINER_TERMINATEDwith causeINTERNAL_PREEMPTION, what is the next state and does the failure counter increment? - From the enum, list every
TaskAttemptTerminationCauseand tag each "counts" / "does not count". - Reproduce a heartbeat timeout on
MiniTezClusterby suspending a task JVM. Identify the exact log line that transitions the attempt. - Walk the path from
TaskCommunicatorManager.heartbeatreturning aLATEST_RESPONSE_TIMEOUTtoTaskAttemptImpl.handle(TA_TIMED_OUT). - Verify that a speculative-loser attempt does not corrupt counters by reading the kill-handler code.