Level 4: DAG State Machine Internals

This level takes you inside VertexImpl — the most complex class in the Tez codebase. You will read the full state machine, understand every major state and the conditions that drive transitions, learn how VertexManager plugs in to control scheduling, and understand how speculative execution works. After this level you are capable of diagnosing vertex-level failures from AM log output and writing patches to the state machine.


Learning Objectives

By the end of Level 4 you must be able to:

  1. Read a StateMachineFactory definition and produce a transition table from code
  2. Explain the full INITIALIZING → INITED → RUNNING → SUCCEEDED path with all preconditions
  3. Describe what VertexManager does and when it is invoked
  4. Explain the difference between ImmediateStartVertexManager and ShuffleVertexManager
  5. Describe the speculative execution trigger conditions and what it causes
  6. Trace a vertex failure from first task failure to DAGImpl receiving V_COMPLETED
  7. Explain what vertex groups are and why they exist

Reading a StateMachineFactory

Tez uses Hadoop's StateMachineFactory (from hadoop-common). The pattern:

private static final StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent>
    stateMachineFactory =
        new StateMachineFactory<>(VertexState.NEW)

        // From NEW
        .addTransition(VertexState.NEW,
            VertexState.INITIALIZING,
            VertexEventType.V_INIT,
            new InitTransition())

        // From INITIALIZING
        .addTransition(VertexState.INITIALIZING,
            EnumSet.of(VertexState.INITED, VertexState.FAILED),
            VertexEventType.V_INIT_DONE,
            new InitedTransition())
        ...
        .installTopology();

Reading rules

  1. First argument — the source state (where we are now)
  2. Second argument — the destination state(s). If an EnumSet, the transition handler decides which destination to return.
  3. Third argument — the event type that triggers this transition
  4. Fourth argument — the SingleArcTransition or MultipleArcTransition handler

A SingleArcTransition always goes to the same destination state. Its transition() method returns void.

A MultipleArcTransition can go to different states. Its transition() method returns the next VertexState.

When you see an EnumSet as the second argument, look for a MultipleArcTransition implementation — the logic inside that class decides which state to move to.

How to extract the full transition table

# List all addTransition calls in VertexImpl
grep -n "addTransition" \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java \
  | wc -l

# Print them all
grep -n "addTransition" \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java

The Full Vertex State Machine

NEW → INITIALIZING (event: V_INIT)

Triggered by DAGImpl.InitTransition when the DAG is initializing.

Handler: InitTransition

What happens:

  • VertexImpl sets up inputsWithInitializers — inputs that require RootInputInitializer
  • Registers event handlers for root input initializer completion events
  • If there are no root input initializers, immediately posts V_INIT_DONE (transitions to INITED in the same logical step)

Precondition for INITIALIZING → INITED:

  • All RootInputInitializers have reported completion
  • VertexManager.initialize() has completed

INITED → RUNNING (event: V_START)

Triggered by DAGImpl when all source vertices of this vertex have started or when the vertex has no source edges (it is a root vertex).

Handler: StartTransition

What happens:

  • Calls vertexManager.onVertexStarted()
  • The VertexManager decides when to schedule tasks

Important: the V_START event does not directly schedule tasks. The VertexManager does, via VertexManagerPlugin.scheduleVertexTasks().

RUNNING task completion handling

Each task completion (success or failure) generates a V_TASK_COMPLETED event.

The TaskCompletedTransition handler:

  • Increments the succeeded/failed task counter
  • Checks if all tasks are done → if yes, triggers V_COMPLETE_EVENT
  • Checks speculative execution conditions
  • Checks if failure count exceeds tolerable failures threshold

Key configuration: tez.vertex.failure-tasks-percent.to-fail-vertex — percentage of task failures that cause the entire vertex to fail. Default: 0 (any failure fails the vertex). Setting to > 0 enables partial failure tolerance.

RUNNING → COMMITTING (all tasks succeeded)

Before a vertex is marked SUCCEEDED, its output committers run.

Handler: VertexCommitCallback

What happens:

  • OutputCommitter.commitOutput() is called for each output with a committer
  • Commit is atomic: either all outputs commit or the vertex fails
  • The AM must not fail between task completion and output commit (AM recovery handles this)

RUNNING → FAILED

Triggers:

  1. A task exceeds the failure threshold (V_TASK_COMPLETED with failure)
  2. A container dies without a task completion report
  3. VertexManager reports an error
  4. A downstream vertex fails and error propagation is configured

RECOVERING states

When the AM restarts (e.g., due to a node failure), VertexImpl enters RECOVERING states. Recovery reads history events from the timeline service to reconstruct which tasks completed before the AM died, avoiding re-running already-succeeded tasks.

This is the most complex part of VertexImpl. Recovery bugs are a major category of contributor-fixable issues.


VertexManager

VertexManager is the plugin interface that controls task scheduling within a vertex. It sits between the AM framework and the actual task scheduler.

Interface (simplified)

public abstract class VertexManagerPlugin {
    // Called when vertex is initialized; plugin configures itself
    public abstract void initialize() throws Exception;

    // Called when V_START event fires; plugin decides when to schedule tasks
    public abstract void onVertexStarted(List<TaskAttemptIdentifier> completions)
        throws Exception;

    // Called each time a source vertex task completes
    // Plugin uses this to update scheduling decisions (for slow-start)
    public abstract void onSourceTaskCompleted(TaskAttemptIdentifier completedSrcTaskAttempt)
        throws Exception;

    // Called when vertex configuration changes (e.g., auto-parallelism)
    public abstract void onVertexManagerEventReceived(VertexManagerEvent vmEvent)
        throws Exception;
}

