Scheduler

The scheduler is the AM-side component that turns task launch requests into running containers. It lives in tez-dag under org.apache.tez.dag.app.rm.

Two-layer design:

  • TaskSchedulerManager — single dispatcher and router. Receives AMSchedulerEvents from the rest of the AM, forwards to the right scheduler instance.
  • TaskScheduler instances — one per scheduler ID. In practice almost always YarnTaskSchedulerService (production) or LocalTaskSchedulerService (tez.local.mode=true). External pluggable schedulers (LLAP) also slot in here.
ls tez-dag/src/main/java/org/apache/tez/dag/app/rm/

TaskSchedulerManager

find tez-dag/src/main/java -name "TaskSchedulerManager.java"
wc -l $(find tez-dag/src/main/java -name "TaskSchedulerManager.java")

Implements EventHandler<AMSchedulerEvent> and is wired into the AM AsyncDispatcher. Every scheduling decision in the AM starts by enqueuing an AMSchedulerEvent.

Event types

find tez-dag/src/main/java -name "AMSchedulerEvent*.java"
grep -rn "extends AMSchedulerEvent" tez-dag/src/main/java
EventSourcePurpose
AMSchedulerEventTALaunchRequestTaskAttemptImpl after a TA is ready to scheduleAsk scheduler to launch this attempt.
AMSchedulerEventTAStateUpdatedTaskAttemptImpl on completionNotify scheduler the container is now releasable.
AMSchedulerEventContainerCompletedYARN RM callbackRM told us a container died.
AMSchedulerEventDeallocateContainervariousForce-release a held container.
AMSchedulerEventNodeBlacklistUpdateNodeTrackerAdd/remove node from blacklist.
AMSchedulerEventDAGStart, AMSchedulerEventVertexStateUpdatedDAGImpl, VertexImplDAG lifecycle hints (drives priority adjustments).

TaskSchedulerManager.handle(event) switches on event type and forwards via getTaskScheduler(event.getSchedulerId()).handleEvent(...).


YarnTaskSchedulerService

find tez-dag/src/main/java -name "YarnTaskSchedulerService.java"
wc -l $(find tez-dag/src/main/java -name "YarnTaskSchedulerService.java")

This is where Tez talks to YARN. Owns:

  • AMRMClientAsync — async RM heartbeat client.
  • Map<Priority, BlockingQueue<CookieContainerRequest>> — outstanding requests, bucketed by priority.
  • Map<ContainerId, HeldContainer> — currently-assigned containers (see container-reuse.md).
  • A DelayedContainerManager thread that releases idle reused containers.

Request flow

sequenceDiagram
  participant TA as TaskAttemptImpl
  participant TSM as TaskSchedulerManager
  participant Y as YarnTaskSchedulerService
  participant RM as YARN RM
  TA->>TSM: AMSchedulerEventTALaunchRequest
  TSM->>Y: allocateTask(...)
  Y->>Y: build CookieContainerRequest (priority, resource, locality)
  Y->>RM: addContainerRequest (via AMRMClientAsync)
  Note over RM: scheduler matches request to a node
  RM-->>Y: onContainersAllocated([Container])
  Y->>Y: assignContainer() — match to a pending request
  Y->>TSM: containerAllocated(taskAttempt, container)
  TSM->>TA: TAEventContainerAssigned

Matching: priority + locality

grep -n "assignContainer\|matchContainerToRequest\|getMatchingRequests" \
  $(find tez-dag/src/main/java -name "YarnTaskSchedulerService.java") | head -20

When a container arrives, assignContainer walks pending requests at the container's priority. For each:

  1. NODE_LOCAL — container's node matches a hint host of the request.
  2. RACK_LOCAL — same rack but different host.
  3. ANY — locality wildcard.

AMRMClientAsync already biases matches by locality on the YARN side; this pass is the AM-side tiebreaker when multiple requests are eligible.

Hint levelYARN requestTez match
NODE_LOCALhost + rack + *accepts container on the exact host
RACK_LOCALrack + *accepts container on the same rack
ANY* onlyaccepts any container at this priority

TaskLocationHint is set on TaskAttemptImpl either from the input split (MRInput), the VertexLocationHint (provided by VertexManager), or left null.

Priorities

grep -n "Priority\|priorityForVertex" \
  $(find tez-dag/src/main/java -name "YarnTaskSchedulerService.java" \
                                -o -name "DAGImpl.java") | head

Tez assigns each vertex a priority class derived from its topological order in the DAG; downstream vertices have higher numeric priority (lower priority value), so that source tasks complete first and free their containers for downstream consumers. Priority is the primary key for container reuse matching as well.

RM callbacks

grep -n "AMRMClientAsync.CallbackHandler\|onContainersAllocated\|onContainersCompleted\|onShutdownRequest" \
  $(find tez-dag/src/main/java -name "YarnTaskSchedulerService.java")

YarnTaskSchedulerService implements AMRMClientAsync.CallbackHandler:

  • onContainersAllocated(List<Container>) — enqueue for assignment.
  • onContainersCompleted(List<ContainerStatus>) — translate exit status into AMSchedulerEventContainerCompleted.
  • onShutdownRequest() — RM asked AM to die (eg lost AM attempt).
  • onNodesUpdated(List<NodeReport>) — update node health for blacklisting.
  • getProgress() — AM tells RM its overall DAG progress.

LocalTaskSchedulerService

