Logical vs Physical Plan
Tez exposes two planes to the user and to the runtime:
- Logical plan — what the application author writes: vertices, edges, edge
properties. Lives in
tez-api. Immutable once submitted (mostly). - Physical plan — what the AM actually schedules: task instances per vertex,
per-edge routing decisions, container assignments. Lives in
tez-dag. Mutable at runtime viaVertexManagerreconfiguration andEdgeManagerPluginrouting.
This chapter walks the boundary between them.
The logical plane
A logical DAG is a DAG object containing Vertex objects connected by Edge
objects, each carrying an EdgeProperty.
ls tez-api/src/main/java/org/apache/tez/dag/api/ | head -30
Key classes:
| Class | File | Purpose |
|---|---|---|
DAG | tez-api/src/main/java/org/apache/tez/dag/api/DAG.java | The DAG builder. |
Vertex | tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java | Logical vertex with processor + target parallelism. |
Edge | tez-api/src/main/java/org/apache/tez/dag/api/Edge.java | Logical edge between two vertices. |
EdgeProperty | tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java | Routing + scheduling + storage characteristics. |
EdgeProperty — four orthogonal axes
grep -n "enum DataMovementType\|enum DataSourceType\|enum SchedulingType" \
tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
public enum DataMovementType { ONE_TO_ONE, BROADCAST, SCATTER_GATHER, CUSTOM }
public enum DataSourceType { PERSISTED, PERSISTED_RELIABLE, EPHEMERAL }
public enum SchedulingType { SEQUENTIAL, CONCURRENT }
| Axis | Values | What it controls |
|---|---|---|
DataMovementType | ONE_TO_ONE, BROADCAST, SCATTER_GATHER, CUSTOM | How source outputs map to destination inputs. |
DataSourceType | PERSISTED, PERSISTED_RELIABLE, EPHEMERAL | Whether outputs survive a task failure; affects re-execution policy. |
SchedulingType | SEQUENTIAL, CONCURRENT | Whether destination can start before source completes (required for pipelined shuffle and broadcast). |
OutputDescriptor / InputDescriptor | class names + payloads | The IO classes wired on each side of the edge. |
A logical edge says nothing about which destination task index reads from which source task index. That decision is the EdgeManagerPlugin.
The physical plane
When the AM initializes a DAG it builds:
VertexImplper logicalVertex(tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java)TaskImpl[]per vertex, sized by parallelism (tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java)TaskAttemptImplper attempt of each task (tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java)Edgeruntime objects with an activeEdgeManagerPlugin(tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java)
wc -l tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/{VertexImpl,TaskImpl,TaskAttemptImpl,Edge}.java
Mapping logical to physical
flowchart LR
subgraph logical[Logical]
LV1[Vertex A parallelism=3]
LV2[Vertex B parallelism=2]
LV1 -- "EdgeProperty SCATTER_GATHER" --> LV2
end
subgraph physical[Physical]
A0[A.0] --> B0[B.0]
A0 --> B1[B.1]
A1[A.1] --> B0
A1 --> B1
A2[A.2] --> B0
A2 --> B1
end
logical --> physical
Every source attempt produces partitions for every destination task. The
EdgeManager decides which output partition goes to which input.
EdgeManagerPlugin — the routing brain
find tez-api/src/main/java -name "EdgeManagerPlugin.java"
cat tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java
The contract (paraphrased):
public abstract class EdgeManagerPlugin {
public abstract void routeDataMovementEventToDestination(
DataMovementEvent event,
int srcTaskIndex,
int outputIndex,
Map<Integer, List<Integer>> destTaskAndInputIndices);
public abstract int getNumDestinationConsumerTasks(int srcTaskIndex);
public abstract int getDestinationConsumerTaskNumber(int srcTaskIndex,
int srcOutputIndex);
public abstract int getNumDestinationTaskPhysicalInputs(int destTaskIndex);
public abstract int getNumSourceTaskPhysicalOutputs(int srcTaskIndex);
}
Built-in implementations
find tez-dag/src/main/java -name "*EdgeManager*.java"
| Plugin | DataMovementType | Routing rule |
|---|---|---|
ScatterGatherEdgeManager | SCATTER_GATHER | Source task i produces N partitions; destination d reads partition d from every source. |
BroadcastEdgeManager | BROADCAST | Every source output is consumed by every destination task. |
OneToOneEdgeManager | ONE_TO_ONE | Requires srcParallelism == destParallelism. Source i → destination i. |
| User-supplied | CUSTOM | Anything. Cartesian product, range partitioning, etc. |
Inspecting routing on a live AM
grep -n "EdgeManager\|setCustomEdgeManager" \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java | head -20
For each destination task Edge.sendTezEventToDestinationTasks() consults the
plugin to expand source outputs into per-destination input events. The
destination task receives a DataMovementEvent per logical input partition.
SCATTER_GATHER walkthrough
Source: A with parallelism 3, each task emits N partitions.
Destination: B with parallelism 2.
cat tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
For source task A.1 emitting partitions [0, 1]:
| Call | Returns |
|---|---|
getNumSourceTaskPhysicalOutputs(1) | 2 (= destination parallelism) |
getNumDestinationTaskPhysicalInputs(0) | 3 (= source parallelism) |
getNumDestinationConsumerTasks(1) | 2 |
routeDataMovementEventToDestination(event, 1, 0, out) | out = { 0 -> [1] } |
routeDataMovementEventToDestination(event, 1, 1, out) | out = { 1 -> [1] } |
Invariant: numSrcOutputs == destParallelism, numDestInputs == srcParallelism.
ONE_TO_ONE walkthrough
cat tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
Requires numSrcTasks == numDestTasks. Each source produces exactly one
partition consumed by exactly one destination of the same index.
Common bug: changing destination parallelism via reconfigureVertex while a
ONE_TO_ONE edge feeds it. Tez throws at edge initialization.
BROADCAST walkthrough
cat tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
Source emits a single logical output. Every destination task receives one
input event per source task. Cost scales as srcParallelism * destParallelism
— large broadcast vertices are an antipattern.
CUSTOM walkthrough — CartesianProductEdgeManager
find tez-dag/src/main/java -name "CartesianProductEdgeManager*.java"
wc -l $(find tez-dag/src/main/java -name "CartesianProductEdgeManager*.java")
CartesianProductVertexManager chunks source outputs and creates a 2D grid of
destination tasks; the edge manager projects (srcChunkX, srcChunkY) → destIndex.
The CUSTOM movement type is the contract by which Hive ships its own routing
for unconventional joins.
Runtime mutation: parallelism reconfiguration
A logical Vertex declares a target parallelism; the physical parallelism
can change before the vertex starts running, via the VertexManager:
grep -n "reconfigureVertex" tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
reconfigureVertex(int parallelism, VertexLocationHint, Map<String,EdgeProperty>)
does three things in one atomic step inside VertexImpl:
- Resizes the
TaskImpl[]array (must happen before any task is scheduled). - Re-installs
EdgeManagerPlugininstances on incoming edges. - Updates location hints used by the scheduler.
Read the state machine guard:
grep -n "reconfigureVertex\|VertexState.INITED\|VertexState.INITIALIZING" \
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java | head -30
Reconfiguration is illegal once any task has been scheduled.
Worked example: ShuffleVertexManager auto-parallelism
- Vertex
Rdeclared with parallelism = 100 (pessimistic upper bound). - Upstream tasks emit
VertexManagerEventpayloads with byte counts per partition. ShuffleVertexManager.onVertexManagerEventReceivedaccumulates totals.- After the slow-start threshold, it computes
target = ceil(totalBytes / desiredTaskInputSize)clamped to[minParallelism, originalParallelism]. - Calls
reconfigureVertex(target, null, updatedEdgeProps). VertexImplresizes from 100 → e.g. 17 task instances.- The edge manager on the incoming
SCATTER_GATHERedge is rebuilt to route 100-partition outputs into 17 destinations (merging at the destination).
Reading exercise
grep -n "createEdgeManager\|edgeManager =" tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java— when is theEdgeManagerPlugininstantiated?cat tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java | head -100— list all factory methods onEdgeProperty. Which require anEdgeManagerPluginDescriptor?grep -n "setParallelism\|setVertexParallelism" tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java | head— which state transitions accept a parallelism change?grep -rn "OneToOneEdgeManager\|ScatterGatherEdgeManager\|BroadcastEdgeManager" tez-dag/src/test— list the unit tests covering each built-in routing.cat tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/CartesianProductEdgeManager.java | head -120— what state must this plugin keep across destination task initializations?grep -n "EdgeProperty\." ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java | head— which edge property combinations does Hive build?
Answer these:
- For
SCATTER_GATHERwhat is the size of the output partition array of one source task? - For
ONE_TO_ONE, what happens if the upstream vertex auto-parallelizes from 100 → 17 after the destination has been initialized? - For
BROADCAST, what is the data volume amplification? - Which
EdgeManagermethods are called on every destination task init, and which once per edge?
Common bugs and symptoms
| Symptom | Likely cause |
|---|---|
Vertex failed: Cannot change parallelism after tasks scheduled | VertexManager.reconfigureVertex invoked after scheduleTasks. Fix ordering. |
OneToOneEdgeManager: srcParallelism != destParallelism | Auto-parallelism broke the ONE_TO_ONE invariant. Forbid auto-parallelism on ONE_TO_ONE edges. |
Destination task receives 0 DataMovementEvents | Custom EdgeManagerPlugin returned 0 from getNumDestinationTaskPhysicalInputs. |
| Hive query produces wrong row counts after a custom join | CUSTOM EdgeManagerPlugin mis-routed partitions; fence-post bug in routeDataMovementEventToDestination. |
BROADCAST edge OOMs the destination | Source parallelism × payload size exceeds destination heap; switch to PERSISTED source type and stream from disk. |
Validation: prove you understand this
- Given
Vertex A (parallelism=4)SCATTER_GATHER→Vertex B (parallelism=3), compute the number ofDataMovementEventsB.1receives. Show the work. - Explain in one sentence each: when does
EdgeManagerPluginget re-instantiated, and when does it survive across reconfiguration? - Write a one-paragraph rejection of "let's just use
BROADCASTfor our 500-task lookup vertex" referencing concrete cost. - Identify the exact line in
VertexImpl.javawherereconfigureVertexis rejected if tasks have been scheduled. Cite path + line number fromgrep -n. - Sketch a
CUSTOMEdgeManagerPluginfor range-partitioned merge: source taskiemits keys in range[i*R, (i+1)*R); the destination is K tasks where K may differ from source parallelism. DefinegetNumDestinationTaskPhysicalInputsand the routing rule in code.