Failure Handling
A Tez DAG fails for many reasons: a corrupted input split, a flaky NM, an OOM in the user processor, a Kerberos token expiry, an RM connectivity blip, an AM crash. Tez has a layered escalation model: small failures are absorbed, big ones propagate, and the AM persists enough state to recover from its own death.
This chapter walks the escalation, the failure taxonomy, and the recovery machinery.
Escalation: attempt → task → vertex → DAG
flowchart TD
TA[TaskAttempt fails] -->|retry budget| TA2[New TaskAttempt]
TA -->|exhausted| T[Task fails]
T -->|failure policy| V[Vertex fails]
V -->|fail-on-vertex-failure| D[DAG fails]
Default behavior:
| Layer | Configuration | Default | Effect when exceeded |
|---|---|---|---|
| TaskAttempt | tez.am.task.max.failed.attempts | 4 | Mark Task as failed |
| Task | tez.am.vertex.max.task.failed.attempts (no direct knob; per-vertex policy) | varies | Vertex fails on first failed task by default |
| Vertex | per-DAG failure policy | fail-fast | DAG fails |
grep -n "MAX_FAILED_ATTEMPTS\|MAX_TASK_ATTEMPTS\|TEZ_AM_TASK_MAX" \
tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
TaskAttemptTerminationCause
find tez-dag/src/main/java -name "TaskAttemptTerminationCause.java"
cat $(find tez-dag/src/main/java -name "TaskAttemptTerminationCause.java")
The enum names every reason a TaskAttempt can end up non-SUCCEEDED. A
selection:
| Cause | Source | Retryable? |
|---|---|---|
TERMINATED_BY_CLIENT | user-initiated DAG kill | no |
INTERNAL_PREEMPTION | scheduler preempted to make room | yes |
EXTERNAL_PREEMPTION | YARN preempted the container | yes |
CONTAINER_LAUNCH_FAILED | NM rejected the launch | retried on a new container |
CONTAINER_EXITED | TezChild exited without a status update | yes |
CONTAINER_STOPPED | AM stopped the container intentionally | depends |
NODE_FAILED | NM died | yes, on a different node |
NODE_BLACKLISTED | node accumulated too many failures | retried elsewhere |
OUTPUT_LOST | downstream reported missing output | yes, re-run source TA |
INPUT_READ_ERROR | TA failed reading shuffle from a source | yes |
APPLICATION_ERROR | uncaught exception in user code | usually no, but retried up to attempt budget |
FRAMEWORK_ERROR | uncaught exception in Tez code | sometimes no |
OTHER_TASK_ATTEMPT_KILLED_DUPLICATE | speculative duplicate lost | no (not a failure) |
This enum is the most important debugging signal — every failed attempt in ATS / AM log surfaces a cause from this list.
TaskAttempt failure retries
grep -n "max.failed.attempts\|maxFailedAttempts\|attemptFailed" \
$(find tez-dag/src/main/java -name "TaskImpl.java")
On a TA failure:
TaskAttemptImpltransitions toFAILED(orKILLEDif the cause is in the "killed" subset).TaskImplincrements its failed-attempt counter.- If counter <
tez.am.task.max.failed.attempts,TaskImplschedules a newTaskAttemptImpl(incremented attempt index). - Otherwise
TaskImpltransitions toFAILEDand reports up toVertexImpl.
Some causes are not counted against the budget (e.g.
OUTPUT_LOST, NODE_FAILED) — these are infrastructure failures, not
user-code failures.
grep -n "isFatalFailure\|isExternalError\|countAsFailure" \
$(find tez-dag/src/main/java -name "*.java" | head -50)
Node blacklisting
grep -rn "NodeTracker\|blacklist\|BLACKLISTED" \
tez-dag/src/main/java/org/apache/tez/dag/app | head
Per-node failure accounting:
| Trigger | Effect |
|---|---|
| N task attempts fail on the same node within a window | Add node to blacklist for this app |
NodeReport from RM says UNHEALTHY | Add node to blacklist immediately |
tez.am.maxtaskfailures.per.node | Per-node failure threshold (default 3) |
tez.am.node-blacklisting.enabled | Master toggle |
tez.am.node-blacklisting.ignore-threshold-node-percent | Don't blacklist if it would remove more than N% of the cluster |
A blacklisted node:
- No new container requests go to it.
- Held containers on it are released.
- Existing attempts already running on it are allowed to finish (not preemptively killed).
Output loss
A common late-stage failure: a downstream task reads from a shuffle source and finds the source's output is gone (the NM died, the disk was wiped, etc).
grep -rn "OUTPUT_LOST\|reportSourceTaskAttemptFailed\|inputFailedEvent" \
tez-runtime-library/src/main/java tez-dag/src/main/java | head
Flow:
- Destination TA's
Fetcherfails permanently on sourceS.a.0. - Destination TA sends
InputReadErrorEventvia umbilical heartbeat. - AM's
VertexImplreceives the event, marksS.a.0asOUTPUT_LOST. TaskImplforS.aschedules a new attemptS.a.1.- New attempt re-runs, produces fresh outputs, downstream resumes.
This is the cascading-rerun engine — and a source of pathological behavior when a single bad disk poisons many downstream tasks.
AM failover
find tez-dag/src/main/java -name "RecoveryService.java"
find tez-dag/src/main/java -name "RecoveryEventHandler.java"
wc -l $(find tez-dag/src/main/java -name "RecoveryService.java")
YARN keeps a small budget of AM restarts (yarn.resourcemanager.am.max-attempts,
default 2). When the AM crashes:
- RM allocates a fresh AM container, attempt index incremented.
- New AM boots, sees attempt > 1, enters recovery mode.
RecoveryServicereads the recovery log from HDFS (written by attempt 1).- Replays events to reconstruct DAG, Vertex, Task, TaskAttempt state.
- Inherits any pre-existing containers via
AMRMClient.getContainersFromPreviousAttempts. - Resumes scheduling from the last consistent state.
RecoveryService
grep -n "writeEvent\|flush\|recover\|RecoveryEventType" \
$(find tez-dag/src/main/java -name "RecoveryService.java" \
-o -name "RecoveryEventHandler.java")
Append-only event log on HDFS, one file per app attempt:
hdfs:///tmp/staging/<user>/.staging/application_<id>/appattempt_<id>_NNNNNN/
recovery/
summary.dag_1.recovery
dag_1.recovery
Event kinds:
| Event | When written |
|---|---|
DAGSubmittedEvent | DAG arrives at AM |
DAGInitializedEvent | DAG state machine reaches INITED |
DAGStartedEvent | DAG reaches RUNNING |
DAGFinishedEvent | DAG terminal state |
VertexInitializedEvent, VertexStartedEvent, VertexFinishedEvent | mirror state transitions |
TaskStartedEvent, TaskFinishedEvent | per task |
TaskAttemptStartedEvent, TaskAttemptFinishedEvent | per attempt |
VertexConfigurationDoneEvent | parallelism finalized |
find tez-dag/src/main/java -name "*Event*.java" -path "*recovery*"
Configuration
| Key | Default | Effect |
|---|---|---|
tez.dag.recovery.enabled | true | Master toggle. |
tez.dag.recovery.flush.interval.secs | 30 | Periodic fsync of the recovery log. |
tez.dag.recovery.io.buffer.size | 8192 | Buffer for the writer. |
yarn.resourcemanager.am.max-attempts (YARN) | 2 | Caps recovery attempts. |
What recovery can and cannot recover
| Can | Cannot |
|---|---|
| DAG / Vertex / Task / TA state at last flush | In-flight events lost since last flush |
| Counter snapshots written to recovery log | Real-time counter updates between flushes |
| Container assignments | NM-side container state — those are rediscovered via getContainersFromPreviousAttempts |
User payload of DAGPlan | User in-memory state inside a custom VertexManagerPlugin |
A VertexManagerPlugin that holds in-memory state across events must
override getState() / setState() to participate in recovery — otherwise
it starts fresh on AM attempt 2.
DAG-level termination causes
find tez-dag/src/main/java -name "DAGTerminationCause.java"
cat $(find tez-dag/src/main/java -name "DAGTerminationCause.java")
| Cause | Trigger |
|---|---|
DAG_KILL | client called dagClient.tryKillDAG() |
VERTEX_FAILURE | a vertex transitioned to FAILED |
INIT_FAILURE | DAG init failed (bad plan, bad input) |
INTERNAL_ERROR | unhandled exception inside AM |
AM_USERCODE_FAILURE | user-supplied plugin threw |
OUT_OF_TEZ_TASK_RESOURCES | scheduler could not satisfy resource requests |
RECOVERY_FAILURE | replay couldn't reconstruct prior state |
Reading exercise
grep -n "transition\|FAILED\|KILLED" \ $(find tez-dag/src/main/java -name "TaskAttemptImpl.java") | head -40— count terminal transitions.grep -rn "OUTPUT_LOST" tez-dag/src/main/java tez-runtime-library/src/main/java— what triggers this cause?cat $(find tez-dag/src/main/java -name "RecoveryService.java") | head -200— read the writer loop.grep -n "RecoveryEvent" $(find tez-dag/src/main/java -name "*.java" | head -50)— list all recovery event classes.wc -l $(find tez-dag/src/main/java -name "TaskAttemptTerminationCause.java" \ -o -name "DAGTerminationCause.java" \ -o -name "VertexTerminationCause.java")grep -rn "node-blacklisting\|blacklistNode" tez-dag/src/main/java | head— where is blacklist enforcement implemented?
Common bugs and symptoms
| Symptom | Likely cause |
|---|---|
OUTPUT_LOST cascade kills the DAG | One bad NM is poisoning downstream; blacklist or pin off it. |
| Recovery infinite-loops on attempt 2 | Corrupt recovery log; check fsync gating and tez.dag.recovery.flush.interval.secs. |
INTERNAL_PREEMPTION repeatedly | Tez scheduler is preempting its own attempts; usually a higher-priority vertex starving lower; tune priorities. |
| All attempts of one task fail in < 1s | User code throws deterministically; cause is APPLICATION_ERROR. |
| DAG hangs forever after one task fails | Vertex failure policy is permissive (rare); look at the vertex transition. |
NODE_BLACKLISTED removes 100% of cluster | ignore-threshold-node-percent not set; the DAG is now unschedulable. |
| AM crashes, attempt 2 boots, but tasks restart from scratch | Recovery disabled or HDFS staging dir not accessible to attempt 2. |
Validation: prove you understand this
- List five
TaskAttemptTerminationCausevalues that do not count against the attempt budget. Cite where the predicate lives. - Explain in two sentences how an
OUTPUT_LOSTon sourceS.a.0triggers a re-run ofS.a, not justS.a.0's downstream consumers. - Identify the HDFS path pattern under which recovery events are written. Give the exact path components.
- Describe what happens to in-flight
DataMovementEvents when the AM crashes mid-DAG and AM attempt 2 takes over. - Given
tez.am.maxtaskfailures.per.node=3and an 8-node cluster, what is the smallest sequence of task failures that triggers blacklisting? Show the math.