Stage 5 — Scheduler Bugs

What this stage teaches

Stage 5 takes you out of the per-vertex event loop and into the AM-wide scheduling layer. You learn:

  • The split between TaskSchedulerManager (the multi-scheduler dispatch shim) and the concrete YarnTaskSchedulerService (the AMRMClient-backed scheduler used in production), plus the alternative LocalTaskSchedulerService used by local mode and tests.
  • How container requests, allocations, and releases flow through AMRMClient, including the heldContainer lifecycle and the canonical leak: a held container that is never returned to YARN after an onError callback fires.
  • Locality miscounts: the bookkeeping mistake where a node-local allocation is charged as rack-local in getAvailableContainers, distorting the affinity signal sent back to the AMRM protocol.
  • Priority inversion: a high-priority request stuck behind a low-priority pending list because the request was added to the wrong queue.
  • Container behaviour across AM failover: when the AM restarts with tez.am.am-rm.heartbeat.interval-ms retries, what should and should not be re-claimed.
  • How to write a MiniTezCluster-backed integration test, and when the cheaper AMRMClient stub pattern is sufficient.

Patches are 50–500 lines, often with a non-trivial test that needs MiniTezCluster or MiniYARNCluster. Reviewers are strict: a scheduler patch without a deterministic test is rejected on sight.

Reading order

  1. tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
  2. tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
  3. tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
  4. tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
  5. The deep dive scheduler.
