Stage 6 — Shuffle and Runtime
What this stage teaches
Stage 6 is the runtime stage. You learn:
- The shuffle pipeline: how
ShuffleManagerschedulesFetcherthreads against the upstream task outputs, howMergeManagerconsolidates fetched segments, and how the result is presented to the downstream processor as aKeyValuesReader. - The on-disk
IFileformat and the off-by-one EOF bugs that haunt every serialiser written against it. FetchedInputand the in-memory vs on-disk decision: howtez.runtime.shuffle.memory.limit.percentinteracts withMergeManager.canShuffleToMemory.- Fetch-failure retry storms: when a single bad NodeManager triggers cascading fetcher restarts that swamp the AM event queue.
- How to inject deterministic faults using the
FaultInjectionFetcherpattern (or, where it does not exist, the equivalent test double).
Patches are 80–600 lines and almost always come with a MiniTezCluster test
because the runtime contracts are too subtle for unit tests alone.
Reading order
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleManager.javatez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Fetcher.javatez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.javatez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java- The deep dive shuffle-sort.
cd ~/tez-src
wc -l tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/*.java
wc -l tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
JIRA filter to find candidates
project = TEZ
AND component in ("tez-runtime-library", "tez-runtime-internals")
AND resolution = Unresolved
AND (text ~ "shuffle" OR text ~ "fetcher" OR text ~ "MergeManager"
OR text ~ "IFile" OR text ~ "FetchedInput" OR text ~ "spill")
ORDER BY priority DESC, updated DESC
A second filter for fetch-failure storms specifically:
project = TEZ AND text ~ "fetch failure" AND text ~ "retry" AND resolution = Unresolved
Walked example A — fetch-failure retry storm
Symptom: a 5k-task vertex runs on a cluster where one NodeManager goes bad.
Within minutes the AM logs are flooded with INPUT_READ_ERROR events. The DAG
eventually succeeds but takes hours instead of minutes. The AM event queue
backs up to 100k+ pending events.
Step 1 — Trace the path
cd ~/tez-src
grep -n "INPUT_READ_ERROR\|reportReadError\|fetchFailures" \
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleManager.java
You find ShuffleManager.reportReadError(...) which fires a TaskAttemptEvent
to the AM for every failed fetch. With 5k downstream tasks each trying to fetch
from the bad source, the AM receives 5k events per cycle. The AM dedupes by
source attempt, but only after the events are on the queue.
Step 2 — Identify the fix
The minimal fix is client-side debounce: a ShuffleManager should not
re-report the same source attempt failure more than once per
tez.runtime.shuffle.fetch-failure.report.cooldown-ms window. The TEZ
convention is to add the config key with a sensible default
(reportCooldownMs = 5_000).
Step 3 — Diff
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleManager.java
@@
+ private final ConcurrentMap<InputAttemptIdentifier, Long> lastReportedAt =
+ new ConcurrentHashMap<>();
+ private final long reportCooldownMs;
@@
public void reportReadError(InputAttemptIdentifier srcAttempt, IOException e) {
+ long now = clock.getTime();
+ Long prev = lastReportedAt.get(srcAttempt);
+ if (prev != null && now - prev < reportCooldownMs) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Debouncing read-error report for {} (last={}ms ago)",
+ srcAttempt, now - prev);
+ }
+ return;
+ }
+ lastReportedAt.put(srcAttempt, now);
inputContext.sendEvents(Collections.singletonList(
createInputReadErrorEvent(srcAttempt, e)));
}
Add the config key to TezRuntimeConfiguration:
+ public static final String TEZ_RUNTIME_SHUFFLE_FETCH_FAILURE_REPORT_COOLDOWN_MS =
+ TEZ_RUNTIME_PREFIX + "shuffle.fetch-failure.report.cooldown-ms";
+ public static final long
+ TEZ_RUNTIME_SHUFFLE_FETCH_FAILURE_REPORT_COOLDOWN_MS_DEFAULT = 5_000L;
And register it in the same file's tezRuntimeKeys set so the validator does
not reject it.
Step 4 — Test with FaultInjectionFetcher pattern
There is no production FaultInjectionFetcher; the test pattern is to subclass
ShuffleManager and override createFetcher to return a Fetcher that throws
IOException on every call. The repro test sits in
tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleManager.java:
@Test(timeout = 10000)
public void testReadErrorReportDebounce() throws Exception {
Clock clock = new ControlledClock();
TezConfiguration conf = new TezConfiguration();
conf.setLong(TEZ_RUNTIME_SHUFFLE_FETCH_FAILURE_REPORT_COOLDOWN_MS, 1000);
ShuffleManager sm = createShuffleManager(conf, clock);
InputAttemptIdentifier src = newInputAttempt(0);
sm.reportReadError(src, new IOException("first"));
sm.reportReadError(src, new IOException("second (debounced)"));
sm.reportReadError(src, new IOException("third (debounced)"));
// Only the first event should reach the inputContext
verify(inputContext, times(1)).sendEvents(anyList());
// Advance the clock past the cooldown
((ControlledClock) clock).setTime(clock.getTime() + 2000);
sm.reportReadError(src, new IOException("after cooldown"));
verify(inputContext, times(2)).sendEvents(anyList());
}
Then a MiniTezCluster integration test with OrderedWordCount and a fault
injection on a single Fetcher — confirms the AM event queue stays bounded.
Walked example B — off-by-one in IFile EOF
Symptom: a reader of IFile-format data occasionally returns one extra
zero-length record at the end of a segment. Downstream processors see a
null/empty key and either throw or silently insert a bogus row.
Step 1 — Locate
cd ~/tez-src
grep -n "EOF_MARKER\|readNextKeyValue\|nextRawKey" \
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
Read the Reader.nextRawKey loop and the EOF_MARKER constant. The classic
bug shape: the loop tests bytesRead >= length after a successful read
instead of before, allowing one extra iteration when the segment ends exactly
on a record boundary.
Diff
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@
public boolean nextRawKey(DataInputBuffer key) throws IOException {
- int recordLength = readVInt(dataIn);
- if (recordLength == EOF_MARKER) {
- return false;
- }
+ if (bytesRead >= segmentLength) {
+ return false;
+ }
+ int recordLength = readVInt(dataIn);
+ if (recordLength == EOF_MARKER) {
+ return false;
+ }
...
}
The fix is two lines. The harder part is the test.
Step 2 — Test
Add to tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java:
@Test
public void testReaderStopsAtExactSegmentBoundary() throws Exception {
// Write exactly two records, capture the byte length, construct a Reader
// bounded to that byte length, and assert the third nextRawKey() returns
// false without throwing.
Path p = writeRecords(2);
long segLen = fs.getFileStatus(p).getLen();
Reader r = new Reader(conf, fs, p, codec, /*ifileReadAhead*/false, 0, segLen);
assertTrue(r.nextRawKey(keyBuf));
assertTrue(r.nextRawKey(keyBuf));
assertFalse("must not return phantom third record",
r.nextRawKey(keyBuf));
r.close();
}
Run:
mvn -pl tez-runtime-library test -Dtest=TestIFile -q 2>&1 | tail -30
A reviewer will also ask for a check that bytesRead does not advance past
segmentLength on a malformed input — add it.
Walked example C — MergeManager unexpected spill
Symptom: a small DAG that fits comfortably in memory still spills to disk.
Investigation: MergeManager.canShuffleToMemory returns false for inputs
smaller than the configured threshold because it compares against the total
memory budget rather than the per-input share.
The bug shape is in MergeManager.canShuffleToMemory(long size) — the
comparison uses usedMemory + size > maxMemory * memoryLimitPercent where it
should be >= plus a fairness check against singleShuffleLimit.
The repro: a tiny OrderedWordCount on MiniTezCluster with
tez.runtime.shuffle.memory.limit.percent=0.95 and a single 100KB input. The
counter MERGED_MAP_OUTPUTS_DISK should be 0 and is not.
The fix and test follow the same pattern as the previous two examples.
Pitfalls
- Don't add
Thread.sleepto a shuffle test. UseDrainDispatcher, theControlledClockpattern, or aCountDownLatchdriven by the production callback. Sleep-based shuffle tests are the #1 source of flakes intez-runtime-library(see Stage 9). - Don't relax
MergeManagerthresholds to "fix" a memory error. The thresholds are a contract with the AM scheduler. IfMergeManagerruns out of memory, the bug is usually upstream — a Fetcher that should have used disk and went to memory. - Don't add a config key without registering it in
tezRuntimeKeys. The runtime validates against an allowlist; an unregistered key is silently ignored. - Don't fix the
IFilereader by widening the boundary check. Boundary bugs inIFileusually have a sibling bug in the writer. Read both before patching either. - Don't add a
Fetcherretry loop that does not respect the AM's already- scheduled retry policy. Two retry loops in series turn a 3x retry into a 9x retry. Confirm via dispatcher trace that the AM is the only retry authority. - Don't change the on-disk
IFileformat without bumpingIFile.VERSION. That is a Stage 11 patch and requires explicit back-compat shims.
Exit criteria — when you're ready for the next stage
Move to Stage 7 when:
- You have shipped one shuffle or runtime patch with a deterministic
MiniTezClusterregression test that passes in under two minutes. - You can recite the relationship between
tez.runtime.shuffle.memory.limit.percent,tez.runtime.shuffle.fetch.buffer.percent, and the JVM heap. - You have read
MergeManager.merge()end to end and can explain the on-disk vs in-memory branches. - A reviewer has accepted your fix without asking "is this the same bug as TEZ-XXXX?" — meaning you have learned to grep for prior art before patching.
Stage 7 takes you out of Tez code and into the Hive-on-Tez attribution skill.