IPO Abstractions
Input, Processor, Output (collectively "IPO") are the three core
runtime contracts. A Tez task is built from one processor, zero or more
inputs, and zero or more outputs. This chapter walks the abstractions, the
distinction between the LogicalInput/LogicalOutput layer (rich, modern)
and the plain Input/Output layer (used for raw byte pipelines), the
lifecycle methods, merged inputs, root vs intermediate inputs, and the
minimum skeleton needed to write a new input or output.
After this chapter you should be able to read any concrete IPO class in
tez-runtime-library and explain what each lifecycle method is for.
The interfaces
tez-api/src/main/java/org/apache/tez/runtime/api/
Input.java
Output.java
Processor.java
LogicalInput.java (extends Input)
LogicalOutput.java (extends Output)
LogicalIOProcessor.java (extends Processor)
AbstractLogicalInput.java (base class for custom inputs)
AbstractLogicalOutput.java
AbstractLogicalIOProcessor.java
Reader.java (the byte/record stream interface)
Writer.java
MergedLogicalInput.java (combines multiple inputs)
InputContext.java
OutputContext.java
ProcessorContext.java
Event.java (DataMovementEvent, etc.)
grep -n "^public " tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInput.java
Plain Input/Output vs LogicalInput/LogicalOutput
| Layer | Class | Why it exists |
|---|---|---|
| Low-level | Input | Bare contract: provides a Reader |
| Low-level | Output | Bare contract: provides a Writer |
| High-level | LogicalInput | Adds events, lifecycle, knowledge of upstream completion |
| High-level | LogicalOutput | Adds events (to AM and downstream) |
Almost all production inputs/outputs are LogicalInput/LogicalOutput. The
plain layer exists for primitive byte-stream cases (rarely used directly).
Lifecycle methods (LogicalInput)
public abstract class AbstractLogicalInput implements LogicalInput {
// Called by the runtime when the task starts. Setup; no I/O yet.
public abstract List<Event> initialize() throws Exception;
// Called after `initialize` for *all* inputs has completed.
// Begin actively pulling data.
public abstract void start() throws Exception;
// The processor calls this to get a Reader for this input.
public abstract Reader getReader() throws Exception;
// Handle data movement / control events from the AM (e.g., upstream task done).
public abstract void handleEvents(List<Event> inputEvents) throws Exception;
// Final cleanup; close streams; return any final events.
public abstract List<Event> close() throws Exception;
}
Order in a task's life:
constructor -> setContext -> initialize -> start -> getReader -> close
initialize returns events to the AM (for example,
InputInitializerEvents that ask the AM to do more split work). Most
inputs return an empty list.
Lifecycle methods (LogicalOutput)
Mirror of input:
public abstract class AbstractLogicalOutput implements LogicalOutput {
public abstract List<Event> initialize() throws Exception;
public abstract void start() throws Exception;
public abstract Writer getWriter() throws Exception;
public abstract void handleEvents(List<Event> outputEvents) throws Exception;
public abstract List<Event> close() throws Exception;
}
The close of an output is the most consequential call: it flushes pending
data, returns CompositeDataMovementEvent (or
VertexManagerEvent) telling the AM (and thus downstream vertices) what
this output produced.
Root inputs vs intermediate inputs
| Kind | Source of data | Initializer runs where? |
|---|---|---|
| Root input | External (HDFS, HBase, Kafka) | AM-side: InputInitializer enumerates splits, emits InputDataInformationEvents |
| Intermediate input | Upstream Tez vertex output | No initializer; data arrives via DataMovementEvent from the AM |
MRInput is the canonical root input. Its AM-side initializer
(MRInputAMSplitGenerator) calls InputFormat.getSplits(...) and pushes
the resulting splits to tasks.
Intermediate inputs (e.g., OrderedGroupedKVInput) receive their data
descriptors from the AM via DataMovementEvents — one event per upstream
task completion, carrying the upstream task's location and partition.
MergedLogicalInput
When a vertex has multiple physical inputs that should look like one to the
processor (e.g., a vertex group union), Tez wraps them in a
MergedLogicalInput:
grep -n "MergedLogicalInput\|getInputs\|getReader" \
tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
The processor calls getReader() once; the merged input combines all
underlying readers. Common subclasses live in tez-runtime-library:
OrderedGroupedMergedInput— merge K/V streams preserving sort order.ConcatenatedMergedKeyValueInput— concatenate.
Events flowing between AM and task
| Event class | Direction | Carries |
|---|---|---|
DataMovementEvent | AM → task input | Source task index, source URL/path, partition |
InputReadErrorEvent | task input → AM | "This source URL is broken, please re-route" |
CompositeDataMovementEvent | task output → AM (then forwarded) | Bulk version of DataMovementEvent |
InputDataInformationEvent | AM → task input | Concrete split (root inputs only) |
InputInitializerEvent | task → AM (initializer) | Custom signal to the initializer |
VertexManagerEvent | task output → AM (vertex manager) | Stats for auto-parallelism (ShuffleVertexManager) |
ls tez-api/src/main/java/org/apache/tez/runtime/api/events/
Minimal LogicalInput skeleton
package com.example;
import org.apache.tez.runtime.api.*;
import org.apache.tez.runtime.api.events.*;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
public class HelloLogicalInput extends AbstractLogicalInput {
private final List<Event> deferred = new java.util.ArrayList<>();
public HelloLogicalInput(InputContext ctx, int physicalInputCount) {
super(ctx, physicalInputCount);
}
@Override
public List<Event> initialize() throws IOException {
// Allocate resources here. Do not do I/O.
return Collections.emptyList();
}
@Override
public void start() throws IOException {
// Begin background fetch threads if any.
}
@Override
public Reader getReader() throws IOException {
// Return a Reader. Simplest: a no-op reader that reports EOF.
return new SimpleStringReader("hello");
}
@Override
public void handleEvents(List<Event> events) throws IOException {
// Receive DataMovementEvents from the AM. Build internal routing.
}
@Override
public List<Event> close() throws IOException {
return Collections.emptyList();
}
}
Real implementations to read for reference:
find tez-runtime-library/src/main/java -name "OrderedGrouped*Input*.java"
find tez-runtime-library/src/main/java -name "Unordered*Input.java"
Reading exercise
sed -n '1,140p' tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInput.java
sed -n '1,140p' tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutput.java
grep -rn "extends AbstractLogicalInput" tez-runtime-library/src/main/java | head
grep -rn "extends AbstractLogicalOutput" tez-runtime-library/src/main/java | head
# Event flow
ls tez-api/src/main/java/org/apache/tez/runtime/api/events/
Answer:
- What is the ordering guarantee between
initialize()calls across the multiple inputs/outputs of a task? - When does
start()get called relative togetReader()? - What's the difference in return semantics between
getReader()of aLogicalInputvs anMergedLogicalInput? - Find one concrete
LogicalOutput; identify what event types itsclose()returns and what downstream effect each has. - Why does
initialize()returnList<Event>instead ofvoid? - What is the difference between
InputInitializerEventandInputDataInformationEvent? Who emits each?
Common bugs and symptoms
| Symptom | Root cause | Where to look |
|---|---|---|
Task hangs in getReader() | Input's start() never returned; deadlock with handler | Always make start() non-blocking |
NullPointerException in handleEvents | Events arrived before initialize(); you're using a field not yet set | Allocate state in initialize() |
| Downstream sees half the data | close() returned Collections.emptyList() when it should have emitted DME | Always emit completion events |
Custom input never receives DataMovementEvents | EdgeManager on the upstream side not aware of your partitioning | Check edge property OutputDescriptor matches your InputDescriptor |
| Root input never starts | Initializer's handleInputInitializerEvent not implemented | Provide a default; never silently drop |
| Task succeeds but produces no output | Writer was never flushed (forgot close()) | Verify with IFile size = 0 in logs |
Validation: prove you understand this
- Write a minimal
LogicalInputthat produces 100 fixed strings via itsReader. Wire it into a one-vertex DAG and run onMiniTezCluster. - From
OrderedGroupedKVInput, identify exactly whenhandleEventsis called and what it does with each event. - List the seven event classes in
org.apache.tez.runtime.api.events. - Diagram the events flowing from one upstream task's
LogicalOutput.close()to a downstream task'sLogicalInput.handleEvents(). - Explain why
initialize()is split fromstart()rather than collapsed into a single method.