Stage 5 — Scheduler Bugs
What this stage teaches
Stage 5 takes you out of the per-vertex event loop and into the AM-wide scheduling layer. You learn:
- The split between
TaskSchedulerManager(the multi-scheduler dispatch shim) and the concreteYarnTaskSchedulerService(the AMRMClient-backed scheduler used in production), plus the alternativeLocalTaskSchedulerServiceused by local mode and tests. - How container requests, allocations, and releases flow through
AMRMClient, including theheldContainerlifecycle and the canonical leak: a held container that is never returned to YARN after anonErrorcallback fires. - Locality miscounts: the bookkeeping mistake where a node-local allocation is
charged as rack-local in
getAvailableContainers, distorting the affinity signal sent back to the AMRM protocol. - Priority inversion: a high-priority request stuck behind a low-priority pending list because the request was added to the wrong queue.
- Container behaviour across AM failover: when the AM restarts with
tez.am.am-rm.heartbeat.interval-msretries, what should and should not be re-claimed. - How to write a
MiniTezCluster-backed integration test, and when the cheaperAMRMClientstub pattern is sufficient.
Patches are 50–500 lines, often with a non-trivial test that needs
MiniTezCluster or MiniYARNCluster. Reviewers are strict: a scheduler patch
without a deterministic test is rejected on sight.
Reading order
tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.javatez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.javatez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.javatez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java- The deep dive scheduler.
cd ~/tez-src
wc -l tez-dag/src/main/java/org/apache/tez/dag/app/rm/*.java
If YarnTaskSchedulerService.java is over 2000 lines, that is expected.
JIRA filter to find candidates
project = TEZ
AND component in ("tez-dag")
AND resolution = Unresolved
AND (text ~ "container leak" OR text ~ "scheduler" OR text ~ "locality"
OR text ~ "priority" OR text ~ "AMRMClient" OR text ~ "heldContainer"
OR description ~ "onError")
ORDER BY priority DESC, updated DESC
A second filter for AM-failover-related candidates:
project = TEZ AND resolution = Unresolved AND (text ~ "failover" OR text ~ "AM restart")
AND component in ("tez-dag")
Walked example A — heldContainer never released after onError
Symptom: an operator reports their long-running session AM holds onto containers
indefinitely after a transient RM disconnect. yarn application -status shows
allocated containers far above what the running DAG should need.
Step 1 — Locate the leak path
cd ~/tez-src
grep -n "onError\|heldContainer\|releaseContainer" \
tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java | head -30
You find a class field:
private final Map<ContainerId, HeldContainer> heldContainers = new HashMap<>();
and an onError(Throwable t) callback (inherited from AMRMClientAsync.CallbackHandler):
@Override
public void onError(Throwable t) {
LOG.error("AMRMClient error", t);
appContext.getEventHandler().handle(
new DAGAppMasterEventSchedulingServiceError(t));
}
The bug: heldContainers is populated by onContainersAllocated but never
drained in onError. When the AM recovers and the RM reissues the same
container IDs, the map already has stale entries, and the new allocations are
silently dropped (the bookkeeping path checks heldContainers.containsKey(id)).
The containers are effectively leaked.
Step 2 — Diff
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@
@Override
public void onError(Throwable t) {
LOG.error("AMRMClient error", t);
+ // Before we tear down, release any containers we still hold. If we don't,
+ // a recovering RM will re-issue the same ContainerIds and the dedup
+ // bookkeeping below will silently drop the new allocations. See TEZ-XXXX.
+ synchronized (heldContainers) {
+ for (HeldContainer hc : heldContainers.values()) {
+ try {
+ amRmClient.releaseAssignedContainer(hc.getContainer().getId());
+ } catch (Exception releaseErr) {
+ LOG.warn("Failed to release {} during onError cleanup: {}",
+ hc.getContainer().getId(), releaseErr.getMessage());
+ }
+ }
+ heldContainers.clear();
+ }
appContext.getEventHandler().handle(
new DAGAppMasterEventSchedulingServiceError(t));
}
Rules in this diff:
- The cleanup runs before the event is dispatched. Once the event fires, the AM may shut down handlers, and any release call would race.
- The cleanup is synchronized on the same monitor that other writers to
heldContainersuse. Find that monitor first; if there is none, you have a second bug to file separately. Do not introduce a new lock in this patch. - Each release is wrapped individually. One failure must not prevent the others from being released.
- Logged failures are
WARN, notERROR. The AM is already in an error path; doubling the severity drowns the originating cause.
Step 3 — Test with AMRMClient stub
A full MiniTezCluster test for this is overkill. Stub the client:
@Test(timeout = 10000)
public void testOnErrorReleasesHeldContainers() throws Exception {
AMRMClientAsync<CookieContainerRequest> mockRm =
mock(AMRMClientAsync.class);
YarnTaskSchedulerService scheduler =
new YarnTaskSchedulerService(mockAppCallbackHandler, appContext, mockRm);
scheduler.serviceInit(new Configuration());
scheduler.serviceStart();
// simulate two allocations
Container c1 = newContainer("container_1");
Container c2 = newContainer("container_2");
scheduler.onContainersAllocated(Arrays.asList(c1, c2));
// fire onError
scheduler.onError(new RuntimeException("RM gone"));
// verify both were released
verify(mockRm).releaseAssignedContainer(c1.getId());
verify(mockRm).releaseAssignedContainer(c2.getId());
assertTrue(scheduler.getHeldContainersForTest().isEmpty());
}
The pattern uses Mockito on the AMRM client interface, not on the
YarnTaskSchedulerService itself. getHeldContainersForTest() is a
package-private accessor you add in the same patch with a // VisibleForTesting
comment.
Step 4 — Build, test, sign off
cd ~/tez-src
mvn -pl tez-dag test -Dtest=TestYarnTaskSchedulerService -q 2>&1 | tail -40
mvn -pl tez-tests test -Dtest=TestExternalTezServices -q 2>&1 | tail -10
The integration test (tez-tests) takes 5–10 minutes; skip it on the first
local iteration but run it before the patch submission.
Walked example B — locality miscount
Symptom: a debug log shows node-local: 4, rack-local: 12, off-switch: 0 for a
vertex whose input splits should give 14 node-local containers. The bookkeeping
is off.
Locating the counter
cd ~/tez-src
grep -n "nodeLocal\|rackLocal\|offSwitch" \
tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java | head -20
You find an assignContainer(...) path that compares the allocated host against
the request's preferred host. The bug: the comparison is host.equals(req.host),
but host arrives as node-1.cluster.local while req.host is node-1. The
short-form comparison fails, the allocation is miscounted as rack-local, and
the affinity penalty cascades into the next request.
Diff
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@
- if (host.equals(request.getHosts()[0])) {
+ // Hosts may be reported as FQDNs by the RM but as short names by the
+ // caller-supplied hint. Compare on the leading label to keep both forms
+ // equivalent. See TEZ-XXXX.
+ if (hostMatches(host, request.getHosts()[0])) {
nodeLocalCount.incrementAndGet();
} else if (rackOf(host).equals(rackOf(request.getHosts()[0]))) {
rackLocalCount.incrementAndGet();
} else {
offSwitchCount.incrementAndGet();
}
}
+
+ static boolean hostMatches(String a, String b) {
+ if (a == null || b == null) return false;
+ return a.equals(b)
+ || leadingLabel(a).equals(leadingLabel(b));
+ }
+
+ private static String leadingLabel(String h) {
+ int dot = h.indexOf('.');
+ return dot < 0 ? h : h.substring(0, dot);
+ }
The accompanying test asserts the counter under both FQDN and short-name forms.
Walked example C — priority inversion
Symptom: a high-priority request (priority 0, AM speculation) waits indefinitely behind a long queue of priority-5 requests, even though the scheduler has capacity.
Root cause: the request was added to the queue keyed by priority string, not
priority int. "0" sorts after "10" in string ordering. The fix is to use
an Integer key or a TreeMap with a numeric comparator. The diff and test
follow the same pattern as above; the file is
tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
near the requestsByPriority field.
MiniTezCluster pattern
For bugs that only manifest end-to-end:
cd ~/tez-src
find tez-tests/src/test/java -name "TestMRRJobsDAGApi.java"
That file is the canonical worked example. The setup pattern:
private static MiniTezCluster tezCluster;
@BeforeClass
public static void setup() throws Exception {
Configuration conf = new Configuration();
tezCluster = new MiniTezCluster("TEZ-XXXX", 1, 1, 1);
tezCluster.init(conf);
tezCluster.start();
}
@AfterClass
public static void teardown() {
if (tezCluster != null) {
tezCluster.stop();
}
}
Tests should:
- Submit a small DAG (an
OrderedWordCountderivative is fine). - Assert on
DAGStatusandVertexStatusvia the client. - Set tight
tez.am.am-rm.heartbeat.interval-msandtez.task.am.heartbeat.interval-msoverrides so retries fire quickly.
A MiniTezCluster test takes 30s+ per run; do not add more than one per JIRA.
Pitfalls
- Don't mock the AppContext or the EventHandler if you can avoid it. Scheduler bugs often live in the handoff between scheduler and dispatcher. Mocking the dispatcher hides the bug.
- Don't add
Thread.sleepto scheduler tests. UseDrainDispatcher.await()or poll the scheduler'sgetHeldContainers()view with a timeout. - Don't introduce a new lock to fix a race. Most scheduler races are fixed
by moving an existing line inside an existing
synchronizedblock. Adding a new lock is a Stage 11 patch. - Don't change the AMRM heartbeat interval to make a test pass. That hides timing bugs that bite in production. Use the existing test helpers that drive the heartbeat synchronously.
- Don't release containers in
onContainersCompletedto "be safe". Hadoop's AMRMClient documentation forbids that; the container is already released by the RM, and a second release fires a confusing log line. - Don't fix a locality miscount by changing the comparison everywhere. The bug is usually a single inconsistency. Pin it down with a focused unit test before broadening the change.
Exit criteria — when you're ready for the next stage
Move to Stage 6 when:
- You have shipped one scheduler patch with a passing
MiniTezClusteror AMRM stub regression test. - You can read
YarnTaskSchedulerService.assignContainerwithout referring to external docs. - You have written a
MiniTezClustertest from scratch and it runs locally in under a minute. - You can explain the
heldContainerlifecycle to another contributor in five sentences.
Stage 6 moves you into the runtime: ShuffleManager, Fetcher, MergeManager.