Shuffle and Sort

The shuffle layer is where Tez moves data between vertices. It splits into two halves, both living in tez-runtime-library:

  • Sort path — producer side: partition, sort, spill, merge. OrderedPartitionedKVOutputPipelinedSorter / DefaultSorterIFile segments on local disk.
  • Shuffle path — consumer side: fetch, merge, iterate. ShuffleManagerFetcherFetchedInputMergeManagerValuesIterator.

Between them sits the YARN ShuffleHandler aux service inside the NodeManager that serves spilled segments over HTTP.

ls tez-runtime-library/src/main/java/org/apache/tez/runtime/library/

The producer side

OrderedPartitionedKVOutput

find tez-runtime-library/src/main/java -name "OrderedPartitionedKVOutput.java"
wc -l $(find tez-runtime-library/src/main/java -name "OrderedPartitionedKVOutput.java")

The output that powers MapReduce-style shuffles. Lifecycle:

  1. initialize() — creates a Sorter (Pipelined or Default), allocates tez.runtime.io.sort.mb of byte buffer, registers as a MemoryUpdateCallback with the MemoryDistributor.
  2. getWriter() — returns a KeyValueWriter that delegates to the sorter.
  3. close() — calls sorter.flush() to merge spills into final segments and emits CompositeDataMovementEvent per partition with offsets into the merged file.

Two sorter implementations

find tez-runtime-library/src/main/java -name "PipelinedSorter.java" \
                                       -o -name "DefaultSorter.java"
SorterStrategyWhen to pick
DefaultSorterSingle in-memory accumulator; quicksort by (partition, key); spill when buffer crosses tez.runtime.sort.spill.percent; final merge of all spills.MapReduce parity, conservative memory.
PipelinedSorterMulti-buffer accumulator; concurrent spill thread; per-partition sort and merge; final spill writes the merged output in one pass.Large outputs, faster; default in Hive.

Configuration knobs:

KeyDefaultEffect
tez.runtime.io.sort.mb100Sort buffer in MB. Reused for both sorters.
tez.runtime.sort.spill.percent0.8Threshold to start spilling (DefaultSorter).
tez.runtime.sorter.classPIPELINEDPIPELINED or LEGACY (DefaultSorter).
tez.runtime.compressfalsePer-segment compression.
tez.runtime.compress.codecDefaultCodecSnappy, Lz4, Gzip.
tez.runtime.combiner.classunsetCombiner ran during spill merge.

IFile on-disk format

IFile is the segment format both sorters write.

find tez-runtime-library/src/main/java -name "IFile.java"
grep -n "class Writer\|class Reader\|EOF_MARKER\|writeKVPair" \
  tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java | head -30

Per-record layout:

+--------------+--------------+----------------+----------------+
| keyLen (VInt)| valLen (VInt)| key bytes (KL) | value bytes (VL)|
+--------------+--------------+----------------+----------------+

End of segment:

keyLen = -1, valLen = -1   // EOF_MARKER

If compression is enabled, the bytes between the partition header and EOF_MARKER are compressed; the record framing is inside the compressed stream.

A sorter writes one IFile segment per partition per spill. After the final merge, an IFile.OutputStream produces one file per output with an *.index sibling that records (rawLen, partLen, compressedLen) per partition.

find tez-runtime-library/src/main/java -name "TezSpillRecord.java"
grep -n "rawLength\|partLength\|compressedLength" \
  $(find tez-runtime-library/src/main/java -name "TezSpillRecord.java")

The index file is what ShuffleHandler reads when a fetcher asks for partition p of source attempt (vertex, task, attempt).

Combiner integration

Both sorters honor tez.runtime.combiner.class. The combiner is invoked during the merge step (not during accumulation), running over sorted runs:

grep -n "combiner\|combineAndSpill\|runCombiner" \
  $(find tez-runtime-library/src/main/java -name "DefaultSorter.java")

A correct combiner is associative and commutative on the value space; Tez gives no guarantee on how many merge phases run it.

