Stage 6 — Shuffle and Runtime

What this stage teaches

Stage 6 is the runtime stage. You learn:

  • The shuffle pipeline: how ShuffleManager schedules Fetcher threads against the upstream task outputs, how MergeManager consolidates fetched segments, and how the result is presented to the downstream processor as a KeyValuesReader.
  • The on-disk IFile format and the off-by-one EOF bugs that haunt every serialiser written against it.
  • FetchedInput and the in-memory vs on-disk decision: how tez.runtime.shuffle.memory.limit.percent interacts with MergeManager.canShuffleToMemory.
  • Fetch-failure retry storms: when a single bad NodeManager triggers cascading fetcher restarts that swamp the AM event queue.
  • How to inject deterministic faults using the FaultInjectionFetcher pattern (or, where it does not exist, the equivalent test double).

Patches are 80–600 lines and almost always come with a MiniTezCluster test because the runtime contracts are too subtle for unit tests alone.

Reading order

  1. tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleManager.java
  2. tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Fetcher.java
  3. tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
  4. tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
  5. The deep dive shuffle-sort.
cd ~/tez-src
wc -l tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/*.java
wc -l tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java

JIRA filter to find candidates

project = TEZ
  AND component in ("tez-runtime-library", "tez-runtime-internals")
  AND resolution = Unresolved
  AND (text ~ "shuffle" OR text ~ "fetcher" OR text ~ "MergeManager"
       OR text ~ "IFile" OR text ~ "FetchedInput" OR text ~ "spill")
ORDER BY priority DESC, updated DESC

A second filter for fetch-failure storms specifically:

project = TEZ AND text ~ "fetch failure" AND text ~ "retry" AND resolution = Unresolved

Walked example A — fetch-failure retry storm

Symptom: a 5k-task vertex runs on a cluster where one NodeManager goes bad. Within minutes the AM logs are flooded with INPUT_READ_ERROR events. The DAG eventually succeeds but takes hours instead of minutes. The AM event queue backs up to 100k+ pending events.

Step 1 — Trace the path

cd ~/tez-src
grep -n "INPUT_READ_ERROR\|reportReadError\|fetchFailures" \
  tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleManager.java

You find ShuffleManager.reportReadError(...) which fires a TaskAttemptEvent to the AM for every failed fetch. With 5k downstream tasks each trying to fetch from the bad source, the AM receives 5k events per cycle. The AM dedupes by source attempt, but only after the events are on the queue.

Step 2 — Identify the fix

The minimal fix is client-side debounce: a ShuffleManager should not re-report the same source attempt failure more than once per tez.runtime.shuffle.fetch-failure.report.cooldown-ms window. The TEZ convention is to add the config key with a sensible default (reportCooldownMs = 5_000).

Step 3 — Diff

--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleManager.java
@@
+  private final ConcurrentMap<InputAttemptIdentifier, Long> lastReportedAt =
+      new ConcurrentHashMap<>();
+  private final long reportCooldownMs;
@@
   public void reportReadError(InputAttemptIdentifier srcAttempt, IOException e) {
+    long now = clock.getTime();
+    Long prev = lastReportedAt.get(srcAttempt);
+    if (prev != null && now - prev < reportCooldownMs) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Debouncing read-error report for {} (last={}ms ago)",
+            srcAttempt, now - prev);
+      }
+      return;
+    }
+    lastReportedAt.put(srcAttempt, now);
     inputContext.sendEvents(Collections.singletonList(
         createInputReadErrorEvent(srcAttempt, e)));
   }

Add the config key to TezRuntimeConfiguration:

+  public static final String TEZ_RUNTIME_SHUFFLE_FETCH_FAILURE_REPORT_COOLDOWN_MS =
+      TEZ_RUNTIME_PREFIX + "shuffle.fetch-failure.report.cooldown-ms";
+  public static final long
+      TEZ_RUNTIME_SHUFFLE_FETCH_FAILURE_REPORT_COOLDOWN_MS_DEFAULT = 5_000L;

And register it in the same file's tezRuntimeKeys set so the validator does not reject it.

Step 4 — Test with FaultInjectionFetcher pattern

There is no production FaultInjectionFetcher; the test pattern is to subclass ShuffleManager and override createFetcher to return a Fetcher that throws IOException on every call. The repro test sits in tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleManager.java:

@Test(timeout = 10000)
public void testReadErrorReportDebounce() throws Exception {
  Clock clock = new ControlledClock();
  TezConfiguration conf = new TezConfiguration();
  conf.setLong(TEZ_RUNTIME_SHUFFLE_FETCH_FAILURE_REPORT_COOLDOWN_MS, 1000);

  ShuffleManager sm = createShuffleManager(conf, clock);
  InputAttemptIdentifier src = newInputAttempt(0);

  sm.reportReadError(src, new IOException("first"));
  sm.reportReadError(src, new IOException("second (debounced)"));
  sm.reportReadError(src, new IOException("third (debounced)"));

  // Only the first event should reach the inputContext
  verify(inputContext, times(1)).sendEvents(anyList());

  // Advance the clock past the cooldown
  ((ControlledClock) clock).setTime(clock.getTime() + 2000);
  sm.reportReadError(src, new IOException("after cooldown"));
  verify(inputContext, times(2)).sendEvents(anyList());
}

Then a MiniTezCluster integration test with OrderedWordCount and a fault injection on a single Fetcher — confirms the AM event queue stays bounded.

Walked example B — off-by-one in IFile EOF

Symptom: a reader of IFile-format data occasionally returns one extra zero-length record at the end of a segment. Downstream processors see a null/empty key and either throw or silently insert a bogus row.

Step 1 — Locate

cd ~/tez-src
grep -n "EOF_MARKER\|readNextKeyValue\|nextRawKey" \
  tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java

Read the Reader.nextRawKey loop and the EOF_MARKER constant. The classic bug shape: the loop tests bytesRead >= length after a successful read instead of before, allowing one extra iteration when the segment ends exactly on a record boundary.

Diff

--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@
   public boolean nextRawKey(DataInputBuffer key) throws IOException {
-    int recordLength = readVInt(dataIn);
-    if (recordLength == EOF_MARKER) {
-      return false;
-    }
+    if (bytesRead >= segmentLength) {
+      return false;
+    }
+    int recordLength = readVInt(dataIn);
+    if (recordLength == EOF_MARKER) {
+      return false;
+    }
     ...
   }

The fix is two lines. The harder part is the test.

Step 2 — Test

Add to tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java:

@Test
public void testReaderStopsAtExactSegmentBoundary() throws Exception {
  // Write exactly two records, capture the byte length, construct a Reader
  // bounded to that byte length, and assert the third nextRawKey() returns
  // false without throwing.
  Path p = writeRecords(2);
  long segLen = fs.getFileStatus(p).getLen();
  Reader r = new Reader(conf, fs, p, codec, /*ifileReadAhead*/false, 0, segLen);
  assertTrue(r.nextRawKey(keyBuf));
  assertTrue(r.nextRawKey(keyBuf));
  assertFalse("must not return phantom third record",
      r.nextRawKey(keyBuf));
  r.close();
}

