DAG Model

A Tez DAG is an immutable plan for a distributed computation. This chapter describes the model classes (DAG, Vertex, Edge, EdgeProperty, DataSourceDescriptor, DataSinkDescriptor, *Descriptor), the protobuf representation that crosses the wire, and the validation rules that turn a "DAG you wrote" into a "DAG the AM will accept."

After this chapter you should be able to write a small DAG by hand, predict which EdgeManager implementation will be picked for each edge, and find any classification rule in the source.


The classes you actually call from a client

All of these live in tez-api:

tez-api/src/main/java/org/apache/tez/dag/api/
  DAG.java
  Vertex.java
  Edge.java
  EdgeProperty.java
  InputDescriptor.java
  OutputDescriptor.java
  ProcessorDescriptor.java
  VertexManagerPluginDescriptor.java
  DataSourceDescriptor.java
  DataSinkDescriptor.java
  EntityDescriptor.java          (base class for all *Descriptors)
  GroupInputEdge.java            (multi-source unioning edge)
  VertexGroup.java               (group of vertices for grouped commits)

Use this command to inspect the API surface:

grep -n "^public " tez-api/src/main/java/org/apache/tez/dag/api/DAG.java | head -40

Every class above is immutable by convention once handed to TezClient. You may mutate via the builder methods (addVertex, addEdge, addDataSource) before submission. After submission the only way to change the plan is via VertexManagerPlugin callbacks (see vertex-lifecycle.md and the Level 4 lab on VertexManager).


EdgeProperty — three orthogonal axes

EdgeProperty.create(DataMovementType, DataSourceType, SchedulingType, OutputDescriptor, InputDescriptor) is the single most important constructor in the API.

grep -n "enum " tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java

The three enums:

EnumValuesWhat it controls
DataMovementTypeONE_TO_ONE, BROADCAST, SCATTER_GATHER, CUSTOMHow outputs are routed from src to dst tasks
DataSourceTypePERSISTED, PERSISTED_RELIABLE, EPHEMERALDurability of intermediate data
SchedulingTypeSEQUENTIAL, CONCURRENTWhether dst tasks must wait for src to finish

Edge type matrix (movement × scheduling)

MovementSchedulingTypical useEdgeManager impl
SCATTER_GATHERSEQUENTIALMap → Reduce shuffleShuffleEdgeManager (the AM-internal default)
ONE_TO_ONESEQUENTIALSorted reducer → re-sorter (rare)OneToOneEdgeManager
BROADCASTSEQUENTIALSmall-side join broadcastBroadcastEdgeManager
CUSTOMSEQUENTIALHive cartesian product, custom partitionerUser-supplied EdgeManagerPlugin
BROADCASTCONCURRENTStreaming push between long-running tasksBroadcastEdgeManager
SCATTER_GATHERCONCURRENT(Unusual — generally invalid for shuffles)

Locate the actual EdgeManager implementations:

find tez-dag/src/main/java -name "*EdgeManager*"

Key files (exact names vary slightly by branch):

tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/
  OneToOneEdgeManagerOnDemand.java
  ScatterGatherEdgeManager.java
  BroadcastEdgeManager.java

Read Edge.java (tez-api) to see how it wires the right manager based on EdgeProperty:

grep -n "EdgeManager\|edgeManager\|createEdgeManager" \
  tez-api/src/main/java/org/apache/tez/dag/api/Edge.java

DataSourceDescriptor vs Input

Beginners frequently confuse these two:

ConceptClassDefined inLives during
Plan-time root-input definitionDataSourceDescriptortez-apiClient + AM (planning)
Runtime input attached to a taskInput (interface)tez-apiTask JVM (execution)

A DataSourceDescriptor describes "how to materialize splits for this vertex" (controller class + input descriptor + (optional) initializer). The AM may run an InputInitializer (e.g., MRInputAMSplitGenerator) to enumerate splits before the vertex starts. The result of that initialization becomes InputDataInformationEvents pushed to tasks (see ipo-abstractions.md and event-routing.md).

At task time the input class is instantiated from the InputDescriptor and called with initialize() → start() → getReader() → close(). The task never sees the DataSourceDescriptor.


The DAGPlan protobuf — the wire format

tez-api/src/main/proto/DAGApiRecords.proto

Inspect:

grep -n "^message " tez-api/src/main/proto/DAGApiRecords.proto

Key messages:

  • DAGPlan — root: name, vertices, edges, plan-level configs, credentials, ACLs.
  • VertexPlan — name, processor descriptor, parallelism, location hints, associated edges, root inputs.
  • EdgePlan — source/dest vertex names, edge properties, edge manager descriptor.
  • TezEntityDescriptorProto{class_name, user_payload, history_text} — the serialized form of any *Descriptor.
  • RootInputLeafOutputProto — the protobuf encoding of DataSourceDescriptor and DataSinkDescriptor.

The conversion from API classes to protobuf happens in:

grep -rn "createDAGPlan\|toProtoFormat" tez-api/src/main/java/org/apache/tez/dag/api/ | head

