Failure Handling

A Tez DAG fails for many reasons: a corrupted input split, a flaky NM, an OOM in the user processor, a Kerberos token expiry, an RM connectivity blip, an AM crash. Tez has a layered escalation model: small failures are absorbed, big ones propagate, and the AM persists enough state to recover from its own death.

This chapter walks the escalation, the failure taxonomy, and the recovery machinery.


Escalation: attempt → task → vertex → DAG

flowchart TD
  TA[TaskAttempt fails] -->|retry budget| TA2[New TaskAttempt]
  TA -->|exhausted| T[Task fails]
  T -->|failure policy| V[Vertex fails]
  V -->|fail-on-vertex-failure| D[DAG fails]

Default behavior:

LayerConfigurationDefaultEffect when exceeded
TaskAttempttez.am.task.max.failed.attempts4Mark Task as failed
Tasktez.am.vertex.max.task.failed.attempts (no direct knob; per-vertex policy)variesVertex fails on first failed task by default
Vertexper-DAG failure policyfail-fastDAG fails
grep -n "MAX_FAILED_ATTEMPTS\|MAX_TASK_ATTEMPTS\|TEZ_AM_TASK_MAX" \
  tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java

TaskAttemptTerminationCause

find tez-dag/src/main/java -name "TaskAttemptTerminationCause.java"
cat $(find tez-dag/src/main/java -name "TaskAttemptTerminationCause.java")

The enum names every reason a TaskAttempt can end up non-SUCCEEDED. A selection:

CauseSourceRetryable?
TERMINATED_BY_CLIENTuser-initiated DAG killno
INTERNAL_PREEMPTIONscheduler preempted to make roomyes
EXTERNAL_PREEMPTIONYARN preempted the containeryes
CONTAINER_LAUNCH_FAILEDNM rejected the launchretried on a new container
CONTAINER_EXITEDTezChild exited without a status updateyes
CONTAINER_STOPPEDAM stopped the container intentionallydepends
NODE_FAILEDNM diedyes, on a different node
NODE_BLACKLISTEDnode accumulated too many failuresretried elsewhere
OUTPUT_LOSTdownstream reported missing outputyes, re-run source TA
INPUT_READ_ERRORTA failed reading shuffle from a sourceyes
APPLICATION_ERRORuncaught exception in user codeusually no, but retried up to attempt budget
FRAMEWORK_ERRORuncaught exception in Tez codesometimes no
OTHER_TASK_ATTEMPT_KILLED_DUPLICATEspeculative duplicate lostno (not a failure)

This enum is the most important debugging signal — every failed attempt in ATS / AM log surfaces a cause from this list.


TaskAttempt failure retries

grep -n "max.failed.attempts\|maxFailedAttempts\|attemptFailed" \
  $(find tez-dag/src/main/java -name "TaskImpl.java")

On a TA failure:

  1. TaskAttemptImpl transitions to FAILED (or KILLED if the cause is in the "killed" subset).
  2. TaskImpl increments its failed-attempt counter.
  3. If counter < tez.am.task.max.failed.attempts, TaskImpl schedules a new TaskAttemptImpl (incremented attempt index).
  4. Otherwise TaskImpl transitions to FAILED and reports up to VertexImpl.

Some causes are not counted against the budget (e.g. OUTPUT_LOST, NODE_FAILED) — these are infrastructure failures, not user-code failures.

grep -n "isFatalFailure\|isExternalError\|countAsFailure" \
  $(find tez-dag/src/main/java -name "*.java" | head -50)

Node blacklisting

grep -rn "NodeTracker\|blacklist\|BLACKLISTED" \
  tez-dag/src/main/java/org/apache/tez/dag/app | head

Per-node failure accounting:

TriggerEffect
N task attempts fail on the same node within a windowAdd node to blacklist for this app
NodeReport from RM says UNHEALTHYAdd node to blacklist immediately
tez.am.maxtaskfailures.per.nodePer-node failure threshold (default 3)
tez.am.node-blacklisting.enabledMaster toggle
tez.am.node-blacklisting.ignore-threshold-node-percentDon't blacklist if it would remove more than N% of the cluster

A blacklisted node:

  • No new container requests go to it.
  • Held containers on it are released.
  • Existing attempts already running on it are allowed to finish (not preemptively killed).

Output loss

A common late-stage failure: a downstream task reads from a shuffle source and finds the source's output is gone (the NM died, the disk was wiped, etc).

grep -rn "OUTPUT_LOST\|reportSourceTaskAttemptFailed\|inputFailedEvent" \
  tez-runtime-library/src/main/java tez-dag/src/main/java | head

Flow:

  1. Destination TA's Fetcher fails permanently on source S.a.0.
  2. Destination TA sends InputReadErrorEvent via umbilical heartbeat.
  3. AM's VertexImpl receives the event, marks S.a.0 as OUTPUT_LOST.
  4. TaskImpl for S.a schedules a new attempt S.a.1.
  5. New attempt re-runs, produces fresh outputs, downstream resumes.

This is the cascading-rerun engine — and a source of pathological behavior when a single bad disk poisons many downstream tasks.


AM failover

find tez-dag/src/main/java -name "RecoveryService.java"
find tez-dag/src/main/java -name "RecoveryEventHandler.java"
wc -l $(find tez-dag/src/main/java -name "RecoveryService.java")