Spill walkthrough

sequenceDiagram
  participant P as Processor
  participant W as KeyValueWriter
  participant S as Sorter (Pipelined)
  participant D as Local disk
  P->>W: write(K, V) [N times]
  W->>S: collect into KV buffer
  S->>S: buffer crosses sort.spill.percent
  S->>D: spill_0.out (partitioned, sorted)
  S->>D: spill_0.out.index
  Note over S: continue accepting writes into next buffer
  P->>W: close()
  W->>S: flush()
  S->>D: merge spill_0..spill_N -> file.out + file.out.index
  S-->>P: CompositeDataMovementEvent per partition

The consumer side

OrderedGroupedKVInput and ShuffleManager

find tez-runtime-library/src/main/java -name "OrderedGroupedKVInput.java"
find tez-runtime-library/src/main/java -name "ShuffleManager.java"
find tez-runtime-library/src/main/java -name "Shuffle.java"

OrderedGroupedKVInput.initialize() constructs Shuffle which holds:

  • ShuffleManager — pool of Fetcher threads and inbound event queue.
  • MergeManager — receives FetchedInputs, decides in-memory vs disk placement, kicks off background merges.
  • ValuesIterator — the reader the processor sees.

Fetcher

find tez-runtime-library/src/main/java -name "Fetcher.java"
wc -l $(find tez-runtime-library/src/main/java -name "Fetcher.java")

A Fetcher is a thread that connects via HTTP to the NodeManager ShuffleHandler running on the source task's node:

GET /mapOutput?job=<jobId>&dag=<dagId>&reduce=<partition>&map=<attempt1,attempt2,...>

Multi-map response: ShuffleHandler streams all requested attempts back-to-back, each prefixed with a header (MapOutputInfo). The Fetcher reads the header, decides if the payload fits in memory (MergeManager.reserve), and either writes to an in-memory buffer or directly to disk.

Key configs:

KeyDefaultEffect
tez.runtime.shuffle.parallel.copies20Fetcher thread count per task.
tez.runtime.shuffle.connect.timeout30000HTTP connect timeout.
tez.runtime.shuffle.read.timeout180000HTTP socket read timeout.
tez.runtime.shuffle.fetch.max.task.output.at.once20Max attempts per HTTP request.
tez.runtime.shuffle.memory.limit.percent0.25Max fraction of heap held in-memory before forcing disk.
tez.runtime.shuffle.merge.percent0.9When in-mem buffer crosses this, kick a merge.

FetchedInput

grep -n "abstract class FetchedInput\|MemoryFetchedInput\|DiskFetchedInput" \
  $(find tez-runtime-library/src/main/java -name "FetchedInput.java")

A FetchedInput is one source partition payload. Two subclasses:

  • MemoryFetchedInput — bytes held in a ByteBuffer.
  • DiskFetchedInput — bytes on local disk under tez.runtime.shuffle.tmp.directory.

The MergeManager decides which based on size and current in-memory budget.

MergeManager

find tez-runtime-library/src/main/java -name "MergeManager.java"

Three merge tracks:

  1. In-memory merge — N in-memory inputs are merged into one in-memory buffer or spilled to disk.
  2. On-disk merge — N on-disk inputs are merged into a single larger on-disk segment.
  3. Final merge — at processor pull time, remaining in-memory and on-disk inputs are merged into a unified KeyValuesReader.
grep -n "InMemoryMerger\|OnDiskMerger\|finalMerge\|mergeFactor" \
  $(find tez-runtime-library/src/main/java -name "MergeManager.java") | head -20

io.sort.factor (default 100) — max segments merged in one pass; more segments trigger multiple passes.

ValuesIterator

find tez-runtime-library/src/main/java -name "ValuesIterator.java"
grep -n "next\|groupingKey\|valuesIter" \
  $(find tez-runtime-library/src/main/java -name "ValuesIterator.java") | head

Wraps the merged sorted stream, presenting (key, Iterable<value>) pairs to the processor — the classic reducer API.

