Lab 5.3 — Build It: Integration Test with MiniTezCluster
Lab type: Build It — Maven module with a real mini-cluster integration test
Estimated time: 150 min
Maven module: book/projects/level-5-integration-test
Key class: org.apache.tez.learning.l5.TestNumberPipelineWithMiniCluster
What You Will Build
A JUnit integration test that:
- Starts
MiniTezClusterin@BeforeClass - Submits the Level 1
NumberPipelineDAG(reused fromlevel-1-number-pipeline) - Waits for the DAG to complete
- Reads back the counter
NumberPipeline/TotalSumand asserts it equals 9900 - Stops the cluster in
@AfterClass
This is the same pattern used by TestOrderedWordCount — you are building the
exact kind of test that Tez committers write for new DAG features.
Step 1 — Create the Maven Module
book/projects/level-5-integration-test/
pom.xml
src/test/java/org/apache/tez/learning/l5/
TestNumberPipelineWithMiniCluster.java
The module is a test-only module (no src/main/). It depends on:
org.apache.tez.learning:level-1-number-pipeline:1.0-SNAPSHOT(your DAG)org.apache.tez:tez-tests(forMiniTezCluster)- JUnit 4.13.2
org.apache.hadoop:hadoop-minicluster
<dependency>
<groupId>org.apache.tez</groupId>
<artifactId>tez-tests</artifactId>
<version>${tez.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.tez.learning</groupId>
<artifactId>level-1-number-pipeline</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
Add level-5-integration-test to the parent pom.xml modules list.
Step 2 — Write TestNumberPipelineWithMiniCluster.java
Skeleton:
package org.apache.tez.learning.l5;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.client.DAGClient;
import org.apache.tez.dag.client.DAGStatus;
import org.apache.tez.learning.l1.NumberPipelineDAG;
import org.apache.tez.test.MiniTezCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestNumberPipelineWithMiniCluster {
private static MiniTezCluster miniTezCluster;
private static TezClient tezClient;
private static TezConfiguration tezConf;
@BeforeClass
public static void setUpClass() throws Exception {
// Start MiniTezCluster with 1 NodeManager
miniTezCluster = new MiniTezCluster(
TestNumberPipelineWithMiniCluster.class.getName(), 1, 1, 1);
Configuration conf = new Configuration();
miniTezCluster.init(conf);
miniTezCluster.start();
tezConf = new TezConfiguration(miniTezCluster.getConfig());
tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, false);
tezClient = TezClient.create(
"TestNumberPipelineClient", tezConf);
tezClient.start();
}
@AfterClass
public static void tearDownClass() throws Exception {
if (tezClient != null) {
tezClient.stop();
}
if (miniTezCluster != null) {
miniTezCluster.stop();
}
}
@Test(timeout = 120_000)
public void testNumberPipelineTotalSum() throws Exception {
// Build the Level 1 DAG (local mode runs fine in mini-cluster too)
DAG dag = NumberPipelineDAG.buildDAG(tezConf);
DAGClient dagClient = tezClient.submitDAG(dag);
DAGStatus dagStatus = dagClient.waitForCompletion();
assertEquals("DAG must succeed",
DAGStatus.State.SUCCEEDED, dagStatus.getState());
TezCounters counters = dagStatus.getDAGCounters();
assertNotNull("Counters must be present", counters);
long totalSum = counters
.getGroup("NumberPipeline")
.findCounter("TotalSum")
.getValue();
assertEquals("TotalSum for 0..99 must equal 4950", 4950L, totalSum);
}
}
Adapting
NumberPipelineDAG: the Level 1 project is designed for local mode (TezConfiguration.TEZ_LOCAL_MODE = true). You will need to either (a) add a staticbuildDAG(TezConfiguration conf)factory method that accepts an external config, or (b) create a subclass that overrides the DAG construction to accept an injected config. Choose (a).
Step 3 — Verify the Build
cd book/projects
mvn -pl level-1-number-pipeline install -DskipTests -q
mvn -pl level-5-integration-test test -q 2>&1 | tail -20
Expected:
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
Step 4 — Deep Questions
| # | Question |
|---|---|
| 1 | Why does the test use dagStatus.getDAGCounters() instead of dagClient.getDAGStatus(EnumSet.of(StatusGetOpts.GET_COUNTERS))? Are they equivalent? |
| 2 | The timeout is 120_000 ms. Why does a simple 100-integer DAG need 2 minutes? |
| 3 | If the DAG fails, dagStatus.getState() returns FAILED and the assertion fires. How would you get the failure reason from dagStatus? |
| 4 | @BeforeClass uses static fields. What happens if two test classes in the same JVM both start MiniTezCluster? How does TestOrderedWordCount handle this? |
| 5 | The counter group is "NumberPipeline" and the counter name is "TotalSum". If you mistype the group name, what does getGroup() return? Does the assertion fail gracefully? |
Step 5 — Experiment: Add a Second Assertion
After verifying TotalSum, add an assertion on the number of tasks run:
long inputRecords = counters
.findCounter(TaskCounter.INPUT_RECORDS_PROCESSED)
.getValue();
// How many input records do you expect?
assertEquals(???, inputRecords);
Think about the DAG topology:
- Source vertex: 1 task, emits 100 integers
- Sink vertex: 1 task, reads 100 records
What value do you expect for INPUT_RECORDS_PROCESSED across both vertices?
Step 6 — Tez Source Connection Table
| Class used in this lab | Tez source file |
|---|---|
MiniTezCluster | |
TezClient | |
DAGClient | |
DAGStatus | |
TezCounters |