State Machines

Tez's AM uses Hadoop's StateMachineFactory extensively: every long-lived entity (DAGImpl, VertexImpl, TaskImpl, TaskAttemptImpl, container state objects) is a state machine. This chapter explains the API, the dispatcher contract that keeps state machines correct, the AsyncDispatcher vs DrainDispatcher distinction, the common InvalidStateTransitonException bug class, and the discipline required to add a transition safely.

After this chapter you should be able to write a small state machine from scratch and review a transition-modifying patch.


The API

The factory lives in:

hadoop-yarn-common
  org/apache/hadoop/yarn/state/StateMachineFactory.java
  org/apache/hadoop/yarn/state/SingleArcTransition.java
  org/apache/hadoop/yarn/state/MultipleArcTransition.java
  org/apache/hadoop/yarn/state/InvalidStateTransitonException.java

(Yes, the exception is spelled Transiton in the Hadoop source — historical typo, preserved for compatibility. Greps that look for Transition will miss it.)

Skeleton:

private static final StateMachineFactory<MyEntity, MyState, MyEvtType, MyEvt>
    stateMachineFactory =
    new StateMachineFactory<MyEntity, MyState, MyEvtType, MyEvt>(MyState.NEW)

        // Single-arc: state, event, nextState, transition
        .addTransition(MyState.NEW, MyState.RUNNING,
            MyEvtType.START,
            new StartTransition())

        // Multiple-arc: state, set of possible next states, event, transition
        .addTransition(MyState.RUNNING,
            EnumSet.of(MyState.SUCCEEDED, MyState.FAILED),
            MyEvtType.DONE,
            new DoneTransition())

        // Self-loop: state, state, event, transition
        .addTransition(MyState.RUNNING, MyState.RUNNING,
            MyEvtType.HEARTBEAT,
            new HeartbeatTransition())

        // No-op self-loop with no transition object
        .addTransition(MyState.SUCCEEDED, MyState.SUCCEEDED,
            EnumSet.of(MyEvtType.HEARTBEAT))

        .installTopology();

installTopology() returns a builder you store; per-instance:

private final StateMachine<MyState, MyEvtType, MyEvt> stateMachine =
    stateMachineFactory.make(this);

public void handle(MyEvt event) {
  writeLock.lock();
  try {
    MyState oldState = stateMachine.getCurrentState();
    try {
      stateMachine.doTransition(event.getType(), event);
    } catch (InvalidStateTransitonException e) {
      LOG.error("Invalid event " + event.getType() + " at " + oldState);
      // typically: re-throw or transition to ERROR
    }
  } finally {
    writeLock.unlock();
  }
}

Single-arc vs multiple-arc

ConceptWhen to useImplementation
SingleArcTransition<OPERAND, EVENT>The next state is always the samevoid transition(OPERAND op, EVENT event)
MultipleArcTransition<OPERAND, EVENT, STATE>Next state depends on event contentSTATE transition(OPERAND op, EVENT event) (returns the chosen state)

You almost always start with SingleArcTransition. Promote to MultipleArcTransition only when the next state legitimately depends on runtime data (e.g., "if task count == 0 then SUCCEEDED else RUNNING").


Dispatcher contract

State machines are not thread-safe by themselves. Tez upholds correctness via the single-dispatcher-thread invariant:

  • All events for a DAGAppMaster's state machines flow through one AsyncDispatcher.
  • The dispatcher has one thread that pulls events and calls handle(event).
  • Therefore handlers run serially; no two handle() calls overlap.

This invariant is the reason VertexImpl.handle can manipulate fields without synchronization. Break the invariant and you get races no test will catch consistently.

grep -n "AsyncDispatcher\|GenericEventHandler" \
  tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java

AsyncDispatcher vs DrainDispatcher

ClassWhereBehavior
AsyncDispatcherProductionBackground thread; events processed asynchronously
DrainDispatcherTestsSame API; tests call await() to block until queue empty

Tests use DrainDispatcher so they can assert state after a known set of events has been processed:

DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.register(VertexEventType.class, vertexEventHandler);
dispatcher.init(conf);
dispatcher.start();
dispatcher.getEventHandler().handle(new VertexEvent(...));
dispatcher.await();   // blocks until queue empty
assertEquals(VertexState.RUNNING, vertex.getState());
find . -name "DrainDispatcher.java"
grep -rn "new DrainDispatcher" tez-dag/src/test/java | head

InvalidStateTransitonException

Thrown when doTransition(type, event) finds no registered handler for the (currentState, eventType) pair. The exception message has the form:

Invalid event: V_TASK_RESCHEDULED at SUCCEEDED

Common causes:

  1. A late event arrived after the entity reached a terminal state (race between cancellation and a completion event).
  2. A new code path emits an event but the receiving state machine forgot to register a handler.
  3. The event sender misunderstood the protocol.

Fixing one of these almost always requires:

  • Adding a (state, eventType, sameState) no-op transition (case 1).
  • Adding a real transition (case 2).
  • Removing the bogus emit (case 3).

Never silently catch and swallow the exception in production code — it indicates a real protocol violation, and an unhandled exception in the dispatch thread is a worse outcome than a graceful error.


How to add a transition safely

Process every Tez committer follows when modifying a state machine:

  1. Find the existing transitions for the state — read all addTransition(STATE, ...) lines.
  2. Identify the gap — confirm the event is not already handled.
  3. Add the transition in the correct alphabetical/grouping order the file uses.
  4. Add a unit test to the corresponding Test*Impl class that triggers the new event in the relevant state.
  5. Update related no-op transitions for terminal states (a new event needs no-op handlers in SUCCEEDED, FAILED, KILLED).
  6. Run all tests in the module before opening a PR.

The discipline "always update the test in the same patch" is enforced by reviewers. PRs that change VertexImpl without changes to TestVertexImpl are typically blocked.


Reading exercise

# Find the factory blocks
grep -n "stateMachineFactory" tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/*.java

# Count transitions per entity
for f in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java \
         tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java \
         tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java \
         tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java; do
  echo "$f $(grep -c addTransition $f)"
done

# Look at one transition impl
grep -n "class StartTransition\|class InitTransition" \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java | head

Answer:

  1. Why is the exception named InvalidStateTransitonException (with a typo)? What would happen if you renamed it?
  2. Which Tez class uses MultipleArcTransition most heavily, and why?
  3. What does installTopology() return, and why is the factory typically a static final field?
  4. In TestVertexImpl, find the DrainDispatcher.await() calls. Why are they essential and what failure mode occurs if you forget?
  5. If two threads call vertexImpl.handle(event) concurrently — bypassing the dispatcher — what specific bug class arises?
  6. Read one MultipleArcTransition and explain how its return value determines the next state.

Common bugs and symptoms

SymptomRoot causeFix
InvalidStateTransitonException: Invalid event X at TERMINAL_STATELate event after terminal stateAdd a no-op transition
Test passes locally, fails on CI intermittentlyDrainDispatcher.await() missing or called too earlyAlways call await() between event sends and asserts
State machine mutates wrong fieldsTransition class accidentally captures outer stateMake transition classes static; pass everything via the event
Dispatcher thread deadlocksHandler is doing blocking I/O on dispatch threadMove I/O to a worker; emit a follow-up event when done
addTransition for a no-op throws compile errorWrong arity overloadUse the variant with EnumSet<EventType>
Adding a transition silently breaks recoveryRecovery replay hits the new event in an old stateCover the recovery test path; recovery uses the same SM

Validation: prove you understand this

  1. Implement a Light state machine with states OFF, ON, BROKEN and events TOGGLE, BREAK. Compile and unit-test.
  2. Find every SingleArcTransition in VertexImpl that is registered as static final — explain why static.
  3. Take an InvalidStateTransitonException from a real AM log; map it to the exact (state, event) pair and propose either a fix or a JIRA.
  4. Run TestVertexImpl#testKilledTasksHandling. Identify every DrainDispatcher.await() call and what it guards.
  5. Add a (SUCCEEDED, T_HEARTBEAT, SUCCEEDED) no-op to TaskImpl and the corresponding test in TestTaskImpl. Ensure all tests pass.