YARN Integration

The Tez AM is, from YARN's perspective, an ordinary YARN application: an ApplicationMaster running in a container, talking to the ResourceManager to request more containers, talking to NodeManagers to launch them, and writing events to a Timeline Server.

This chapter walks every YARN-facing interface Tez touches.


DAGAppMaster as a YARN AM

find tez-dag/src/main/java -name "DAGAppMaster.java"
wc -l $(find tez-dag/src/main/java -name "DAGAppMaster.java")
grep -n "main(\|serviceStart\|serviceInit" \
  $(find tez-dag/src/main/java -name "DAGAppMaster.java") | head

Boot sequence when YARN launches the AM container:

  1. NodeManager runs the AM command line (constructed by TezClientUtils), which is essentially java -cp <classpath> org.apache.tez.dag.app.DAGAppMaster.
  2. DAGAppMaster.main parses environment for ApplicationAttemptId, container ID, AM Resource, etc.
  3. Constructs the DAGAppMaster service tree (state machines, dispatchers, schedulers, ATS publisher).
  4. serviceStart() registers with the RM via AMRMClientAsync.registerApplicationMaster.
  5. Starts an RPC server for client connections (DAGClient, TezTaskUmbilicalProtocol).
  6. Waits for DAG submissions over the client RPC (or, for non-session mode, picks up the pre-submitted DAG from local disk).

Key clients owned by the AM:

ClientPurposeLibrary
AMRMClientAsyncRM heartbeat: request/release containershadoop-yarn-client
NMClientAsyncNM RPC: launch/stop containershadoop-yarn-client
TimelineClientATS event publisherhadoop-yarn-client
DFSClientHDFS access for recovery & temp fileshadoop-hdfs-client

AMRMClientAsync

grep -n "AMRMClientAsync\|addContainerRequest\|releaseAssignedContainer\|allocate" \
  $(find tez-dag/src/main/java -name "YarnTaskSchedulerService.java") | head -20

The async wrapper around AMRMClient. Tez uses it instead of the sync client so allocate-callbacks fire on a dedicated thread.

Lifecycle:

  1. Register: registerApplicationMaster(host, rpcPort, trackingUrl). This is the AM telling the RM "I'm alive, here is where to find me."
  2. Allocate loop: a background thread heartbeats every yarn.am.liveness-monitor.expiry-interval-ms / 3 (roughly). Each heartbeat the AMRM client sends:
    • Pending container requests (added via addContainerRequest).
    • Containers to release.
    • Application progress (0..1). It receives:
    • Newly allocated containers.
    • Completed container statuses.
    • Updated node reports (for blacklisting).
    • Decommissioned-node reports.
  3. Unregister: unregisterApplicationMaster(state, msg, trackingUrl) on AM shutdown.
grep -n "CallbackHandler\|onContainersAllocated\|onContainersCompleted\|onShutdownRequest\|onNodesUpdated" \
  $(find tez-dag/src/main/java -name "YarnTaskSchedulerService.java")

These callbacks run on the AMRM client's internal thread; Tez keeps them short by forwarding to its own dispatcher.


NMClientAsync and ContainerLauncherImpl

find tez-dag/src/main/java -name "ContainerLauncherImpl.java"
wc -l $(find tez-dag/src/main/java -name "ContainerLauncherImpl.java")
grep -n "NMClientAsync\|startContainerAsync\|stopContainerAsync" \
  $(find tez-dag/src/main/java -name "ContainerLauncherImpl.java")

After the RM allocates a container, Tez must tell the relevant NM to actually launch the JVM. ContainerLauncherImpl uses NMClientAsync to send startContainerAsync(container, containerLaunchContext).

ContainerLaunchContext

grep -n "buildContainerLaunchContext\|ContainerLaunchContext\|setLocalResources\|setEnvironment\|setCommands" \
  $(find tez-dag/src/main/java -name "ContainerLauncherImpl.java" \
                                -o -name "AMContainerHelpers.java")

