Lab 3.2: Understand the IPO Abstraction

Background

Every task in Tez runs a Processor that reads from one or more Input objects and writes to one or more Output objects. This Input-Processor-Output (IPO) model is the fundamental abstraction for how data moves through a DAG. Edge properties in the API (EdgeProperty, DataMovementType) determine which I/O classes are instantiated in the container.

This lab walks through the IPO model from the API layer to the runtime, tracing how an ORDERED_PARTITIONED_KV_OUTPUT configuration becomes actual bytes in a shuffle buffer.


The IPO Interface Hierarchy

tez-runtime-api (in tez-api module):
  AbstractLogicalInput
      └── AbstractInput
  AbstractLogicalOutput
      └── AbstractOutput
  AbstractProcessor

tez-runtime-library (implementations):
  OrderedPartitionedKVOutput    extends AbstractLogicalOutput
  OrderedGroupedKVInput         extends AbstractLogicalInput
  UnorderedKVOutput             extends AbstractLogicalOutput
  UnorderedKVInput              extends AbstractLogicalInput
  UnorderedPartitionedKVOutput  extends AbstractLogicalOutput
  BroadcastKVInput              extends AbstractLogicalInput (alias for UnorderedKVInput)

The key interface chain:

AbstractLogicalOutput.initialize() → called by LogicalIOProcessorRuntimeTask
AbstractLogicalOutput.start()      → called when the processor is started
AbstractLogicalOutput.getWriter()  → returns KeyValueWriter for the processor to use
AbstractLogicalOutput.commit()     → called after processor.run() completes
AbstractLogicalOutput.close()      → cleanup

Step-by-Step Tasks

Step 1: Read the AbstractLogicalOutput Interface

Open tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java.

Answer:

  1. What is the purpose of the initialize() method? What does it return?
  2. What is the difference between start() and initialize()? Why are they separate?
  3. What method does a Processor call to get a writer to write records?

Step 2: Trace OrderedPartitionedKVOutput.initialize()

Open tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java.

Find the initialize() method.

Answer:

  1. What configuration key controls the buffer size for sorting?
  2. What class is created in initialize() to handle the actual sort-and-spill?
  3. How is the Partitioner class determined at runtime?
# Find sort buffer configuration
grep -n "SORT_MB\|sortmb\|buffer" \
  tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java \
  | head -10

# Find the writer/sorter creation
grep -n "new.*Writer\|new.*Sorter\|ExternalSorter" \
  tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java

Step 3: Trace the Write Path

When a processor calls writer.write(key, value), the data goes:

KeyValueWriter.write(key, value)
  → ExternalSorter.collect(key, value, partition)
    → SpillThread triggers when buffer is full
      → IFile.Writer writes sorted partition to local disk
        → On close(): merges all spills into final output file

Find the ExternalSorter class:

find tez-runtime-library/src/main/java -name "ExternalSorter.java"

Answer:

  1. What data structure holds records before they are spilled?
  2. What algorithm is used to sort records in the buffer?
  3. How is the sort key computed for (K, V) pairs with a custom Comparator?

Step 4: Read OrderedGroupedKVInput.initialize()

Open tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java.

Find initialize().

Answer:

  1. What class handles the shuffle (fetching data from remote nodes)?
  2. How does the input know which upstream tasks it needs to fetch from?
  3. What event type does the input consume to discover shuffle locations?
grep -n "Shuffle\|ShuffleManager\|DataMovementEvent" \
  tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java \
  | head -15

Step 5: Trace the Read Path

When a processor calls keyValueReader.next(), the data flow is:

KeyValueReader.next()
  → MergedKeyValueIterator.next()    [merging multiple sorted partitions]
    → TezRawKeyValueIterator         [from TezMerger]
      → IFile.Reader reads from local merged file

But before the merge can happen, the shuffle must fetch data:

DataMovementEvent arrives (from AM, routed from upstream task)
  → ShuffleManager records: "partition P is at host H:port/path"
  → Fetcher.fetch() downloads the partition file
    → stores locally
  → When all partitions fetched: MergeManager merges them
    → final sorted output available for KeyValueReader

Find the Fetcher class:

find tez-runtime-library/src/main/java -name "Fetcher.java"
wc -l $(find tez-runtime-library/src/main/java -name "Fetcher.java")

Answer:

  1. What HTTP endpoint does Fetcher call to retrieve partition data?
  2. What does Fetcher do if the HTTP request fails?
  3. How many simultaneous fetch connections does Fetcher allow by default?

Step 6: Understand DataMovementEvent Routing

The DataMovementEvent is what connects output to input. When a task completes its output:

  1. The ShuffleHandler (shuffle server) registers the output location
  2. The task sends a DataMovementEvent via the TezTaskUmbilicalProtocol to the AM
  3. The AM routes the event to the downstream tasks that need it
  4. The downstream input receives it and knows to fetch from that location

Find the DataMovementEvent class:

find . -name "DataMovementEvent.java" | grep -v test

Answer:

  1. What fields does DataMovementEvent carry?
  2. Why is the payload (userPayload) a byte array and not a typed field?
  3. How does the AM know which downstream tasks to route the event to?
# Find the AM-side routing logic
grep -rn "DataMovementEvent\|routeEvent" \
  tez-dag/src/main/java/org/apache/tez/dag/app/ --include="*.java" | grep -v test | head -15

Step 7: Edge Properties → I/O Classes

The EdgeProperty object in the API specifies which I/O classes to use. Trace how EdgeProperty becomes actual I/O class instantiation in the container.

Starting point:

# Find EdgeProperty
find tez-api/src/main/java -name "EdgeProperty.java"

Then trace:

# How does VertexImpl use EdgeProperty to configure I/O for a vertex?
grep -n "EdgeProperty\|getInputDescriptor\|getOutputDescriptor" \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java | head -15

Answer:

  1. What field in EdgeProperty specifies the Input class for the destination vertex?
  2. What field specifies the Output class for the source vertex?
  3. How is the class name passed to the container so it can instantiate the correct I/O class?

Build the IPO Map

For OrderedWordCount, fill in this table by reading the code:

EdgeSource vertexDest vertexOutput classInput classDataMovementType
Tokenizer → SumReducerTokenizerSumReducer???
SumReducer → SorterSumReducerSorter???

Read tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java to fill in the ? cells.

grep -n "EdgeProperty\|KVOutput\|KVInput\|DataMovementType" \
  tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java

Expected Output

By end of this lab you have:

  1. The IPO map table for OrderedWordCount completed
  2. An answer for each step question (from code, not from documentation)
  3. Understanding of what DataMovementEvent carries and why it exists
  4. Knowledge of which configuration key controls sort buffer size

Stretch Goals

  1. Find the shuffle HTTP server that serves partition data to Fetcher:

    find . -name "ShuffleHandler.java" | grep -v test
    

    What HTTP framework does it use? What is the URL pattern for fetching a partition?

  2. Trace what happens when Fetcher receives corrupted data (a checksum mismatch). Does the task fail immediately? Or does it retry from a different source?

  3. Find the EdgeManagerPlugin interface and read its contract:

    find tez-api/src/main/java -name "EdgeManagerPlugin.java"
    

    What three methods must a custom edge manager implement, and what do they do? Why would you use a custom edge manager instead of SCATTER_GATHER?

  4. Look at IntersectExample.java in tez-examples. It uses BROADCAST for one edge. Explain why: what is the semantic meaning of broadcasting in a join operation?