DAGClient

DAGClient is the read-only client-side handle to a submitted DAG. It is returned by TezClient.submitDAG(...) and lives until the DAG completes (or the user kills it). This chapter covers status polling, the StatusGetOpts flag, the RPC vs ATS backends, error reporting, and the contract DAGClient exposes to callers like Hive, Pig, and CLI drivers.

After this chapter you should know which backend a given DAGClient instance is using, what fields will be populated, and which calls block vs poll.


Files to open

tez-api/src/main/java/org/apache/tez/dag/api/client/
  DAGClient.java                       (abstract base)
  DAGStatus.java                       (the snapshot type)
  VertexStatus.java
  Progress.java
  StatusGetOpts.java                   (enum: GET_COUNTERS, GET_MEMORY_USAGE)

  rpc/
    DAGClientRPCImpl.java              (talks to the AM via DAGClientAMProtocol)
    DAGClientImplLocal.java            (in-process; for LocalClient)

  registry/                            (service discovery if applicable)

ATS-backed variant:

tez-plugins/tez-yarn-timeline-history-with-fs/  or
tez-plugins/tez-yarn-timeline-history/
  src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java

(Module names vary across versions; locate with find . -name "DAGClientTimelineImpl.java".)


Core API

public abstract class DAGClient implements Closeable {
  public abstract String getExecutionContext();
  public abstract DAGStatus getDAGStatus(Set<StatusGetOpts> opts) throws ...;
  public abstract DAGStatus getDAGStatus(Set<StatusGetOpts> opts, long timeoutMillis) throws ...;
  public abstract VertexStatus getVertexStatus(String vertexName, Set<StatusGetOpts> opts) throws ...;
  public abstract DAGStatus waitForCompletion() throws ...;
  public abstract DAGStatus waitForCompletionWithStatusUpdates(Set<StatusGetOpts> opts) throws ...;
  public abstract void tryKillDAG() throws ...;
  // ...
}
grep -n "public abstract\|public " \
  tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java

DAGStatus — what callers actually consume

grep -n "public " tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java

Fields you'll see in production triage:

FieldPopulated byNotes
state (DAGStatus.State)AlwaysSUBMITTED/INITING/RUNNING/SUCCEEDED/FAILED/KILLED/ERROR
progressRPC backend; ATS backend may lagProgress per vertex + aggregate
diagnosticsOn terminal statesNewline-joined messages
countersOnly if StatusGetOpts.GET_COUNTERS passedExpensive over RPC
memoryUsageOnly if StatusGetOpts.GET_MEMORY_USAGE passedAggregated across containers

Note: state is not the same as VertexStatus.State. Vertex states are richer (INITED, RUNNING, COMMITTING, SUCCEEDED, etc.) — see vertex-lifecycle.md. DAG state is a roll-up.


RPC backend: DAGClientRPCImpl

grep -n "DAGClientAMProtocol\|proxy" \
  tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java

Behavior:

  • Each getDAGStatus(opts) is a synchronous RPC to the AM.
  • Default timeout per call is governed by tez.dag.am.client.am-connect-timeout-secs.
  • If GET_COUNTERS is set, the AM serializes the entire TezCounters tree (potentially MBs); avoid in tight loops.
  • waitForCompletion() is implemented as a polling loop with backoff. Find the loop:
grep -n "waitForCompletion\|sleep\|poll" \
  tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java

ATS backend: DAGClientTimelineImpl

When the AM has exited but ATS retains history, status is fetched from the ATS REST API (or RM web UI) instead. This is critical for post-mortem and "why did my job fail" UIs.

Behavior differences from RPC:

  • Eventually consistent (ATS publication is async; see counters-diagnostics.md).
  • state is the final state recorded; intermediate states between two ATS events are invisible.
  • Counters are available if ATSHistoryLoggingService was active and the event made it past the publisher queue.

Search for the fallback path that picks ATS when RPC fails:

grep -rn "DAGClientTimelineImpl\|getDAGAndAMURL\|RPCFailed\|amProxyFailed" \
  tez-api/src/main/java/org/apache/tez/dag/api/client/ \
  tez-api/src/main/java/org/apache/tez/client/

tryKillDAG() — the only mutation

Despite the name, DAGClient has exactly one mutating method: tryKillDAG. It triggers the AM to start the kill path, but does not block until the DAG is dead.

grep -n "tryKillDAG\|killDAG" \
  tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java \
  tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java

To wait for the kill to take effect:

client.tryKillDAG();
DAGStatus status = client.waitForCompletion();
// status.state will be KILLED (or whatever it raced to)

Status populate flow

sequenceDiagram
    participant U as User code
    participant DC as DAGClientRPCImpl
    participant AM as DAGAppMaster
    participant DH as DAGClientHandler
    participant DI as DAGImpl

    U->>DC: getDAGStatus(opts)
    DC->>AM: RPC: getDAGStatus(dagId, opts)
    AM->>DH: dispatch
    DH->>DI: dagImpl.getDAGStatus(opts)
    DI-->>DH: DAGStatusProto
    DH-->>AM: response
    AM-->>DC: response bytes
    DC-->>U: DAGStatus

The conversion DAGImpl → DAGStatusProto happens in DAGImpl.getDAGStatus() (in tez-dag). For GET_COUNTERS, the AM walks the counter aggregation tree — expensive.


Reading exercise

# Surface
sed -n '1,80p' tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java

# State enum
grep -n "public enum State\b" \
  tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java

# RPC polling loop
grep -n "waitForCompletion\|backoff\|sleep" \
  tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java

Answer:

  1. What is the difference between waitForCompletion() and waitForCompletionWithStatusUpdates(opts)?
  2. What happens if GET_COUNTERS is requested but the DAG is still INITING?
  3. List the exact DAGStatus.State enum values and the terminal subset.
  4. From the polling loop, what is the maximum sleep between polls?
  5. When tryKillDAG() is called after the DAG already finished, what does the RPC return? Is it an error?
  6. In DAGClientTimelineImpl, how is the "I don't see a SUCCEEDED event yet" case distinguished from "the DAG is still running"?

Common bugs and symptoms

SymptomRoot causeFix
waitForCompletion() returns RUNNING foreverAM crashed, RPC keeps timing outAdd timeout; check AM log; fall back to ATS
Counters are stale by ~30sAM aggregation intervaltez.am.aggregate.counters.interval-secs
tryKillDAG() returns immediately but DAG keeps running for minutesKill is async; tasks must drainAlways follow with waitForCompletion
Hive sees DAGStatus.State=ERROR with no diagnosticsAM crashed before publishingCheck NM container log for the AM
ATS-backed status missing for a recently completed DAGATS publisher queue backed upWait; or query ATS REST directly
Inconsistent state between RPC and ATS for same DAGRace during AM shutdown; ATS publishes after final RPCTrust RPC while AM lives, ATS after

Validation: prove you understand this

  1. Write a 20-line program that polls getDAGStatus(GET_COUNTERS) once a second and prints the FILE_BYTES_WRITTEN counter from each snapshot.
  2. List the four StatusGetOpts enum values (check the source — there may be fewer/more than you remember) and what each adds to the payload.
  3. From DAGClient.java, draw the inheritance/factory diagram for how a DAGClient instance is actually constructed (look at TezClient.submitDAG to see which subclass is returned in YARN vs local mode).
  4. Force the RPC backend to fail and confirm whether (or not) Tez falls back to the ATS backend automatically. Cite the line that performs the fallback.
  5. Explain why DAGStatus is a snapshot rather than an observable.