The CLC is what NM uses to fork the JVM. It carries:

FieldWhat Tez puts there
commandsjava <jvm opts> -Dlog4j.configuration=... org.apache.tez.runtime.task.TezChild <args>
environmentCLASSPATH, JVM_PID, container ID, AM host/port
localResourcesTez tarball, user JARs, any HDFS-distributed resources
tokensDelegation tokens (HDFS, HMS, etc) for the container to use
serviceDataPer-aux-service payload (e.g. mapreduce_shuffle job secret)
grep -n "ServiceData\|JobTokenSecretManager\|shuffleSecret" \
  $(find tez-dag/src/main/java -name "*.java") | head

The serviceData map entry under key mapreduce_shuffle carries the serialized JobToken that NM's ShuffleHandler will use to authorize fetch requests — this is why mapreduce_shuffle must be configured as an NM aux-service even for Tez DAGs.


Tokens

grep -rn "AMRMToken\|ClientToAMToken\|TimelineDelegationToken" \
  tez-dag/src/main/java | head
TokenIssued byUsed forWhere it lives
AMRMTokenRM, auto-injected into AM's CredentialsAM↔RM RPCAM JVM credentials
ClientToAMTokenRM, returned to client at submitClient (DAGClient) ↔ AM RPCClient credentials
TimelineDelegationTokenTimeline ServerAM → Timeline publisherAM credentials, refreshed periodically
HDFS delegation tokenNNTasks reading/writing HDFSContainer credentials
Hive Metastore tokenHMSTasks calling HMSContainer credentials, via Hive code path

The AM is responsible for collecting all necessary delegation tokens at submit time (client-side TezClientUtils does this) and passing them to NMs in the CLC. Tokens that expire mid-DAG must be renewed by a TokenRenewer.


Log aggregation

grep -rn "log-aggregation\|LogAggregationService" \
  $(find ~/hadoop-src -name "*.java" 2>/dev/null | head -3) 2>/dev/null | head

YARN log aggregation is configured in yarn-site.xml:

<property>
  <name>yarn.log-aggregation-enable</name>
  <value>true</value>
</property>
<property>
  <name>yarn.nodemanager.remote-app-log-dir</name>
  <value>/app-logs</value>
</property>

When enabled, every container's stdout, stderr, and syslog are uploaded to HDFS under /app-logs/<user>/logs/<applicationId>/<nodeAddress> when the container exits. Retrieve with:

yarn logs -applicationId application_1234_0001 -containerId container_..._01
yarn logs -applicationId application_1234_0001 -appOwner alice

Without aggregation, logs sit in ${yarn.nodemanager.log-dirs}/application_.../container_.../ on each NM until cleaned by yarn.nodemanager.log.retain-seconds.


Timeline Server (ATS)

Tez publishes a rich event stream to ATS for post-mortem debugging.

find tez-plugins -type d -name "tez-yarn-timeline*"
ls tez-plugins/

Three flavors exist in the wild:

VersionTez plugin moduleNotes
ATSv1tez-yarn-timeline-historyOriginal; LevelDB-backed Timeline Server. Deprecated.
ATSv1.5tez-yarn-timeline-history-with-acls and tez-yarn-timeline-history-with-fsAdds entity-file staging to HDFS; reduces ATS write load.
ATSv2tez-yarn-timeline-history-with-fs + ATSv2 reader configurationHBase-backed, scalable; requires Hadoop 3.x.
grep -rn "TimelineClient\|TIMELINE_HISTORY\|HistoryEventHandler" \
  tez-plugins/tez-yarn-timeline-history*/src/main/java | head

What gets published:

  • AppLaunchedEvent
  • DAGSubmittedEvent, DAGInitializedEvent, DAGStartedEvent, DAGFinishedEvent
  • VertexInitializedEvent, VertexStartedEvent, VertexFinishedEvent
  • TaskStartedEvent, TaskFinishedEvent
  • TaskAttemptStartedEvent, TaskAttemptFinishedEvent
  • ContainerLaunchedEvent, ContainerStoppedEvent

