Lab 3.3 — Build It: Multi-Input Union DAG

Lab type: Build It — real Maven project, compilable Java, run + break + fix cycle
Estimated time: 90–120 min
Maven module: book/projects/level-3-multi-input
Main class: org.apache.tez.learning.l3.MultiInputDAG


What You Will Build

EvenNumberSource(1) ──even-edge──┐
                                  ├─▶ MultiInputUnionProcessor(1) ──▶ UnionSinkProcessor(1)
OddNumberSource(1)  ──odd-edge───┘

Two source vertices emit separate streams of integers (even: 0,2,4,…,98 and odd: 1,3,5,…,99). A middle vertex receives both streams through two named input edges, unions them, and forwards everything to a terminal sink. The sink sums all values and publishes the result via a Tez counter.

Expected output: TotalSum=4950 PASS

This is the smallest possible Tez program with a multi-input vertex — the same structural pattern used by every Tez join, union, and co-group operation.


Step 1 — Set the Tez Version

Open book/projects/pom.xml and confirm <tez.version> matches your local build:

cd ~/tez-src         # your Tez clone from Lab 1.1
git log --oneline -1 | head -c 60
mvn help:evaluate -Dexpression=project.version -q -DforceStdout 2>/dev/null

If the version printed differs from what is in the POM, update <tez.version> before continuing.


Step 2 — Compile and Run the Unit Tests

cd /path/to/apache-tez/book/projects

# Compile and test the new module only
mvn -pl level-3-multi-input test

You should see:

Tests run: 10, Failures: 0, Errors: 0, Skipped: 0

Read every test in TestMultiInputProcessors.java before moving on.

Questions

#Question
1testEvenAndOddRangesNoOverlapNoGap simulates both sources using a boolean[]. Why is this a more rigorous check than just verifying the counts?
2testEdgeNameConstants tests string literals. What real bug would be caught if a developer renamed the constant but not the string in buildDAG()?
3testExpectedSum hardcodes 4950L. Could you make this test fail by changing only EvenNumberSource.COUNT? What would change?

Step 3 — Build the Fat JAR and Run the DAG

mvn -pl level-3-multi-input package -q

java -jar level-3-multi-input/target/level-3-multi-input-1.0-SNAPSHOT-jar-with-dependencies.jar

Expected final line:

[MultiInputDAG] TotalSum=4950  expected=4950  PASS

If you see FAIL, the counter value is wrong — note the actual value before proceeding to the debugging exercises.


Step 4 — Read Every Source File

Work through each file in src/main/java/org/apache/tez/learning/l3/.

EvenNumberSource.java

#Question
1What Tez base class does it extend? What does that class provide?
2Why does run() call output.start() before getWriter()? What happens if you skip it? (Break It experiment below)
3The output is retrieved by getOutputs().values().iterator().next(). What would break if this vertex had two outputs?
4Why are key and value declared once outside the loop rather than inside it? What allocation cost would the inner-loop placement cause?

MultiInputUnionProcessor.java

#Question
1Inputs are retrieved by string name: inputs.get(EVEN_EDGE). Where are these names assigned? Trace the call to setDestinationEdgeName in MultiInputDAG.java.
2Both inputs are started before either reader is obtained. Could you start them one at a time (start even → read even → start odd → read odd)? What would happen?
3After draining the even input, the odd input's reader is obtained separately. Is there a scenario where odd records arrive before all even records have been read? How does Tez buffer handle this?
4The processor forwards records unchanged (key=value=integer). What change to run() would be needed to emit only distinct values if both sources could produce duplicates?

MultiInputDAG.java

#Question
1Both evenEdge and oddEdge use edgeCfg.createDefaultEdgeProperty(). Could you use different edge configs for the two sources? When would that be necessary?
2Edge.setDestinationEdgeName(...) names the edge as seen by the destination vertex. Does the source vertex also see this name? Check by reading the Edge API.
3The DAG has 4 vertices. Draw the dependency graph. Which vertices can run in parallel?
4waitForCompletion(EnumSet.of(StatusGetOpts.GET_COUNTERS)) — what does GET_COUNTERS do? What would status.getDAGCounters() return if this option were omitted?

Step 5 — Break It: Three Experiments

Perform each experiment, observe the failure, then revert before the next one.

Experiment A — Swap the edge names

In MultiInputDAG.buildDAG(), swap EVEN_EDGE and ODD_EDGE:

.setDestinationEdgeName(MultiInputUnionProcessor.ODD_EDGE)   // was EVEN_EDGE
// ...
.setDestinationEdgeName(MultiInputUnionProcessor.EVEN_EDGE)  // was ODD_EDGE

Rebuild and run.

  • Does the DAG succeed or fail?
  • Is the sum still 4950?
  • Why does swapping the names not cause a failure here, but would cause a failure in a join operation where the left and right inputs have different schemas?

Experiment B — Remove one start() call

In MultiInputUnionProcessor.run(), remove evenInput.start().

Rebuild and run.

  • What exception is thrown? On which line?
  • Search the Tez source for the method that throws this exception. What is the guard condition?

Experiment C — Make one source emit duplicates

In EvenNumberSource.run(), change int n = i * 2 to int n = 0 (every write uses key=0).

Rebuild and run.

  • What is the counter value now?
  • Is the DAG PASS or FAIL?
  • What does this reveal about how OrderedGroupedKVInput handles duplicate keys when the value type is IntWritable?

Step 6 — Implement a FilterUnionProcessor

Create a new file in the same package: FilterUnionProcessor.java

Specification:

  • Extends AbstractLogicalIOProcessor
  • Has the same two named inputs as MultiInputUnionProcessor
  • Accepts a threshold via UserPayload (key "threshold", default 50)
  • Reads from both inputs; only forwards values >= threshold
  • Increments counter UnionPipeline/FilteredCount for each record dropped

Wire it into the DAG as a replacement for MultiInputUnionProcessor:

Vertex filter = Vertex.create(
    "FilterUnion",
    ProcessorDescriptor.create(FilterUnionProcessor.class.getName())
        .setUserPayload(UserPayload.create(
            ByteBuffer.wrap("threshold=50".getBytes()))),
    1);

Expected result: With threshold=50, values 0–49 are dropped, values 50–99 are forwarded. Sum at sink = 50+51+…+99 = 3725. FilteredCount = 50.


Step 7 — Tez Source Connection Table

For each class below, locate the corresponding source file in your Tez clone and record the path:

Class used in this projectTez source file (relative to repo root)
OrderedPartitionedKVOutput
OrderedGroupedKVInput
OrderedPartitionedKVEdgeConfig
HashPartitioner
Edge.setDestinationEdgeName

Step 8 — Connect to Real Tez Data Flows

Open tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java or OrderedWordCount.java in the Tez source tree.

  1. Find a DAG in the examples that has more than 2 vertices.
  2. Draw its topology as an ASCII diagram.
  3. Identify which vertex is the "union-like" vertex (if any) that receives edges from multiple sources.
  4. Compare its processor class to MultiInputUnionProcessor: what is similar, what is different?

Step 9 — JIRA Research

Search issues.apache.org/jira for:

project = TEZ AND text ~ "multi-input" AND resolution = Fixed

Find one resolved issue involving multiple inputs to a single vertex.

  • What was the bug?
  • Which class was modified?
  • Was a test added? If so, what does it test?