Tez Runtime Internals
The Tez runtime is the code that runs inside the container, not inside the AM. Its job: boot a JVM, accept tasks from the AM over umbilical RPC, run them to completion, and report status.
Three modules collaborate:
tez-runtime-internals— process boot, task driver, umbilical client, memory broker.tez-runtime-library— concreteInput/Processor/Outputimplementations (KV, shuffle, etc).tez-api— the SPI the user implements (AbstractLogicalInput,AbstractLogicalOutput,AbstractLogicalIOProcessor).
ls tez-runtime-internals/src/main/java/org/apache/tez/runtime/
The container process: TezChild
TezChild.main() is the JVM entry point for every Tez task container.
find tez-runtime-internals/src/main/java -name "TezChild.java"
grep -n "public static void main\|new TezChild\|run()" \
$(find tez-runtime-internals/src/main/java -name "TezChild.java")
Boot sequence (paraphrased from TezChild.java):
- Read JVM args: AM host, AM port, container ID, application attempt ID, PID, JVM identifier.
- Read the security tokens from
$HADOOP_TOKEN_FILE_LOCATION. - Construct
TezTaskUmbilicalProtocolRPC proxy pointing at the AMTaskAttemptListenerImpl. - Enter
TezChild.run()— an infinite loop: a.umbilical.getTask(containerContext)— blocks until the AM hands us aContainerTask. b. IfContainerTask.shouldDie(), exit cleanly. c. Otherwise build aTezTaskRunner2for the assigned attempt and callrunner.run(). d. Loop — this is container reuse: same JVM, next task.
flowchart TD
S[JVM start] --> P[Parse args + tokens]
P --> R[RPC connect to AM]
R --> L{umbilical.getTask}
L -- shouldDie --> X[exit]
L -- new task --> T[TezTaskRunner2.run]
T --> L
Why container reuse needs this loop
Allocating a YARN container costs hundreds of ms; starting a JVM costs
seconds. Tez amortizes both by running multiple tasks in the same TezChild
process. See container-reuse.md for the AM side.
TezTaskRunner2 — the task driver
find tez-runtime-internals/src/main/java -name "TezTaskRunner2.java"
wc -l $(find tez-runtime-internals/src/main/java -name "TezTaskRunner2.java")
Per-attempt driver. Owns:
- a
LogicalIOProcessorRuntimeTask(the actual task body), - the input/output initialization thread pool,
- abort hooks (kill, fatal error, timeout).
Lifecycle of a single attempt:
sequenceDiagram
participant TC as TezChild
participant TR as TezTaskRunner2
participant T as LogicalIOProcessorRuntimeTask
participant IO as Inputs / Outputs
participant P as Processor
TC->>TR: new + run()
TR->>T: initialize()
T->>IO: input.initialize() (parallel)
T->>IO: output.initialize() (parallel)
T->>P: processor.initialize()
TR->>T: run() => processor.run(inputs, outputs)
P->>IO: read inputs, write outputs
T->>IO: output.close() (parallel)
T->>IO: input.close() (parallel)
TR->>TC: result (success/failure)
Failure routes:
- Input init throws →
TaskFailedExceptionto AM, attempt fails. - Processor throws → same.
- AM sends
killvia umbilical heartbeat reply →TezTaskRunner2.killTask()interrupts the processor thread. - Fatal error on any IO →
TaskReporter.notifyFatalError()short-circuits the run.
LogicalIOProcessorRuntimeTask — orchestrator
find tez-runtime-internals/src/main/java -name "LogicalIOProcessorRuntimeTask.java"
wc -l $(find tez-runtime-internals/src/main/java -name "LogicalIOProcessorRuntimeTask.java")
This is the class that actually instantiates the user's IPO triple.
initialize() does, in order:
- Build the
TezConfigurationfor this task from the AM-providedTaskSpec. - Build the
MemoryDistributor(next section) over all IOs. - For each
InputSpec: instantiate the input class, set itsInputContext, callinitialize()on a worker thread. - Same for each
OutputSpec. - Instantiate the processor; call
processor.initialize(processorContext). - Wait for all input/output
initialize()calls to complete (parallel).
run():
- Block until every input reports it has data (or signals empty).
- Call
processor.run(inputs, outputs)on the main thread. - On return, call
output.close()for every output (parallel), theninput.close()for every input (parallel). - Collect counters; hand the final
TaskStatusback toTezTaskRunner2.
Key field:
grep -n "initializerCompletionService\|runInputRunnable\|runOutputRunnable" \
$(find tez-runtime-internals/src/main/java -name "LogicalIOProcessorRuntimeTask.java") | head
Parallel init is what makes Tez fast for processors with many inputs (eg multi-input joins).
MemoryDistributor
find tez-runtime-internals/src/main/java -name "MemoryDistributor.java"
cat $(find tez-runtime-internals/src/main/java -name "MemoryDistributor.java") | head -160
A single broker that hands out portions of the task's JVM heap to IOs that ask for memory.
Flow:
- At task init, each
Input/Outputcallscontext.requestInitialMemory(size, callback)with what it would like to reserve (e.g.OrderedPartitionedKVOutputrequeststez.runtime.io.sort.mbMB). MemoryDistributor.makeInitialAllocations()runs anInitialMemoryAllocatorplugin (default:WeightedScalingMemoryDistributor) to scale requests down to fit the container's available heap.- Allocations are dispatched to callbacks; each IO learns its actual budget
via
MemoryUpdateCallback.memoryAssigned(long). - IO classes resize their buffers accordingly.
Configuration knobs:
| Key | Effect |
|---|---|
tez.runtime.task.initial.memory.allocator.class | Plugin to use. Default WeightedScalingMemoryDistributor. |
tez.task.scale.memory.enabled | Master toggle. |
tez.task.scale.memory.ratios | Per-IO-class weight overrides. |
tez.task.scale.memory.reserve-fraction | Reserved for processor/JVM. |
grep -n "requestInitialMemory\|memoryAssigned" \
$(find tez-runtime-library/src/main/java -name "OrderedPartitionedKVOutput.java")
Without the distributor an output would request its configured size verbatim, potentially OOMing the container when summed across IOs.
TaskReporter and the umbilical
find tez-runtime-internals/src/main/java -name "TaskReporter*.java"
find tez-api/src/main/java -name "TezTaskUmbilicalProtocol.java"
TaskReporter runs a heartbeat thread per task attempt. Each cycle:
- Collect outbound events (counter updates, completion events from completed IOs).
- Call
umbilical.heartbeat(request)whererequestcontains attempt ID, counters, status messages, and the outboundTezEventlist. - Decode the reply: AM may push back inbound
TezEvents (e.g.DataMovementEvents from upstream tasks), ashouldDieflag, or ashouldResetflag. - Dispatch inbound events into the appropriate Input via
LogicalIOProcessorRuntimeTask.handleEvents().
Interval: tez.task.am.heartbeat.interval-ms (default 100) plus a
counter-update interval tez.task.am.heartbeat.counter.interval-ms (default
4000).
Why heartbeats carry events
Tez has no separate "event bus" between AM and containers. Everything piggybacks on the umbilical heartbeat. This means:
- Event latency is bounded below by
heartbeat.interval-ms. - A wedged umbilical (network partition) blocks all task communication;
tez.task.timeout-ms(default 5 minutes) eventually fires and the AM considers the attempt lost.
End-to-end task lifecycle inside the JVM
grep -n "phase\|TaskRunnerPhase" $(find tez-runtime-internals/src/main/java -name "TezTaskRunner2.java") | head
| Phase | Owner | What happens |
|---|---|---|
| 1 Receive | TezChild.run | umbilical.getTask returns a ContainerTask. |
| 2 Build | TezTaskRunner2 | Construct LogicalIOProcessorRuntimeTask, hook up TaskReporter. |
| 3 Init | LogicalIOProcessorRuntimeTask.initialize | MemoryDistributor + parallel IO init + processor init. |
| 4 Run | LogicalIOProcessorRuntimeTask.run | processor.run(inputs, outputs). |
| 5 Close | same | Outputs close (flush spills, emit DataMovementEvents), inputs close. |
| 6 Report | TaskReporter final tick | Send counters + completion event. AM transitions attempt to SUCCEEDED. |
| 7 Loop | TezChild.run | Discard task object, request next. |
Reading exercise
grep -n "shouldDie\|exit(" $(find tez-runtime-internals/src/main/java -name "TezChild.java")— list every termination path.grep -n "initialize\(\)\|run\(\)\|close\(\)" $(find tez-runtime-internals/src/main/java -name "LogicalIOProcessorRuntimeTask.java") | head -40— verify the lifecycle order.cat $(find tez-runtime-internals/src/main/java -name "MemoryDistributor.java") | head -100— how does it handle the case where summed requests exceed available?grep -n "heartbeat\|TezTaskUmbilical" $(find tez-runtime-internals/src/main/java -name "TaskReporter.java") | head— find the heartbeat loop body.cat tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java— read the user-facing processor contract.wc -l $(find tez-runtime-internals/src/main/java -name "*.java" | head -20)— find the biggest classes in the runtime module.
Common bugs and symptoms
| Symptom | Likely cause |
|---|---|
| Container OOM during init | MemoryDistributor disabled or summed IO requests exceed heap. Enable tez.task.scale.memory.enabled. |
TaskAttempt timed out after 5 min of no heartbeat | TaskReporter thread died (uncaught exception) or RPC hung. |
| Processor sees zero events | Inbound events not delivered — check TaskReporter.heartbeat reply path; common when tez.task.am.heartbeat.interval-ms raised too high. |
| Container reuse off, JVMs constantly spinning up | TezChild.run loop returns shouldDie too eagerly; check AM-side AMContainerImpl reuse decision. |
IllegalStateException: Cannot reserve more memory | IO requesting after makeInitialAllocations already ran. |
| Outputs never close (process hangs) | Processor never returned from run(); usually an infinite loop on a KeyValuesReader. |
Validation: prove you understand this
- Trace, with file:method references, the path from
TezChild.maintoprocessor.runfor a single attempt. - Explain in two sentences why
LogicalIOProcessorRuntimeTask.initializeparallelizes input/output init. Cite the field name. - Given a container with 1 GB heap, one
OrderedPartitionedKVOutputrequesting 512 MB and twoOrderedGroupedKVInputs requesting 256 MB each, compute the actual allocations under the defaultWeightedScalingMemoryDistributor. - Identify the single umbilical method that delivers inbound
TezEvents to the task. Cite the file and the field on the response object. - Sketch the smallest possible
AbstractLogicalIOProcessorthat prints the class names of all configured inputs and exits. Includeinitialize,handleEvents,run,close.