DAGAppMaster

DAGAppMaster is Tez's YARN ApplicationMaster: a single JVM, launched by the YARN ResourceManager, that owns one or more DAGs over its lifetime. This chapter describes its bring-up, its dispatcher topology, its YARN-facing heartbeats, and the recovery service that lets it restart after a crash.

After this chapter you should be able to map any AM log line in the first 60 seconds of operation to a method in DAGAppMaster.java.


Files to open

tez-dag/src/main/java/org/apache/tez/dag/app/
  DAGAppMaster.java                          (the AM main class)
  TaskCommunicatorManager.java               (task umbilical multiplexer)
  ContainerHeartbeatHandler.java             (container liveness)
  rm/
    TaskSchedulerManager.java                (one per scheduler instance)
    YarnTaskSchedulerService.java            (the default scheduler impl)
    container/
      AMContainerImpl.java                   (container state machine)
  launcher/
    ContainerLauncherManager.java
    DagContainerLauncher.java                (varies by version)
    LocalContainerLauncher.java              (in-process)
  recovery/
    RecoveryService.java                     (event log; restart path)
  dag/impl/
    DAGImpl.java
    VertexImpl.java
    TaskImpl.java
    TaskAttemptImpl.java

Bring-up: serviceInit and serviceStart

DAGAppMaster extends AbstractService. YARN starts it with a main; control flows:

main()
  -> DAGAppMaster.create / new DAGAppMaster(...)
  -> init(conf)
       -> serviceInit(conf)
          - parse appAttemptId
          - load credentials
          - construct AsyncDispatcher
          - construct + register child services: TaskSchedulerManager,
              ContainerLauncherManager, TaskCommunicatorManager,
              RecoveryService (if enabled), HistoryEventHandler, ATSHook
          - register event handlers on the dispatcher
  -> start()
       -> serviceStart()
          - start child services (they each start their own threads)
          - if not session mode: handle the inline DAG plan
          - if session mode: enter idle loop, wait for submitDAG RPC

Inspect the boundaries:

grep -n "serviceInit\|serviceStart\|serviceStop" \
  tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java

The AsyncDispatcher and registered handlers

DAGAppMaster builds one AsyncDispatcher (from hadoop-yarn-common) and registers a handler per event type. The contract is:

  • Each handler runs on a single dispatch thread.
  • Handlers must be fast (no blocking I/O); they should mutate state and emit follow-on events.

Find the registrations:

grep -n "dispatcher.register\|register(.*\.class" \
  tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java

Typical registrations (names approximate by version):

Event typeHandler classOwned subsystem
DAGEventTypeDAGEventDispatcher (forwards to DAGImpl.handle)DAG lifecycle
VertexEventTypeVertexEventDispatcher (forwards to VertexImpl.handle)Vertex lifecycle
TaskEventTypeTaskEventDispatcher (forwards to TaskImpl.handle)Task lifecycle
TaskAttemptEventTypeTaskAttemptEventDispatcher (forwards to TaskAttemptImpl.handle)Attempt lifecycle
AMSchedulerEventTypeAMSchedulerEventDispatcher (forwards to TaskSchedulerManager)Scheduling
AMContainerEventTypecontainer event dispatcherContainer state
AMNodeEventTypenode event dispatcherNode tracking
ContainerLauncherEventTypelauncher dispatcherLaunch/stop containers
TaskCommunicatorEventTypecomms dispatcherPer-launcher umbilical
HistoryEventTypehistory event dispatcherATS/log publication
SpeculatorEventTypespeculator dispatcherSpeculation (if enabled)
DAGAppMasterEventTypeAM itselfLifecycle (e.g., shutdown)
RecoveryEventTyperecovery dispatcherRecovery log

The handlers themselves are inner classes or top-level dispatchers found in:

grep -rn "extends EventHandler\|implements EventHandler" \
  tez-dag/src/main/java/org/apache/tez/dag/app/ | head -20

Event flow diagram

flowchart TB
    subgraph "Sources of events"
        TC[Task heartbeat]
        SCH[Scheduler callback]
        TL[Container launcher]
        UC[User: submitDAG/killDAG]
        RC[Recovery on restart]
    end
    TC --> D
    SCH --> D
    TL --> D
    UC --> D
    RC --> D
    D[AsyncDispatcher] --> DH[DAGEventDispatcher]
    D --> VH[VertexEventDispatcher]
    D --> TH[TaskEventDispatcher]
    D --> AH[TaskAttemptEventDispatcher]
    D --> SH[AMSchedulerEventDispatcher]
    D --> HH[HistoryEventDispatcher]
    DH --> DI[DAGImpl]
    VH --> VI[VertexImpl]
    TH --> TI[TaskImpl]
    AH --> TAI[TaskAttemptImpl]
    SH --> TSM[TaskSchedulerManager]
    HH --> HEH[HistoryEventHandler]

Everything flows through D. There is no other way to mutate the state of a DAG, vertex, task, or attempt. See event-routing.md.


