Lab 1.3: Run a Simple Tez DAG Locally

Background

Apache Tez supports a local mode that runs the entire DAG execution inside a single JVM without YARN or HDFS. This is the primary environment for rapid development and testing. Understanding how to run a DAG in local mode is essential before attempting cluster testing.

The tez-examples module contains reference DAG implementations. OrderedWordCount is the canonical example: it reads text, counts word occurrences, and sorts by frequency. It demonstrates the complete Tez DAG API: TezClient, DAG, Vertex, Edge, and I/O processors.

Why This Lab Matters for Contributors

  • Local mode is how you verify behavior changes without a cluster
  • All integration test work in tez-tests builds on the same local mode infrastructure
  • Understanding how a real DAG is constructed gives concrete context for reading state machine code
  • Every DAG execution produces log output that teaches you about the AM lifecycle

Understanding Tez Local Mode

Tez local mode is enabled by setting tez.local.mode=true in the TezConfiguration. When this is set:

  • No YARN cluster is contacted
  • No containers are launched — task execution happens in threads within the same JVM
  • LocalMode.java replaces the full DAGAppMaster with a lightweight local executor
  • HDFS is replaced by the local filesystem (configurable)

Key configuration for local mode:

TezConfiguration tezConf = new TezConfiguration();
tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
// Use local filesystem instead of HDFS
tezConf.set("fs.defaultFS", "file:///");
tezConf.setBoolean("tez.local.mode.without.network", true);

Anatomy of OrderedWordCount

Before running the example, read tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java.

The DAG structure:

[Tokenizer Vertex]
      |
      | (SCATTER_GATHER edge — partitioned by hash, sorted)
      v
[SumReducer Vertex]
      |
      | (SCATTER_GATHER edge — partitioned by value for sort)
      v
[Sorter Vertex] → HDFS output

Tokenizer: Reads input text lines, splits into words, emits (word, 1) pairs.
Processor class: TokenProcessor (inner class in OrderedWordCount)

SumReducer: Receives (word, [1, 1, 1, ...]) groups, sums counts, emits (word, count).
Processor class: SumProcessor (inner class in OrderedWordCount)

Sorter: Receives by (count, word) key (reversed), emits sorted output.
Processor class: NoOpSorter — uses OrderedGroupedKVInput to do the sort during shuffle

The key insight: Tez uses edge properties and I/O processor configuration to control the sort and partition behavior. The Sorter vertex does not sort — the shuffle/merge into OrderedGroupedKVInput does the sorting.


Step-by-Step Tasks

Step 1: Prepare Sample Input

mkdir -p /tmp/tez-lab/input
cat > /tmp/tez-lab/input/words.txt << 'EOF'
the quick brown fox jumps over the lazy dog
the dog barked at the fox
quick brown dog
EOF

Step 2: Build tez-examples

cd /path/to/tez
mvn package -DskipTests -pl tez-examples -am -q

Locate the examples JAR:

ls tez-examples/target/tez-examples-*.jar | grep -v sources | grep -v tests

Step 3: Run OrderedWordCount in Local Mode

The example is run as a standard Java main class:

# Set classpath to include Tez JARs
TEZ_HOME=/path/to/tez

CLASSPATH=\
$TEZ_HOME/tez-examples/target/tez-examples-*.jar:\
$TEZ_HOME/tez-api/target/tez-api-*.jar:\
$TEZ_HOME/tez-dag/target/tez-dag-*.jar:\
$TEZ_HOME/tez-runtime-library/target/tez-runtime-library-*.jar:\
$TEZ_HOME/tez-runtime-internals/target/tez-runtime-internals-*.jar:\
$TEZ_HOME/tez-mapreduce/target/tez-mapreduce-*.jar:\
$TEZ_HOME/tez-common/target/tez-common-*.jar

# Add Hadoop JARs (required for FileSystem, Configuration, etc.)
# If Hadoop is installed:
CLASSPATH=$CLASSPATH:$(hadoop classpath)
# If not, add from Maven local cache manually

