Lab 3.3 — Build It: Multi-Input Union DAG
Lab type: Build It — real Maven project, compilable Java, run + break + fix cycle
Estimated time: 90–120 min
Maven module: book/projects/level-3-multi-input
Main class: org.apache.tez.learning.l3.MultiInputDAG
What You Will Build
EvenNumberSource(1) ──even-edge──┐
├─▶ MultiInputUnionProcessor(1) ──▶ UnionSinkProcessor(1)
OddNumberSource(1) ──odd-edge───┘
Two source vertices emit separate streams of integers (even: 0,2,4,…,98 and odd: 1,3,5,…,99). A middle vertex receives both streams through two named input edges, unions them, and forwards everything to a terminal sink. The sink sums all values and publishes the result via a Tez counter.
Expected output: TotalSum=4950 PASS
This is the smallest possible Tez program with a multi-input vertex — the same structural pattern used by every Tez join, union, and co-group operation.
Step 1 — Set the Tez Version
Open book/projects/pom.xml and confirm <tez.version> matches your local build:
cd ~/tez-src # your Tez clone from Lab 1.1
git log --oneline -1 | head -c 60
mvn help:evaluate -Dexpression=project.version -q -DforceStdout 2>/dev/null
If the version printed differs from what is in the POM, update <tez.version> before
continuing.
Step 2 — Compile and Run the Unit Tests
cd /path/to/apache-tez/book/projects
# Compile and test the new module only
mvn -pl level-3-multi-input test
You should see:
Tests run: 10, Failures: 0, Errors: 0, Skipped: 0
Read every test in TestMultiInputProcessors.java before moving on.
Questions
| # | Question |
|---|---|
| 1 | testEvenAndOddRangesNoOverlapNoGap simulates both sources using a boolean[]. Why is this a more rigorous check than just verifying the counts? |
| 2 | testEdgeNameConstants tests string literals. What real bug would be caught if a developer renamed the constant but not the string in buildDAG()? |
| 3 | testExpectedSum hardcodes 4950L. Could you make this test fail by changing only EvenNumberSource.COUNT? What would change? |
Step 3 — Build the Fat JAR and Run the DAG
mvn -pl level-3-multi-input package -q
java -jar level-3-multi-input/target/level-3-multi-input-1.0-SNAPSHOT-jar-with-dependencies.jar
Expected final line:
[MultiInputDAG] TotalSum=4950 expected=4950 PASS
If you see FAIL, the counter value is wrong — note the actual value before
proceeding to the debugging exercises.
Step 4 — Read Every Source File
Work through each file in src/main/java/org/apache/tez/learning/l3/.
EvenNumberSource.java
| # | Question |
|---|---|
| 1 | What Tez base class does it extend? What does that class provide? |
| 2 | Why does run() call output.start() before getWriter()? What happens if you skip it? (Break It experiment below) |
| 3 | The output is retrieved by getOutputs().values().iterator().next(). What would break if this vertex had two outputs? |
| 4 | Why are key and value declared once outside the loop rather than inside it? What allocation cost would the inner-loop placement cause? |
MultiInputUnionProcessor.java
| # | Question |
|---|---|
| 1 | Inputs are retrieved by string name: inputs.get(EVEN_EDGE). Where are these names assigned? Trace the call to setDestinationEdgeName in MultiInputDAG.java. |
| 2 | Both inputs are started before either reader is obtained. Could you start them one at a time (start even → read even → start odd → read odd)? What would happen? |
| 3 | After draining the even input, the odd input's reader is obtained separately. Is there a scenario where odd records arrive before all even records have been read? How does Tez buffer handle this? |
| 4 | The processor forwards records unchanged (key=value=integer). What change to run() would be needed to emit only distinct values if both sources could produce duplicates? |
MultiInputDAG.java
| # | Question |
|---|---|
| 1 | Both evenEdge and oddEdge use edgeCfg.createDefaultEdgeProperty(). Could you use different edge configs for the two sources? When would that be necessary? |
| 2 | Edge.setDestinationEdgeName(...) names the edge as seen by the destination vertex. Does the source vertex also see this name? Check by reading the Edge API. |
| 3 | The DAG has 4 vertices. Draw the dependency graph. Which vertices can run in parallel? |
| 4 | waitForCompletion(EnumSet.of(StatusGetOpts.GET_COUNTERS)) — what does GET_COUNTERS do? What would status.getDAGCounters() return if this option were omitted? |
Step 5 — Break It: Three Experiments
Perform each experiment, observe the failure, then revert before the next one.
Experiment A — Swap the edge names
In MultiInputDAG.buildDAG(), swap EVEN_EDGE and ODD_EDGE:
.setDestinationEdgeName(MultiInputUnionProcessor.ODD_EDGE) // was EVEN_EDGE
// ...
.setDestinationEdgeName(MultiInputUnionProcessor.EVEN_EDGE) // was ODD_EDGE
Rebuild and run.
- Does the DAG succeed or fail?
- Is the sum still 4950?
- Why does swapping the names not cause a failure here, but would cause a failure in a join operation where the left and right inputs have different schemas?
Experiment B — Remove one start() call
In MultiInputUnionProcessor.run(), remove evenInput.start().
Rebuild and run.
- What exception is thrown? On which line?
- Search the Tez source for the method that throws this exception. What is the guard condition?
Experiment C — Make one source emit duplicates
In EvenNumberSource.run(), change int n = i * 2 to int n = 0 (every write uses key=0).
Rebuild and run.
- What is the counter value now?
- Is the DAG PASS or FAIL?
- What does this reveal about how
OrderedGroupedKVInputhandles duplicate keys when the value type isIntWritable?
Step 6 — Implement a FilterUnionProcessor
Create a new file in the same package:
FilterUnionProcessor.java
Specification:
- Extends
AbstractLogicalIOProcessor - Has the same two named inputs as
MultiInputUnionProcessor - Accepts a threshold via
UserPayload(key"threshold", default50) - Reads from both inputs; only forwards values
>= threshold - Increments counter
UnionPipeline/FilteredCountfor each record dropped
Wire it into the DAG as a replacement for MultiInputUnionProcessor:
Vertex filter = Vertex.create(
"FilterUnion",
ProcessorDescriptor.create(FilterUnionProcessor.class.getName())
.setUserPayload(UserPayload.create(
ByteBuffer.wrap("threshold=50".getBytes()))),
1);
Expected result: With threshold=50, values 0–49 are dropped, values 50–99 are
forwarded. Sum at sink = 50+51+…+99 = 3725. FilteredCount = 50.
Step 7 — Tez Source Connection Table
For each class below, locate the corresponding source file in your Tez clone and record the path:
| Class used in this project | Tez source file (relative to repo root) |
|---|---|
OrderedPartitionedKVOutput | |
OrderedGroupedKVInput | |
OrderedPartitionedKVEdgeConfig | |
HashPartitioner | |
Edge.setDestinationEdgeName |
Step 8 — Connect to Real Tez Data Flows
Open tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java or
OrderedWordCount.java in the Tez source tree.
- Find a DAG in the examples that has more than 2 vertices.
- Draw its topology as an ASCII diagram.
- Identify which vertex is the "union-like" vertex (if any) that receives edges from multiple sources.
- Compare its processor class to
MultiInputUnionProcessor: what is similar, what is different?
Step 9 — JIRA Research
Search issues.apache.org/jira for:
project = TEZ AND text ~ "multi-input" AND resolution = Fixed
Find one resolved issue involving multiple inputs to a single vertex.
- What was the bug?
- Which class was modified?
- Was a test added? If so, what does it test?