Logical vs Physical Plan

Tez exposes two planes to the user and to the runtime:

  • Logical plan — what the application author writes: vertices, edges, edge properties. Lives in tez-api. Immutable once submitted (mostly).
  • Physical plan — what the AM actually schedules: task instances per vertex, per-edge routing decisions, container assignments. Lives in tez-dag. Mutable at runtime via VertexManager reconfiguration and EdgeManagerPlugin routing.

This chapter walks the boundary between them.


The logical plane

A logical DAG is a DAG object containing Vertex objects connected by Edge objects, each carrying an EdgeProperty.

ls tez-api/src/main/java/org/apache/tez/dag/api/ | head -30

Key classes:

ClassFilePurpose
DAGtez-api/src/main/java/org/apache/tez/dag/api/DAG.javaThe DAG builder.
Vertextez-api/src/main/java/org/apache/tez/dag/api/Vertex.javaLogical vertex with processor + target parallelism.
Edgetez-api/src/main/java/org/apache/tez/dag/api/Edge.javaLogical edge between two vertices.
EdgePropertytez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.javaRouting + scheduling + storage characteristics.

EdgeProperty — four orthogonal axes

grep -n "enum DataMovementType\|enum DataSourceType\|enum SchedulingType" \
  tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
public enum DataMovementType { ONE_TO_ONE, BROADCAST, SCATTER_GATHER, CUSTOM }
public enum DataSourceType   { PERSISTED, PERSISTED_RELIABLE, EPHEMERAL }
public enum SchedulingType   { SEQUENTIAL, CONCURRENT }
AxisValuesWhat it controls
DataMovementTypeONE_TO_ONE, BROADCAST, SCATTER_GATHER, CUSTOMHow source outputs map to destination inputs.
DataSourceTypePERSISTED, PERSISTED_RELIABLE, EPHEMERALWhether outputs survive a task failure; affects re-execution policy.
SchedulingTypeSEQUENTIAL, CONCURRENTWhether destination can start before source completes (required for pipelined shuffle and broadcast).
OutputDescriptor / InputDescriptorclass names + payloadsThe IO classes wired on each side of the edge.

A logical edge says nothing about which destination task index reads from which source task index. That decision is the EdgeManagerPlugin.


The physical plane

When the AM initializes a DAG it builds:

  • VertexImpl per logical Vertex (tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java)
  • TaskImpl[] per vertex, sized by parallelism (tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java)
  • TaskAttemptImpl per attempt of each task (tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java)
  • Edge runtime objects with an active EdgeManagerPlugin (tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java)
wc -l tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/{VertexImpl,TaskImpl,TaskAttemptImpl,Edge}.java

Mapping logical to physical

flowchart LR
  subgraph logical[Logical]
    LV1[Vertex A parallelism=3]
    LV2[Vertex B parallelism=2]
    LV1 -- "EdgeProperty SCATTER_GATHER" --> LV2
  end
  subgraph physical[Physical]
    A0[A.0] --> B0[B.0]
    A0 --> B1[B.1]
    A1[A.1] --> B0
    A1 --> B1
    A2[A.2] --> B0
    A2 --> B1
  end
  logical --> physical

Every source attempt produces partitions for every destination task. The EdgeManager decides which output partition goes to which input.


EdgeManagerPlugin — the routing brain

find tez-api/src/main/java -name "EdgeManagerPlugin.java"
cat tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java

The contract (paraphrased):

public abstract class EdgeManagerPlugin {
  public abstract void routeDataMovementEventToDestination(
      DataMovementEvent event,
      int srcTaskIndex,
      int outputIndex,
      Map<Integer, List<Integer>> destTaskAndInputIndices);

  public abstract int getNumDestinationConsumerTasks(int srcTaskIndex);
  public abstract int getDestinationConsumerTaskNumber(int srcTaskIndex,
                                                       int srcOutputIndex);
  public abstract int getNumDestinationTaskPhysicalInputs(int destTaskIndex);
  public abstract int getNumSourceTaskPhysicalOutputs(int srcTaskIndex);
}

Built-in implementations

find tez-dag/src/main/java -name "*EdgeManager*.java"
PluginDataMovementTypeRouting rule
ScatterGatherEdgeManagerSCATTER_GATHERSource task i produces N partitions; destination d reads partition d from every source.
BroadcastEdgeManagerBROADCASTEvery source output is consumed by every destination task.
OneToOneEdgeManagerONE_TO_ONERequires srcParallelism == destParallelism. Source i → destination i.
User-suppliedCUSTOMAnything. Cartesian product, range partitioning, etc.

Inspecting routing on a live AM

grep -n "EdgeManager\|setCustomEdgeManager" \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java | head -20

For each destination task Edge.sendTezEventToDestinationTasks() consults the plugin to expand source outputs into per-destination input events. The destination task receives a DataMovementEvent per logical input partition.


SCATTER_GATHER walkthrough

Source: A with parallelism 3, each task emits N partitions. Destination: B with parallelism 2.

cat tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java

For source task A.1 emitting partitions [0, 1]:

CallReturns
getNumSourceTaskPhysicalOutputs(1)2 (= destination parallelism)
getNumDestinationTaskPhysicalInputs(0)3 (= source parallelism)
getNumDestinationConsumerTasks(1)2
routeDataMovementEventToDestination(event, 1, 0, out)out = { 0 -> [1] }
routeDataMovementEventToDestination(event, 1, 1, out)out = { 1 -> [1] }

