Lab 7.2 — Modify a Processor: Add Deduplication to UnionSinkProcessor
Lab type: Fix-It / Extend
Estimated time: 90 min
Maven module: book/projects/level-3-multi-input
Overview
UnionSinkProcessor from Level 3 sums all values it receives. In this lab
you will extend it to deduplicate records by key before summing — only
the first record for each key is counted.
This exercise teaches:
- How to modify a
Processorthat usesOrderedGroupedKVInput - How counters interact with deduplication logic
- How to write a unit test for processor logic using mocks
Step 1 — Understand the Current Behavior
The current UnionSinkProcessor (Level 3) receives (Integer key, Integer value)
pairs and sums all values. For the test input (0..99 integers), expected sum is 4950.
Open UnionSinkProcessor.java and answer:
- How does it iterate over input records?
- Where does it write the counter?
- What happens if the same key appears twice (e.g. key=5, value=5 appears
from both the even source and… wait, can it? Check
EvenNumberSourceandOddNumberSource.)
Step 2 — Add a Deduplicating Variant
Create DeduplicatingUnionSinkProcessor.java in the same package. It should:
- Maintain a
Set<Integer>of seen keys - For each
(key, value)pair from the input: if key is new, add to set and add value to the sum; otherwise skip - Publish the same
UnionPipeline/TotalSumcounter - Also publish a new counter
UnionPipeline/DuplicatesSkipped
Step 3 — Write a Unit Test
Create TestDeduplicatingUnionSinkProcessor.java. Use the Mockito pattern
from TestMultiInputProcessors:
@Test
public void testDuplicateKeyIsSkippedOnce() {
// Create a mock input that returns (key=1, value=10) twice
// and (key=2, value=20) once
// Expected TotalSum: 10 + 20 = 30
// Expected DuplicatesSkipped: 1
}
@Test
public void testAllUniqueKeys() {
// No duplicates: result must equal non-deduplicating sum
}
Step 4 — Run the Tests
cd book/projects
mvn -pl level-3-multi-input test -q 2>&1 | tail -10
Step 5 — Questions
| # | Question |
|---|---|
| 1 | If your deduplication Set grows very large (millions of keys), what would happen to the task JVM heap? |
| 2 | The input is already sorted by key (because OrderedGroupedKVInput sorts). Could you use this property to deduplicate without a Set? Rewrite DeduplicatingUnionSinkProcessor to use O(1) memory. |
| 3 | Your new counter UnionPipeline/DuplicatesSkipped — where in the Tez framework does it get propagated to the AM and eventually to DAGStatus.getDAGCounters()? |