Hive on Tez

Hive is the largest single consumer of Tez. Roughly 70% of bug reports filed against Tez originate in a Hive query; many "Tez bugs" turn out to be Hive bugs, and vice versa. This chapter walks the compile boundary, explains how Hive operators map to Tez I/P/O, and gives a triage tree for attribution.


The compile boundary

Hive's query compiler produces a TezWork, a graph of BaseWork nodes (MapWork, ReduceWork, MergeJoinWork, etc). TezTask.execute walks TezWork and constructs a Tez DAG.

ls ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/

Key files:

FileRole
TezTask.javaHive's Task impl; builds the DAG and submits via TezSessionState.
DagUtils.javaDAG construction helpers (createVertex, createEdge, etc).
TezSessionPoolManager.javaWarm session pool — keeps AMs alive between queries.
TezSessionState.javaOne Hive session ↔ one Tez AM.
TezProcessor.javaThe LogicalIOProcessor that runs Hive operator pipelines inside a Tez task.
wc -l ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/{TezTask,DagUtils,TezSessionPoolManager,TezProcessor}.java

TezTask.execute — high-level flow

grep -n "execute\|build\|submitDAG" \
  ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java | head -30

Steps:

  1. Acquire a TezSessionState from TezSessionPoolManager (or open a new one).
  2. build(jobConf, work, scratchDir, ...) — call DagUtils to turn each BaseWork into a Tez Vertex and each TezEdgeProperty into a Tez Edge.
  3. submit(dag, sessionState)tezClient.submitDAG(dag).
  4. Poll dagClient.getDAGStatus(...) until terminal.
  5. Surface counters + diagnostics back to Hive.

DagUtils.createVertex

grep -n "createVertex\|createEdge\|createEdgeProperty\|setVertexManagerPlugin" \
  ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java | head -30

For a MapWork:

Hive conceptTez vertex configuration
Operator tree starting with TableScanOperatorprocessor = MapTezProcessor (subclass of TezProcessor)
Number of input splitsparallelism = splits.length (often overridden by grouping)
Per-split inputDataSourceDescriptor with MRInputLegacy and the InputFormat
CombinerEdge-level (downstream ReduceWork configures it as a combiner.class)

For a ReduceWork:

Hive conceptTez vertex configuration
Operator tree starting at ReduceSinkOperator's consumerprocessor = ReduceTezProcessor
Target parallelismnumReducers (from Hive's Operator tree, optionally
auto-parallelized via ShuffleVertexManager)
Sort key codecOrderedGroupedKVInput.KEY_CLASS, KEY_COMPARATOR_CLASS
setVertexManagerPluginShuffleVertexManager with auto-parallelism if hive.tez.auto.reducer.parallelism=true

For a MergeJoinWork:

  • processor = MergeJoinProcessor
  • Multiple sorted inputs (one per join side) using OrderedGroupedKVInput
  • A custom or built-in vertex manager that coordinates inputs

Operator → IPO mapping

Hive operators run inside a Tez task — they are not Tez constructs. The mapping happens at the input/output boundary of the vertex.

PositionHive operatorTez wiring
Vertex entry (map side)TableScanOperatorMRInputLegacy (tez-mapreduce) emits (key, value) from InputFormat
Vertex middleFilter / Select / GroupBy partial / etcPure in-memory operator chain inside TezProcessor.process
Vertex exit (shuffle producer)ReduceSinkOperatorOrderedPartitionedKVOutput with Hive's HiveKey serializer and partitioner
Vertex entry (reduce side)First operator after the boundaryOrderedGroupedKVInput provides a KeyValuesReader; ReduceRecordProcessor adapts it into Hive's tuple-at-a-time interface
Vertex middle (reduce)GroupBy aggregation, Join, etcOperator chain
Vertex exit (final)FileSinkOperatorMROutputLegacy writes to HDFS
Broadcast join buildHashTableSinkOperatorUnorderedKVOutput (or in newer Hive a BROADCAST-typed edge) feeding the probe vertex
Broadcast join probeMapJoinOperatorUnorderedKVInput on a BROADCAST edge
grep -rn "OrderedPartitionedKVOutput\|OrderedGroupedKVInput\|UnorderedKVOutput\|UnorderedKVInput" \
  ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez | head -20

TezProcessor adapter

grep -n "class TezProcessor\|class MapTezProcessor\|class ReduceTezProcessor\|process(" \
  ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java

TezProcessor.run(inputs, outputs):

  1. Pull the singular input (MRInputLegacy or first OrderedGroupedKVInput).
  2. Construct a RecordSource that adapts the Tez reader into Hive's Operator.process(Object row, int tag) calling convention.
  3. Run the operator tree until the input is drained.
  4. Call forward(EOF) to drain operator buffers.
  5. Close outputs in reverse order.

The processor is intentionally thin — all the interesting logic is in the Hive operator chain.


TezSessionPoolManager

find ~/hive-src/ql/src/java -name "TezSessionPoolManager.java"
wc -l ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java

A Tez session = a long-lived Tez AM holding zero or more idle containers ready to accept the next DAG.

