Shuffle and Sort
The shuffle layer is where Tez moves data between vertices. It splits into two
halves, both living in tez-runtime-library:
- Sort path — producer side: partition, sort, spill, merge.
OrderedPartitionedKVOutput→PipelinedSorter/DefaultSorter→IFilesegments on local disk. - Shuffle path — consumer side: fetch, merge, iterate.
ShuffleManager→Fetcher→FetchedInput→MergeManager→ValuesIterator.
Between them sits the YARN ShuffleHandler aux service inside the NodeManager
that serves spilled segments over HTTP.
ls tez-runtime-library/src/main/java/org/apache/tez/runtime/library/
The producer side
OrderedPartitionedKVOutput
find tez-runtime-library/src/main/java -name "OrderedPartitionedKVOutput.java"
wc -l $(find tez-runtime-library/src/main/java -name "OrderedPartitionedKVOutput.java")
The output that powers MapReduce-style shuffles. Lifecycle:
initialize()— creates aSorter(Pipelined or Default), allocatestez.runtime.io.sort.mbof byte buffer, registers as aMemoryUpdateCallbackwith theMemoryDistributor.getWriter()— returns aKeyValueWriterthat delegates to the sorter.close()— callssorter.flush()to merge spills into final segments and emitsCompositeDataMovementEventper partition with offsets into the merged file.
Two sorter implementations
find tez-runtime-library/src/main/java -name "PipelinedSorter.java" \
-o -name "DefaultSorter.java"
| Sorter | Strategy | When to pick |
|---|---|---|
DefaultSorter | Single in-memory accumulator; quicksort by (partition, key); spill when buffer crosses tez.runtime.sort.spill.percent; final merge of all spills. | MapReduce parity, conservative memory. |
PipelinedSorter | Multi-buffer accumulator; concurrent spill thread; per-partition sort and merge; final spill writes the merged output in one pass. | Large outputs, faster; default in Hive. |
Configuration knobs:
| Key | Default | Effect |
|---|---|---|
tez.runtime.io.sort.mb | 100 | Sort buffer in MB. Reused for both sorters. |
tez.runtime.sort.spill.percent | 0.8 | Threshold to start spilling (DefaultSorter). |
tez.runtime.sorter.class | PIPELINED | PIPELINED or LEGACY (DefaultSorter). |
tez.runtime.compress | false | Per-segment compression. |
tez.runtime.compress.codec | DefaultCodec | Snappy, Lz4, Gzip. |
tez.runtime.combiner.class | unset | Combiner ran during spill merge. |
IFile on-disk format
IFile is the segment format both sorters write.
find tez-runtime-library/src/main/java -name "IFile.java"
grep -n "class Writer\|class Reader\|EOF_MARKER\|writeKVPair" \
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java | head -30
Per-record layout:
+--------------+--------------+----------------+----------------+
| keyLen (VInt)| valLen (VInt)| key bytes (KL) | value bytes (VL)|
+--------------+--------------+----------------+----------------+
End of segment:
keyLen = -1, valLen = -1 // EOF_MARKER
If compression is enabled, the bytes between the partition header and EOF_MARKER are compressed; the record framing is inside the compressed stream.
A sorter writes one IFile segment per partition per spill. After the final
merge, an IFile.OutputStream produces one file per output with an *.index
sibling that records (rawLen, partLen, compressedLen) per partition.
find tez-runtime-library/src/main/java -name "TezSpillRecord.java"
grep -n "rawLength\|partLength\|compressedLength" \
$(find tez-runtime-library/src/main/java -name "TezSpillRecord.java")
The index file is what ShuffleHandler reads when a fetcher asks for partition
p of source attempt (vertex, task, attempt).
Combiner integration
Both sorters honor tez.runtime.combiner.class. The combiner is invoked
during the merge step (not during accumulation), running over sorted runs:
grep -n "combiner\|combineAndSpill\|runCombiner" \
$(find tez-runtime-library/src/main/java -name "DefaultSorter.java")
A correct combiner is associative and commutative on the value space; Tez gives no guarantee on how many merge phases run it.
Spill walkthrough
sequenceDiagram
participant P as Processor
participant W as KeyValueWriter
participant S as Sorter (Pipelined)
participant D as Local disk
P->>W: write(K, V) [N times]
W->>S: collect into KV buffer
S->>S: buffer crosses sort.spill.percent
S->>D: spill_0.out (partitioned, sorted)
S->>D: spill_0.out.index
Note over S: continue accepting writes into next buffer
P->>W: close()
W->>S: flush()
S->>D: merge spill_0..spill_N -> file.out + file.out.index
S-->>P: CompositeDataMovementEvent per partition
The consumer side
OrderedGroupedKVInput and ShuffleManager
find tez-runtime-library/src/main/java -name "OrderedGroupedKVInput.java"
find tez-runtime-library/src/main/java -name "ShuffleManager.java"
find tez-runtime-library/src/main/java -name "Shuffle.java"
OrderedGroupedKVInput.initialize() constructs Shuffle which holds:
ShuffleManager— pool ofFetcherthreads and inbound event queue.MergeManager— receivesFetchedInputs, decides in-memory vs disk placement, kicks off background merges.ValuesIterator— the reader the processor sees.
Fetcher
find tez-runtime-library/src/main/java -name "Fetcher.java"
wc -l $(find tez-runtime-library/src/main/java -name "Fetcher.java")
A Fetcher is a thread that connects via HTTP to the NodeManager
ShuffleHandler running on the source task's node:
GET /mapOutput?job=<jobId>&dag=<dagId>&reduce=<partition>&map=<attempt1,attempt2,...>
Multi-map response: ShuffleHandler streams all requested attempts back-to-back,
each prefixed with a header (MapOutputInfo). The Fetcher reads the header,
decides if the payload fits in memory (MergeManager.reserve), and either
writes to an in-memory buffer or directly to disk.
Key configs:
| Key | Default | Effect |
|---|---|---|
tez.runtime.shuffle.parallel.copies | 20 | Fetcher thread count per task. |
tez.runtime.shuffle.connect.timeout | 30000 | HTTP connect timeout. |
tez.runtime.shuffle.read.timeout | 180000 | HTTP socket read timeout. |
tez.runtime.shuffle.fetch.max.task.output.at.once | 20 | Max attempts per HTTP request. |
tez.runtime.shuffle.memory.limit.percent | 0.25 | Max fraction of heap held in-memory before forcing disk. |
tez.runtime.shuffle.merge.percent | 0.9 | When in-mem buffer crosses this, kick a merge. |
FetchedInput
grep -n "abstract class FetchedInput\|MemoryFetchedInput\|DiskFetchedInput" \
$(find tez-runtime-library/src/main/java -name "FetchedInput.java")
A FetchedInput is one source partition payload. Two subclasses:
MemoryFetchedInput— bytes held in a ByteBuffer.DiskFetchedInput— bytes on local disk undertez.runtime.shuffle.tmp.directory.
The MergeManager decides which based on size and current in-memory budget.
MergeManager
find tez-runtime-library/src/main/java -name "MergeManager.java"
Three merge tracks:
- In-memory merge — N in-memory inputs are merged into one in-memory buffer or spilled to disk.
- On-disk merge — N on-disk inputs are merged into a single larger on-disk segment.
- Final merge — at processor pull time, remaining in-memory and on-disk
inputs are merged into a unified
KeyValuesReader.
grep -n "InMemoryMerger\|OnDiskMerger\|finalMerge\|mergeFactor" \
$(find tez-runtime-library/src/main/java -name "MergeManager.java") | head -20
io.sort.factor (default 100) — max segments merged in one pass; more
segments trigger multiple passes.
ValuesIterator
find tez-runtime-library/src/main/java -name "ValuesIterator.java"
grep -n "next\|groupingKey\|valuesIter" \
$(find tez-runtime-library/src/main/java -name "ValuesIterator.java") | head
Wraps the merged sorted stream, presenting (key, Iterable<value>) pairs to
the processor — the classic reducer API.
Shuffle walkthrough
sequenceDiagram
participant T as Task processor
participant SM as ShuffleManager
participant F as Fetcher
participant NM as Source NM (ShuffleHandler)
participant MM as MergeManager
SM->>F: assign source attempt + partition
F->>NM: GET /mapOutput?...
NM-->>F: stream attempt headers + IFile bytes
F->>MM: reserve(size)
alt fits in memory
MM-->>F: MemoryFetchedInput
else too big
MM-->>F: DiskFetchedInput
end
F->>MM: commit FetchedInput
MM->>MM: kick InMemoryMerger / OnDiskMerger when thresholds crossed
T->>SM: getReader() (blocks until all inputs done)
SM->>MM: finalMerge()
MM-->>T: KeyValuesReader (ValuesIterator)
ShuffleHandler is YARN's, not Tez's
ls /opt/hadoop/share/hadoop/yarn/lib/ | grep shuffle # cluster path varies
org.apache.hadoop.mapred.ShuffleHandler lives in Hadoop. NodeManagers load
it as an aux service via yarn-site.xml:
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
Tez piggybacks on this — Tez ships no NodeManager-side fetch service.
Misconfigured aux services are a common cause of ConnectException in
Fetcher.
Reading exercise
grep -n "EOF_MARKER\|writeRecord" $(find tez-runtime-library/src/main/java -name "IFile.java")— verify the EOF sentinel value.wc -l $(find tez-runtime-library/src/main/java -name "PipelinedSorter.java" -o -name "DefaultSorter.java")— which is larger? Hypothesize why.grep -rn "tez.runtime.io.sort.mb\|tez.runtime.sort.spill.percent" tez-runtime-library/src/main/java— find every read site for these keys.grep -n "GET /mapOutput\|reduce=\|map=" $(find ~ -name ShuffleHandler.java 2>/dev/null | head -1)— read the exact request format.cat $(find tez-runtime-library/src/main/java -name "ShuffleManager.java") | head -200— how is back-pressure onFetcherthreads applied?grep -n "combiner" tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/*.java— at what phases does the combiner run?
Common bugs and symptoms
| Symptom | Likely cause |
|---|---|
Fetcher: java.net.ConnectException | mapreduce_shuffle aux service not configured or NM not running. |
Shuffle error: java.io.IOException: Failed on local exception: org.apache.hadoop.security.AccessControlException | ShuffleSecret missing or stale; check JobTokenSecretManager. |
| OOM during sort | tez.runtime.io.sort.mb too high relative to container JVM heap. |
| OOM during shuffle | tez.runtime.shuffle.memory.limit.percent too high; in-memory inputs starve heap. |
Premature EOF from inputStream | Source task wrote partial IFile (killed mid-spill); destination retries from another attempt. |
| Wrong reducer output count | Combiner not idempotent across merge passes. |
OnDiskMerger thrashing | io.sort.factor too low; many tiny segments forcing many merge passes. |
| Long shuffle plateau | One source NM saturated; HDFS-local fetch concentration. |
Validation: prove you understand this
- Sketch the byte layout of an
IFilesegment containing 3 records and a single partition. Show key/val lengths and the EOF marker. - A reducer task reads from 200 mappers. With
tez.runtime.shuffle.parallel.copies=20andtez.runtime.shuffle.fetch.max.task.output.at.once=20, what is the minimum number of HTTP requests the fetcher pool must make? Justify. - Explain why
PipelinedSorterreduces wall time but not CPU time. - Given a 10 GB shuffle into a 4 GB heap reducer with
tez.runtime.shuffle.memory.limit.percent=0.25, predict which inputs go to disk versus memory and why. - Identify the exact file and method where the URL pattern
?reduce=&map=is constructed on the Tez fetcher side. Use grep.