java -cp "$CLASSPATH" \
  org.apache.tez.examples.OrderedWordCount \
  /tmp/tez-lab/input \
  /tmp/tez-lab/output \
  1

Tip: The easiest way to handle classpaths during development is to use Maven's exec:java goal or to build a fat JAR using the shade plugin. The tez-dist assembly includes all JARs and the bin/ scripts handle classpath setup.

Step 4: Run with Maven exec plugin (simpler)

If you have Hadoop installed and HADOOP_HOME set, use the Tez distributed shell script:

cd $TEZ_HOME
bin/tez-examples.sh OrderedWordCount \
  /tmp/tez-lab/input \
  /tmp/tez-lab/output \
  1

Or, add local mode flags to the Hadoop conf:

java -Dtez.local.mode=true \
     -Dfs.defaultFS=file:/// \
     -cp "$CLASSPATH" \
     org.apache.tez.examples.OrderedWordCount \
     /tmp/tez-lab/input \
     /tmp/tez-lab/output \
     1

Step 5: Verify Output

cat /tmp/tez-lab/output/part-*

Expected output (sorted by frequency descending):

the	4
dog	3
fox	2
quick	2
brown	2
...

Step 6: Read the Execution Log

Examine the log output from the run. Key lines to understand:

INFO  TezClient: Submitting DAG to YARN, queueName=...
INFO  DAGAppMaster: Running DAG: [OrderedWordCount]
INFO  VertexImpl: Vertex: [Tokenizer] initialized
INFO  VertexImpl: Vertex: [Tokenizer] started
INFO  DAGImpl: DAG: [OrderedWordCount] finished with status: [SUCCEEDED]

These lines correspond directly to state machine transitions you will study in Level 4. For each log line, identify the state transition it represents.


Implementation Requirements

Modify OrderedWordCount to add a fourth vertex that filters out words with count < 2:

  1. Add a new Vertex named "Filter" after SumReducer and before Sorter
  2. Write a minimal FilterProcessor extends AbstractProcessor:
    • In run(): iterate the input, skip pairs where the count value < 2, forward the rest
  3. Add an edge SumReducer → Filter and Filter → Sorter
  4. Run the modified DAG and verify that single-occurrence words are removed from output

This exercise teaches you:

  • How to add a vertex to an existing DAG
  • How to write a minimal Processor implementation
  • How edges connect processors

Do not overthink the implementation — the processor body is ~20 lines.


Debugging Checklist

If the DAG fails with DAG status: FAILED:

  1. Read the log for ERROR lines — they contain the failure reason and task attempt ID
  2. Check DAGAppMaster log for VertexImpl: Vertex [...] failed
  3. The error message will include the class and method where the exception occurred
  4. Common causes:
    • Classpath missing a required JAR (NoClassDefFoundError)
    • Output directory already exists (FileAlreadyExistsException)
    • Wrong input path (FileNotFoundException)

Clean output directory before re-running:

rm -rf /tmp/tez-lab/output

Expected Output

A successful run ends with:

INFO  DAGImpl: DAG: [OrderedWordCount] finished with status: [SUCCEEDED]
INFO  TezClient: Shutting down TezSession...

Stretch Goals

  1. Enable INFO-level logging for org.apache.tez.dag.app.dag.impl and observe vertex state transitions in the console output during the DAG run.

  2. Modify the DAG to use UnorderedKVInput/UnorderedKVOutput instead of the ordered pair for the first edge. Observe the difference in output ordering.

  3. Change the parallelism of the Sorter vertex to 2 and observe the output directory structure (2 part files instead of 1).

  4. Add a timer around the TezClient.submitDAG()DAGClient.waitForCompletion() block and measure execution time for different input sizes.

  • Local mode-specific bugs (different from cluster mode) — contributor opportunity
  • DAG API usability issues — often exposed by example code
  • Local mode configuration issues — often reported by new users