TezClient

TezClient is the client-side API: the class your driver code instantiates to start an AM, submit DAGs, and (optionally) keep the AM alive across DAGs. This chapter walks bring-up, the session vs non-session distinction, local resource staging, RPC submission, and ATS hookup.

After this chapter you should be able to point at every line of code that runs between TezClient.create(...) and the moment a DAG appears inside the AM ready to be start()ed.


Files to open

tez-api/src/main/java/org/apache/tez/client/
  TezClient.java
  TezClientUtils.java
  TezSessionImpl.java
  FrameworkClient.java
  TezYarnClient.java            (YARN-backed FrameworkClient)
  LocalClient.java              (in-process FrameworkClient for local mode)

Plus the YARN-AM protocol definition:

tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
tez-api/src/main/proto/DAGClientAMProtocol.proto

Two modes: session and non-session

The mode is chosen at TezClient.create(...):

TezClient client = TezClient.create(
    "MyApp",
    tezConf,
    isSession  /* true = session mode */);
PropertyNon-sessionSession
AM lifetimePer DAGAcross many DAGs
start() semanticsNo-op (AM launched at submitDAG)Launches AM and waits for it to register
Allowed DAGs in flight11 (sequential within a session by default)
Keep-aliven/atez.session.am.dag.submit.timeout.secs
Use caseOne-shot jobs (CLI tools, scheduled batch)Latency-sensitive (Hive, Pig, interactive)

The AM keep-alive timer is critical. In session mode, after a DAG completes the AM waits for the configured timeout for a new DAG. If none arrives, it shuts down to free YARN resources. Find the timer:

grep -n "AMSessionDAGSubmitTimeout\|dag.submit.timeout" \
  tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java

Bring-up control flow

sequenceDiagram
    participant U as User code
    participant TC as TezClient
    participant TCU as TezClientUtils
    participant YC as TezYarnClient
    participant RM as YARN RM
    participant AM as DAGAppMaster

    U->>TC: TezClient.create(name, conf, isSession)
    U->>TC: addAppMasterLocalFiles(map)
    U->>TC: start()
    TC->>TCU: createApplicationSubmissionContext(...)
    TCU->>TCU: stage local resources to HDFS
    TCU->>TCU: build classpath & env
    TC->>YC: submitApplication(appSubmissionContext)
    YC->>RM: submitApplication
    RM-->>YC: appId
    Note over RM,AM: RM launches AM container
    AM->>AM: serviceInit, serviceStart
    AM-->>TC: AM registers via heartbeat; TC sees RUNNING
    U->>TC: submitDAG(dag)
    TC->>AM: DAGClientAMProtocol.submitDAG(rpcCall)
    AM-->>TC: dagId
    TC-->>U: DAGClient

Where each call lives:

  • TezClient.start()TezClientUtils.createFinalConfProtoForApp()TezClientUtils.createApplicationSubmissionContext()frameworkClient.submitApplication(...).
  • TezClient.submitDAG(dag)getSessionAMProxy()dagAMProtocol.submitDAG(submitRequest) (the YARN AM proxy).
grep -n "submitApplication\|submitDAG\|dagAMProtocol" \
  tez-api/src/main/java/org/apache/tez/client/TezClient.java

Local resources that TezClientUtils uploads

A YARN container starts with a clean working directory plus whatever local resources the AM submission context declares. For Tez, that includes:

  1. Tez framework tarball — pointed to by tez.lib.uris (or a local jar list). Contains tez-api.jar, tez-dag.jar, tez-runtime-*.jar, etc.
  2. User application jars — anything you added via TezClient.addAppMasterLocalFiles(Map<String, LocalResource>) plus addTaskLocalFiles.
  3. The DAGPlannot a local resource. It is sent via the submitDAG RPC payload.

Inspect:

grep -n "tez.lib.uris\|TezConfiguration.TEZ_LIB_URIS\|addAppMasterLocalFiles" \
  tez-api/src/main/java/org/apache/tez/client/TezClient.java \
  tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java

The AMRM token is delivered by YARN when the container starts; Tez does not manage it directly.


The submission RPC

The protocol is defined in:

tez-api/src/main/proto/DAGClientAMProtocol.proto
grep -n "rpc " tez-api/src/main/proto/DAGClientAMProtocol.proto

Key RPCs:

RPCWhat it does
submitDAGSubmit a new DAG to a running AM
getDAGStatusPoll status (also used by DAGClient)
getVertexStatusPoll a specific vertex
tryKillDAGInitiate kill
shutdownSessionStop the AM in session mode

The RPC server lives in the AM (DAGClientHandler and its Protobuf implementation):

grep -rn "DAGClientAMProtocol\|submitDAG" \
  tez-dag/src/main/java/org/apache/tez/dag/api/client/ 2>/dev/null | head

ATS / Timeline Service integration

When tez.history.logging.service.class is set to ATSHistoryLoggingService (the default in many distros), TezClient does not publish events itself — the AM does, via the HistoryEventHandler. However, TezClient does:

  • Set tez.history.logging.service.class into the AM env.
  • Provide ATS credentials in the application submission context.

Read:

grep -rn "ATSHistoryLoggingService\|YARN_TIMELINE_SERVICE" \
  tez-api/src/main/java/org/apache/tez/client/

For the AM-side, see counters-diagnostics.md.


TezSessionImpl vs TezClient

There is a subclass relationship: TezSessionImpl was the older name; modern Tez uses TezClient with isSession=true, but TezSessionImpl still appears in some codepaths. The two are largely interchangeable. Inspect both:

grep -n "class TezClient\|class TezSessionImpl" \
  tez-api/src/main/java/org/apache/tez/client/*.java

Reading exercise

sed -n '1,120p' tez-api/src/main/java/org/apache/tez/client/TezClient.java
grep -n "submitDAG\b" tez-api/src/main/java/org/apache/tez/client/TezClient.java
grep -n "stopSession\|stop\|close" \
  tez-api/src/main/java/org/apache/tez/client/TezClient.java
grep -rn "submitApplication" tez-api/src/main/java/org/apache/tez/client/

Answer:

  1. What is the difference between TezClient.stop() in session vs non-session mode?
  2. When TezClient.submitDAG() is called for a DAG that conflicts with one currently running in the session, what happens?
  3. Find the timeout used while waiting for the AM to reach RUNNING after start(). Which config key controls it?
  4. What pre-condition does submitDAG enforce on the DAG's vertex names with respect to previously-submitted DAGs in the same session?
  5. Trace addAppMasterLocalFiles(...) end-to-end. Where do those files end up on HDFS?
  6. Why is tez.lib.uris sometimes a directory and sometimes a tarball? What does TezClientUtils.setupTezJarsLocalResources do for each case?

Common bugs and symptoms

SymptomRoot causeFix
AM never reaches RUNNING; client hangs in start()tez.lib.uris points to a path the NodeManager can't readVerify HDFS perms; check NM logs
submitDAG throws SessionNotRunningAM died (idle timeout, crash)Catch, recreate TezClient, resubmit
submitDAG blocks foreverPrevious DAG still in flight in the sessionDon't reuse session for parallel DAGs; or wait
IOException: Failed to submit applicationRM rejected (queue full, ACL)Inspect RM logs; verify queue config
AM starts but cannot talk back to clientClient behind NAT; AM cannot reach client's RPC serverUse polling-only DAGClient; avoid callbacks
Tasks fail with ClassNotFoundException for user codeaddTaskLocalFiles not called for that jarAdd jars via both addAppMasterLocalFiles and addTaskLocalFiles if used in tasks

Validation: prove you understand this

  1. Write a 30-line Java driver that creates a TezClient in session mode, submits two DAGs back-to-back, prints both DAGClient.getDAGStatus() results, and shuts down cleanly.
  2. From TezClient.java, list every method that ultimately reaches dagAMProtocol.
  3. Explain why addAppMasterLocalFiles is a Map<String, LocalResource> and not a List<Path>.
  4. From the proto file DAGClientAMProtocol.proto, write the exact request message used by submitDAG.
  5. Reproduce the "AM idle timeout" path on MiniTezCluster: submit one DAG, wait past the configured timeout, attempt a second submit, observe the exception class and message.