IPO Abstractions

Input, Processor, Output (collectively "IPO") are the three core runtime contracts. A Tez task is built from one processor, zero or more inputs, and zero or more outputs. This chapter walks the abstractions, the distinction between the LogicalInput/LogicalOutput layer (rich, modern) and the plain Input/Output layer (used for raw byte pipelines), the lifecycle methods, merged inputs, root vs intermediate inputs, and the minimum skeleton needed to write a new input or output.

After this chapter you should be able to read any concrete IPO class in tez-runtime-library and explain what each lifecycle method is for.


The interfaces

tez-api/src/main/java/org/apache/tez/runtime/api/
  Input.java
  Output.java
  Processor.java
  LogicalInput.java                          (extends Input)
  LogicalOutput.java                         (extends Output)
  LogicalIOProcessor.java                    (extends Processor)
  AbstractLogicalInput.java                  (base class for custom inputs)
  AbstractLogicalOutput.java
  AbstractLogicalIOProcessor.java
  Reader.java                                (the byte/record stream interface)
  Writer.java
  MergedLogicalInput.java                    (combines multiple inputs)
  InputContext.java
  OutputContext.java
  ProcessorContext.java
  Event.java                                 (DataMovementEvent, etc.)
grep -n "^public " tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInput.java

Plain Input/Output vs LogicalInput/LogicalOutput

LayerClassWhy it exists
Low-levelInputBare contract: provides a Reader
Low-levelOutputBare contract: provides a Writer
High-levelLogicalInputAdds events, lifecycle, knowledge of upstream completion
High-levelLogicalOutputAdds events (to AM and downstream)

Almost all production inputs/outputs are LogicalInput/LogicalOutput. The plain layer exists for primitive byte-stream cases (rarely used directly).


Lifecycle methods (LogicalInput)

public abstract class AbstractLogicalInput implements LogicalInput {
  // Called by the runtime when the task starts. Setup; no I/O yet.
  public abstract List<Event> initialize() throws Exception;

  // Called after `initialize` for *all* inputs has completed.
  // Begin actively pulling data.
  public abstract void start() throws Exception;

  // The processor calls this to get a Reader for this input.
  public abstract Reader getReader() throws Exception;

  // Handle data movement / control events from the AM (e.g., upstream task done).
  public abstract void handleEvents(List<Event> inputEvents) throws Exception;

  // Final cleanup; close streams; return any final events.
  public abstract List<Event> close() throws Exception;
}

Order in a task's life:

constructor -> setContext -> initialize -> start -> getReader -> close

initialize returns events to the AM (for example, InputInitializerEvents that ask the AM to do more split work). Most inputs return an empty list.


Lifecycle methods (LogicalOutput)

Mirror of input:

public abstract class AbstractLogicalOutput implements LogicalOutput {
  public abstract List<Event> initialize() throws Exception;
  public abstract void start() throws Exception;
  public abstract Writer getWriter() throws Exception;
  public abstract void handleEvents(List<Event> outputEvents) throws Exception;
  public abstract List<Event> close() throws Exception;
}

The close of an output is the most consequential call: it flushes pending data, returns CompositeDataMovementEvent (or VertexManagerEvent) telling the AM (and thus downstream vertices) what this output produced.


Root inputs vs intermediate inputs

KindSource of dataInitializer runs where?
Root inputExternal (HDFS, HBase, Kafka)AM-side: InputInitializer enumerates splits, emits InputDataInformationEvents
Intermediate inputUpstream Tez vertex outputNo initializer; data arrives via DataMovementEvent from the AM

MRInput is the canonical root input. Its AM-side initializer (MRInputAMSplitGenerator) calls InputFormat.getSplits(...) and pushes the resulting splits to tasks.

Intermediate inputs (e.g., OrderedGroupedKVInput) receive their data descriptors from the AM via DataMovementEvents — one event per upstream task completion, carrying the upstream task's location and partition.


MergedLogicalInput

When a vertex has multiple physical inputs that should look like one to the processor (e.g., a vertex group union), Tez wraps them in a MergedLogicalInput:

grep -n "MergedLogicalInput\|getInputs\|getReader" \
  tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java

