Tez Runtime Internals

The Tez runtime is the code that runs inside the container, not inside the AM. Its job: boot a JVM, accept tasks from the AM over umbilical RPC, run them to completion, and report status.

Three modules collaborate:

  • tez-runtime-internals — process boot, task driver, umbilical client, memory broker.
  • tez-runtime-library — concrete Input / Processor / Output implementations (KV, shuffle, etc).
  • tez-api — the SPI the user implements (AbstractLogicalInput, AbstractLogicalOutput, AbstractLogicalIOProcessor).
ls tez-runtime-internals/src/main/java/org/apache/tez/runtime/

The container process: TezChild

TezChild.main() is the JVM entry point for every Tez task container.

find tez-runtime-internals/src/main/java -name "TezChild.java"
grep -n "public static void main\|new TezChild\|run()" \
  $(find tez-runtime-internals/src/main/java -name "TezChild.java")

Boot sequence (paraphrased from TezChild.java):

  1. Read JVM args: AM host, AM port, container ID, application attempt ID, PID, JVM identifier.
  2. Read the security tokens from $HADOOP_TOKEN_FILE_LOCATION.
  3. Construct TezTaskUmbilicalProtocol RPC proxy pointing at the AM TaskAttemptListenerImpl.
  4. Enter TezChild.run() — an infinite loop: a. umbilical.getTask(containerContext) — blocks until the AM hands us a ContainerTask. b. If ContainerTask.shouldDie(), exit cleanly. c. Otherwise build a TezTaskRunner2 for the assigned attempt and call runner.run(). d. Loop — this is container reuse: same JVM, next task.
flowchart TD
  S[JVM start] --> P[Parse args + tokens]
  P --> R[RPC connect to AM]
  R --> L{umbilical.getTask}
  L -- shouldDie --> X[exit]
  L -- new task --> T[TezTaskRunner2.run]
  T --> L

Why container reuse needs this loop

Allocating a YARN container costs hundreds of ms; starting a JVM costs seconds. Tez amortizes both by running multiple tasks in the same TezChild process. See container-reuse.md for the AM side.


TezTaskRunner2 — the task driver

find tez-runtime-internals/src/main/java -name "TezTaskRunner2.java"
wc -l $(find tez-runtime-internals/src/main/java -name "TezTaskRunner2.java")

Per-attempt driver. Owns:

  • a LogicalIOProcessorRuntimeTask (the actual task body),
  • the input/output initialization thread pool,
  • abort hooks (kill, fatal error, timeout).

Lifecycle of a single attempt:

sequenceDiagram
  participant TC as TezChild
  participant TR as TezTaskRunner2
  participant T as LogicalIOProcessorRuntimeTask
  participant IO as Inputs / Outputs
  participant P as Processor
  TC->>TR: new + run()
  TR->>T: initialize()
  T->>IO: input.initialize() (parallel)
  T->>IO: output.initialize() (parallel)
  T->>P: processor.initialize()
  TR->>T: run() => processor.run(inputs, outputs)
  P->>IO: read inputs, write outputs
  T->>IO: output.close() (parallel)
  T->>IO: input.close() (parallel)
  TR->>TC: result (success/failure)

Failure routes:

  • Input init throws → TaskFailedException to AM, attempt fails.
  • Processor throws → same.
  • AM sends kill via umbilical heartbeat reply → TezTaskRunner2.killTask() interrupts the processor thread.
  • Fatal error on any IO → TaskReporter.notifyFatalError() short-circuits the run.

LogicalIOProcessorRuntimeTask — orchestrator

find tez-runtime-internals/src/main/java -name "LogicalIOProcessorRuntimeTask.java"
wc -l $(find tez-runtime-internals/src/main/java -name "LogicalIOProcessorRuntimeTask.java")

This is the class that actually instantiates the user's IPO triple.

initialize() does, in order:

  1. Build the TezConfiguration for this task from the AM-provided TaskSpec.
  2. Build the MemoryDistributor (next section) over all IOs.
  3. For each InputSpec: instantiate the input class, set its InputContext, call initialize() on a worker thread.
  4. Same for each OutputSpec.
  5. Instantiate the processor; call processor.initialize(processorContext).
  6. Wait for all input/output initialize() calls to complete (parallel).

run():

  1. Block until every input reports it has data (or signals empty).
  2. Call processor.run(inputs, outputs) on the main thread.
  3. On return, call output.close() for every output (parallel), then input.close() for every input (parallel).
  4. Collect counters; hand the final TaskStatus back to TezTaskRunner2.

Key field:

grep -n "initializerCompletionService\|runInputRunnable\|runOutputRunnable" \
  $(find tez-runtime-internals/src/main/java -name "LogicalIOProcessorRuntimeTask.java") | head

Parallel init is what makes Tez fast for processors with many inputs (eg multi-input joins).


MemoryDistributor

find tez-runtime-internals/src/main/java -name "MemoryDistributor.java"
cat $(find tez-runtime-internals/src/main/java -name "MemoryDistributor.java") | head -160

A single broker that hands out portions of the task's JVM heap to IOs that ask for memory.

Flow:

  1. At task init, each Input / Output calls context.requestInitialMemory(size, callback) with what it would like to reserve (e.g. OrderedPartitionedKVOutput requests tez.runtime.io.sort.mb MB).
  2. MemoryDistributor.makeInitialAllocations() runs an InitialMemoryAllocator plugin (default: WeightedScalingMemoryDistributor) to scale requests down to fit the container's available heap.
  3. Allocations are dispatched to callbacks; each IO learns its actual budget via MemoryUpdateCallback.memoryAssigned(long).
  4. IO classes resize their buffers accordingly.

