TaskAttemptImpl Lifecycle

TaskAttemptImpl is the AM-side representation of a single execution attempt of a task. It owns the container assignment, the umbilical, the output commit decision, and — critically — the TaskAttemptTerminationCause that drives upstream retry decisions.

After this chapter you should be able to look at any TaskAttemptImpl state in an AM log and explain (a) what container holds it, (b) which umbilical calls have or have not landed, and (c) what its termination cause will be if it dies right now.


File

tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java

Tests:

tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java

Termination cause enum:

tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptTerminationCause.java

The states (typical 0.10.x naming)

StateMeaning
NEWConstructed; not yet given to scheduler
START_WAITRequest sent to scheduler; awaiting container
SUBMITTEDContainer allocated; awaiting launch ack (some versions)
RUNNINGContainer launched; processor executing
SUCCEEDEDTerminal: TA_DONE received
KILL_IN_PROGRESSKill requested; awaiting confirmation
KILLEDTerminal: killed before/during execution
FAIL_IN_PROGRESSFailure recognized; cleaning up
FAILEDTerminal: failed (counts against max.failed.attempts)

Exact list varies by branch. Verify:

grep -n "TaskAttemptStateInternal\." \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java | head

Tez separates the external state (TaskAttemptState in tez-api, the 3-state coarse enum visible to ATS) from the internal state machine state (richer). Mapping:

InternalExternal
NEW, START_WAIT, SUBMITTED, RUNNINGSTARTING / RUNNING
SUCCEEDEDSUCCEEDED
KILL_IN_PROGRESS, KILLEDKILLED
FAIL_IN_PROGRESS, FAILEDFAILED

State × event matrix (key transitions)

StateEventNext stateNotes
NEWTA_SCHEDULESTART_WAITrequest container
START_WAITTA_STARTEDSUBMITTED/RUNNINGcontainer launched
START_WAITTA_CONTAINER_TERMINATINGKILL_IN_PROGRESSpreemption before launch
RUNNINGTA_DONESUCCEEDEDdone(...) umbilical call
RUNNINGTA_FAILEDFAIL_IN_PROGRESSprocessor threw
RUNNINGTA_TIMED_OUTFAIL_IN_PROGRESSheartbeat exceeded tez.task.timeout-ms
RUNNINGTA_KILL_REQUESTKILL_IN_PROGRESSexternal kill
RUNNINGTA_CONTAINER_TERMINATEDFAIL_IN_PROGRESS / KILL_IN_PROGRESSNM said container died
KILL_IN_PROGRESSTA_CONTAINER_TERMINATEDKILLEDcleanup done
FAIL_IN_PROGRESSTA_CONTAINER_TERMINATEDFAILEDcleanup done
grep -c "addTransition" \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java

Container assignment

When a TaskAttempt becomes schedulable, the AM:

  1. Builds a ContainerRequest (resource, priority, locality).
  2. Hands it to TaskSchedulerManager.allocateTask(...).
  3. The scheduler (YarnTaskSchedulerService) eventually matches a granted container.
  4. The match drives an AMSchedulerEventTAEnded/...TALaunchRequest flow that updates the TaskAttemptImpl state.
  5. ContainerLauncherManager actually starts the JVM via NMClient.
grep -n "allocateTask\|deallocateTask\|AMSchedulerEvent" \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java | head

The container is not assigned at construction; that's why the START_WAIT state exists. Some configurations short-circuit this via container reuse (the scheduler offers a free, idle container).

See container-reuse.md and scheduler.md.


Output commit rules (per attempt)

For attempts of vertices with an OutputCommitter:

ConditionCommit who?
Output commits are at the task level (tez.am.commit-all-outputs-on-dag-success=false)Each TaskAttemptImpl runs commit() from inside the task JVM (via processor)
Output commits are at the vertex level (default for MROutput)Only the AM commits, after all tasks succeed (see vertex-lifecycle.md)

Losing speculative attempts must not commit. The setOutputCommitted(true) flag on TaskAttemptImpl records who actually committed. The AM ensures exactly one attempt of each task has outputCommitted=true.

grep -n "outputCommitted\|commitOutput\|noCommit" \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java | head

TaskAttemptTerminationCause — the policy enum

sed -n '1,200p' \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptTerminationCause.java

Categories (the exact enum is long):

CauseCounts as failure?Typical trigger
TERMINATED_BY_CLIENTNoUser killed DAG
TERMINATED_AT_SHUTDOWNNoAM shutting down
TERMINATED_INEFFECTIVE_SPECULATIONNoLost the speculation race
INTERNAL_PREEMPTIONNoAM preempted it (e.g., for higher-priority work)
EXTERNAL_PREEMPTIONNoYARN preempted the container
CONTAINER_EXITEDYes (default)Container died mid-run
NODE_FAILEDYesNM died
TASK_HEARTBEAT_ERRORYesHeartbeat timeout
OUTPUT_LOSTYesDownstream reported output gone (rerun)
APPLICATION_ERRORYesProcessor threw

TaskImpl uses cause.causesFailure() (or equivalent) to decide whether to bump the failure counter.


Reading exercise

sed -n '1,160p' tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
grep -n "TaskAttemptStateInternal\." \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java | head -20
grep -n "TerminationCause" \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java | head -20

# Heartbeat timeout path
grep -n "TA_TIMED_OUT\|heartbeatTimeout" \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java | head

Answer:

  1. What event arrives when an attempt's container heartbeat times out? What issues it?
  2. What is the difference between TA_FAILED and TA_CONTAINER_TERMINATED? When does each fire?
  3. Which TaskAttemptTerminationCause values are not counted toward tez.am.task.max.failed.attempts?
  4. In what state does an attempt sit during container provisioning?
  5. What does outputCommitted track, and how is it used by the AM to choose the canonical attempt?
  6. Why are there separate FAIL_IN_PROGRESS and FAILED states (likewise for kill)?

Common bugs and symptoms

SymptomRoot causeWhere to look
Attempt stuck in START_WAIT for minutesScheduler can't satisfy locality/resourceTaskSchedulerManager log; relax locality
Attempt marked FAILED when container was preemptedTerminationCause set incorrectlyCheck the TA_CONTAINER_TERMINATED handler
Two attempts both commit outputs (data corruption)setOutputCommitted race; speculative commitRun TestSpeculation; ensure committer is idempotent
TaskAttempt heartbeat timeout fires even though task was runningAM GC pause; clock skewTune AM heap; check NM/AM clock drift
Recovery comes back with all attempts FAILEDRecovery log lacks TaskAttemptStartedEvent for last attemptForce flush before submitting next event
KILL_IN_PROGRESS lingersTA_CONTAINER_TERMINATED never arrivesNM is dead; AM eventually times out container

Validation: prove you understand this

  1. Without running code: given an attempt in RUNNING and event TA_CONTAINER_TERMINATED with cause INTERNAL_PREEMPTION, what is the next state and does the failure counter increment?
  2. From the enum, list every TaskAttemptTerminationCause and tag each "counts" / "does not count".
  3. Reproduce a heartbeat timeout on MiniTezCluster by suspending a task JVM. Identify the exact log line that transitions the attempt.
  4. Walk the path from TaskCommunicatorManager.heartbeat returning a LATEST_RESPONSE_TIMEOUT to TaskAttemptImpl.handle(TA_TIMED_OUT).
  5. Verify that a speculative-loser attempt does not corrupt counters by reading the kill-handler code.