Run:

mvn -pl tez-runtime-library test -Dtest=TestIFile -q 2>&1 | tail -30

A reviewer will also ask for a check that bytesRead does not advance past segmentLength on a malformed input — add it.

Walked example C — MergeManager unexpected spill

Symptom: a small DAG that fits comfortably in memory still spills to disk. Investigation: MergeManager.canShuffleToMemory returns false for inputs smaller than the configured threshold because it compares against the total memory budget rather than the per-input share.

The bug shape is in MergeManager.canShuffleToMemory(long size) — the comparison uses usedMemory + size > maxMemory * memoryLimitPercent where it should be >= plus a fairness check against singleShuffleLimit.

The repro: a tiny OrderedWordCount on MiniTezCluster with tez.runtime.shuffle.memory.limit.percent=0.95 and a single 100KB input. The counter MERGED_MAP_OUTPUTS_DISK should be 0 and is not.

The fix and test follow the same pattern as the previous two examples.

Pitfalls

  • Don't add Thread.sleep to a shuffle test. Use DrainDispatcher, the ControlledClock pattern, or a CountDownLatch driven by the production callback. Sleep-based shuffle tests are the #1 source of flakes in tez-runtime-library (see Stage 9).
  • Don't relax MergeManager thresholds to "fix" a memory error. The thresholds are a contract with the AM scheduler. If MergeManager runs out of memory, the bug is usually upstream — a Fetcher that should have used disk and went to memory.
  • Don't add a config key without registering it in tezRuntimeKeys. The runtime validates against an allowlist; an unregistered key is silently ignored.
  • Don't fix the IFile reader by widening the boundary check. Boundary bugs in IFile usually have a sibling bug in the writer. Read both before patching either.
  • Don't add a Fetcher retry loop that does not respect the AM's already- scheduled retry policy. Two retry loops in series turn a 3x retry into a 9x retry. Confirm via dispatcher trace that the AM is the only retry authority.
  • Don't change the on-disk IFile format without bumping IFile.VERSION. That is a Stage 11 patch and requires explicit back-compat shims.

Exit criteria — when you're ready for the next stage

Move to Stage 7 when:

  • You have shipped one shuffle or runtime patch with a deterministic MiniTezCluster regression test that passes in under two minutes.
  • You can recite the relationship between tez.runtime.shuffle.memory.limit.percent, tez.runtime.shuffle.fetch.buffer.percent, and the JVM heap.
  • You have read MergeManager.merge() end to end and can explain the on-disk vs in-memory branches.
  • A reviewer has accepted your fix without asking "is this the same bug as TEZ-XXXX?" — meaning you have learned to grep for prior art before patching.

Stage 7 takes you out of Tez code and into the Hive-on-Tez attribution skill.