Lab 3.2: Understand the IPO Abstraction
Background
Every task in Tez runs a Processor that reads from one or more Input objects and writes
to one or more Output objects. This Input-Processor-Output (IPO) model is the fundamental
abstraction for how data moves through a DAG. Edge properties in the API (EdgeProperty,
DataMovementType) determine which I/O classes are instantiated in the container.
This lab walks through the IPO model from the API layer to the runtime, tracing how an
ORDERED_PARTITIONED_KV_OUTPUT configuration becomes actual bytes in a shuffle buffer.
The IPO Interface Hierarchy
tez-runtime-api (in tez-api module):
AbstractLogicalInput
└── AbstractInput
AbstractLogicalOutput
└── AbstractOutput
AbstractProcessor
tez-runtime-library (implementations):
OrderedPartitionedKVOutput extends AbstractLogicalOutput
OrderedGroupedKVInput extends AbstractLogicalInput
UnorderedKVOutput extends AbstractLogicalOutput
UnorderedKVInput extends AbstractLogicalInput
UnorderedPartitionedKVOutput extends AbstractLogicalOutput
BroadcastKVInput extends AbstractLogicalInput (alias for UnorderedKVInput)
The key interface chain:
AbstractLogicalOutput.initialize() → called by LogicalIOProcessorRuntimeTask
AbstractLogicalOutput.start() → called when the processor is started
AbstractLogicalOutput.getWriter() → returns KeyValueWriter for the processor to use
AbstractLogicalOutput.commit() → called after processor.run() completes
AbstractLogicalOutput.close() → cleanup
Step-by-Step Tasks
Step 1: Read the AbstractLogicalOutput Interface
Open tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java.
Answer:
- What is the purpose of the
initialize()method? What does it return? - What is the difference between
start()andinitialize()? Why are they separate? - What method does a
Processorcall to get a writer to write records?
Step 2: Trace OrderedPartitionedKVOutput.initialize()
Open tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java.
Find the initialize() method.
Answer:
- What configuration key controls the buffer size for sorting?
- What class is created in
initialize()to handle the actual sort-and-spill? - How is the
Partitionerclass determined at runtime?
# Find sort buffer configuration
grep -n "SORT_MB\|sortmb\|buffer" \
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java \
| head -10
# Find the writer/sorter creation
grep -n "new.*Writer\|new.*Sorter\|ExternalSorter" \
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
Step 3: Trace the Write Path
When a processor calls writer.write(key, value), the data goes:
KeyValueWriter.write(key, value)
→ ExternalSorter.collect(key, value, partition)
→ SpillThread triggers when buffer is full
→ IFile.Writer writes sorted partition to local disk
→ On close(): merges all spills into final output file
Find the ExternalSorter class:
find tez-runtime-library/src/main/java -name "ExternalSorter.java"
Answer:
- What data structure holds records before they are spilled?
- What algorithm is used to sort records in the buffer?
- How is the sort key computed for
(K, V)pairs with a customComparator?
Step 4: Read OrderedGroupedKVInput.initialize()
Open tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java.
Find initialize().
Answer:
- What class handles the shuffle (fetching data from remote nodes)?
- How does the input know which upstream tasks it needs to fetch from?
- What event type does the input consume to discover shuffle locations?
grep -n "Shuffle\|ShuffleManager\|DataMovementEvent" \
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java \
| head -15
Step 5: Trace the Read Path
When a processor calls keyValueReader.next(), the data flow is:
KeyValueReader.next()
→ MergedKeyValueIterator.next() [merging multiple sorted partitions]
→ TezRawKeyValueIterator [from TezMerger]
→ IFile.Reader reads from local merged file
But before the merge can happen, the shuffle must fetch data:
DataMovementEvent arrives (from AM, routed from upstream task)
→ ShuffleManager records: "partition P is at host H:port/path"
→ Fetcher.fetch() downloads the partition file
→ stores locally
→ When all partitions fetched: MergeManager merges them
→ final sorted output available for KeyValueReader
Find the Fetcher class:
find tez-runtime-library/src/main/java -name "Fetcher.java"
wc -l $(find tez-runtime-library/src/main/java -name "Fetcher.java")
Answer:
- What HTTP endpoint does
Fetchercall to retrieve partition data? - What does
Fetcherdo if the HTTP request fails? - How many simultaneous fetch connections does
Fetcherallow by default?
Step 6: Understand DataMovementEvent Routing
The DataMovementEvent is what connects output to input. When a task completes its output:
- The
ShuffleHandler(shuffle server) registers the output location - The task sends a
DataMovementEventvia theTezTaskUmbilicalProtocolto the AM - The AM routes the event to the downstream tasks that need it
- The downstream input receives it and knows to fetch from that location
Find the DataMovementEvent class:
find . -name "DataMovementEvent.java" | grep -v test
Answer:
- What fields does
DataMovementEventcarry? - Why is the payload (
userPayload) a byte array and not a typed field? - How does the AM know which downstream tasks to route the event to?
# Find the AM-side routing logic
grep -rn "DataMovementEvent\|routeEvent" \
tez-dag/src/main/java/org/apache/tez/dag/app/ --include="*.java" | grep -v test | head -15
Step 7: Edge Properties → I/O Classes
The EdgeProperty object in the API specifies which I/O classes to use. Trace how
EdgeProperty becomes actual I/O class instantiation in the container.
Starting point:
# Find EdgeProperty
find tez-api/src/main/java -name "EdgeProperty.java"
Then trace:
# How does VertexImpl use EdgeProperty to configure I/O for a vertex?
grep -n "EdgeProperty\|getInputDescriptor\|getOutputDescriptor" \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java | head -15
Answer:
- What field in
EdgePropertyspecifies theInputclass for the destination vertex? - What field specifies the
Outputclass for the source vertex? - How is the class name passed to the container so it can instantiate the correct I/O class?
Build the IPO Map
For OrderedWordCount, fill in this table by reading the code:
| Edge | Source vertex | Dest vertex | Output class | Input class | DataMovementType |
|---|---|---|---|---|---|
| Tokenizer → SumReducer | Tokenizer | SumReducer | ? | ? | ? |
| SumReducer → Sorter | SumReducer | Sorter | ? | ? | ? |
Read tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java to fill in
the ? cells.
grep -n "EdgeProperty\|KVOutput\|KVInput\|DataMovementType" \
tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
Expected Output
By end of this lab you have:
- The IPO map table for
OrderedWordCountcompleted - An answer for each step question (from code, not from documentation)
- Understanding of what
DataMovementEventcarries and why it exists - Knowledge of which configuration key controls sort buffer size
Stretch Goals
-
Find the shuffle HTTP server that serves partition data to
Fetcher:find . -name "ShuffleHandler.java" | grep -v testWhat HTTP framework does it use? What is the URL pattern for fetching a partition?
-
Trace what happens when
Fetcherreceives corrupted data (a checksum mismatch). Does the task fail immediately? Or does it retry from a different source? -
Find the
EdgeManagerPlugininterface and read its contract:find tez-api/src/main/java -name "EdgeManagerPlugin.java"What three methods must a custom edge manager implement, and what do they do? Why would you use a custom edge manager instead of
SCATTER_GATHER? -
Look at
IntersectExample.javaintez-examples. It usesBROADCASTfor one edge. Explain why: what is the semantic meaning of broadcasting in a join operation?