cd ~/tez-src
wc -l tez-dag/src/main/java/org/apache/tez/dag/app/rm/*.java

If YarnTaskSchedulerService.java is over 2000 lines, that is expected.

JIRA filter to find candidates

project = TEZ
  AND component in ("tez-dag")
  AND resolution = Unresolved
  AND (text ~ "container leak" OR text ~ "scheduler" OR text ~ "locality"
       OR text ~ "priority" OR text ~ "AMRMClient" OR text ~ "heldContainer"
       OR description ~ "onError")
ORDER BY priority DESC, updated DESC

A second filter for AM-failover-related candidates:

project = TEZ AND resolution = Unresolved AND (text ~ "failover" OR text ~ "AM restart")
  AND component in ("tez-dag")

Walked example A — heldContainer never released after onError

Symptom: an operator reports their long-running session AM holds onto containers indefinitely after a transient RM disconnect. yarn application -status shows allocated containers far above what the running DAG should need.

Step 1 — Locate the leak path

cd ~/tez-src
grep -n "onError\|heldContainer\|releaseContainer" \
  tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java | head -30

You find a class field:

private final Map<ContainerId, HeldContainer> heldContainers = new HashMap<>();

and an onError(Throwable t) callback (inherited from AMRMClientAsync.CallbackHandler):

@Override
public void onError(Throwable t) {
  LOG.error("AMRMClient error", t);
  appContext.getEventHandler().handle(
      new DAGAppMasterEventSchedulingServiceError(t));
}

The bug: heldContainers is populated by onContainersAllocated but never drained in onError. When the AM recovers and the RM reissues the same container IDs, the map already has stale entries, and the new allocations are silently dropped (the bookkeeping path checks heldContainers.containsKey(id)). The containers are effectively leaked.

Step 2 — Diff

--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@
   @Override
   public void onError(Throwable t) {
     LOG.error("AMRMClient error", t);
+    // Before we tear down, release any containers we still hold. If we don't,
+    // a recovering RM will re-issue the same ContainerIds and the dedup
+    // bookkeeping below will silently drop the new allocations. See TEZ-XXXX.
+    synchronized (heldContainers) {
+      for (HeldContainer hc : heldContainers.values()) {
+        try {
+          amRmClient.releaseAssignedContainer(hc.getContainer().getId());
+        } catch (Exception releaseErr) {
+          LOG.warn("Failed to release {} during onError cleanup: {}",
+              hc.getContainer().getId(), releaseErr.getMessage());
+        }
+      }
+      heldContainers.clear();
+    }
     appContext.getEventHandler().handle(
         new DAGAppMasterEventSchedulingServiceError(t));
   }

Rules in this diff:

  1. The cleanup runs before the event is dispatched. Once the event fires, the AM may shut down handlers, and any release call would race.
  2. The cleanup is synchronized on the same monitor that other writers to heldContainers use. Find that monitor first; if there is none, you have a second bug to file separately. Do not introduce a new lock in this patch.
  3. Each release is wrapped individually. One failure must not prevent the others from being released.
  4. Logged failures are WARN, not ERROR. The AM is already in an error path; doubling the severity drowns the originating cause.

Step 3 — Test with AMRMClient stub

A full MiniTezCluster test for this is overkill. Stub the client:

@Test(timeout = 10000)
public void testOnErrorReleasesHeldContainers() throws Exception {
  AMRMClientAsync<CookieContainerRequest> mockRm =
      mock(AMRMClientAsync.class);
  YarnTaskSchedulerService scheduler =
      new YarnTaskSchedulerService(mockAppCallbackHandler, appContext, mockRm);
  scheduler.serviceInit(new Configuration());
  scheduler.serviceStart();

  // simulate two allocations
  Container c1 = newContainer("container_1");
  Container c2 = newContainer("container_2");
  scheduler.onContainersAllocated(Arrays.asList(c1, c2));

  // fire onError
  scheduler.onError(new RuntimeException("RM gone"));

  // verify both were released
  verify(mockRm).releaseAssignedContainer(c1.getId());
  verify(mockRm).releaseAssignedContainer(c2.getId());
  assertTrue(scheduler.getHeldContainersForTest().isEmpty());
}

The pattern uses Mockito on the AMRM client interface, not on the YarnTaskSchedulerService itself. getHeldContainersForTest() is a package-private accessor you add in the same patch with a // VisibleForTesting comment.

Step 4 — Build, test, sign off

cd ~/tez-src
mvn -pl tez-dag test -Dtest=TestYarnTaskSchedulerService -q 2>&1 | tail -40
mvn -pl tez-tests test -Dtest=TestExternalTezServices -q 2>&1 | tail -10

The integration test (tez-tests) takes 5–10 minutes; skip it on the first local iteration but run it before the patch submission.

Walked example B — locality miscount

Symptom: a debug log shows node-local: 4, rack-local: 12, off-switch: 0 for a vertex whose input splits should give 14 node-local containers. The bookkeeping is off.

Locating the counter

cd ~/tez-src
grep -n "nodeLocal\|rackLocal\|offSwitch" \
  tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java | head -20

You find an assignContainer(...) path that compares the allocated host against the request's preferred host. The bug: the comparison is host.equals(req.host), but host arrives as node-1.cluster.local while req.host is node-1. The short-form comparison fails, the allocation is miscounted as rack-local, and the affinity penalty cascades into the next request.

Diff

--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@
-    if (host.equals(request.getHosts()[0])) {
+    // Hosts may be reported as FQDNs by the RM but as short names by the
+    // caller-supplied hint. Compare on the leading label to keep both forms
+    // equivalent. See TEZ-XXXX.
+    if (hostMatches(host, request.getHosts()[0])) {
       nodeLocalCount.incrementAndGet();
     } else if (rackOf(host).equals(rackOf(request.getHosts()[0]))) {
       rackLocalCount.incrementAndGet();
     } else {
       offSwitchCount.incrementAndGet();
     }
   }
+
+  static boolean hostMatches(String a, String b) {
+    if (a == null || b == null) return false;
+    return a.equals(b)
+        || leadingLabel(a).equals(leadingLabel(b));
+  }
+
+  private static String leadingLabel(String h) {
+    int dot = h.indexOf('.');
+    return dot < 0 ? h : h.substring(0, dot);
+  }

The accompanying test asserts the counter under both FQDN and short-name forms.

Walked example C — priority inversion

Symptom: a high-priority request (priority 0, AM speculation) waits indefinitely behind a long queue of priority-5 requests, even though the scheduler has capacity.

Root cause: the request was added to the queue keyed by priority string, not priority int. "0" sorts after "10" in string ordering. The fix is to use an Integer key or a TreeMap with a numeric comparator. The diff and test follow the same pattern as above; the file is tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java near the requestsByPriority field.

MiniTezCluster pattern

For bugs that only manifest end-to-end:

cd ~/tez-src
find tez-tests/src/test/java -name "TestMRRJobsDAGApi.java"

That file is the canonical worked example. The setup pattern:

private static MiniTezCluster tezCluster;

@BeforeClass
public static void setup() throws Exception {
  Configuration conf = new Configuration();
  tezCluster = new MiniTezCluster("TEZ-XXXX", 1, 1, 1);
  tezCluster.init(conf);
  tezCluster.start();
}

@AfterClass
public static void teardown() {
  if (tezCluster != null) {
    tezCluster.stop();
  }
}

Tests should:

  • Submit a small DAG (an OrderedWordCount derivative is fine).
  • Assert on DAGStatus and VertexStatus via the client.
  • Set tight tez.am.am-rm.heartbeat.interval-ms and tez.task.am.heartbeat.interval-ms overrides so retries fire quickly.

A MiniTezCluster test takes 30s+ per run; do not add more than one per JIRA.

Pitfalls

  • Don't mock the AppContext or the EventHandler if you can avoid it. Scheduler bugs often live in the handoff between scheduler and dispatcher. Mocking the dispatcher hides the bug.
  • Don't add Thread.sleep to scheduler tests. Use DrainDispatcher.await() or poll the scheduler's getHeldContainers() view with a timeout.
  • Don't introduce a new lock to fix a race. Most scheduler races are fixed by moving an existing line inside an existing synchronized block. Adding a new lock is a Stage 11 patch.
  • Don't change the AMRM heartbeat interval to make a test pass. That hides timing bugs that bite in production. Use the existing test helpers that drive the heartbeat synchronously.
  • Don't release containers in onContainersCompleted to "be safe". Hadoop's AMRMClient documentation forbids that; the container is already released by the RM, and a second release fires a confusing log line.
  • Don't fix a locality miscount by changing the comparison everywhere. The bug is usually a single inconsistency. Pin it down with a focused unit test before broadening the change.

Exit criteria — when you're ready for the next stage

Move to Stage 6 when:

  • You have shipped one scheduler patch with a passing MiniTezCluster or AMRM stub regression test.
  • You can read YarnTaskSchedulerService.assignContainer without referring to external docs.
  • You have written a MiniTezCluster test from scratch and it runs locally in under a minute.
  • You can explain the heldContainer lifecycle to another contributor in five sentences.

Stage 6 moves you into the runtime: ShuffleManager, Fetcher, MergeManager.