YARN-facing components

AMRM heartbeat (the resource conversation)

TaskSchedulerManager (and underneath, YarnTaskSchedulerService) maintains an AMRMClient (from YARN). This heartbeats with the RM at a configurable interval (tez.am.am-rm.heartbeat.interval-ms.max) carrying:

  • ContainerRequests for new tasks.
  • ContainerReleases for freed containers.
  • Progress percent (visible in yarn application -status).

Responses contain:

  • AllocatedContainers (RM granted).
  • CompletedContainersStatuses (RM tells us a container died).
grep -n "heartbeat\|AMRMClient\|allocate" \
  tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java | head

Container heartbeat (the liveness check)

ContainerHeartbeatHandler tracks the wall time of the last heartbeat() call from each running container's umbilical. If a container goes silent past tez.task.timeout-ms, the AM declares the container unresponsive and kills the attempt.

grep -n "ContainerHeartbeatHandler\|tez.task.timeout" \
  tez-dag/src/main/java/org/apache/tez/dag/app/ContainerHeartbeatHandler.java

Task umbilical (the per-task RPC server)

TaskCommunicatorManager runs an in-AM RPC server (the umbilical) that tasks call into for:

  • getTask() — pick up assigned task.
  • statusUpdate(...) / heartbeat(...) — progress and liveness.
  • done(...) / fatalError(...) — completion.
  • outputReady(...) / inputEvents(...) — runtime data plane.

The umbilical protocol is TezTaskUmbilicalProtocol:

find . -name "TezTaskUmbilicalProtocol.java"

Recovery: surviving an AM restart

If tez.am.am-rm.heartbeat.interval-ms.max allows it and recovery is enabled (tez.dag.recovery.enabled=true), RecoveryService writes a log of state-changing events to HDFS. On a restart (YARN gives the AM a new appAttemptId but the same appId), the new AM:

  1. Reads the recovery log under ${tez.staging-dir}/$appId/recovery/$attemptId/.
  2. Replays events into DAGImpl, VertexImpl, etc., to rebuild in-memory state up to the last durable point.
  3. Resumes execution: completed tasks remain completed, in-flight tasks are relaunched.
grep -rn "RecoveryService\|RecoveryEvent\|replayEvents" \
  tez-dag/src/main/java/org/apache/tez/dag/app/recovery/ | head

Note: recovery is per-DAG, not per-task. A vertex that was RUNNING becomes RUNNING again; tasks that completed stay completed; tasks that were in flight get fresh attempts.


Reading exercise

# Bring-up
sed -n '1,200p' tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java | head -200
grep -n "serviceInit\|serviceStart" tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java

# Handlers
grep -n "register\b" tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java | head -30

# Session vs non-session control
grep -n "isSession\|sessionMode" tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java | head -20

# Recovery hookup
grep -n "RecoveryService\|recoveryEnabled" tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java | head

Answer:

  1. In what order are the child services started in serviceStart? Why does order matter?
  2. List the first three events that flow through the dispatcher when an AM in non-session mode starts.
  3. What thread does DAGImpl.handle(DAGEvent) execute on? Is it the same thread as VertexImpl.handle(VertexEvent)?
  4. Where is the appAttemptId > 1 check that decides whether to start fresh or recover?
  5. What is the difference between DAGAppMaster.shutdown() and DAGAppMaster.serviceStop()?
  6. Find the line that emits the first "DAGAppMaster started" log statement (or its modern equivalent).

Common bugs and symptoms

SymptomRoot causeWhere to look
AM dies immediately with NPE in serviceInitMissing or wrong tez.lib.uris; jars not foundNM container log; verify HDFS perms
AM hangs forever after serviceStart in session modeNo DAGs submitted; tez.session.am.dag.submit.timeout.secs exhaustedIncrease timeout; or check why client isn't submitting
Tasks all fail with "container lost" after a long GCAM GC pause exceeded heartbeat budget; RM killed AMTune AM heap; reduce dispatcher pressure
Recovery replays but stalls in INITINGRecovery log truncated mid-vertex-initLook for SummaryEventWriter errors in prior attempt
Event dispatcher queue grows without boundA handler is doing blocking I/O on the dispatch threadTake a thread dump; verify which event is stuck
AM exits with ERROR and no DAG transitionAn uncaught exception bubbled out of an event handlergrep "Error in dispatcher thread" in AM log

Validation: prove you understand this

  1. From memory, list ten event-type→handler registrations in DAGAppMaster.
  2. Draw the event flow from TezTaskUmbilicalProtocol.heartbeat to TaskAttemptImpl.handle(TA_DONE).
  3. Reproduce a single-DAG, non-session AM bring-up on MiniTezCluster and identify the log line emitted by each child-service start.
  4. Read the RecoveryService writer and identify which event types are persisted vs in-memory-only.
  5. Explain why the dispatcher must be single-threaded and what would break if you parallelized it.