The processor calls getReader() once; the merged input combines all underlying readers. Common subclasses live in tez-runtime-library:

  • OrderedGroupedMergedInput — merge K/V streams preserving sort order.
  • ConcatenatedMergedKeyValueInput — concatenate.

Events flowing between AM and task

Event classDirectionCarries
DataMovementEventAM → task inputSource task index, source URL/path, partition
InputReadErrorEventtask input → AM"This source URL is broken, please re-route"
CompositeDataMovementEventtask output → AM (then forwarded)Bulk version of DataMovementEvent
InputDataInformationEventAM → task inputConcrete split (root inputs only)
InputInitializerEventtask → AM (initializer)Custom signal to the initializer
VertexManagerEventtask output → AM (vertex manager)Stats for auto-parallelism (ShuffleVertexManager)
ls tez-api/src/main/java/org/apache/tez/runtime/api/events/

Minimal LogicalInput skeleton

package com.example;

import org.apache.tez.runtime.api.*;
import org.apache.tez.runtime.api.events.*;
import java.io.IOException;
import java.util.Collections;
import java.util.List;

public class HelloLogicalInput extends AbstractLogicalInput {

  private final List<Event> deferred = new java.util.ArrayList<>();

  public HelloLogicalInput(InputContext ctx, int physicalInputCount) {
    super(ctx, physicalInputCount);
  }

  @Override
  public List<Event> initialize() throws IOException {
    // Allocate resources here. Do not do I/O.
    return Collections.emptyList();
  }

  @Override
  public void start() throws IOException {
    // Begin background fetch threads if any.
  }

  @Override
  public Reader getReader() throws IOException {
    // Return a Reader. Simplest: a no-op reader that reports EOF.
    return new SimpleStringReader("hello");
  }

  @Override
  public void handleEvents(List<Event> events) throws IOException {
    // Receive DataMovementEvents from the AM. Build internal routing.
  }

  @Override
  public List<Event> close() throws IOException {
    return Collections.emptyList();
  }
}

Real implementations to read for reference:

find tez-runtime-library/src/main/java -name "OrderedGrouped*Input*.java"
find tez-runtime-library/src/main/java -name "Unordered*Input.java"

Reading exercise

sed -n '1,140p' tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInput.java
sed -n '1,140p' tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutput.java
grep -rn "extends AbstractLogicalInput" tez-runtime-library/src/main/java | head
grep -rn "extends AbstractLogicalOutput" tez-runtime-library/src/main/java | head

# Event flow
ls tez-api/src/main/java/org/apache/tez/runtime/api/events/

Answer:

  1. What is the ordering guarantee between initialize() calls across the multiple inputs/outputs of a task?
  2. When does start() get called relative to getReader()?
  3. What's the difference in return semantics between getReader() of a LogicalInput vs an MergedLogicalInput?
  4. Find one concrete LogicalOutput; identify what event types its close() returns and what downstream effect each has.
  5. Why does initialize() return List<Event> instead of void?
  6. What is the difference between InputInitializerEvent and InputDataInformationEvent? Who emits each?

Common bugs and symptoms

SymptomRoot causeWhere to look
Task hangs in getReader()Input's start() never returned; deadlock with handlerAlways make start() non-blocking
NullPointerException in handleEventsEvents arrived before initialize(); you're using a field not yet setAllocate state in initialize()
Downstream sees half the dataclose() returned Collections.emptyList() when it should have emitted DMEAlways emit completion events
Custom input never receives DataMovementEventsEdgeManager on the upstream side not aware of your partitioningCheck edge property OutputDescriptor matches your InputDescriptor
Root input never startsInitializer's handleInputInitializerEvent not implementedProvide a default; never silently drop
Task succeeds but produces no outputWriter was never flushed (forgot close())Verify with IFile size = 0 in logs

Validation: prove you understand this

  1. Write a minimal LogicalInput that produces 100 fixed strings via its Reader. Wire it into a one-vertex DAG and run on MiniTezCluster.
  2. From OrderedGroupedKVInput, identify exactly when handleEvents is called and what it does with each event.
  3. List the seven event classes in org.apache.tez.runtime.api.events.
  4. Diagram the events flowing from one upstream task's LogicalOutput.close() to a downstream task's LogicalInput.handleEvents().
  5. Explain why initialize() is split from start() rather than collapsed into a single method.