Invariant: numSrcOutputs == destParallelism, numDestInputs == srcParallelism.


ONE_TO_ONE walkthrough

cat tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java

Requires numSrcTasks == numDestTasks. Each source produces exactly one partition consumed by exactly one destination of the same index.

Common bug: changing destination parallelism via reconfigureVertex while a ONE_TO_ONE edge feeds it. Tez throws at edge initialization.


BROADCAST walkthrough

cat tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java

Source emits a single logical output. Every destination task receives one input event per source task. Cost scales as srcParallelism * destParallelism — large broadcast vertices are an antipattern.


CUSTOM walkthrough — CartesianProductEdgeManager

find tez-dag/src/main/java -name "CartesianProductEdgeManager*.java"
wc -l $(find tez-dag/src/main/java -name "CartesianProductEdgeManager*.java")

CartesianProductVertexManager chunks source outputs and creates a 2D grid of destination tasks; the edge manager projects (srcChunkX, srcChunkY) → destIndex.

The CUSTOM movement type is the contract by which Hive ships its own routing for unconventional joins.


Runtime mutation: parallelism reconfiguration

A logical Vertex declares a target parallelism; the physical parallelism can change before the vertex starts running, via the VertexManager:

grep -n "reconfigureVertex" tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java

reconfigureVertex(int parallelism, VertexLocationHint, Map<String,EdgeProperty>) does three things in one atomic step inside VertexImpl:

  1. Resizes the TaskImpl[] array (must happen before any task is scheduled).
  2. Re-installs EdgeManagerPlugin instances on incoming edges.
  3. Updates location hints used by the scheduler.

Read the state machine guard:

grep -n "reconfigureVertex\|VertexState.INITED\|VertexState.INITIALIZING" \
  tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java | head -30

Reconfiguration is illegal once any task has been scheduled.

Worked example: ShuffleVertexManager auto-parallelism

  1. Vertex R declared with parallelism = 100 (pessimistic upper bound).
  2. Upstream tasks emit VertexManagerEvent payloads with byte counts per partition.
  3. ShuffleVertexManager.onVertexManagerEventReceived accumulates totals.
  4. After the slow-start threshold, it computes target = ceil(totalBytes / desiredTaskInputSize) clamped to [minParallelism, originalParallelism].
  5. Calls reconfigureVertex(target, null, updatedEdgeProps).
  6. VertexImpl resizes from 100 → e.g. 17 task instances.
  7. The edge manager on the incoming SCATTER_GATHER edge is rebuilt to route 100-partition outputs into 17 destinations (merging at the destination).

Reading exercise

  1. grep -n "createEdgeManager\|edgeManager =" tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java — when is the EdgeManagerPlugin instantiated?
  2. cat tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java | head -100 — list all factory methods on EdgeProperty. Which require an EdgeManagerPluginDescriptor?
  3. grep -n "setParallelism\|setVertexParallelism" tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java | head — which state transitions accept a parallelism change?
  4. grep -rn "OneToOneEdgeManager\|ScatterGatherEdgeManager\|BroadcastEdgeManager" tez-dag/src/test — list the unit tests covering each built-in routing.
  5. cat tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/CartesianProductEdgeManager.java | head -120 — what state must this plugin keep across destination task initializations?
  6. grep -n "EdgeProperty\." ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java | head — which edge property combinations does Hive build?

Answer these:

  • For SCATTER_GATHER what is the size of the output partition array of one source task?
  • For ONE_TO_ONE, what happens if the upstream vertex auto-parallelizes from 100 → 17 after the destination has been initialized?
  • For BROADCAST, what is the data volume amplification?
  • Which EdgeManager methods are called on every destination task init, and which once per edge?

Common bugs and symptoms

SymptomLikely cause
Vertex failed: Cannot change parallelism after tasks scheduledVertexManager.reconfigureVertex invoked after scheduleTasks. Fix ordering.
OneToOneEdgeManager: srcParallelism != destParallelismAuto-parallelism broke the ONE_TO_ONE invariant. Forbid auto-parallelism on ONE_TO_ONE edges.
Destination task receives 0 DataMovementEventsCustom EdgeManagerPlugin returned 0 from getNumDestinationTaskPhysicalInputs.
Hive query produces wrong row counts after a custom joinCUSTOM EdgeManagerPlugin mis-routed partitions; fence-post bug in routeDataMovementEventToDestination.
BROADCAST edge OOMs the destinationSource parallelism × payload size exceeds destination heap; switch to PERSISTED source type and stream from disk.

Validation: prove you understand this

  1. Given Vertex A (parallelism=4) SCATTER_GATHERVertex B (parallelism=3), compute the number of DataMovementEvents B.1 receives. Show the work.
  2. Explain in one sentence each: when does EdgeManagerPlugin get re-instantiated, and when does it survive across reconfiguration?
  3. Write a one-paragraph rejection of "let's just use BROADCAST for our 500-task lookup vertex" referencing concrete cost.
  4. Identify the exact line in VertexImpl.java where reconfigureVertex is rejected if tasks have been scheduled. Cite path + line number from grep -n.
  5. Sketch a CUSTOM EdgeManagerPlugin for range-partitioned merge: source task i emits keys in range [i*R, (i+1)*R); the destination is K tasks where K may differ from source parallelism. Define getNumDestinationTaskPhysicalInputs and the routing rule in code.