DAGClient
DAGClient is the read-only client-side handle to a submitted DAG. It is
returned by TezClient.submitDAG(...) and lives until the DAG completes (or
the user kills it). This chapter covers status polling, the
StatusGetOpts flag, the RPC vs ATS backends, error reporting, and the
contract DAGClient exposes to callers like Hive, Pig, and CLI drivers.
After this chapter you should know which backend a given DAGClient instance
is using, what fields will be populated, and which calls block vs poll.
Files to open
tez-api/src/main/java/org/apache/tez/dag/api/client/
DAGClient.java (abstract base)
DAGStatus.java (the snapshot type)
VertexStatus.java
Progress.java
StatusGetOpts.java (enum: GET_COUNTERS, GET_MEMORY_USAGE)
rpc/
DAGClientRPCImpl.java (talks to the AM via DAGClientAMProtocol)
DAGClientImplLocal.java (in-process; for LocalClient)
registry/ (service discovery if applicable)
ATS-backed variant:
tez-plugins/tez-yarn-timeline-history-with-fs/ or
tez-plugins/tez-yarn-timeline-history/
src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
(Module names vary across versions; locate with find . -name "DAGClientTimelineImpl.java".)
Core API
public abstract class DAGClient implements Closeable {
public abstract String getExecutionContext();
public abstract DAGStatus getDAGStatus(Set<StatusGetOpts> opts) throws ...;
public abstract DAGStatus getDAGStatus(Set<StatusGetOpts> opts, long timeoutMillis) throws ...;
public abstract VertexStatus getVertexStatus(String vertexName, Set<StatusGetOpts> opts) throws ...;
public abstract DAGStatus waitForCompletion() throws ...;
public abstract DAGStatus waitForCompletionWithStatusUpdates(Set<StatusGetOpts> opts) throws ...;
public abstract void tryKillDAG() throws ...;
// ...
}
grep -n "public abstract\|public " \
tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
DAGStatus — what callers actually consume
grep -n "public " tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
Fields you'll see in production triage:
| Field | Populated by | Notes |
|---|---|---|
state (DAGStatus.State) | Always | SUBMITTED/INITING/RUNNING/SUCCEEDED/FAILED/KILLED/ERROR |
progress | RPC backend; ATS backend may lag | Progress per vertex + aggregate |
diagnostics | On terminal states | Newline-joined messages |
counters | Only if StatusGetOpts.GET_COUNTERS passed | Expensive over RPC |
memoryUsage | Only if StatusGetOpts.GET_MEMORY_USAGE passed | Aggregated across containers |
Note: state is not the same as VertexStatus.State. Vertex states are
richer (INITED, RUNNING, COMMITTING, SUCCEEDED, etc.) — see
vertex-lifecycle.md. DAG state is a roll-up.
RPC backend: DAGClientRPCImpl
grep -n "DAGClientAMProtocol\|proxy" \
tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
Behavior:
- Each
getDAGStatus(opts)is a synchronous RPC to the AM. - Default timeout per call is governed by
tez.dag.am.client.am-connect-timeout-secs. - If
GET_COUNTERSis set, the AM serializes the entireTezCounterstree (potentially MBs); avoid in tight loops. waitForCompletion()is implemented as a polling loop with backoff. Find the loop:
grep -n "waitForCompletion\|sleep\|poll" \
tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
ATS backend: DAGClientTimelineImpl
When the AM has exited but ATS retains history, status is fetched from the ATS REST API (or RM web UI) instead. This is critical for post-mortem and "why did my job fail" UIs.
Behavior differences from RPC:
- Eventually consistent (ATS publication is async; see counters-diagnostics.md).
stateis the final state recorded; intermediate states between two ATS events are invisible.- Counters are available if
ATSHistoryLoggingServicewas active and the event made it past the publisher queue.
Search for the fallback path that picks ATS when RPC fails:
grep -rn "DAGClientTimelineImpl\|getDAGAndAMURL\|RPCFailed\|amProxyFailed" \
tez-api/src/main/java/org/apache/tez/dag/api/client/ \
tez-api/src/main/java/org/apache/tez/client/
tryKillDAG() — the only mutation
Despite the name, DAGClient has exactly one mutating method: tryKillDAG.
It triggers the AM to start the kill path, but does not block until the
DAG is dead.
grep -n "tryKillDAG\|killDAG" \
tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java \
tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
To wait for the kill to take effect:
client.tryKillDAG();
DAGStatus status = client.waitForCompletion();
// status.state will be KILLED (or whatever it raced to)
Status populate flow
sequenceDiagram
participant U as User code
participant DC as DAGClientRPCImpl
participant AM as DAGAppMaster
participant DH as DAGClientHandler
participant DI as DAGImpl
U->>DC: getDAGStatus(opts)
DC->>AM: RPC: getDAGStatus(dagId, opts)
AM->>DH: dispatch
DH->>DI: dagImpl.getDAGStatus(opts)
DI-->>DH: DAGStatusProto
DH-->>AM: response
AM-->>DC: response bytes
DC-->>U: DAGStatus
The conversion DAGImpl → DAGStatusProto happens in DAGImpl.getDAGStatus()
(in tez-dag). For GET_COUNTERS, the AM walks the counter aggregation
tree — expensive.
Reading exercise
# Surface
sed -n '1,80p' tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
# State enum
grep -n "public enum State\b" \
tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
# RPC polling loop
grep -n "waitForCompletion\|backoff\|sleep" \
tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
Answer:
- What is the difference between
waitForCompletion()andwaitForCompletionWithStatusUpdates(opts)? - What happens if
GET_COUNTERSis requested but the DAG is stillINITING? - List the exact
DAGStatus.Stateenum values and the terminal subset. - From the polling loop, what is the maximum sleep between polls?
- When
tryKillDAG()is called after the DAG already finished, what does the RPC return? Is it an error? - In
DAGClientTimelineImpl, how is the "I don't see a SUCCEEDED event yet" case distinguished from "the DAG is still running"?
Common bugs and symptoms
| Symptom | Root cause | Fix |
|---|---|---|
waitForCompletion() returns RUNNING forever | AM crashed, RPC keeps timing out | Add timeout; check AM log; fall back to ATS |
| Counters are stale by ~30s | AM aggregation interval | tez.am.aggregate.counters.interval-secs |
tryKillDAG() returns immediately but DAG keeps running for minutes | Kill is async; tasks must drain | Always follow with waitForCompletion |
Hive sees DAGStatus.State=ERROR with no diagnostics | AM crashed before publishing | Check NM container log for the AM |
| ATS-backed status missing for a recently completed DAG | ATS publisher queue backed up | Wait; or query ATS REST directly |
| Inconsistent state between RPC and ATS for same DAG | Race during AM shutdown; ATS publishes after final RPC | Trust RPC while AM lives, ATS after |
Validation: prove you understand this
- Write a 20-line program that polls
getDAGStatus(GET_COUNTERS)once a second and prints theFILE_BYTES_WRITTENcounter from each snapshot. - List the four
StatusGetOptsenum values (check the source — there may be fewer/more than you remember) and what each adds to the payload. - From
DAGClient.java, draw the inheritance/factory diagram for how aDAGClientinstance is actually constructed (look atTezClient.submitDAGto see which subclass is returned in YARN vs local mode). - Force the RPC backend to fail and confirm whether (or not) Tez falls back to the ATS backend automatically. Cite the line that performs the fallback.
- Explain why
DAGStatusis a snapshot rather than an observable.