YARN keeps a small budget of AM restarts (yarn.resourcemanager.am.max-attempts, default 2). When the AM crashes:

  1. RM allocates a fresh AM container, attempt index incremented.
  2. New AM boots, sees attempt > 1, enters recovery mode.
  3. RecoveryService reads the recovery log from HDFS (written by attempt 1).
  4. Replays events to reconstruct DAG, Vertex, Task, TaskAttempt state.
  5. Inherits any pre-existing containers via AMRMClient.getContainersFromPreviousAttempts.
  6. Resumes scheduling from the last consistent state.

RecoveryService

grep -n "writeEvent\|flush\|recover\|RecoveryEventType" \
  $(find tez-dag/src/main/java -name "RecoveryService.java" \
                                -o -name "RecoveryEventHandler.java")

Append-only event log on HDFS, one file per app attempt:

hdfs:///tmp/staging/<user>/.staging/application_<id>/appattempt_<id>_NNNNNN/
  recovery/
    summary.dag_1.recovery
    dag_1.recovery

Event kinds:

EventWhen written
DAGSubmittedEventDAG arrives at AM
DAGInitializedEventDAG state machine reaches INITED
DAGStartedEventDAG reaches RUNNING
DAGFinishedEventDAG terminal state
VertexInitializedEvent, VertexStartedEvent, VertexFinishedEventmirror state transitions
TaskStartedEvent, TaskFinishedEventper task
TaskAttemptStartedEvent, TaskAttemptFinishedEventper attempt
VertexConfigurationDoneEventparallelism finalized
find tez-dag/src/main/java -name "*Event*.java" -path "*recovery*"

Configuration

KeyDefaultEffect
tez.dag.recovery.enabledtrueMaster toggle.
tez.dag.recovery.flush.interval.secs30Periodic fsync of the recovery log.
tez.dag.recovery.io.buffer.size8192Buffer for the writer.
yarn.resourcemanager.am.max-attempts (YARN)2Caps recovery attempts.

What recovery can and cannot recover

CanCannot
DAG / Vertex / Task / TA state at last flushIn-flight events lost since last flush
Counter snapshots written to recovery logReal-time counter updates between flushes
Container assignmentsNM-side container state — those are rediscovered via getContainersFromPreviousAttempts
User payload of DAGPlanUser in-memory state inside a custom VertexManagerPlugin

A VertexManagerPlugin that holds in-memory state across events must override getState() / setState() to participate in recovery — otherwise it starts fresh on AM attempt 2.


DAG-level termination causes

find tez-dag/src/main/java -name "DAGTerminationCause.java"
cat $(find tez-dag/src/main/java -name "DAGTerminationCause.java")
CauseTrigger
DAG_KILLclient called dagClient.tryKillDAG()
VERTEX_FAILUREa vertex transitioned to FAILED
INIT_FAILUREDAG init failed (bad plan, bad input)
INTERNAL_ERRORunhandled exception inside AM
AM_USERCODE_FAILUREuser-supplied plugin threw
OUT_OF_TEZ_TASK_RESOURCESscheduler could not satisfy resource requests
RECOVERY_FAILUREreplay couldn't reconstruct prior state

Reading exercise

  1. grep -n "transition\|FAILED\|KILLED" \ $(find tez-dag/src/main/java -name "TaskAttemptImpl.java") | head -40 — count terminal transitions.
  2. grep -rn "OUTPUT_LOST" tez-dag/src/main/java tez-runtime-library/src/main/java — what triggers this cause?
  3. cat $(find tez-dag/src/main/java -name "RecoveryService.java") | head -200 — read the writer loop.
  4. grep -n "RecoveryEvent" $(find tez-dag/src/main/java -name "*.java" | head -50) — list all recovery event classes.
  5. wc -l $(find tez-dag/src/main/java -name "TaskAttemptTerminationCause.java" \ -o -name "DAGTerminationCause.java" \ -o -name "VertexTerminationCause.java")
  6. grep -rn "node-blacklisting\|blacklistNode" tez-dag/src/main/java | head — where is blacklist enforcement implemented?

Common bugs and symptoms

SymptomLikely cause
OUTPUT_LOST cascade kills the DAGOne bad NM is poisoning downstream; blacklist or pin off it.
Recovery infinite-loops on attempt 2Corrupt recovery log; check fsync gating and tez.dag.recovery.flush.interval.secs.
INTERNAL_PREEMPTION repeatedlyTez scheduler is preempting its own attempts; usually a higher-priority vertex starving lower; tune priorities.
All attempts of one task fail in < 1sUser code throws deterministically; cause is APPLICATION_ERROR.
DAG hangs forever after one task failsVertex failure policy is permissive (rare); look at the vertex transition.
NODE_BLACKLISTED removes 100% of clusterignore-threshold-node-percent not set; the DAG is now unschedulable.
AM crashes, attempt 2 boots, but tasks restart from scratchRecovery disabled or HDFS staging dir not accessible to attempt 2.

Validation: prove you understand this

  1. List five TaskAttemptTerminationCause values that do not count against the attempt budget. Cite where the predicate lives.
  2. Explain in two sentences how an OUTPUT_LOST on source S.a.0 triggers a re-run of S.a, not just S.a.0's downstream consumers.
  3. Identify the HDFS path pattern under which recovery events are written. Give the exact path components.
  4. Describe what happens to in-flight DataMovementEvents when the AM crashes mid-DAG and AM attempt 2 takes over.
  5. Given tez.am.maxtaskfailures.per.node=3 and an 8-node cluster, what is the smallest sequence of task failures that triggers blacklisting? Show the math.