The Tez UI (Ambari, standalone) reads these events to render the DAG view, vertex graphs, task swimlanes, and counter trees.

ls tez-ui/src/main 2>/dev/null

Configuration cheat sheet

grep -n "YARN\|ATS\|TIMELINE\|LOG_AGG" \
  tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java | head -20
KeyDefaultEffect
tez.am.am-rm.heartbeat.interval-ms.max1000Cap on AMRM heartbeat interval.
tez.am.client.am.port-rangeautoRPC port for AM client RPC.
tez.am.container.lookup.timeout-ms30000How long to wait for an NM ack before failing the launch.
tez.history.logging.service.class(varies)Which ATS plugin to use.
tez.am.tez-ui.history-url.templatetemplateWhere the UI is hosted; surfaced in DAGStatus.

yarn CLI behaviors for Tez apps

CommandBehavior on a Tez app
yarn application -listLists Tez AMs alongside MR/Spark; type tag is TEZ.
yarn application -status <appId>Shows AM state, RM tracking URL, ATS tracking URL (if configured).
yarn application -kill <appId>RM SIGKILLs the AM container; Tez state is lost (no recovery beyond what RecoveryService already wrote).
yarn logs -applicationId <appId>Streams aggregated logs of all containers — AM and TezChilds.
yarn node -listUseful for confirming aux-service mapreduce_shuffle is up on each NM.

Reading exercise

  1. grep -n "registerApplicationMaster\|unregisterApplicationMaster" \ $(find tez-dag/src/main/java -name "*.java") — find every AM-lifecycle call.
  2. grep -rn "setupContainerEnvironment\|buildContainerEnvironment" \ tez-dag/src/main/java tez-api/src/main/java | head — what environment variables does the AM pass to each container?
  3. cat $(find tez-dag/src/main/java -name "ContainerLauncherImpl.java") | head -200 — read the launch path.
  4. grep -rn "mapreduce_shuffle" tez-dag/src/main/java tez-api/src/main/java — verify the aux-service name is hard-coded.
  5. find tez-plugins -name "*.java" | xargs grep -l "TimelineEntity" | head -3 — which classes assemble ATS entities?
  6. cat $(find tez-dag/src/main/java -name "DAGAppMaster.java") | head -300 — locate serviceInit and list every service added to the composite.

Common bugs and symptoms

SymptomLikely cause
AM exits with InvalidApplicationMasterRequestExceptionAM tried to register twice or after un-register; usually a re-init bug.
Auxiliary service mapreduce_shuffle not configuredyarn-site.xml aux-services missing.
ConnectionRefused from FetcherNodeManager aux-service crashed or wrong shuffle port.
AM dies "RM expired"AMRM heartbeat thread blocked or paused for GC > expiry interval.
ATS empty for completed apptez.history.logging.service.class mis-set, or ATS not running.
yarn logs returns "Logs not aggregated"Container did not finish cleanly, or aggregation not enabled.
ClientToAMToken auth failClient and AM disagree on cluster security; check both have the same hadoop.security.authentication.

Validation: prove you understand this

  1. Trace the exact call path from DAGAppMaster.serviceStart to AMRMClientAsync.registerApplicationMaster.
  2. List the contents of the ContainerLaunchContext.serviceData map that Tez populates, and explain who reads each entry.
  3. Explain why an AM long pause for full GC can manifest as an RM expired shutdown, and which config controls the threshold.
  4. For an app with yarn.log-aggregation-enable=false and a TezChild that crashed, give the exact filesystem path on the NM where its stderr lives. Use the configured yarn.nodemanager.log-dirs as a variable.
  5. Name the three ATS plugin modules, and pick the right one for a Hadoop 3.x cluster targeting HBase-backed ATSv2.