ImmediateStartVertexManager

The default for root vertices and vertices with no special scheduling requirements.

Behavior:

  • Schedules all tasks immediately when onVertexStarted() is called
  • Does not wait for any source task completion
  • Used by: Tokenizer vertex in OrderedWordCount

ShuffleVertexManager

Used for vertices that receive SCATTER_GATHER input from a source vertex.

Behavior:

  • Implements slow start: waits until a configurable fraction of source tasks have completed before scheduling downstream tasks
  • Configuration key: tez.shuffle-vertex-manager.min-src-fraction (default 0.25) and tez.shuffle-vertex-manager.max-src-fraction (default 0.75)
  • Implements auto-parallelism: can reduce the downstream vertex's parallelism based on the actual size of shuffle data
  • When auto-parallelism reduces parallelism, it calls context.reconfigureVertex() which posts a V_PARALLELISM_UPDATED event to VertexImpl

Why VertexManager Matters for Contributors

Auto-parallelism and slow-start bugs are a major category of Tez issues. The interaction between ShuffleVertexManager and VertexImpl involves:

  • Parallelism changes after task scheduling
  • Race conditions between task completion events and parallelism updates
  • Recovery of vertices that had parallelism changed before AM death

Speculative Execution

Speculative execution launches a duplicate task attempt when the original attempt is slow.

Trigger conditions

VertexImpl checks speculation conditions in TaskCompletedTransition and on a periodic timer:

  1. At least one task has completed (we have a baseline for "normal" task duration)
  2. The running attempt has been running longer than speculative_threshold * median_time
  3. The running attempt's progress is lower than expected for its elapsed time

Configuration:

tez.am.speculation.enabled = true   (default: false)
tez.am.speculation.interval-ms = 5000  (check interval)

What happens

  1. VertexImpl posts a TaskEventType.T_ADD_SPEC_ATTEMPT event to TaskImpl
  2. TaskImpl creates a new TaskAttemptImpl
  3. Both attempts run concurrently
  4. The first to succeed wins; the other is killed
  5. The winning attempt's output is committed; the losing attempt's output is discarded

Interaction with ShuffleVertexManager

If a speculative attempt completes, ShuffleVertexManager receives an onSourceTaskCompleted callback for the winning attempt. It must de-duplicate: the task's output should only be counted once regardless of which attempt succeeded.


Vertex Groups

Vertex groups (VertexGroup in the API) allow multiple vertices to be treated as a single logical vertex for downstream consumption.

Use case: merging the output of multiple Map vertices before a single Reduce vertex, without an intermediate shuffle. This is used in the Hive UnionAll operator implementation.

Key classes:

  • VertexGroup API: tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
  • GroupInputEdge: an edge from a VertexGroup to a regular vertex
  • The downstream vertex sees a single MergedLogicalInput that combines all group members

Key Classes for This Level

ClassPathFocus
VertexImpltez-dag/.../dag/impl/VertexImpl.javaThe entire state machine; 6000+ lines
ShuffleVertexManagertez-dag/.../library/cartesian/ShuffleVertexManager.javaWait: this is actually in tez-dag/.../vertexmanager/
ImmediateStartVertexManagertez-dag/.../vertexmanager/ImmediateStartVertexManager.javaSimple baseline
VertexManagerPlugintez-api/.../VertexManagerPlugin.javaThe interface
VertexManagerPluginContexttez-api/.../VertexManagerPluginContext.javaWhat the plugin can call back into
TaskImpltez-dag/.../dag/impl/TaskImpl.javaManages attempt lifecycle
# Find the VertexManager implementations
find tez-dag/src/main/java -name "*VertexManager*.java" | grep -v test

JIRA Categories for Level 4 Contributors

You are now ready to investigate and submit patches for:

  • Vertex failure handling bugs — incorrect state transitions, wrong error messages
  • VertexManager logic bugs — slow-start fraction calculation, auto-parallelism edge cases
  • Recovery bugs — vertices that fail to recover correctly after AM restart
  • Speculation bugs — duplicate completions, wrong trigger conditions
  • Test improvementsTestVertexImpl has hundreds of tests; adding coverage for edge cases

Approach:

  1. Find a TestVertexImpl test that is @Ignored — read the comment explaining why
  2. If the bug is fixed, the @Ignore can be removed (a trivial but real contribution)
  3. Or find a state machine transition that has no test coverage (grep for the transition, then grep for the handler class name in test files)

Deliverables

  • Extract the complete VertexImpl state transition table (all source states, event types, destination states) from the code
  • Explain ShuffleVertexManager slow-start in your own words, with the relevant config keys
  • Trace a vertex failure through TaskImpl → VertexImpl → DAGImpl using event type names
  • Identify one @Ignored test in TestVertexImpl and read why it is ignored
  • Lab 4.1 completed: full state machine map documented
  • Lab 4.2 completed: VertexManager walkthrough complete

Common Mistakes

MistakeImpactCorrect understanding
Assuming V_START schedules tasksCode changes that bypass VertexManager break auto-parallelismV_START calls VertexManager.onVertexStarted(); the manager schedules
Ignoring RECOVERING statesPatches that forget about recovery cause AM restart failuresEvery new state or transition must handle the RECOVERING_* path
Confusing TaskImpl failure handling with VertexImplRetry logic is in TaskImpl; failure threshold is in VertexImplRead both classes before touching failure handling code
Reading VertexImpl in isolationMany transitions involve callbacks to DAGImplAlways trace events both ways: into the state machine AND back out