Event Routing
Events are the only sanctioned API for mutating any AM-side entity. This chapter catalogs the event hierarchy, explains the "events are the only mutation API" rule, walks how a single task-completion percolates up to the DAG, and shows where each event is registered and dispatched.
After this chapter you should be able to trace any state transition in the AM back through the chain of events that caused it.
The hierarchy
hadoop-yarn-common
org/apache/hadoop/yarn/event/AbstractEvent<EVT_TYPE>
org/apache/hadoop/yarn/event/EventHandler<E>
org/apache/hadoop/yarn/event/AsyncDispatcher
tez-dag
org/apache/tez/dag/app/dag/event/
DAGEvent (subclasses: DAGEventStart, DAGEventDAGAttemptStarted, ...)
VertexEvent (subclasses: VertexEventTaskCompleted, VertexEventVertexCompleted, ...)
TaskEvent (subclasses: TaskEventTAUpdate, TaskEventTermination, ...)
TaskAttemptEvent (subclasses: TaskAttemptEventStartedRemotely, ...)
AMSchedulerEvent
AMContainerEvent
AMNodeEvent
SpeculatorEvent
...
Hint to grep all event classes:
find tez-dag/src/main/java/org/apache/tez/dag/app -path "*event*" -name "*.java" \
| xargs grep -l "extends AbstractEvent\|extends DAGEvent\|extends VertexEvent" \
| head -30
The AbstractEvent<E> base has two fields: an event type (enum) and a
timestamp. Concrete event classes add payloads (e.g.,
VertexEventTaskCompleted carries the TezTaskID and the
TaskAttemptIdentifier).
The "events are the only mutation API" rule
This rule is the bedrock of correctness:
Any change to the externally observable state of a
DAGImpl,VertexImpl,TaskImpl, orTaskAttemptImplmust occur inside a state-machine transition handler, triggered by an event that flowed through theAsyncDispatcher.
Concretely:
- Never call a setter directly on
VertexImplfrom another thread. - Never have one entity reach into another and mutate. Send an event.
- The only "side door" is read-only getters (intentionally not synchronized; callers tolerate slight staleness).
Why this rule:
- Concurrency safety — the dispatcher serializes everything. Direct mutation re-introduces races.
- Auditability — events appear in the AM log; field writes do not.
- Recoverability —
RecoveryServicewrites events; replay rebuilds state. Mutations outside events are invisible to recovery. - Testability —
DrainDispatchercontrols the world; bypass it and tests become non-deterministic.
A patch that calls a mutator method outside a transition handler is, by convention, immediately rejected.
Bubble-up: a task completion to the DAG
sequenceDiagram
participant TA as TaskAttemptImpl
participant T as TaskImpl
participant V as VertexImpl
participant D as DAGImpl
participant DI as Dispatcher
Note over TA: heartbeat -> done(...) on umbilical
TA->>TA: handle(TA_DONE)
TA-->>DI: emit T_ATTEMPT_SUCCEEDED
DI->>T: handle(T_ATTEMPT_SUCCEEDED)
T->>T: mark winner; check siblings
T-->>DI: emit V_TASK_COMPLETED (success)
DI->>V: handle(V_TASK_COMPLETED)
V->>V: bump succeededTaskCount
alt All tasks done
V-->>DI: emit V_COMMIT_REQUEST (if applicable)
V-->>DI: emit DAG_VERTEX_COMPLETED
DI->>D: handle(DAG_VERTEX_COMPLETED)
D->>D: bump succeededVertexCount
end
Every arrow is a state-machine transition. Every emit is an
eventHandler.handle(...) call inside the transition body.
Find the emit sites:
grep -n "eventHandler.handle\b" \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java | head
grep -n "eventHandler.handle\b" \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java | head
Where events are registered
Registrations live in DAGAppMaster.serviceInit (see
dag-app-master.md):
grep -n "dispatcher.register\|register(.*\.class" \
tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Each registration maps an event type to a handler. Most handlers are inner
classes that delegate to entity.handle(event):
private class TaskEventDispatcher implements EventHandler<TaskEvent> {
@Override
public void handle(TaskEvent event) {
DAG dag = context.getCurrentDAG();
Task task = dag.getVertex(event.getTaskID().getVertexID())
.getTask(event.getTaskID());
((EventHandler<TaskEvent>) task).handle(event);
}
}
Why the indirection: events carry IDs, not object references. The dispatcher handler does the resolve, then forwards.
Per-entity event types
| Entity | Event type enum | Where emitted |
|---|---|---|
DAGImpl | DAGEventType | Vertex completions, kill, recovery |
VertexImpl | VertexEventType | Task completions, manager callbacks, root input events |
TaskImpl | TaskEventType | Attempt completions, speculation, kill |
TaskAttemptImpl | TaskAttemptEventType | Container events, umbilical events |
TaskSchedulerManager | AMSchedulerEventType | New requests, completions, container availability |
AMContainerImpl | AMContainerEventType | Launch, assignment, completion |
HistoryEventHandler | HistoryEventType | Any history-loggable change |
Each enum lives next to the event class:
ls tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/*EventType.java
Reading exercise
# Catalog
find tez-dag/src/main/java/org/apache/tez/dag/app -name "*Event.java" \
| head -40
# Find a transition that emits other events
grep -B2 -A15 "class CommitCompletedTransition" \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
# Find AMSchedulerEvent emit sites
grep -rn "AMSchedulerEvent" tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ | head
# Compare: emit vs direct mutation
grep -n "eventHandler.handle" tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java | wc -l
Answer:
- Why does the dispatcher carry IDs (e.g.,
TezTaskID) inside events rather than object references? - Find an event that crosses subsystems: e.g.,
TaskAttemptImplemitting anAMSchedulerEvent. What is the receiver and what action does it take? - List the four classes of events that
VertexImpl.handlereacts to and the three classes it emits. - How does the AM ensure ordering when multiple events for the same entity are emitted in quick succession?
- What happens if a transition handler throws an uncaught exception? Which thread catches it?
- Find one event that has no consumers (dead code). If you find one, propose its removal in a JIRA.
Common bugs and symptoms
| Symptom | Root cause | Where to look |
|---|---|---|
| Inconsistent state visible to two getters | Direct mutation outside dispatcher | Audit for setters called from non-handler code |
| Event "lost" — entity never sees it | Forgot to register handler in DAGAppMaster.serviceInit | Add registration; add unit test |
| Replay during recovery diverges from original run | An event was emitted but not recorded (recovery log gap) | RecoveryService writer filter |
| Deadlock when one entity event handler tries to read another entity | Reader path uses a lock held elsewhere | Prefer event-emit over cross-entity reads |
Test hangs in DrainDispatcher.await() | Transition emitted an event of a type with no handler in test | Register the missing handler (no-op is fine) |
| One subsystem floods the dispatcher | Storm of small events (e.g., per-heartbeat) | Batch in the emitter; or upgrade to a separate dispatcher |
Validation: prove you understand this
- Pick one transition in
TaskAttemptImpland trace every event it emits; for each, name the receiving entity. - Open
DAGAppMasterand list every event type registered, in order. - Walk a
V_KILLfromDAGImpl.killDAGdown to aTaskAttemptImplactually shutting down its container. - Write a unit test that triggers a transition with an event whose payload is malformed; verify the dispatcher logs the error without crashing.
- Explain why moving from
AsyncDispatcherto a multi-threaded dispatcher would break Tez and what would have to change to support it.