Specifically DAG.createDag(...) and DagTypeConverters (a kitchen-sink class of to/from helpers).


Validation — what DAG.verify() checks

grep -n "private void.*verify\|public void verify" \
  tez-api/src/main/java/org/apache/tez/dag/api/DAG.java

DAG.verify(restricted=true) enforces, at minimum:

  1. Name uniqueness — vertex names and DAG name are unique.
  2. No cycles — DFS over the edge graph; throws IllegalStateException ("DAG contains a cycle") if any back-edge is found.
  3. Parallelism rules:
    • ONE_TO_ONE edges require source.parallelism == dest.parallelism if both are statically set.
    • Vertices with BROADCAST outputs must have a finite parallelism (since each downstream task receives every output).
  4. Descriptor non-null for required slots (Processor, Output for vertices that produce, Input for vertices that consume).
  5. No "dangling" data sources — every root input is on a real vertex.
  6. VertexManagerPlugin specified explicitly for vertices that need dynamic reconfig (else a default is chosen — see vertex-lifecycle.md for the default rules).

Read the body of verify(...) line-by-line; the comments cite the JIRA that added each check.


How a DAG becomes a plan, end-to-end

flowchart LR
    A[User code: new DAG] --> B[addVertex/addEdge/addDataSource]
    B --> C[TezClient.submitDAG]
    C --> D[DAG.verify]
    D -->|ok| E[DAG.createDag -> DAGPlan proto]
    E --> F[RPC DAGClientAMProtocol.submitDAG]
    F --> G[DAGAppMaster: DAGImpl init]
    G --> H[VertexImpl per VertexPlan]
    H --> I[Edge per EdgePlan; EdgeManager selected]

Each arrow has a citation:

  • verify: DAG.verify(...).
  • createDag: DAG.createDag(BinaryConfig, Credentials, Map<String,LocalResource>, JobTokenSecretManager, boolean tezLrsAsArchive).
  • AM-side: DAGImpl.init() and VertexImpl.constructInputDescriptors(), Edge.<init> (in tez-dag, not the tez-api Edge).

Reading exercise

# Top-level surface
sed -n '1,80p' tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
sed -n '1,80p' tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
sed -n '1,80p' tez-api/src/main/java/org/apache/tez/dag/api/Edge.java

# All the places where DAGPlan is constructed
grep -rn "DAGPlan.newBuilder" tez-api/src/main/java | head

# Cycle detection
grep -n "cycle\|cycleFound\|visit" \
  tez-api/src/main/java/org/apache/tez/dag/api/DAG.java

Answer:

  1. What exception class does DAG.verify() throw on a cycle, and what does its message contain that helps a user diagnose the offending vertex?
  2. Which method on Vertex is used to attach a DataSourceDescriptor? Which to attach a DataSinkDescriptor?
  3. What is the role of DagTypeConverters and why is it preferred over each class owning its own toProto/fromProto methods?
  4. When you call Edge.create(srcV, dstV, EdgeProperty.create(...)), where is the resulting Edge registered? On the source vertex? Destination? The DAG itself?
  5. Suppose you call dag.addVertex(v) twice with the same v instance. What happens, and where in DAG.java is the protection?
  6. What is the difference between DataSourceType.PERSISTED and DataSourceType.PERSISTED_RELIABLE? Find the consumer (search tez-dag for uses of DataSourceType).

Common bugs and symptoms

SymptomRoot causeWhere to look
IllegalStateException: DAG contains a cycle at submissionAccidentally added a back-edgeDAG.verify
Vertex starts with parallelism -1 and never runssetParallelism(-1) and no VertexManagerPlugin to reconfigureVertexImpl.initialize; check for "parallelism not set"
Job hangs with all vertices in INITEDA DataSourceDescriptor has an initializer that never emits eventsSearch AM log for InputInitializerEvent; cross-reference initializer impl
ClassNotFoundException at task start for your ProcessorThe class is in client classpath but not uploaded as a local resourceTezClient.addAppMasterLocalFiles not called; see tez-client.md
EdgeManager mismatch between sides — task hangs readingCustom EdgeManagerPlugin returns inconsistent partition countsAlways run TestEdgeManagerSelf on your plugin
DAGPlan proto exceeds 64 MBEncoding huge userPayload directly into the planUse a side file via LocalResource; payload is byte[] not free-form storage

Validation: prove you understand this

  1. Write, on a whiteboard, a 4-vertex DAG with two SCATTER_GATHER edges and one BROADCAST edge. Annotate each edge with its three EdgeProperty enums. Justify each choice.
  2. Given an edge with (SCATTER_GATHER, PERSISTED, SEQUENTIAL), name the EdgeManager class that will be selected at runtime and the source file where the selection logic lives.
  3. From memory, list the five required arguments to EdgeProperty.create(...).
  4. Open DAG.verify() and identify the first five checks. For each, propose a one-line DAG that would fail it.
  5. In a new method getAllRootInputs(DAG), walk the DAG and return all DataSourceDescriptor objects across all vertices. Compile it; check against DAG.java's own helpers.