Configuration knobs:

KeyEffect
tez.runtime.task.initial.memory.allocator.classPlugin to use. Default WeightedScalingMemoryDistributor.
tez.task.scale.memory.enabledMaster toggle.
tez.task.scale.memory.ratiosPer-IO-class weight overrides.
tez.task.scale.memory.reserve-fractionReserved for processor/JVM.
grep -n "requestInitialMemory\|memoryAssigned" \
  $(find tez-runtime-library/src/main/java -name "OrderedPartitionedKVOutput.java")

Without the distributor an output would request its configured size verbatim, potentially OOMing the container when summed across IOs.


TaskReporter and the umbilical

find tez-runtime-internals/src/main/java -name "TaskReporter*.java"
find tez-api/src/main/java -name "TezTaskUmbilicalProtocol.java"

TaskReporter runs a heartbeat thread per task attempt. Each cycle:

  1. Collect outbound events (counter updates, completion events from completed IOs).
  2. Call umbilical.heartbeat(request) where request contains attempt ID, counters, status messages, and the outbound TezEvent list.
  3. Decode the reply: AM may push back inbound TezEvents (e.g. DataMovementEvents from upstream tasks), a shouldDie flag, or a shouldReset flag.
  4. Dispatch inbound events into the appropriate Input via LogicalIOProcessorRuntimeTask.handleEvents().

Interval: tez.task.am.heartbeat.interval-ms (default 100) plus a counter-update interval tez.task.am.heartbeat.counter.interval-ms (default 4000).

Why heartbeats carry events

Tez has no separate "event bus" between AM and containers. Everything piggybacks on the umbilical heartbeat. This means:

  • Event latency is bounded below by heartbeat.interval-ms.
  • A wedged umbilical (network partition) blocks all task communication; tez.task.timeout-ms (default 5 minutes) eventually fires and the AM considers the attempt lost.

End-to-end task lifecycle inside the JVM

grep -n "phase\|TaskRunnerPhase" $(find tez-runtime-internals/src/main/java -name "TezTaskRunner2.java") | head
PhaseOwnerWhat happens
1 ReceiveTezChild.runumbilical.getTask returns a ContainerTask.
2 BuildTezTaskRunner2Construct LogicalIOProcessorRuntimeTask, hook up TaskReporter.
3 InitLogicalIOProcessorRuntimeTask.initializeMemoryDistributor + parallel IO init + processor init.
4 RunLogicalIOProcessorRuntimeTask.runprocessor.run(inputs, outputs).
5 ClosesameOutputs close (flush spills, emit DataMovementEvents), inputs close.
6 ReportTaskReporter final tickSend counters + completion event. AM transitions attempt to SUCCEEDED.
7 LoopTezChild.runDiscard task object, request next.

Reading exercise

  1. grep -n "shouldDie\|exit(" $(find tez-runtime-internals/src/main/java -name "TezChild.java") — list every termination path.
  2. grep -n "initialize\(\)\|run\(\)\|close\(\)" $(find tez-runtime-internals/src/main/java -name "LogicalIOProcessorRuntimeTask.java") | head -40 — verify the lifecycle order.
  3. cat $(find tez-runtime-internals/src/main/java -name "MemoryDistributor.java") | head -100 — how does it handle the case where summed requests exceed available?
  4. grep -n "heartbeat\|TezTaskUmbilical" $(find tez-runtime-internals/src/main/java -name "TaskReporter.java") | head — find the heartbeat loop body.
  5. cat tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java — read the user-facing processor contract.
  6. wc -l $(find tez-runtime-internals/src/main/java -name "*.java" | head -20) — find the biggest classes in the runtime module.

Common bugs and symptoms

SymptomLikely cause
Container OOM during initMemoryDistributor disabled or summed IO requests exceed heap. Enable tez.task.scale.memory.enabled.
TaskAttempt timed out after 5 min of no heartbeatTaskReporter thread died (uncaught exception) or RPC hung.
Processor sees zero eventsInbound events not delivered — check TaskReporter.heartbeat reply path; common when tez.task.am.heartbeat.interval-ms raised too high.
Container reuse off, JVMs constantly spinning upTezChild.run loop returns shouldDie too eagerly; check AM-side AMContainerImpl reuse decision.
IllegalStateException: Cannot reserve more memoryIO requesting after makeInitialAllocations already ran.
Outputs never close (process hangs)Processor never returned from run(); usually an infinite loop on a KeyValuesReader.

Validation: prove you understand this

  1. Trace, with file:method references, the path from TezChild.main to processor.run for a single attempt.
  2. Explain in two sentences why LogicalIOProcessorRuntimeTask.initialize parallelizes input/output init. Cite the field name.
  3. Given a container with 1 GB heap, one OrderedPartitionedKVOutput requesting 512 MB and two OrderedGroupedKVInputs requesting 256 MB each, compute the actual allocations under the default WeightedScalingMemoryDistributor.
  4. Identify the single umbilical method that delivers inbound TezEvents to the task. Cite the file and the field on the response object.
  5. Sketch the smallest possible AbstractLogicalIOProcessor that prints the class names of all configured inputs and exits. Include initialize, handleEvents, run, close.