Scheduler
The scheduler is the AM-side component that turns task launch requests into
running containers. It lives in tez-dag under
org.apache.tez.dag.app.rm.
Two-layer design:
TaskSchedulerManager— single dispatcher and router. ReceivesAMSchedulerEvents from the rest of the AM, forwards to the right scheduler instance.TaskSchedulerinstances — one per scheduler ID. In practice almost alwaysYarnTaskSchedulerService(production) orLocalTaskSchedulerService(tez.local.mode=true). External pluggable schedulers (LLAP) also slot in here.
ls tez-dag/src/main/java/org/apache/tez/dag/app/rm/
TaskSchedulerManager
find tez-dag/src/main/java -name "TaskSchedulerManager.java"
wc -l $(find tez-dag/src/main/java -name "TaskSchedulerManager.java")
Implements EventHandler<AMSchedulerEvent> and is wired into the AM
AsyncDispatcher. Every scheduling decision in the AM starts by enqueuing an
AMSchedulerEvent.
Event types
find tez-dag/src/main/java -name "AMSchedulerEvent*.java"
grep -rn "extends AMSchedulerEvent" tez-dag/src/main/java
| Event | Source | Purpose |
|---|---|---|
AMSchedulerEventTALaunchRequest | TaskAttemptImpl after a TA is ready to schedule | Ask scheduler to launch this attempt. |
AMSchedulerEventTAStateUpdated | TaskAttemptImpl on completion | Notify scheduler the container is now releasable. |
AMSchedulerEventContainerCompleted | YARN RM callback | RM told us a container died. |
AMSchedulerEventDeallocateContainer | various | Force-release a held container. |
AMSchedulerEventNodeBlacklistUpdate | NodeTracker | Add/remove node from blacklist. |
AMSchedulerEventDAGStart, AMSchedulerEventVertexStateUpdated | DAGImpl, VertexImpl | DAG lifecycle hints (drives priority adjustments). |
TaskSchedulerManager.handle(event) switches on event type and forwards via
getTaskScheduler(event.getSchedulerId()).handleEvent(...).
YarnTaskSchedulerService
find tez-dag/src/main/java -name "YarnTaskSchedulerService.java"
wc -l $(find tez-dag/src/main/java -name "YarnTaskSchedulerService.java")
This is where Tez talks to YARN. Owns:
AMRMClientAsync— async RM heartbeat client.Map<Priority, BlockingQueue<CookieContainerRequest>>— outstanding requests, bucketed by priority.Map<ContainerId, HeldContainer>— currently-assigned containers (see container-reuse.md).- A
DelayedContainerManagerthread that releases idle reused containers.
Request flow
sequenceDiagram
participant TA as TaskAttemptImpl
participant TSM as TaskSchedulerManager
participant Y as YarnTaskSchedulerService
participant RM as YARN RM
TA->>TSM: AMSchedulerEventTALaunchRequest
TSM->>Y: allocateTask(...)
Y->>Y: build CookieContainerRequest (priority, resource, locality)
Y->>RM: addContainerRequest (via AMRMClientAsync)
Note over RM: scheduler matches request to a node
RM-->>Y: onContainersAllocated([Container])
Y->>Y: assignContainer() — match to a pending request
Y->>TSM: containerAllocated(taskAttempt, container)
TSM->>TA: TAEventContainerAssigned
Matching: priority + locality
grep -n "assignContainer\|matchContainerToRequest\|getMatchingRequests" \
$(find tez-dag/src/main/java -name "YarnTaskSchedulerService.java") | head -20
When a container arrives, assignContainer walks pending requests at the
container's priority. For each:
- NODE_LOCAL — container's node matches a hint host of the request.
- RACK_LOCAL — same rack but different host.
- ANY — locality wildcard.
AMRMClientAsync already biases matches by locality on the YARN side; this
pass is the AM-side tiebreaker when multiple requests are eligible.
| Hint level | YARN request | Tez match |
|---|---|---|
NODE_LOCAL | host + rack + * | accepts container on the exact host |
RACK_LOCAL | rack + * | accepts container on the same rack |
ANY | * only | accepts any container at this priority |
TaskLocationHint is set on TaskAttemptImpl either from the input split
(MRInput), the VertexLocationHint (provided by VertexManager), or left
null.
Priorities
grep -n "Priority\|priorityForVertex" \
$(find tez-dag/src/main/java -name "YarnTaskSchedulerService.java" \
-o -name "DAGImpl.java") | head
Tez assigns each vertex a priority class derived from its topological order in the DAG; downstream vertices have higher numeric priority (lower priority value), so that source tasks complete first and free their containers for downstream consumers. Priority is the primary key for container reuse matching as well.
RM callbacks
grep -n "AMRMClientAsync.CallbackHandler\|onContainersAllocated\|onContainersCompleted\|onShutdownRequest" \
$(find tez-dag/src/main/java -name "YarnTaskSchedulerService.java")
YarnTaskSchedulerService implements AMRMClientAsync.CallbackHandler:
onContainersAllocated(List<Container>)— enqueue for assignment.onContainersCompleted(List<ContainerStatus>)— translate exit status intoAMSchedulerEventContainerCompleted.onShutdownRequest()— RM asked AM to die (eg lost AM attempt).onNodesUpdated(List<NodeReport>)— update node health for blacklisting.getProgress()— AM tells RM its overall DAG progress.
LocalTaskSchedulerService
find tez-dag/src/main/java -name "LocalTaskSchedulerService.java"
wc -l $(find tez-dag/src/main/java -name "LocalTaskSchedulerService.java")
Same contract as YarnTaskSchedulerService but bypasses YARN:
- A bounded
ExecutorServiceofLocalContainerworker threads stands in for the YARN cluster. allocateTaskinstantly synthesizes a fakeContainerand dispatchescontainerAllocated.- The container launcher (
LocalContainerLauncher) runsTezChildin the same JVM on the executor.
Used by tez.local.mode=true and MiniTezCluster tests of certain flavors.
See local-mode.md.
Pluggable schedulers
grep -n "tez.am.task.scheduler.classes\|TASK_SCHEDULER_SERVICE_CLASS" \
tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Configuration:
tez.am.task.scheduler.classes = <comma-separated FQNs>
TaskSchedulerManager instantiates one per ID. Hive's LLAP plugs in a
custom scheduler that talks to LLAP daemons instead of YARN.
Walkthrough: launching a single task attempt
VertexImpldecides to schedule taskT.k(viaVertexManageror scaling).TaskImplcreatesTaskAttemptImplfor attempt 0 → stateNEW.TaskAttemptImpltransitions toSTART_WAIT, dispatchesAMSchedulerEventTALaunchRequestwithTaskLocationHintand capability.TaskSchedulerManager.handleroutes to the configured scheduler.YarnTaskSchedulerService.allocateTaskconstructsCookieContainerRequest(priority, capability, hosts, racks, relaxLocality=true)and callsAMRMClientAsync.addContainerRequest.- RM schedules → callback
onContainersAllocated([c]). assignContainer(c)finds the matching pending request, callsinformAppAboutAssignment→TaskSchedulerManager.containerAllocated.TaskSchedulerManagerdispatchesAMContainerEventAssignTAtoAMContainerImpl, thenTAEventContainerAssignedtoTaskAttemptImpl.AMContainerImplasksContainerLauncherImplto launch the container (or reuse a held one).TezChildstarts (or accepts new task via reuse loop). The umbilical fires up; the attempt transitions toRUNNING.
sequenceDiagram
participant V as VertexImpl
participant TA as TaskAttemptImpl
participant TSM as TaskSchedulerManager
participant Y as YarnTaskSchedulerService
participant AC as AMContainerImpl
participant RM as YARN RM
V->>TA: schedule
TA->>TSM: AMSchedulerEventTALaunchRequest
TSM->>Y: allocateTask
Y->>RM: addContainerRequest
RM-->>Y: onContainersAllocated
Y->>TSM: containerAllocated
TSM->>AC: AMContainerEventAssignTA
TSM->>TA: TAEventContainerAssigned
AC->>RM: start container (via NMClient)
Reading exercise
cat $(find tez-dag/src/main/java -name "TaskSchedulerManager.java") | head -200— list the event types handled.grep -n "addContainerRequest\|removeContainerRequest\|releaseAssignedContainer" \ $(find tez-dag/src/main/java -name "YarnTaskSchedulerService.java")— find all RM client interactions.grep -n "NODE_LOCAL\|RACK_LOCAL\|OFF_SWITCH\|ANY" \ $(find tez-dag/src/main/java -name "YarnTaskSchedulerService.java")— how is locality classified?grep -n "CookieContainerRequest" $(find tez-dag/src/main/java -name "*.java" | grep rm)— what is the "cookie"? (Hint: opaque payload to thread reuse data throughAMRMClient.)wc -l $(find tez-dag/src/main/java/org/apache/tez/dag/app/rm -name "*.java")— which file dominates? LikelyYarnTaskSchedulerService≫ everything.grep -n "Priority.newInstance\|priority(" \ $(find tez-dag/src/main/java -name "VertexImpl.java" -o -name "DAGImpl.java")— where is per-vertex priority computed?
Common bugs and symptoms
| Symptom | Likely cause |
|---|---|
| AM stuck "0 containers running" | RM has no capacity at requested priority; queue at capacity. Check yarn application -status. |
| All tasks scheduled OFF_SWITCH | TaskLocationHint not propagated through VertexManager. |
Tasks fail with Container released by AM | YarnTaskSchedulerService released a container that an attempt still owned — usually a state machine race; see failure-handling.md. |
| Reuse not happening | Priorities mismatch between completed and pending tasks; check tez.am.container.reuse.locality.delay-allocation-millis. |
| AM heartbeat thread blocked | A scheduler callback (onContainersAllocated) ran a slow blocking op on the RM client thread. Keep callbacks light. |
IllegalStateException: Priority N not registered | allocateTask called for a vertex whose priority class was never bootstrapped. |
Validation: prove you understand this
- Walk an
AMSchedulerEventTALaunchRequestfrom dispatch inTaskAttemptImplto a YARNAMRMClient.addContainerRequestcall. Cite file paths. - Explain the difference between priority (YARN concept) and DAG priority (Tez concept) and where Tez sets each.
- Given a 100-task
Vertex Afollowed by a 10-taskVertex B, what priority class does each get and why? - Describe how
YarnTaskSchedulerServicedecides between two pending requests at the same priority when a container arrives. - Identify the single method on
YarnTaskSchedulerServicethat the RM callback thread invokes when containers become available. Cite file:line.