Level 4: DAG State Machine Internals
This level takes you inside VertexImpl — the most complex class in the Tez codebase. You
will read the full state machine, understand every major state and the conditions that drive
transitions, learn how VertexManager plugs in to control scheduling, and understand how
speculative execution works. After this level you are capable of diagnosing vertex-level
failures from AM log output and writing patches to the state machine.
Learning Objectives
By the end of Level 4 you must be able to:
- Read a
StateMachineFactorydefinition and produce a transition table from code - Explain the full
INITIALIZING → INITED → RUNNING → SUCCEEDEDpath with all preconditions - Describe what
VertexManagerdoes and when it is invoked - Explain the difference between
ImmediateStartVertexManagerandShuffleVertexManager - Describe the speculative execution trigger conditions and what it causes
- Trace a vertex failure from first task failure to
DAGImplreceivingV_COMPLETED - Explain what vertex groups are and why they exist
Reading a StateMachineFactory
Tez uses Hadoop's StateMachineFactory (from hadoop-common). The pattern:
private static final StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent>
stateMachineFactory =
new StateMachineFactory<>(VertexState.NEW)
// From NEW
.addTransition(VertexState.NEW,
VertexState.INITIALIZING,
VertexEventType.V_INIT,
new InitTransition())
// From INITIALIZING
.addTransition(VertexState.INITIALIZING,
EnumSet.of(VertexState.INITED, VertexState.FAILED),
VertexEventType.V_INIT_DONE,
new InitedTransition())
...
.installTopology();
Reading rules
- First argument — the source state (where we are now)
- Second argument — the destination state(s). If an
EnumSet, the transition handler decides which destination to return. - Third argument — the event type that triggers this transition
- Fourth argument — the
SingleArcTransitionorMultipleArcTransitionhandler
A SingleArcTransition always goes to the same destination state. Its transition() method
returns void.
A MultipleArcTransition can go to different states. Its transition() method returns the
next VertexState.
When you see an EnumSet as the second argument, look for a MultipleArcTransition
implementation — the logic inside that class decides which state to move to.
How to extract the full transition table
# List all addTransition calls in VertexImpl
grep -n "addTransition" \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java \
| wc -l
# Print them all
grep -n "addTransition" \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
The Full Vertex State Machine
NEW → INITIALIZING (event: V_INIT)
Triggered by DAGImpl.InitTransition when the DAG is initializing.
Handler: InitTransition
What happens:
VertexImplsets upinputsWithInitializers— inputs that requireRootInputInitializer- Registers event handlers for root input initializer completion events
- If there are no root input initializers, immediately posts
V_INIT_DONE(transitions toINITEDin the same logical step)
Precondition for INITIALIZING → INITED:
- All
RootInputInitializers have reported completion VertexManager.initialize()has completed
INITED → RUNNING (event: V_START)
Triggered by DAGImpl when all source vertices of this vertex have started or when the
vertex has no source edges (it is a root vertex).
Handler: StartTransition
What happens:
- Calls
vertexManager.onVertexStarted() - The
VertexManagerdecides when to schedule tasks
Important: the V_START event does not directly schedule tasks. The VertexManager
does, via VertexManagerPlugin.scheduleVertexTasks().
RUNNING task completion handling
Each task completion (success or failure) generates a V_TASK_COMPLETED event.
The TaskCompletedTransition handler:
- Increments the succeeded/failed task counter
- Checks if all tasks are done → if yes, triggers
V_COMPLETE_EVENT - Checks speculative execution conditions
- Checks if failure count exceeds tolerable failures threshold
Key configuration: tez.vertex.failure-tasks-percent.to-fail-vertex — percentage of task
failures that cause the entire vertex to fail. Default: 0 (any failure fails the vertex).
Setting to > 0 enables partial failure tolerance.
RUNNING → COMMITTING (all tasks succeeded)
Before a vertex is marked SUCCEEDED, its output committers run.
Handler: VertexCommitCallback
What happens:
OutputCommitter.commitOutput()is called for each output with a committer- Commit is atomic: either all outputs commit or the vertex fails
- The AM must not fail between task completion and output commit (AM recovery handles this)
RUNNING → FAILED
Triggers:
- A task exceeds the failure threshold (
V_TASK_COMPLETEDwith failure) - A container dies without a task completion report
VertexManagerreports an error- A downstream vertex fails and error propagation is configured
RECOVERING states
When the AM restarts (e.g., due to a node failure), VertexImpl enters RECOVERING states.
Recovery reads history events from the timeline service to reconstruct which tasks completed
before the AM died, avoiding re-running already-succeeded tasks.
This is the most complex part of VertexImpl. Recovery bugs are a major category of
contributor-fixable issues.
VertexManager
VertexManager is the plugin interface that controls task scheduling within a vertex.
It sits between the AM framework and the actual task scheduler.
Interface (simplified)
public abstract class VertexManagerPlugin {
// Called when vertex is initialized; plugin configures itself
public abstract void initialize() throws Exception;
// Called when V_START event fires; plugin decides when to schedule tasks
public abstract void onVertexStarted(List<TaskAttemptIdentifier> completions)
throws Exception;
// Called each time a source vertex task completes
// Plugin uses this to update scheduling decisions (for slow-start)
public abstract void onSourceTaskCompleted(TaskAttemptIdentifier completedSrcTaskAttempt)
throws Exception;
// Called when vertex configuration changes (e.g., auto-parallelism)
public abstract void onVertexManagerEventReceived(VertexManagerEvent vmEvent)
throws Exception;
}
ImmediateStartVertexManager
The default for root vertices and vertices with no special scheduling requirements.
Behavior:
- Schedules all tasks immediately when
onVertexStarted()is called - Does not wait for any source task completion
- Used by:
Tokenizervertex inOrderedWordCount
ShuffleVertexManager
Used for vertices that receive SCATTER_GATHER input from a source vertex.
Behavior:
- Implements slow start: waits until a configurable fraction of source tasks have completed before scheduling downstream tasks
- Configuration key:
tez.shuffle-vertex-manager.min-src-fraction(default 0.25) andtez.shuffle-vertex-manager.max-src-fraction(default 0.75) - Implements auto-parallelism: can reduce the downstream vertex's parallelism based on the actual size of shuffle data
- When auto-parallelism reduces parallelism, it calls
context.reconfigureVertex()which posts aV_PARALLELISM_UPDATEDevent toVertexImpl
Why VertexManager Matters for Contributors
Auto-parallelism and slow-start bugs are a major category of Tez issues. The interaction
between ShuffleVertexManager and VertexImpl involves:
- Parallelism changes after task scheduling
- Race conditions between task completion events and parallelism updates
- Recovery of vertices that had parallelism changed before AM death
Speculative Execution
Speculative execution launches a duplicate task attempt when the original attempt is slow.
Trigger conditions
VertexImpl checks speculation conditions in TaskCompletedTransition and on a periodic
timer:
- At least one task has completed (we have a baseline for "normal" task duration)
- The running attempt has been running longer than
speculative_threshold * median_time - The running attempt's progress is lower than expected for its elapsed time
Configuration:
tez.am.speculation.enabled = true (default: false)
tez.am.speculation.interval-ms = 5000 (check interval)
What happens
VertexImplposts aTaskEventType.T_ADD_SPEC_ATTEMPTevent toTaskImplTaskImplcreates a newTaskAttemptImpl- Both attempts run concurrently
- The first to succeed wins; the other is killed
- The winning attempt's output is committed; the losing attempt's output is discarded
Interaction with ShuffleVertexManager
If a speculative attempt completes, ShuffleVertexManager receives an onSourceTaskCompleted
callback for the winning attempt. It must de-duplicate: the task's output should only be
counted once regardless of which attempt succeeded.
Vertex Groups
Vertex groups (VertexGroup in the API) allow multiple vertices to be treated as a single
logical vertex for downstream consumption.
Use case: merging the output of multiple Map vertices before a single Reduce vertex, without
an intermediate shuffle. This is used in the Hive UnionAll operator implementation.
Key classes:
VertexGroupAPI:tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.javaGroupInputEdge: an edge from aVertexGroupto a regular vertex- The downstream vertex sees a single
MergedLogicalInputthat combines all group members
Key Classes for This Level
| Class | Path | Focus |
|---|---|---|
VertexImpl | tez-dag/.../dag/impl/VertexImpl.java | The entire state machine; 6000+ lines |
ShuffleVertexManager | tez-dag/.../library/cartesian/ShuffleVertexManager.java | Wait: this is actually in tez-dag/.../vertexmanager/ |
ImmediateStartVertexManager | tez-dag/.../vertexmanager/ImmediateStartVertexManager.java | Simple baseline |
VertexManagerPlugin | tez-api/.../VertexManagerPlugin.java | The interface |
VertexManagerPluginContext | tez-api/.../VertexManagerPluginContext.java | What the plugin can call back into |
TaskImpl | tez-dag/.../dag/impl/TaskImpl.java | Manages attempt lifecycle |
# Find the VertexManager implementations
find tez-dag/src/main/java -name "*VertexManager*.java" | grep -v test
JIRA Categories for Level 4 Contributors
You are now ready to investigate and submit patches for:
- Vertex failure handling bugs — incorrect state transitions, wrong error messages
- VertexManager logic bugs — slow-start fraction calculation, auto-parallelism edge cases
- Recovery bugs — vertices that fail to recover correctly after AM restart
- Speculation bugs — duplicate completions, wrong trigger conditions
- Test improvements —
TestVertexImplhas hundreds of tests; adding coverage for edge cases
Approach:
- Find a
TestVertexImpltest that is@Ignored — read the comment explaining why - If the bug is fixed, the
@Ignorecan be removed (a trivial but real contribution) - Or find a state machine transition that has no test coverage (grep for the transition, then grep for the handler class name in test files)
Deliverables
-
Extract the complete
VertexImplstate transition table (all source states, event types, destination states) from the code -
Explain
ShuffleVertexManagerslow-start in your own words, with the relevant config keys -
Trace a vertex failure through
TaskImpl → VertexImpl → DAGImplusing event type names -
Identify one
@Ignored test inTestVertexImpland read why it is ignored - Lab 4.1 completed: full state machine map documented
- Lab 4.2 completed: VertexManager walkthrough complete
Common Mistakes
| Mistake | Impact | Correct understanding |
|---|---|---|
Assuming V_START schedules tasks | Code changes that bypass VertexManager break auto-parallelism | V_START calls VertexManager.onVertexStarted(); the manager schedules |
Ignoring RECOVERING states | Patches that forget about recovery cause AM restart failures | Every new state or transition must handle the RECOVERING_* path |
Confusing TaskImpl failure handling with VertexImpl | Retry logic is in TaskImpl; failure threshold is in VertexImpl | Read both classes before touching failure handling code |
Reading VertexImpl in isolation | Many transitions involve callbacks to DAGImpl | Always trace events both ways: into the state machine AND back out |