Lab 7.2 — Modify a Processor: Add Deduplication to UnionSinkProcessor

Lab type: Fix-It / Extend
Estimated time: 90 min
Maven module: book/projects/level-3-multi-input


Overview

UnionSinkProcessor from Level 3 sums all values it receives. In this lab you will extend it to deduplicate records by key before summing — only the first record for each key is counted.

This exercise teaches:

  • How to modify a Processor that uses OrderedGroupedKVInput
  • How counters interact with deduplication logic
  • How to write a unit test for processor logic using mocks

Step 1 — Understand the Current Behavior

The current UnionSinkProcessor (Level 3) receives (Integer key, Integer value) pairs and sums all values. For the test input (0..99 integers), expected sum is 4950.

Open UnionSinkProcessor.java and answer:

  1. How does it iterate over input records?
  2. Where does it write the counter?
  3. What happens if the same key appears twice (e.g. key=5, value=5 appears from both the even source and… wait, can it? Check EvenNumberSource and OddNumberSource.)

Step 2 — Add a Deduplicating Variant

Create DeduplicatingUnionSinkProcessor.java in the same package. It should:

  1. Maintain a Set<Integer> of seen keys
  2. For each (key, value) pair from the input: if key is new, add to set and add value to the sum; otherwise skip
  3. Publish the same UnionPipeline/TotalSum counter
  4. Also publish a new counter UnionPipeline/DuplicatesSkipped

Step 3 — Write a Unit Test

Create TestDeduplicatingUnionSinkProcessor.java. Use the Mockito pattern from TestMultiInputProcessors:

@Test
public void testDuplicateKeyIsSkippedOnce() {
    // Create a mock input that returns (key=1, value=10) twice
    // and (key=2, value=20) once
    // Expected TotalSum: 10 + 20 = 30
    // Expected DuplicatesSkipped: 1
}

@Test
public void testAllUniqueKeys() {
    // No duplicates: result must equal non-deduplicating sum
}

Step 4 — Run the Tests

cd book/projects
mvn -pl level-3-multi-input test -q 2>&1 | tail -10

Step 5 — Questions

#Question
1If your deduplication Set grows very large (millions of keys), what would happen to the task JVM heap?
2The input is already sorted by key (because OrderedGroupedKVInput sorts). Could you use this property to deduplicate without a Set? Rewrite DeduplicatingUnionSinkProcessor to use O(1) memory.
3Your new counter UnionPipeline/DuplicatesSkipped — where in the Tez framework does it get propagated to the AM and eventually to DAGStatus.getDAGCounters()?