Event Routing

Events are the only sanctioned API for mutating any AM-side entity. This chapter catalogs the event hierarchy, explains the "events are the only mutation API" rule, walks how a single task-completion percolates up to the DAG, and shows where each event is registered and dispatched.

After this chapter you should be able to trace any state transition in the AM back through the chain of events that caused it.


The hierarchy

hadoop-yarn-common
  org/apache/hadoop/yarn/event/AbstractEvent<EVT_TYPE>
  org/apache/hadoop/yarn/event/EventHandler<E>
  org/apache/hadoop/yarn/event/AsyncDispatcher

tez-dag
  org/apache/tez/dag/app/dag/event/
    DAGEvent (subclasses: DAGEventStart, DAGEventDAGAttemptStarted, ...)
    VertexEvent (subclasses: VertexEventTaskCompleted, VertexEventVertexCompleted, ...)
    TaskEvent (subclasses: TaskEventTAUpdate, TaskEventTermination, ...)
    TaskAttemptEvent (subclasses: TaskAttemptEventStartedRemotely, ...)
    AMSchedulerEvent
    AMContainerEvent
    AMNodeEvent
    SpeculatorEvent
    ...

Hint to grep all event classes:

find tez-dag/src/main/java/org/apache/tez/dag/app -path "*event*" -name "*.java" \
  | xargs grep -l "extends AbstractEvent\|extends DAGEvent\|extends VertexEvent" \
  | head -30

The AbstractEvent<E> base has two fields: an event type (enum) and a timestamp. Concrete event classes add payloads (e.g., VertexEventTaskCompleted carries the TezTaskID and the TaskAttemptIdentifier).


The "events are the only mutation API" rule

This rule is the bedrock of correctness:

Any change to the externally observable state of a DAGImpl, VertexImpl, TaskImpl, or TaskAttemptImpl must occur inside a state-machine transition handler, triggered by an event that flowed through the AsyncDispatcher.

Concretely:

  • Never call a setter directly on VertexImpl from another thread.
  • Never have one entity reach into another and mutate. Send an event.
  • The only "side door" is read-only getters (intentionally not synchronized; callers tolerate slight staleness).

Why this rule:

  1. Concurrency safety — the dispatcher serializes everything. Direct mutation re-introduces races.
  2. Auditability — events appear in the AM log; field writes do not.
  3. RecoverabilityRecoveryService writes events; replay rebuilds state. Mutations outside events are invisible to recovery.
  4. TestabilityDrainDispatcher controls the world; bypass it and tests become non-deterministic.

A patch that calls a mutator method outside a transition handler is, by convention, immediately rejected.


Bubble-up: a task completion to the DAG

sequenceDiagram
    participant TA as TaskAttemptImpl
    participant T as TaskImpl
    participant V as VertexImpl
    participant D as DAGImpl
    participant DI as Dispatcher

    Note over TA: heartbeat -> done(...) on umbilical
    TA->>TA: handle(TA_DONE)
    TA-->>DI: emit T_ATTEMPT_SUCCEEDED
    DI->>T: handle(T_ATTEMPT_SUCCEEDED)
    T->>T: mark winner; check siblings
    T-->>DI: emit V_TASK_COMPLETED (success)
    DI->>V: handle(V_TASK_COMPLETED)
    V->>V: bump succeededTaskCount
    alt All tasks done
        V-->>DI: emit V_COMMIT_REQUEST (if applicable)
        V-->>DI: emit DAG_VERTEX_COMPLETED
        DI->>D: handle(DAG_VERTEX_COMPLETED)
        D->>D: bump succeededVertexCount
    end

Every arrow is a state-machine transition. Every emit is an eventHandler.handle(...) call inside the transition body.

Find the emit sites:

grep -n "eventHandler.handle\b" \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java | head
grep -n "eventHandler.handle\b" \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java | head

Where events are registered

Registrations live in DAGAppMaster.serviceInit (see dag-app-master.md):

grep -n "dispatcher.register\|register(.*\.class" \
  tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java

Each registration maps an event type to a handler. Most handlers are inner classes that delegate to entity.handle(event):

private class TaskEventDispatcher implements EventHandler<TaskEvent> {
  @Override
  public void handle(TaskEvent event) {
    DAG dag = context.getCurrentDAG();
    Task task = dag.getVertex(event.getTaskID().getVertexID())
                   .getTask(event.getTaskID());
    ((EventHandler<TaskEvent>) task).handle(event);
  }
}

Why the indirection: events carry IDs, not object references. The dispatcher handler does the resolve, then forwards.


Per-entity event types

EntityEvent type enumWhere emitted
DAGImplDAGEventTypeVertex completions, kill, recovery
VertexImplVertexEventTypeTask completions, manager callbacks, root input events
TaskImplTaskEventTypeAttempt completions, speculation, kill
TaskAttemptImplTaskAttemptEventTypeContainer events, umbilical events
TaskSchedulerManagerAMSchedulerEventTypeNew requests, completions, container availability
AMContainerImplAMContainerEventTypeLaunch, assignment, completion
HistoryEventHandlerHistoryEventTypeAny history-loggable change

Each enum lives next to the event class:

ls tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/*EventType.java

Reading exercise

# Catalog
find tez-dag/src/main/java/org/apache/tez/dag/app -name "*Event.java" \
  | head -40

# Find a transition that emits other events
grep -B2 -A15 "class CommitCompletedTransition" \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java

# Find AMSchedulerEvent emit sites
grep -rn "AMSchedulerEvent" tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ | head

# Compare: emit vs direct mutation
grep -n "eventHandler.handle" tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java | wc -l

Answer:

  1. Why does the dispatcher carry IDs (e.g., TezTaskID) inside events rather than object references?
  2. Find an event that crosses subsystems: e.g., TaskAttemptImpl emitting an AMSchedulerEvent. What is the receiver and what action does it take?
  3. List the four classes of events that VertexImpl.handle reacts to and the three classes it emits.
  4. How does the AM ensure ordering when multiple events for the same entity are emitted in quick succession?
  5. What happens if a transition handler throws an uncaught exception? Which thread catches it?
  6. Find one event that has no consumers (dead code). If you find one, propose its removal in a JIRA.

Common bugs and symptoms

SymptomRoot causeWhere to look
Inconsistent state visible to two gettersDirect mutation outside dispatcherAudit for setters called from non-handler code
Event "lost" — entity never sees itForgot to register handler in DAGAppMaster.serviceInitAdd registration; add unit test
Replay during recovery diverges from original runAn event was emitted but not recorded (recovery log gap)RecoveryService writer filter
Deadlock when one entity event handler tries to read another entityReader path uses a lock held elsewherePrefer event-emit over cross-entity reads
Test hangs in DrainDispatcher.await()Transition emitted an event of a type with no handler in testRegister the missing handler (no-op is fine)
One subsystem floods the dispatcherStorm of small events (e.g., per-heartbeat)Batch in the emitter; or upgrade to a separate dispatcher

Validation: prove you understand this

  1. Pick one transition in TaskAttemptImpl and trace every event it emits; for each, name the receiving entity.
  2. Open DAGAppMaster and list every event type registered, in order.
  3. Walk a V_KILL from DAGImpl.killDAG down to a TaskAttemptImpl actually shutting down its container.
  4. Write a unit test that triggers a transition with an event whose payload is malformed; verify the dispatcher logs the error without crashing.
  5. Explain why moving from AsyncDispatcher to a multi-threaded dispatcher would break Tez and what would have to change to support it.