Shuffle walkthrough

sequenceDiagram
  participant T as Task processor
  participant SM as ShuffleManager
  participant F as Fetcher
  participant NM as Source NM (ShuffleHandler)
  participant MM as MergeManager
  SM->>F: assign source attempt + partition
  F->>NM: GET /mapOutput?...
  NM-->>F: stream attempt headers + IFile bytes
  F->>MM: reserve(size)
  alt fits in memory
    MM-->>F: MemoryFetchedInput
  else too big
    MM-->>F: DiskFetchedInput
  end
  F->>MM: commit FetchedInput
  MM->>MM: kick InMemoryMerger / OnDiskMerger when thresholds crossed
  T->>SM: getReader() (blocks until all inputs done)
  SM->>MM: finalMerge()
  MM-->>T: KeyValuesReader (ValuesIterator)

ShuffleHandler is YARN's, not Tez's

ls /opt/hadoop/share/hadoop/yarn/lib/ | grep shuffle    # cluster path varies

org.apache.hadoop.mapred.ShuffleHandler lives in Hadoop. NodeManagers load it as an aux service via yarn-site.xml:

<property>
  <name>yarn.nodemanager.aux-services</name>
  <value>mapreduce_shuffle</value>
</property>

Tez piggybacks on this — Tez ships no NodeManager-side fetch service. Misconfigured aux services are a common cause of ConnectException in Fetcher.


Reading exercise

  1. grep -n "EOF_MARKER\|writeRecord" $(find tez-runtime-library/src/main/java -name "IFile.java") — verify the EOF sentinel value.
  2. wc -l $(find tez-runtime-library/src/main/java -name "PipelinedSorter.java" -o -name "DefaultSorter.java") — which is larger? Hypothesize why.
  3. grep -rn "tez.runtime.io.sort.mb\|tez.runtime.sort.spill.percent" tez-runtime-library/src/main/java — find every read site for these keys.
  4. grep -n "GET /mapOutput\|reduce=\|map=" $(find ~ -name ShuffleHandler.java 2>/dev/null | head -1) — read the exact request format.
  5. cat $(find tez-runtime-library/src/main/java -name "ShuffleManager.java") | head -200 — how is back-pressure on Fetcher threads applied?
  6. grep -n "combiner" tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/*.java — at what phases does the combiner run?

Common bugs and symptoms

SymptomLikely cause
Fetcher: java.net.ConnectExceptionmapreduce_shuffle aux service not configured or NM not running.
Shuffle error: java.io.IOException: Failed on local exception: org.apache.hadoop.security.AccessControlExceptionShuffleSecret missing or stale; check JobTokenSecretManager.
OOM during sorttez.runtime.io.sort.mb too high relative to container JVM heap.
OOM during shuffletez.runtime.shuffle.memory.limit.percent too high; in-memory inputs starve heap.
Premature EOF from inputStreamSource task wrote partial IFile (killed mid-spill); destination retries from another attempt.
Wrong reducer output countCombiner not idempotent across merge passes.
OnDiskMerger thrashingio.sort.factor too low; many tiny segments forcing many merge passes.
Long shuffle plateauOne source NM saturated; HDFS-local fetch concentration.

Validation: prove you understand this

  1. Sketch the byte layout of an IFile segment containing 3 records and a single partition. Show key/val lengths and the EOF marker.
  2. A reducer task reads from 200 mappers. With tez.runtime.shuffle.parallel.copies=20 and tez.runtime.shuffle.fetch.max.task.output.at.once=20, what is the minimum number of HTTP requests the fetcher pool must make? Justify.
  3. Explain why PipelinedSorter reduces wall time but not CPU time.
  4. Given a 10 GB shuffle into a 4 GB heap reducer with tez.runtime.shuffle.memory.limit.percent=0.25, predict which inputs go to disk versus memory and why.
  5. Identify the exact file and method where the URL pattern ?reduce=&map= is constructed on the Tez fetcher side. Use grep.