State Machines
Tez's AM uses Hadoop's StateMachineFactory extensively: every long-lived
entity (DAGImpl, VertexImpl, TaskImpl, TaskAttemptImpl, container
state objects) is a state machine. This chapter explains the API, the
dispatcher contract that keeps state machines correct, the AsyncDispatcher
vs DrainDispatcher distinction, the common InvalidStateTransitonException
bug class, and the discipline required to add a transition safely.
After this chapter you should be able to write a small state machine from scratch and review a transition-modifying patch.
The API
The factory lives in:
hadoop-yarn-common
org/apache/hadoop/yarn/state/StateMachineFactory.java
org/apache/hadoop/yarn/state/SingleArcTransition.java
org/apache/hadoop/yarn/state/MultipleArcTransition.java
org/apache/hadoop/yarn/state/InvalidStateTransitonException.java
(Yes, the exception is spelled Transiton in the Hadoop source — historical
typo, preserved for compatibility. Greps that look for Transition will
miss it.)
Skeleton:
private static final StateMachineFactory<MyEntity, MyState, MyEvtType, MyEvt>
stateMachineFactory =
new StateMachineFactory<MyEntity, MyState, MyEvtType, MyEvt>(MyState.NEW)
// Single-arc: state, event, nextState, transition
.addTransition(MyState.NEW, MyState.RUNNING,
MyEvtType.START,
new StartTransition())
// Multiple-arc: state, set of possible next states, event, transition
.addTransition(MyState.RUNNING,
EnumSet.of(MyState.SUCCEEDED, MyState.FAILED),
MyEvtType.DONE,
new DoneTransition())
// Self-loop: state, state, event, transition
.addTransition(MyState.RUNNING, MyState.RUNNING,
MyEvtType.HEARTBEAT,
new HeartbeatTransition())
// No-op self-loop with no transition object
.addTransition(MyState.SUCCEEDED, MyState.SUCCEEDED,
EnumSet.of(MyEvtType.HEARTBEAT))
.installTopology();
installTopology() returns a builder you store; per-instance:
private final StateMachine<MyState, MyEvtType, MyEvt> stateMachine =
stateMachineFactory.make(this);
public void handle(MyEvt event) {
writeLock.lock();
try {
MyState oldState = stateMachine.getCurrentState();
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
LOG.error("Invalid event " + event.getType() + " at " + oldState);
// typically: re-throw or transition to ERROR
}
} finally {
writeLock.unlock();
}
}
Single-arc vs multiple-arc
| Concept | When to use | Implementation |
|---|---|---|
SingleArcTransition<OPERAND, EVENT> | The next state is always the same | void transition(OPERAND op, EVENT event) |
MultipleArcTransition<OPERAND, EVENT, STATE> | Next state depends on event content | STATE transition(OPERAND op, EVENT event) (returns the chosen state) |
You almost always start with SingleArcTransition. Promote to
MultipleArcTransition only when the next state legitimately depends on
runtime data (e.g., "if task count == 0 then SUCCEEDED else RUNNING").
Dispatcher contract
State machines are not thread-safe by themselves. Tez upholds correctness via the single-dispatcher-thread invariant:
- All events for a
DAGAppMaster's state machines flow through oneAsyncDispatcher. - The dispatcher has one thread that pulls events and calls
handle(event). - Therefore handlers run serially; no two
handle()calls overlap.
This invariant is the reason VertexImpl.handle can manipulate fields
without synchronization. Break the invariant and you get races no test will
catch consistently.
grep -n "AsyncDispatcher\|GenericEventHandler" \
tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
AsyncDispatcher vs DrainDispatcher
| Class | Where | Behavior |
|---|---|---|
AsyncDispatcher | Production | Background thread; events processed asynchronously |
DrainDispatcher | Tests | Same API; tests call await() to block until queue empty |
Tests use DrainDispatcher so they can assert state after a known set of
events has been processed:
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.register(VertexEventType.class, vertexEventHandler);
dispatcher.init(conf);
dispatcher.start();
dispatcher.getEventHandler().handle(new VertexEvent(...));
dispatcher.await(); // blocks until queue empty
assertEquals(VertexState.RUNNING, vertex.getState());
find . -name "DrainDispatcher.java"
grep -rn "new DrainDispatcher" tez-dag/src/test/java | head
InvalidStateTransitonException
Thrown when doTransition(type, event) finds no registered handler for the
(currentState, eventType) pair. The exception message has the form:
Invalid event: V_TASK_RESCHEDULED at SUCCEEDED
Common causes:
- A late event arrived after the entity reached a terminal state (race between cancellation and a completion event).
- A new code path emits an event but the receiving state machine forgot to register a handler.
- The event sender misunderstood the protocol.
Fixing one of these almost always requires:
- Adding a
(state, eventType, sameState)no-op transition (case 1). - Adding a real transition (case 2).
- Removing the bogus emit (case 3).
Never silently catch and swallow the exception in production code — it indicates a real protocol violation, and an unhandled exception in the dispatch thread is a worse outcome than a graceful error.
How to add a transition safely
Process every Tez committer follows when modifying a state machine:
- Find the existing transitions for the state — read all
addTransition(STATE, ...)lines. - Identify the gap — confirm the event is not already handled.
- Add the transition in the correct alphabetical/grouping order the file uses.
- Add a unit test to the corresponding
Test*Implclass that triggers the new event in the relevant state. - Update related no-op transitions for terminal states (a new event
needs no-op handlers in
SUCCEEDED,FAILED,KILLED). - Run all tests in the module before opening a PR.
The discipline "always update the test in the same patch" is enforced by
reviewers. PRs that change VertexImpl without changes to TestVertexImpl
are typically blocked.
Reading exercise
# Find the factory blocks
grep -n "stateMachineFactory" tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/*.java
# Count transitions per entity
for f in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java; do
echo "$f $(grep -c addTransition $f)"
done
# Look at one transition impl
grep -n "class StartTransition\|class InitTransition" \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java | head
Answer:
- Why is the exception named
InvalidStateTransitonException(with a typo)? What would happen if you renamed it? - Which Tez class uses
MultipleArcTransitionmost heavily, and why? - What does
installTopology()return, and why is the factory typically astatic finalfield? - In
TestVertexImpl, find theDrainDispatcher.await()calls. Why are they essential and what failure mode occurs if you forget? - If two threads call
vertexImpl.handle(event)concurrently — bypassing the dispatcher — what specific bug class arises? - Read one
MultipleArcTransitionand explain how its return value determines the next state.
Common bugs and symptoms
| Symptom | Root cause | Fix |
|---|---|---|
InvalidStateTransitonException: Invalid event X at TERMINAL_STATE | Late event after terminal state | Add a no-op transition |
| Test passes locally, fails on CI intermittently | DrainDispatcher.await() missing or called too early | Always call await() between event sends and asserts |
| State machine mutates wrong fields | Transition class accidentally captures outer state | Make transition classes static; pass everything via the event |
| Dispatcher thread deadlocks | Handler is doing blocking I/O on dispatch thread | Move I/O to a worker; emit a follow-up event when done |
addTransition for a no-op throws compile error | Wrong arity overload | Use the variant with EnumSet<EventType> |
| Adding a transition silently breaks recovery | Recovery replay hits the new event in an old state | Cover the recovery test path; recovery uses the same SM |
Validation: prove you understand this
- Implement a
Lightstate machine with statesOFF,ON,BROKENand eventsTOGGLE,BREAK. Compile and unit-test. - Find every
SingleArcTransitioninVertexImplthat is registered asstatic final— explain why static. - Take an
InvalidStateTransitonExceptionfrom a real AM log; map it to the exact(state, event)pair and propose either a fix or a JIRA. - Run
TestVertexImpl#testKilledTasksHandling. Identify everyDrainDispatcher.await()call and what it guards. - Add a
(SUCCEEDED, T_HEARTBEAT, SUCCEEDED)no-op toTaskImpland the corresponding test inTestTaskImpl. Ensure all tests pass.