TaskImpl Lifecycle

TaskImpl is the AM-side representation of one logical task within a vertex. It is a relatively small state machine, but it owns a critical piece of policy: which attempt of this task is the "winner." This chapter walks the states, the attempt management rules, speculation, and the max-failed threshold that promotes a task to "this whole vertex must fail."

After this chapter you should be able to explain why a task with three failed attempts may still be RUNNING while another with one failed attempt is already FAILED.


File

tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java

Tests:

tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java

The states

grep -n "TaskState\." tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java | head
grep -n "public enum TaskState\|enum TaskState" \
  tez-api/src/main/java/org/apache/tez/dag/api/event/TaskState.java
StateMeaning
NEWConstructed; no attempts yet
SCHEDULEDFirst attempt requested from scheduler
RUNNINGAt least one attempt is RUNNING
SUCCEEDEDTerminal: one attempt succeeded; task complete
KILLEDTerminal: explicitly killed (vertex termination, user)
FAILEDTerminal: max attempts exceeded

TaskImpl does not have INITIALIZING or TERMINATING — those concerns belong to the vertex.


State × event matrix

StateEventNext stateAction
NEWT_SCHEDULESCHEDULEDcreate first TaskAttemptImpl, send TA_SCHEDULE
SCHEDULEDT_ATTEMPT_LAUNCHEDRUNNINGmark first attempt as running
RUNNINGT_ATTEMPT_SUCCEEDEDSUCCEEDEDpick this attempt as the winner; kill others (if speculating)
RUNNINGT_ATTEMPT_FAILEDRUNNING (retry) or FAILED (exceeded)spawn new attempt or terminate
RUNNINGT_ATTEMPT_KILLEDRUNNINGno-op unless this was last attempt
RUNNINGT_ADD_SPEC_ATTEMPTRUNNINGspawn a duplicate attempt
RUNNINGT_TERMINATEKILLEDkill all attempts
anyT_RECOVER_*recovered statereplay events

Count transitions:

grep -c "addTransition" tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java

Retry: how max-failed-attempts works

The config:

grep -n "TASK_MAX_FAILED_ATTEMPTS\|tez.am.task.max.failed.attempts" \
  tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java

Default is 4 in most branches; a task is FAILED only after N attempts have failed (not been killed).

Failed vs killed distinction:

OutcomeCounts toward max.failed.attempts?
TaskAttempt failed (own crash, processor exception)yes
TaskAttempt killed by speculation (lost the race)no
TaskAttempt killed because vertex terminatedno
TaskAttempt killed because container preemptedno

The classification is owned by TaskAttemptTerminationCause (see task-attempt-lifecycle.md). TaskImpl.handle consults the cause when deciding whether to retry or fail.

grep -n "TerminationCause\|isFailureCause" \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java | head

Speculation

Speculation runs a second copy of a task before the first finishes, hoping the second wins. Implementation:

tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculate/
  LegacySpeculator.java
  SimpleSpeculator.java                    (varies by version)
  legacy/RuntimeTaskStatsEstimator.java

The speculator emits T_ADD_SPEC_ATTEMPT events into the dispatcher; the task spawns an additional attempt. The first attempt to succeed wins; the others are killed with cause TERMINATED_BY_OWNER (or similar). Killed attempts do not count toward max.failed.attempts.

Enabled by:

grep -n "tez.am.speculation.enabled\|speculation" \
  tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java | head

"Best attempt" selection

When multiple attempts of the same task exist, the first to send TA_DONE (successful completion) wins. The handler:

  1. Marks that attempt as the canonical one (cached in TaskImpl).
  2. Iterates remaining attempts, sending each a kill event.
  3. Transitions task to SUCCEEDED.
grep -n "successfulAttempt\|setWinnerAttempt\|markSuccessful" \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java | head

Downstream vertices reading from this task's output use the winner's outputLocationHint for shuffle (see shuffle-sort.md).


Reading exercise

# Surface
sed -n '1,120p' tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java

# State machine block
grep -n "addTransition" tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java | head -40

# Retry logic
grep -n "addAttempt\|nextAttemptNumber\|createAttempt" \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java | head

# Speculation hook
grep -rn "T_ADD_SPEC_ATTEMPT" tez-dag/src/main/java/org/apache/tez/dag/app/ | head

Answer:

  1. What is the precise condition for transitioning from RUNNING to FAILED on a T_ATTEMPT_FAILED event? Cite the line.
  2. Where is a new TaskAttemptImpl constructed? Is it a public method or private to TaskImpl?
  3. How does TaskImpl know whether a failed attempt should count toward the failure budget?
  4. In what state can T_ADD_SPEC_ATTEMPT arrive? What does the handler do?
  5. Why does TaskImpl not own its own scheduling? Who does?
  6. When a task succeeds with two parallel attempts, which one becomes the downstream input? How is the loser cleaned up?

Common bugs and symptoms

SymptomRoot causeWhere to look
Task retries forever and never failsmax.failed.attempts set absurdly high; or all failures classified as "kill"Check config; verify TerminationCause for each failure
Speculation kills the original just after it succeeds (lost work)Race on markSuccessful and speculative-attempt killEnsure speculator backs off when task is in completing
Task SUCCEEDED but a sibling attempt still appears as RUNNING for a long timeContainer slow to acknowledge killLook at ContainerHeartbeatHandler and TA_KILL_REQUEST
Task succeeded reported but downstream cannot fetch outputsRace between TA_DONE and output ready eventCheck ordering of outputReady umbilical calls
Recovery brings task back as RUNNING even though it had finishedMissing TaskFinishedEvent in recovery logInvestigate RecoveryService flush boundaries

Validation: prove you understand this

  1. Draw the TaskImpl state machine from memory, including all six states.
  2. From TestTaskImpl, identify a test that drives a task to FAILED. Walk the events it sends.
  3. List the four TaskAttemptTerminationCause categories that do not count toward max.failed.attempts. Cite the enum and the consumer.
  4. Trace, line by line, what TaskImpl does when T_ATTEMPT_SUCCEEDED arrives for the second of two concurrent attempts.
  5. Modify TaskImpl to log the winner's attempt number explicitly at the INFO level. Run a MiniTezCluster job and observe.