ConfigDefaultEffect
hive.server2.tez.default.queuesdefaultPre-warm sessions per YARN queue.
hive.server2.tez.sessions.per.default.queue1Number of pre-warm sessions per queue.
hive.server2.tez.initialize.default.sessionsfalseStart them at HS2 boot.
hive.tez.exec.print.summaryfalseSurface Tez counters in query output.

Pool flow:

  1. HS2 starts. If initialize.default.sessions=true, launches N AMs per queue.
  2. Query comes in. HS2 calls TezSessionPoolManager.getSession(queue) — gets an idle session or opens a new one.
  3. Session executes the DAG; AM holds containers across DAGs (see container-reuse.md).
  4. On session return, AM remains idle awaiting next DAG.
  5. On idle timeout (hive.server2.session.check.interval), pool may close sessions.

LLAP

LLAP (Live Long And Process) is a different execution model that replaces the per-query AM with a long-lived per-node daemon. The Tez AM still coordinates, but instead of asking YARN for containers it asks LLAP daemons for "fragments".

find ~/hive-src/llap-* -type d -maxdepth 2 2>/dev/null | head

Key differences (do not extrapolate Tez-on-YARN debugging to LLAP):

  • Containers are replaced by LlapTaskExecutorService worker slots.
  • The shuffle path uses a Netty-based fetcher (LlapShuffleHandler).
  • The Tez scheduler plugin is LlapTaskSchedulerService (in hive-llap-server).
  • Container reuse is not relevant — LLAP slots are always "hot".

This chapter does not cover LLAP further; treat it as a separate world.


Bug attribution: where does it really live?

Triage tree. Symptom: query fails or returns wrong result.

flowchart TD
  S[Failure observed] --> Q1{Failure message mentions Hive operator?}
  Q1 -- yes --> H1[Hive bug: open against HIVE]
  Q1 -- no --> Q2{Failure in TezChild / IFile / Fetcher?}
  Q2 -- yes --> T1[Tez bug: open against TEZ]
  Q2 -- no --> Q3{Failure in container launch / RM allocation?}
  Q3 -- yes --> Y1[YARN bug: open against YARN]
  Q3 -- no --> Q4{Wrong result not crash?}
  Q4 -- yes --> Q5{Reproduce with same DAG via TestOrderedWordCount-style?}
  Q5 -- no --> H1
  Q5 -- yes --> T1

Practical heuristics:

Stack trace containsProbably
org.apache.hadoop.hive.ql.exec.OperatorHive
org.apache.tez.runtime.libraryTez
org.apache.tez.dag.app.rmTez (scheduling)
org.apache.hadoop.yarnYARN
ShuffleHandlerYARN-side (mapreduce auxservice)
LlapDaemonLLAP (Hive)
MapJoinOperator + OOMHive (join planning), even though the OOM happens in a Tez container

Wrong-result bugs almost always live in Hive (operator semantics) unless you can isolate the same DAG with synthetic data in TestOrderedWordCount style.


Reading exercise

  1. cat ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java | head -200 — read the top of TezTask.execute.
  2. grep -n "createEdgeProperty\|EdgeProperty\.create" \ ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java — list all edge property factories Hive uses.
  3. grep -rn "ShuffleVertexManager\|RootInputVertexManager" \ ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez — when does Hive set each manager?
  4. find ~/hive-src/ql/src/java -name "TezProcessor.java" -exec wc -l {} \; — confirm the processor is < 1000 lines (it's an adapter, not the brain).
  5. grep -rn "TezSessionPoolManager.getSession" ~/hive-src/service/src — when does HS2 acquire sessions?
  6. cat ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java | head -100 — see how a session wraps a TezClient.

Common bugs and symptoms

SymptomLikely owner
MetaException mid-queryHive (HMS client)
Container OOM during reduce joinHive operator (map-join build size); Tez can not size around an oversized hash table
Wrong row counts after a query rewriteHive optimizer or MapJoinOperator semantics
Fetcher: ConnectException to nm:13562YARN (aux-service mis-config)
AM dies with org.apache.tez.dag.app.DAGAppMaster: Vertex failed and the diagnostic mentions only TezProcessor, no Hive classTez bug — open a reproducer DAG without Hive
Slow first query after HS2 restartNo warm sessions; enable initialize.default.sessions
Stale ACL after GRANT reissueHive (HMS) — Tez containers cache delegation tokens; see container-reuse.md

Validation: prove you understand this

  1. List the Hive operators on the source and destination sides of a SCATTER_GATHER shuffle edge and map each side to the Tez Input or Output class.
  2. Identify the Hive method that finally calls tezClient.submitDAG. Cite path + grep command.
  3. Given a query that succeeds in standalone HS2 but fails in HS2 with session pooling on, name two likely failure modes and where to look.
  4. Explain why a MapJoinOperator OOM is a Hive bug even though the OOM stack trace is rooted in TezChild.
  5. Show, in three lines, the conditional inside DagUtils that decides whether to install ShuffleVertexManager on a reduce vertex. (Find via grep; quote the file:line.)