DAGAppMaster
DAGAppMaster is Tez's YARN ApplicationMaster: a single JVM, launched by the
YARN ResourceManager, that owns one or more DAGs over its lifetime. This
chapter describes its bring-up, its dispatcher topology, its YARN-facing
heartbeats, and the recovery service that lets it restart after a crash.
After this chapter you should be able to map any AM log line in the first 60
seconds of operation to a method in DAGAppMaster.java.
Files to open
tez-dag/src/main/java/org/apache/tez/dag/app/
DAGAppMaster.java (the AM main class)
TaskCommunicatorManager.java (task umbilical multiplexer)
ContainerHeartbeatHandler.java (container liveness)
rm/
TaskSchedulerManager.java (one per scheduler instance)
YarnTaskSchedulerService.java (the default scheduler impl)
container/
AMContainerImpl.java (container state machine)
launcher/
ContainerLauncherManager.java
DagContainerLauncher.java (varies by version)
LocalContainerLauncher.java (in-process)
recovery/
RecoveryService.java (event log; restart path)
dag/impl/
DAGImpl.java
VertexImpl.java
TaskImpl.java
TaskAttemptImpl.java
Bring-up: serviceInit and serviceStart
DAGAppMaster extends AbstractService. YARN starts it with a main; control
flows:
main()
-> DAGAppMaster.create / new DAGAppMaster(...)
-> init(conf)
-> serviceInit(conf)
- parse appAttemptId
- load credentials
- construct AsyncDispatcher
- construct + register child services: TaskSchedulerManager,
ContainerLauncherManager, TaskCommunicatorManager,
RecoveryService (if enabled), HistoryEventHandler, ATSHook
- register event handlers on the dispatcher
-> start()
-> serviceStart()
- start child services (they each start their own threads)
- if not session mode: handle the inline DAG plan
- if session mode: enter idle loop, wait for submitDAG RPC
Inspect the boundaries:
grep -n "serviceInit\|serviceStart\|serviceStop" \
tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
The AsyncDispatcher and registered handlers
DAGAppMaster builds one AsyncDispatcher (from hadoop-yarn-common) and
registers a handler per event type. The contract is:
- Each handler runs on a single dispatch thread.
- Handlers must be fast (no blocking I/O); they should mutate state and emit follow-on events.
Find the registrations:
grep -n "dispatcher.register\|register(.*\.class" \
tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Typical registrations (names approximate by version):
| Event type | Handler class | Owned subsystem |
|---|---|---|
DAGEventType | DAGEventDispatcher (forwards to DAGImpl.handle) | DAG lifecycle |
VertexEventType | VertexEventDispatcher (forwards to VertexImpl.handle) | Vertex lifecycle |
TaskEventType | TaskEventDispatcher (forwards to TaskImpl.handle) | Task lifecycle |
TaskAttemptEventType | TaskAttemptEventDispatcher (forwards to TaskAttemptImpl.handle) | Attempt lifecycle |
AMSchedulerEventType | AMSchedulerEventDispatcher (forwards to TaskSchedulerManager) | Scheduling |
AMContainerEventType | container event dispatcher | Container state |
AMNodeEventType | node event dispatcher | Node tracking |
ContainerLauncherEventType | launcher dispatcher | Launch/stop containers |
TaskCommunicatorEventType | comms dispatcher | Per-launcher umbilical |
HistoryEventType | history event dispatcher | ATS/log publication |
SpeculatorEventType | speculator dispatcher | Speculation (if enabled) |
DAGAppMasterEventType | AM itself | Lifecycle (e.g., shutdown) |
RecoveryEventType | recovery dispatcher | Recovery log |
The handlers themselves are inner classes or top-level dispatchers found in:
grep -rn "extends EventHandler\|implements EventHandler" \
tez-dag/src/main/java/org/apache/tez/dag/app/ | head -20
Event flow diagram
flowchart TB
subgraph "Sources of events"
TC[Task heartbeat]
SCH[Scheduler callback]
TL[Container launcher]
UC[User: submitDAG/killDAG]
RC[Recovery on restart]
end
TC --> D
SCH --> D
TL --> D
UC --> D
RC --> D
D[AsyncDispatcher] --> DH[DAGEventDispatcher]
D --> VH[VertexEventDispatcher]
D --> TH[TaskEventDispatcher]
D --> AH[TaskAttemptEventDispatcher]
D --> SH[AMSchedulerEventDispatcher]
D --> HH[HistoryEventDispatcher]
DH --> DI[DAGImpl]
VH --> VI[VertexImpl]
TH --> TI[TaskImpl]
AH --> TAI[TaskAttemptImpl]
SH --> TSM[TaskSchedulerManager]
HH --> HEH[HistoryEventHandler]
Everything flows through D. There is no other way to mutate the state
of a DAG, vertex, task, or attempt. See event-routing.md.
YARN-facing components
AMRM heartbeat (the resource conversation)
TaskSchedulerManager (and underneath, YarnTaskSchedulerService) maintains
an AMRMClient (from YARN). This heartbeats with the RM at a configurable
interval (tez.am.am-rm.heartbeat.interval-ms.max) carrying:
ContainerRequests for new tasks.ContainerReleases for freed containers.- Progress percent (visible in
yarn application -status).
Responses contain:
AllocatedContainers(RM granted).CompletedContainersStatuses(RM tells us a container died).
grep -n "heartbeat\|AMRMClient\|allocate" \
tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java | head
Container heartbeat (the liveness check)
ContainerHeartbeatHandler tracks the wall time of the last
heartbeat() call from each running container's umbilical. If a container
goes silent past tez.task.timeout-ms, the AM declares the container
unresponsive and kills the attempt.
grep -n "ContainerHeartbeatHandler\|tez.task.timeout" \
tez-dag/src/main/java/org/apache/tez/dag/app/ContainerHeartbeatHandler.java
Task umbilical (the per-task RPC server)
TaskCommunicatorManager runs an in-AM RPC server (the umbilical) that tasks
call into for:
getTask()— pick up assigned task.statusUpdate(...)/heartbeat(...)— progress and liveness.done(...)/fatalError(...)— completion.outputReady(...)/inputEvents(...)— runtime data plane.
The umbilical protocol is TezTaskUmbilicalProtocol:
find . -name "TezTaskUmbilicalProtocol.java"
Recovery: surviving an AM restart
If tez.am.am-rm.heartbeat.interval-ms.max allows it and recovery is
enabled (tez.dag.recovery.enabled=true), RecoveryService writes a log of
state-changing events to HDFS. On a restart (YARN gives the AM a new
appAttemptId but the same appId), the new AM:
- Reads the recovery log under
${tez.staging-dir}/$appId/recovery/$attemptId/. - Replays events into
DAGImpl,VertexImpl, etc., to rebuild in-memory state up to the last durable point. - Resumes execution: completed tasks remain completed, in-flight tasks are relaunched.
grep -rn "RecoveryService\|RecoveryEvent\|replayEvents" \
tez-dag/src/main/java/org/apache/tez/dag/app/recovery/ | head
Note: recovery is per-DAG, not per-task. A vertex that was RUNNING
becomes RUNNING again; tasks that completed stay completed; tasks that
were in flight get fresh attempts.
Reading exercise
# Bring-up
sed -n '1,200p' tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java | head -200
grep -n "serviceInit\|serviceStart" tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
# Handlers
grep -n "register\b" tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java | head -30
# Session vs non-session control
grep -n "isSession\|sessionMode" tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java | head -20
# Recovery hookup
grep -n "RecoveryService\|recoveryEnabled" tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java | head
Answer:
- In what order are the child services started in
serviceStart? Why does order matter? - List the first three events that flow through the dispatcher when an AM in non-session mode starts.
- What thread does
DAGImpl.handle(DAGEvent)execute on? Is it the same thread asVertexImpl.handle(VertexEvent)? - Where is the
appAttemptId > 1check that decides whether to start fresh or recover? - What is the difference between
DAGAppMaster.shutdown()andDAGAppMaster.serviceStop()? - Find the line that emits the first
"DAGAppMaster started"log statement (or its modern equivalent).
Common bugs and symptoms
| Symptom | Root cause | Where to look |
|---|---|---|
AM dies immediately with NPE in serviceInit | Missing or wrong tez.lib.uris; jars not found | NM container log; verify HDFS perms |
AM hangs forever after serviceStart in session mode | No DAGs submitted; tez.session.am.dag.submit.timeout.secs exhausted | Increase timeout; or check why client isn't submitting |
| Tasks all fail with "container lost" after a long GC | AM GC pause exceeded heartbeat budget; RM killed AM | Tune AM heap; reduce dispatcher pressure |
Recovery replays but stalls in INITING | Recovery log truncated mid-vertex-init | Look for SummaryEventWriter errors in prior attempt |
| Event dispatcher queue grows without bound | A handler is doing blocking I/O on the dispatch thread | Take a thread dump; verify which event is stuck |
AM exits with ERROR and no DAG transition | An uncaught exception bubbled out of an event handler | grep "Error in dispatcher thread" in AM log |
Validation: prove you understand this
- From memory, list ten event-type→handler registrations in
DAGAppMaster. - Draw the event flow from
TezTaskUmbilicalProtocol.heartbeattoTaskAttemptImpl.handle(TA_DONE). - Reproduce a single-DAG, non-session AM bring-up on
MiniTezClusterand identify the log line emitted by each child-service start. - Read the
RecoveryServicewriter and identify which event types are persisted vs in-memory-only. - Explain why the dispatcher must be single-threaded and what would break if you parallelized it.