Hive on Tez
Hive is the largest single consumer of Tez. Roughly 70% of bug reports filed against Tez originate in a Hive query; many "Tez bugs" turn out to be Hive bugs, and vice versa. This chapter walks the compile boundary, explains how Hive operators map to Tez I/P/O, and gives a triage tree for attribution.
The compile boundary
Hive's query compiler produces a TezWork, a graph of BaseWork nodes
(MapWork, ReduceWork, MergeJoinWork, etc). TezTask.execute walks
TezWork and constructs a Tez DAG.
ls ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/
Key files:
| File | Role |
|---|---|
TezTask.java | Hive's Task impl; builds the DAG and submits via TezSessionState. |
DagUtils.java | DAG construction helpers (createVertex, createEdge, etc). |
TezSessionPoolManager.java | Warm session pool — keeps AMs alive between queries. |
TezSessionState.java | One Hive session ↔ one Tez AM. |
TezProcessor.java | The LogicalIOProcessor that runs Hive operator pipelines inside a Tez task. |
wc -l ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/{TezTask,DagUtils,TezSessionPoolManager,TezProcessor}.java
TezTask.execute — high-level flow
grep -n "execute\|build\|submitDAG" \
~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java | head -30
Steps:
- Acquire a
TezSessionStatefromTezSessionPoolManager(or open a new one). build(jobConf, work, scratchDir, ...)— callDagUtilsto turn eachBaseWorkinto a TezVertexand eachTezEdgePropertyinto a TezEdge.submit(dag, sessionState)→tezClient.submitDAG(dag).- Poll
dagClient.getDAGStatus(...)until terminal. - Surface counters + diagnostics back to Hive.
DagUtils.createVertex
grep -n "createVertex\|createEdge\|createEdgeProperty\|setVertexManagerPlugin" \
~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java | head -30
For a MapWork:
| Hive concept | Tez vertex configuration |
|---|---|
Operator tree starting with TableScanOperator | processor = MapTezProcessor (subclass of TezProcessor) |
| Number of input splits | parallelism = splits.length (often overridden by grouping) |
| Per-split input | DataSourceDescriptor with MRInputLegacy and the InputFormat |
| Combiner | Edge-level (downstream ReduceWork configures it as a combiner.class) |
For a ReduceWork:
| Hive concept | Tez vertex configuration |
|---|---|
Operator tree starting at ReduceSinkOperator's consumer | processor = ReduceTezProcessor |
| Target parallelism | numReducers (from Hive's Operator tree, optionally |
auto-parallelized via ShuffleVertexManager) | |
| Sort key codec | OrderedGroupedKVInput.KEY_CLASS, KEY_COMPARATOR_CLASS |
setVertexManagerPlugin | ShuffleVertexManager with auto-parallelism if hive.tez.auto.reducer.parallelism=true |
For a MergeJoinWork:
processor = MergeJoinProcessor- Multiple sorted inputs (one per join side) using
OrderedGroupedKVInput - A custom or built-in vertex manager that coordinates inputs
Operator → IPO mapping
Hive operators run inside a Tez task — they are not Tez constructs. The mapping happens at the input/output boundary of the vertex.
| Position | Hive operator | Tez wiring |
|---|---|---|
| Vertex entry (map side) | TableScanOperator | MRInputLegacy (tez-mapreduce) emits (key, value) from InputFormat |
| Vertex middle | Filter / Select / GroupBy partial / etc | Pure in-memory operator chain inside TezProcessor.process |
| Vertex exit (shuffle producer) | ReduceSinkOperator | OrderedPartitionedKVOutput with Hive's HiveKey serializer and partitioner |
| Vertex entry (reduce side) | First operator after the boundary | OrderedGroupedKVInput provides a KeyValuesReader; ReduceRecordProcessor adapts it into Hive's tuple-at-a-time interface |
| Vertex middle (reduce) | GroupBy aggregation, Join, etc | Operator chain |
| Vertex exit (final) | FileSinkOperator | MROutputLegacy writes to HDFS |
| Broadcast join build | HashTableSinkOperator | UnorderedKVOutput (or in newer Hive a BROADCAST-typed edge) feeding the probe vertex |
| Broadcast join probe | MapJoinOperator | UnorderedKVInput on a BROADCAST edge |
grep -rn "OrderedPartitionedKVOutput\|OrderedGroupedKVInput\|UnorderedKVOutput\|UnorderedKVInput" \
~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez | head -20
TezProcessor adapter
grep -n "class TezProcessor\|class MapTezProcessor\|class ReduceTezProcessor\|process(" \
~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
TezProcessor.run(inputs, outputs):
- Pull the singular input (
MRInputLegacyor firstOrderedGroupedKVInput). - Construct a
RecordSourcethat adapts the Tez reader into Hive'sOperator.process(Object row, int tag)calling convention. - Run the operator tree until the input is drained.
- Call
forward(EOF)to drain operator buffers. - Close outputs in reverse order.
The processor is intentionally thin — all the interesting logic is in the Hive operator chain.
TezSessionPoolManager
find ~/hive-src/ql/src/java -name "TezSessionPoolManager.java"
wc -l ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
A Tez session = a long-lived Tez AM holding zero or more idle containers ready to accept the next DAG.
| Config | Default | Effect |
|---|---|---|
hive.server2.tez.default.queues | default | Pre-warm sessions per YARN queue. |
hive.server2.tez.sessions.per.default.queue | 1 | Number of pre-warm sessions per queue. |
hive.server2.tez.initialize.default.sessions | false | Start them at HS2 boot. |
hive.tez.exec.print.summary | false | Surface Tez counters in query output. |
Pool flow:
- HS2 starts. If
initialize.default.sessions=true, launches N AMs per queue. - Query comes in. HS2 calls
TezSessionPoolManager.getSession(queue)— gets an idle session or opens a new one. - Session executes the DAG; AM holds containers across DAGs (see container-reuse.md).
- On session return, AM remains idle awaiting next DAG.
- On idle timeout (
hive.server2.session.check.interval), pool may close sessions.
LLAP
LLAP (Live Long And Process) is a different execution model that replaces the per-query AM with a long-lived per-node daemon. The Tez AM still coordinates, but instead of asking YARN for containers it asks LLAP daemons for "fragments".
find ~/hive-src/llap-* -type d -maxdepth 2 2>/dev/null | head
Key differences (do not extrapolate Tez-on-YARN debugging to LLAP):
- Containers are replaced by
LlapTaskExecutorServiceworker slots. - The shuffle path uses a Netty-based fetcher (
LlapShuffleHandler). - The Tez scheduler plugin is
LlapTaskSchedulerService(inhive-llap-server). - Container reuse is not relevant — LLAP slots are always "hot".
This chapter does not cover LLAP further; treat it as a separate world.
Bug attribution: where does it really live?
Triage tree. Symptom: query fails or returns wrong result.
flowchart TD
S[Failure observed] --> Q1{Failure message mentions Hive operator?}
Q1 -- yes --> H1[Hive bug: open against HIVE]
Q1 -- no --> Q2{Failure in TezChild / IFile / Fetcher?}
Q2 -- yes --> T1[Tez bug: open against TEZ]
Q2 -- no --> Q3{Failure in container launch / RM allocation?}
Q3 -- yes --> Y1[YARN bug: open against YARN]
Q3 -- no --> Q4{Wrong result not crash?}
Q4 -- yes --> Q5{Reproduce with same DAG via TestOrderedWordCount-style?}
Q5 -- no --> H1
Q5 -- yes --> T1
Practical heuristics:
| Stack trace contains | Probably |
|---|---|
org.apache.hadoop.hive.ql.exec.Operator | Hive |
org.apache.tez.runtime.library | Tez |
org.apache.tez.dag.app.rm | Tez (scheduling) |
org.apache.hadoop.yarn | YARN |
ShuffleHandler | YARN-side (mapreduce auxservice) |
LlapDaemon | LLAP (Hive) |
MapJoinOperator + OOM | Hive (join planning), even though the OOM happens in a Tez container |
Wrong-result bugs almost always live in Hive (operator semantics) unless
you can isolate the same DAG with synthetic data in TestOrderedWordCount
style.
Reading exercise
cat ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java | head -200— read the top ofTezTask.execute.grep -n "createEdgeProperty\|EdgeProperty\.create" \ ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java— list all edge property factories Hive uses.grep -rn "ShuffleVertexManager\|RootInputVertexManager" \ ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez— when does Hive set each manager?find ~/hive-src/ql/src/java -name "TezProcessor.java" -exec wc -l {} \;— confirm the processor is < 1000 lines (it's an adapter, not the brain).grep -rn "TezSessionPoolManager.getSession" ~/hive-src/service/src— when does HS2 acquire sessions?cat ~/hive-src/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java | head -100— see how a session wraps aTezClient.
Common bugs and symptoms
| Symptom | Likely owner |
|---|---|
MetaException mid-query | Hive (HMS client) |
| Container OOM during reduce join | Hive operator (map-join build size); Tez can not size around an oversized hash table |
| Wrong row counts after a query rewrite | Hive optimizer or MapJoinOperator semantics |
Fetcher: ConnectException to nm:13562 | YARN (aux-service mis-config) |
AM dies with org.apache.tez.dag.app.DAGAppMaster: Vertex failed and the diagnostic mentions only TezProcessor, no Hive class | Tez bug — open a reproducer DAG without Hive |
| Slow first query after HS2 restart | No warm sessions; enable initialize.default.sessions |
Stale ACL after GRANT reissue | Hive (HMS) — Tez containers cache delegation tokens; see container-reuse.md |
Validation: prove you understand this
- List the Hive operators on the source and destination sides of a
SCATTER_GATHERshuffle edge and map each side to the Tez Input or Output class. - Identify the Hive method that finally calls
tezClient.submitDAG. Cite path + grep command. - Given a query that succeeds in standalone HS2 but fails in HS2 with session pooling on, name two likely failure modes and where to look.
- Explain why a
MapJoinOperatorOOM is a Hive bug even though the OOM stack trace is rooted inTezChild. - Show, in three lines, the conditional inside
DagUtilsthat decides whether to installShuffleVertexManageron a reduce vertex. (Find via grep; quote the file:line.)