Lab 1.4: Project — Number Pipeline DAG
What You Are Building
A self-contained, runnable Java project that builds and executes a 3-vertex Tez DAG entirely in local mode — no YARN cluster, no HDFS, no Docker required.
Generator (2 tasks)
│ SCATTER_GATHER shuffle
▼
Multiplier (2 tasks) [value * 2]
│ SCATTER_GATHER shuffle
▼
Sink (1 task) [sum → counter]
Numbers 0–99 flow through the pipeline. The expected final sum is:
sum(0..99) * 2 = 4950 * 2 = 9900.
This pipeline intentionally mirrors the structure of Apache Tez's own
OrderedWordCount example but with an integer domain so the math is verifiable
without a corpus.
Project Location
book/projects/
├── pom.xml ← parent; sets Tez + Hadoop versions
└── level-1-number-pipeline/
├── pom.xml
└── src/main/java/org/apache/tez/learning/l1/
├── GeneratorProcessor.java ← no inputs; emits integers
├── MultiplierProcessor.java ← one input, one output; value * 2
├── SinkProcessor.java ← sums values; publishes counter
├── FilterProcessor.java ← exercise stub (incomplete)
└── NumberPipelineDAG.java ← main class; configures + submits DAG
Prerequisites
- Completed Lab 1.1 (Apache Tez built from source with
mvn install -DskipTests) - Java 8+ on
$PATH - Maven 3.6+ on
$PATH
Step 1: Set the Tez Version
The parent pom.xml needs to reference the exact version that mvn install
installed into your local ~/.m2 repository. Find it:
# Inside your apache/tez clone:
grep -m1 '<version>' pom.xml
Open book/projects/pom.xml and set <tez.version> to match:
<tez.version>0.10.3-SNAPSHOT</tez.version> <!-- adjust to your build -->
Step 2: Compile
cd /path/to/opensource-engineer-and-contributor/apache-tez/book/projects
# Build only the level-1 module (fast; skips the other modules)
mvn -pl level-1-number-pipeline package -q
You should see no errors. The fat JAR is at:
level-1-number-pipeline/target/level-1-number-pipeline-1.0-SNAPSHOT-jar-with-dependencies.jar
If you see Could not resolve dependency org.apache.tez:tez-api:
- Verify that
tez.versionmatches the version in~/.m2/repository/org/apache/tez/tez-api/ - Re-run
mvn install -DskipTestsin your Tez clone
Step 3: Run
java -jar level-1-number-pipeline/target/level-1-number-pipeline-1.0-SNAPSHOT-jar-with-dependencies.jar
Expected output (log lines abbreviated):
TezClient started (local mode).
Submitting DAG...
[SinkProcessor] task=0 partialSum=9900
=== NumberPipeline Result ===
Expected : 9900
Actual : 9900
Result : PASS
Note: You will see a large number of INFO log lines from the Tez framework. This is normal for local mode. The important lines are the ones from
[SinkProcessor]and the final=== Result ===block.
Step 4: Read Every Source File
Before modifying anything, read each Java file carefully.
GeneratorProcessor.java
Key questions:
- Which Tez interface does it implement?
- Why is
output.start()called beforegetWriter()? What happens if you remove it? - How does the processor know which range of numbers to generate? What Tez API provides this?
- The key and value written are both the same integer
n. Why? When would you want them to differ?
MultiplierProcessor.java
Key questions:
OrderedGroupedKVInputvsOrderedPartitionedKVOutput— which side is the input and which is the output? Why are they named differently?- Both
input.start()andoutput.start()are called. What doesinput.start()actually trigger? (Hint: look atOrderedGroupedKVInput.start()in the Tez source.) FACTOR = 2is hardcoded. The Javadoc explains how to pass it viaUserPayload. What is the size in bytes of anintencoded in aByteBuffer?
SinkProcessor.java
Key questions:
- What is the type of
getContext().getCounters()? findCounter(group, name)— what happens if the counter doesn't exist yet when first called?- There is only one Sink task (parallelism=1). If you changed it to 2, would the counter still be correct? Why?
NumberPipelineDAG.java
Key questions:
- What does
tez.local.mode=trueactually change about task execution? OrderedPartitionedKVEdgeConfig.newBuilder(keyClass, valueClass, partitionerClass)— what isHashPartitionerdoing here, and where does the partition count come from?dagClient.waitForCompletion()— does this block on the calling thread, or is it async?EnumSet.of(StatusGetOpts.GET_COUNTERS)— why is this extra call needed? Why aren't counters always included inDAGStatus?
Step 5: Break It and Understand It
Make each change, run the JAR, observe the failure, then revert.
Break 1: Remove output.start()
In GeneratorProcessor.run(), comment out logicalOutput.start().
Expected: NullPointerException or IllegalStateException from the Tez runtime when
getWriter() is called on an uninitialized output.
Why this matters: Tez I/O objects are lazily initialized. The start() method triggers
buffer allocation, sort buffer setup, and (for inputs) the shuffle fetch. Forgetting
start() is a common first patch mistake.
Break 2: Set the wrong parallelism
Change sink parallelism from 1 to 3, run again.
Observe: does the result change? Is it still 9900? Why or why not?
Expected: the total counter is still 9900, because each Sink task emits a partial sum and the AM aggregates counters across all tasks automatically.
Break 3: Swap key and value in the Generator
Change writer.write(new IntWritable(n), new IntWritable(n)) to
writer.write(new IntWritable(0), new IntWritable(n)) (fixed key = 0).
Expected: all values route to the same Multiplier task (the one that owns partition 0).
The other Multiplier task gets no work. The result is still 9900 (correct) but the work
distribution is skewed. You can verify this by adding a counter in MultiplierProcessor
that tracks how many records each task processed.
Why this matters: key-skew (many records with the same key) is one of the most common Tez/MapReduce performance problems. This exercise makes it visible.
Step 6: Add a FilterProcessor (Exercise)
Open FilterProcessor.java. This is the skeleton for your exercise.
Your task: Insert a FilterProcessor between Multiplier and Sink that drops all
values not divisible by 4, then verify the new expected sum.
Step 6a: Implement FilterProcessor
- Add a
private int thresholdfield. - In
initialize(), read the threshold fromUserPayload:byte[] bytes = getContext().getUserPayload().deepCopyAsArray(); this.threshold = ByteBuffer.wrap(bytes).getInt(); - In
run(), replaceif (true)withif (value.get() % threshold == 0).
Step 6b: Update NumberPipelineDAG.buildDAG()
Vertex filter = Vertex.create("filter",
ProcessorDescriptor.create(FilterProcessor.class.getName())
.setUserPayload(UserPayload.create(
ByteBuffer.allocate(4).putInt(4).flip())), // threshold=4
2); // same parallelism as multiplier
// New edge chain: generator → multiplier → filter → sink
.addEdge(Edge.create(generator, multiplier, edgeConf.createDefaultEdgeProperty()))
.addEdge(Edge.create(multiplier, filter, edgeConf.createDefaultEdgeProperty()))
.addEdge(Edge.create(filter, sink, edgeConf.createDefaultEdgeProperty()));
Step 6c: Calculate the new expected sum
After multiplying by 2, the values are: 0, 2, 4, 6, 8, …, 198. After filtering (keep only values divisible by 4): 0, 4, 8, 12, …, 196. Sum of {0, 4, 8, …, 196} = 4 * sum(0, 1, 2, …, 49) = 4 * (49*50/2) = 4 * 1225 = 4900.
Update NumberPipelineDAG.expectedSum() to return 4900 and verify PASS.
Step 7: Connect This to the Tez Source
Every class you used in this project maps to a real Tez module.
| Class | Module | Source path |
|---|---|---|
AbstractLogicalIOProcessor | tez-runtime-api | tez-runtime-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java |
OrderedPartitionedKVOutput | tez-runtime-library | tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java |
OrderedGroupedKVInput | tez-runtime-library | tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java |
OrderedPartitionedKVEdgeConfig | tez-runtime-library | tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java |
TezClient | tez-api | tez-api/src/main/java/org/apache/tez/client/TezClient.java |
TezConfiguration | tez-common | tez-common/src/main/java/org/apache/tez/dag/api/TezConfiguration.java |
After running the pipeline successfully, open each source file above. For each one:
- Find the method you called
- Read its implementation — what does it actually do?
- Find the unit test class for that file (usually in
src/test/java/under the same package)
Step 8: Find Related JIRA Issues
This pipeline uses OrderedPartitionedKVOutput. Search the Tez JIRA for issues in this
component to find real bugs and improvements you could work on:
project = TEZ AND component = "runtime-library" AND status in (Open, Patch Available)
ORDER BY priority DESC
Also search specifically:
text ~ "OrderedPartitionedKVOutput" AND status in (Open, "Patch Available")
For each open issue you find, ask yourself:
- Do you understand what the bug description is saying?
- Can you locate the relevant code in the source?
- Is there a failing test, or do you need to write one?
Expected Deliverables
- Project compiles without errors
-
Running the JAR prints
PASSwith result 9900 - You can answer all questions in Step 4 (with file:line references to the source)
- You have run all three "Break It" experiments and understand each failure
-
FilterProcessoris implemented and the pipeline printsPASSwith result 4900 - You have opened all 5 source files from the "Connect to Source" table
-
You have found at least 2 open JIRA issues in the
runtime-librarycomponent