find tez-dag/src/main/java -name "LocalTaskSchedulerService.java"
wc -l $(find tez-dag/src/main/java -name "LocalTaskSchedulerService.java")

Same contract as YarnTaskSchedulerService but bypasses YARN:

  • A bounded ExecutorService of LocalContainer worker threads stands in for the YARN cluster.
  • allocateTask instantly synthesizes a fake Container and dispatches containerAllocated.
  • The container launcher (LocalContainerLauncher) runs TezChild in the same JVM on the executor.

Used by tez.local.mode=true and MiniTezCluster tests of certain flavors. See local-mode.md.


Pluggable schedulers

grep -n "tez.am.task.scheduler.classes\|TASK_SCHEDULER_SERVICE_CLASS" \
  tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java

Configuration:

tez.am.task.scheduler.classes = <comma-separated FQNs>

TaskSchedulerManager instantiates one per ID. Hive's LLAP plugs in a custom scheduler that talks to LLAP daemons instead of YARN.


Walkthrough: launching a single task attempt

  1. VertexImpl decides to schedule task T.k (via VertexManager or scaling).
  2. TaskImpl creates TaskAttemptImpl for attempt 0 → state NEW.
  3. TaskAttemptImpl transitions to START_WAIT, dispatches AMSchedulerEventTALaunchRequest with TaskLocationHint and capability.
  4. TaskSchedulerManager.handle routes to the configured scheduler.
  5. YarnTaskSchedulerService.allocateTask constructs CookieContainerRequest(priority, capability, hosts, racks, relaxLocality=true) and calls AMRMClientAsync.addContainerRequest.
  6. RM schedules → callback onContainersAllocated([c]).
  7. assignContainer(c) finds the matching pending request, calls informAppAboutAssignmentTaskSchedulerManager.containerAllocated.
  8. TaskSchedulerManager dispatches AMContainerEventAssignTA to AMContainerImpl, then TAEventContainerAssigned to TaskAttemptImpl.
  9. AMContainerImpl asks ContainerLauncherImpl to launch the container (or reuse a held one).
  10. TezChild starts (or accepts new task via reuse loop). The umbilical fires up; the attempt transitions to RUNNING.
sequenceDiagram
  participant V as VertexImpl
  participant TA as TaskAttemptImpl
  participant TSM as TaskSchedulerManager
  participant Y as YarnTaskSchedulerService
  participant AC as AMContainerImpl
  participant RM as YARN RM
  V->>TA: schedule
  TA->>TSM: AMSchedulerEventTALaunchRequest
  TSM->>Y: allocateTask
  Y->>RM: addContainerRequest
  RM-->>Y: onContainersAllocated
  Y->>TSM: containerAllocated
  TSM->>AC: AMContainerEventAssignTA
  TSM->>TA: TAEventContainerAssigned
  AC->>RM: start container (via NMClient)

Reading exercise

  1. cat $(find tez-dag/src/main/java -name "TaskSchedulerManager.java") | head -200 — list the event types handled.
  2. grep -n "addContainerRequest\|removeContainerRequest\|releaseAssignedContainer" \ $(find tez-dag/src/main/java -name "YarnTaskSchedulerService.java") — find all RM client interactions.
  3. grep -n "NODE_LOCAL\|RACK_LOCAL\|OFF_SWITCH\|ANY" \ $(find tez-dag/src/main/java -name "YarnTaskSchedulerService.java") — how is locality classified?
  4. grep -n "CookieContainerRequest" $(find tez-dag/src/main/java -name "*.java" | grep rm) — what is the "cookie"? (Hint: opaque payload to thread reuse data through AMRMClient.)
  5. wc -l $(find tez-dag/src/main/java/org/apache/tez/dag/app/rm -name "*.java") — which file dominates? Likely YarnTaskSchedulerService ≫ everything.
  6. grep -n "Priority.newInstance\|priority(" \ $(find tez-dag/src/main/java -name "VertexImpl.java" -o -name "DAGImpl.java") — where is per-vertex priority computed?

Common bugs and symptoms

SymptomLikely cause
AM stuck "0 containers running"RM has no capacity at requested priority; queue at capacity. Check yarn application -status.
All tasks scheduled OFF_SWITCHTaskLocationHint not propagated through VertexManager.
Tasks fail with Container released by AMYarnTaskSchedulerService released a container that an attempt still owned — usually a state machine race; see failure-handling.md.
Reuse not happeningPriorities mismatch between completed and pending tasks; check tez.am.container.reuse.locality.delay-allocation-millis.
AM heartbeat thread blockedA scheduler callback (onContainersAllocated) ran a slow blocking op on the RM client thread. Keep callbacks light.
IllegalStateException: Priority N not registeredallocateTask called for a vertex whose priority class was never bootstrapped.

Validation: prove you understand this

  1. Walk an AMSchedulerEventTALaunchRequest from dispatch in TaskAttemptImpl to a YARN AMRMClient.addContainerRequest call. Cite file paths.
  2. Explain the difference between priority (YARN concept) and DAG priority (Tez concept) and where Tez sets each.
  3. Given a 100-task Vertex A followed by a 10-task Vertex B, what priority class does each get and why?
  4. Describe how YarnTaskSchedulerService decides between two pending requests at the same priority when a container arrives.
  5. Identify the single method on YarnTaskSchedulerService that the RM callback thread invokes when containers become available. Cite file:line.