Lab 5.3 — Build It: Integration Test with MiniTezCluster

Lab type: Build It — Maven module with a real mini-cluster integration test
Estimated time: 150 min
Maven module: book/projects/level-5-integration-test
Key class: org.apache.tez.learning.l5.TestNumberPipelineWithMiniCluster


What You Will Build

A JUnit integration test that:

  1. Starts MiniTezCluster in @BeforeClass
  2. Submits the Level 1 NumberPipelineDAG (reused from level-1-number-pipeline)
  3. Waits for the DAG to complete
  4. Reads back the counter NumberPipeline/TotalSum and asserts it equals 9900
  5. Stops the cluster in @AfterClass

This is the same pattern used by TestOrderedWordCount — you are building the exact kind of test that Tez committers write for new DAG features.


Step 1 — Create the Maven Module

book/projects/level-5-integration-test/
  pom.xml
  src/test/java/org/apache/tez/learning/l5/
    TestNumberPipelineWithMiniCluster.java

The module is a test-only module (no src/main/). It depends on:

  • org.apache.tez.learning:level-1-number-pipeline:1.0-SNAPSHOT (your DAG)
  • org.apache.tez:tez-tests (for MiniTezCluster)
  • JUnit 4.13.2
  • org.apache.hadoop:hadoop-minicluster
<dependency>
  <groupId>org.apache.tez</groupId>
  <artifactId>tez-tests</artifactId>
  <version>${tez.version}</version>
  <classifier>tests</classifier>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-minicluster</artifactId>
  <version>${hadoop.version}</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.tez.learning</groupId>
  <artifactId>level-1-number-pipeline</artifactId>
  <version>1.0-SNAPSHOT</version>
  <scope>test</scope>
</dependency>

Add level-5-integration-test to the parent pom.xml modules list.


Step 2 — Write TestNumberPipelineWithMiniCluster.java

Skeleton:

package org.apache.tez.learning.l5;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.client.DAGClient;
import org.apache.tez.dag.client.DAGStatus;
import org.apache.tez.learning.l1.NumberPipelineDAG;
import org.apache.tez.test.MiniTezCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import static org.junit.Assert.*;

public class TestNumberPipelineWithMiniCluster {

    private static MiniTezCluster miniTezCluster;
    private static TezClient tezClient;
    private static TezConfiguration tezConf;

    @BeforeClass
    public static void setUpClass() throws Exception {
        // Start MiniTezCluster with 1 NodeManager
        miniTezCluster = new MiniTezCluster(
                TestNumberPipelineWithMiniCluster.class.getName(), 1, 1, 1);
        Configuration conf = new Configuration();
        miniTezCluster.init(conf);
        miniTezCluster.start();

        tezConf = new TezConfiguration(miniTezCluster.getConfig());
        tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, false);

        tezClient = TezClient.create(
                "TestNumberPipelineClient", tezConf);
        tezClient.start();
    }

    @AfterClass
    public static void tearDownClass() throws Exception {
        if (tezClient != null) {
            tezClient.stop();
        }
        if (miniTezCluster != null) {
            miniTezCluster.stop();
        }
    }

    @Test(timeout = 120_000)
    public void testNumberPipelineTotalSum() throws Exception {
        // Build the Level 1 DAG (local mode runs fine in mini-cluster too)
        DAG dag = NumberPipelineDAG.buildDAG(tezConf);

        DAGClient dagClient = tezClient.submitDAG(dag);
        DAGStatus dagStatus = dagClient.waitForCompletion();

        assertEquals("DAG must succeed",
                DAGStatus.State.SUCCEEDED, dagStatus.getState());

        TezCounters counters = dagStatus.getDAGCounters();
        assertNotNull("Counters must be present", counters);

        long totalSum = counters
                .getGroup("NumberPipeline")
                .findCounter("TotalSum")
                .getValue();

        assertEquals("TotalSum for 0..99 must equal 4950", 4950L, totalSum);
    }
}

Adapting NumberPipelineDAG: the Level 1 project is designed for local mode (TezConfiguration.TEZ_LOCAL_MODE = true). You will need to either (a) add a static buildDAG(TezConfiguration conf) factory method that accepts an external config, or (b) create a subclass that overrides the DAG construction to accept an injected config. Choose (a).


Step 3 — Verify the Build

cd book/projects
mvn -pl level-1-number-pipeline install -DskipTests -q
mvn -pl level-5-integration-test test -q 2>&1 | tail -20

Expected:

Tests run: 1, Failures: 0, Errors: 0, Skipped: 0

Step 4 — Deep Questions

#Question
1Why does the test use dagStatus.getDAGCounters() instead of dagClient.getDAGStatus(EnumSet.of(StatusGetOpts.GET_COUNTERS))? Are they equivalent?
2The timeout is 120_000 ms. Why does a simple 100-integer DAG need 2 minutes?
3If the DAG fails, dagStatus.getState() returns FAILED and the assertion fires. How would you get the failure reason from dagStatus?
4@BeforeClass uses static fields. What happens if two test classes in the same JVM both start MiniTezCluster? How does TestOrderedWordCount handle this?
5The counter group is "NumberPipeline" and the counter name is "TotalSum". If you mistype the group name, what does getGroup() return? Does the assertion fail gracefully?

Step 5 — Experiment: Add a Second Assertion

After verifying TotalSum, add an assertion on the number of tasks run:

long inputRecords = counters
        .findCounter(TaskCounter.INPUT_RECORDS_PROCESSED)
        .getValue();
// How many input records do you expect?
assertEquals(???, inputRecords);

Think about the DAG topology:

  • Source vertex: 1 task, emits 100 integers
  • Sink vertex: 1 task, reads 100 records

What value do you expect for INPUT_RECORDS_PROCESSED across both vertices?


Step 6 — Tez Source Connection Table

Class used in this labTez source file
MiniTezCluster
TezClient
DAGClient
DAGStatus
TezCounters