DAG Model
A Tez DAG is an immutable plan for a distributed computation. This chapter
describes the model classes (DAG, Vertex, Edge, EdgeProperty,
DataSourceDescriptor, DataSinkDescriptor, *Descriptor), the protobuf
representation that crosses the wire, and the validation rules that turn a
"DAG you wrote" into a "DAG the AM will accept."
After this chapter you should be able to write a small DAG by hand, predict
which EdgeManager implementation will be picked for each edge, and find any
classification rule in the source.
The classes you actually call from a client
All of these live in tez-api:
tez-api/src/main/java/org/apache/tez/dag/api/
DAG.java
Vertex.java
Edge.java
EdgeProperty.java
InputDescriptor.java
OutputDescriptor.java
ProcessorDescriptor.java
VertexManagerPluginDescriptor.java
DataSourceDescriptor.java
DataSinkDescriptor.java
EntityDescriptor.java (base class for all *Descriptors)
GroupInputEdge.java (multi-source unioning edge)
VertexGroup.java (group of vertices for grouped commits)
Use this command to inspect the API surface:
grep -n "^public " tez-api/src/main/java/org/apache/tez/dag/api/DAG.java | head -40
Every class above is immutable by convention once handed to TezClient.
You may mutate via the builder methods (addVertex, addEdge, addDataSource)
before submission. After submission the only way to change the plan is via
VertexManagerPlugin callbacks (see vertex-lifecycle.md
and the Level 4 lab on VertexManager).
EdgeProperty — three orthogonal axes
EdgeProperty.create(DataMovementType, DataSourceType, SchedulingType, OutputDescriptor, InputDescriptor)
is the single most important constructor in the API.
grep -n "enum " tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
The three enums:
| Enum | Values | What it controls |
|---|---|---|
DataMovementType | ONE_TO_ONE, BROADCAST, SCATTER_GATHER, CUSTOM | How outputs are routed from src to dst tasks |
DataSourceType | PERSISTED, PERSISTED_RELIABLE, EPHEMERAL | Durability of intermediate data |
SchedulingType | SEQUENTIAL, CONCURRENT | Whether dst tasks must wait for src to finish |
Edge type matrix (movement × scheduling)
| Movement | Scheduling | Typical use | EdgeManager impl |
|---|---|---|---|
| SCATTER_GATHER | SEQUENTIAL | Map → Reduce shuffle | ShuffleEdgeManager (the AM-internal default) |
| ONE_TO_ONE | SEQUENTIAL | Sorted reducer → re-sorter (rare) | OneToOneEdgeManager |
| BROADCAST | SEQUENTIAL | Small-side join broadcast | BroadcastEdgeManager |
| CUSTOM | SEQUENTIAL | Hive cartesian product, custom partitioner | User-supplied EdgeManagerPlugin |
| BROADCAST | CONCURRENT | Streaming push between long-running tasks | BroadcastEdgeManager |
| SCATTER_GATHER | CONCURRENT | (Unusual — generally invalid for shuffles) | — |
Locate the actual EdgeManager implementations:
find tez-dag/src/main/java -name "*EdgeManager*"
Key files (exact names vary slightly by branch):
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/
OneToOneEdgeManagerOnDemand.java
ScatterGatherEdgeManager.java
BroadcastEdgeManager.java
Read Edge.java (tez-api) to see how it wires the right manager based on
EdgeProperty:
grep -n "EdgeManager\|edgeManager\|createEdgeManager" \
tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
DataSourceDescriptor vs Input
Beginners frequently confuse these two:
| Concept | Class | Defined in | Lives during |
|---|---|---|---|
| Plan-time root-input definition | DataSourceDescriptor | tez-api | Client + AM (planning) |
| Runtime input attached to a task | Input (interface) | tez-api | Task JVM (execution) |
A DataSourceDescriptor describes "how to materialize splits for this vertex"
(controller class + input descriptor + (optional) initializer). The AM may run
an InputInitializer (e.g., MRInputAMSplitGenerator) to enumerate splits
before the vertex starts. The result of that initialization becomes
InputDataInformationEvents pushed to tasks (see
ipo-abstractions.md and
event-routing.md).
At task time the input class is instantiated from the InputDescriptor and
called with initialize() → start() → getReader() → close(). The task never
sees the DataSourceDescriptor.
The DAGPlan protobuf — the wire format
tez-api/src/main/proto/DAGApiRecords.proto
Inspect:
grep -n "^message " tez-api/src/main/proto/DAGApiRecords.proto
Key messages:
DAGPlan— root: name, vertices, edges, plan-level configs, credentials, ACLs.VertexPlan— name, processor descriptor, parallelism, location hints, associated edges, root inputs.EdgePlan— source/dest vertex names, edge properties, edge manager descriptor.TezEntityDescriptorProto—{class_name, user_payload, history_text}— the serialized form of any*Descriptor.RootInputLeafOutputProto— the protobuf encoding ofDataSourceDescriptorandDataSinkDescriptor.
The conversion from API classes to protobuf happens in:
grep -rn "createDAGPlan\|toProtoFormat" tez-api/src/main/java/org/apache/tez/dag/api/ | head
Specifically DAG.createDag(...) and DagTypeConverters (a kitchen-sink
class of to/from helpers).
Validation — what DAG.verify() checks
grep -n "private void.*verify\|public void verify" \
tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
DAG.verify(restricted=true) enforces, at minimum:
- Name uniqueness — vertex names and DAG name are unique.
- No cycles — DFS over the edge graph; throws
IllegalStateException("DAG contains a cycle") if any back-edge is found. - Parallelism rules:
ONE_TO_ONEedges requiresource.parallelism == dest.parallelismif both are statically set.- Vertices with
BROADCASToutputs must have a finite parallelism (since each downstream task receives every output).
- Descriptor non-null for required slots (Processor, Output for vertices that produce, Input for vertices that consume).
- No "dangling" data sources — every root input is on a real vertex.
VertexManagerPluginspecified explicitly for vertices that need dynamic reconfig (else a default is chosen — see vertex-lifecycle.md for the default rules).
Read the body of verify(...) line-by-line; the comments cite the JIRA that
added each check.
How a DAG becomes a plan, end-to-end
flowchart LR
A[User code: new DAG] --> B[addVertex/addEdge/addDataSource]
B --> C[TezClient.submitDAG]
C --> D[DAG.verify]
D -->|ok| E[DAG.createDag -> DAGPlan proto]
E --> F[RPC DAGClientAMProtocol.submitDAG]
F --> G[DAGAppMaster: DAGImpl init]
G --> H[VertexImpl per VertexPlan]
H --> I[Edge per EdgePlan; EdgeManager selected]
Each arrow has a citation:
verify:DAG.verify(...).createDag:DAG.createDag(BinaryConfig, Credentials, Map<String,LocalResource>, JobTokenSecretManager, boolean tezLrsAsArchive).- AM-side:
DAGImpl.init()andVertexImpl.constructInputDescriptors(),Edge.<init>(intez-dag, not thetez-apiEdge).
Reading exercise
# Top-level surface
sed -n '1,80p' tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
sed -n '1,80p' tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
sed -n '1,80p' tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
# All the places where DAGPlan is constructed
grep -rn "DAGPlan.newBuilder" tez-api/src/main/java | head
# Cycle detection
grep -n "cycle\|cycleFound\|visit" \
tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
Answer:
- What exception class does
DAG.verify()throw on a cycle, and what does its message contain that helps a user diagnose the offending vertex? - Which method on
Vertexis used to attach aDataSourceDescriptor? Which to attach aDataSinkDescriptor? - What is the role of
DagTypeConvertersand why is it preferred over each class owning its owntoProto/fromProtomethods? - When you call
Edge.create(srcV, dstV, EdgeProperty.create(...)), where is the resultingEdgeregistered? On the source vertex? Destination? The DAG itself? - Suppose you call
dag.addVertex(v)twice with the samevinstance. What happens, and where inDAG.javais the protection? - What is the difference between
DataSourceType.PERSISTEDandDataSourceType.PERSISTED_RELIABLE? Find the consumer (searchtez-dagfor uses ofDataSourceType).
Common bugs and symptoms
| Symptom | Root cause | Where to look |
|---|---|---|
IllegalStateException: DAG contains a cycle at submission | Accidentally added a back-edge | DAG.verify |
Vertex starts with parallelism -1 and never runs | setParallelism(-1) and no VertexManagerPlugin to reconfigure | VertexImpl.initialize; check for "parallelism not set" |
Job hangs with all vertices in INITED | A DataSourceDescriptor has an initializer that never emits events | Search AM log for InputInitializerEvent; cross-reference initializer impl |
ClassNotFoundException at task start for your Processor | The class is in client classpath but not uploaded as a local resource | TezClient.addAppMasterLocalFiles not called; see tez-client.md |
EdgeManager mismatch between sides — task hangs reading | Custom EdgeManagerPlugin returns inconsistent partition counts | Always run TestEdgeManagerSelf on your plugin |
DAGPlan proto exceeds 64 MB | Encoding huge userPayload directly into the plan | Use a side file via LocalResource; payload is byte[] not free-form storage |
Validation: prove you understand this
- Write, on a whiteboard, a 4-vertex DAG with two
SCATTER_GATHERedges and oneBROADCASTedge. Annotate each edge with its threeEdgePropertyenums. Justify each choice. - Given an edge with
(SCATTER_GATHER, PERSISTED, SEQUENTIAL), name theEdgeManagerclass that will be selected at runtime and the source file where the selection logic lives. - From memory, list the five required arguments to
EdgeProperty.create(...). - Open
DAG.verify()and identify the first five checks. For each, propose a one-line DAG that would fail it. - In a new method
getAllRootInputs(DAG), walk the DAG and return allDataSourceDescriptorobjects across all vertices. Compile it; check againstDAG.java's own helpers.