Lab 4.3 — Build It: WavingVertexManager
Lab type: Build It — VertexManagerPlugin with full JUnit + Mockito test suite
Estimated time: 120–150 min
Maven module: book/projects/level-4-waving-manager
Key class: org.apache.tez.learning.l4.WavingVertexManager
What You Will Build
A VertexManagerPlugin that schedules tasks in configurable waves:
- Wave 0: tasks 0 to waveSize-1
- Wave 1: tasks waveSize to 2×waveSize-1
- Wave N: starts only when all tasks in wave N-1 have succeeded
Wave size is read from UserPayload as "waveSize=N".
Default: WavingVertexManager.DEFAULT_WAVE_SIZE = 2.
This is a minimal but complete VertexManagerPlugin — the same architectural
pattern used by ImmediateStartVertexManager, ShuffleVertexManager, and the
VertexManagerPlugin inside every Hive-on-Tez reduce vertex.
Step 1 — Understand the VertexManagerPlugin Contract
Before reading any code, open the Tez source:
find ~/tez-src -name "VertexManagerPlugin.java" | head -3
find ~/tez-src -name "VertexManagerPluginContext.java" | head -3
find ~/tez-src -name "ImmediateStartVertexManager.java" | head -3
Read all three files completely. Then answer:
| # | Question |
|---|---|
| 1 | What are all the lifecycle callback methods in VertexManagerPlugin? List them. |
| 2 | When does the Tez AM call initialize()? Can you call scheduleVertexTasks() from inside initialize()? |
| 3 | What does VertexManagerPluginContext.scheduleVertexTasks(List<ScheduleTaskRequest>) actually do to the DAG execution engine? |
| 4 | ImmediateStartVertexManager.onVertexStarted() calls scheduleAllTasks(). Does it call scheduleVertexTasks once (all tasks in one list) or once per task? Why does that matter for performance? |
| 5 | What is the purpose of VertexManagerPluginContext.reconfigureVertex()? Does WavingVertexManager use it? |
Step 2 — Compile and Run the Tests
cd /path/to/apache-tez/book/projects
mvn -pl level-4-waving-manager test
Expected:
Tests run: 13, Failures: 0, Errors: 0, Skipped: 0
Step 3 — Read the Source Code
Open WavingVertexManager.java and work through every section.
initialize()
| # | Question |
|---|---|
| 1 | The payload is parsed as "waveSize=N". Where in a real DAG would you set this payload? (Hint: VertexManagerPluginDescriptor.setUserPayload() in DAG.create()) |
| 2 | Why does initialize() store totalTasks from the context rather than accepting it as a constructor argument? |
| 3 | If the user sets waveSize=1000 but there are only 5 tasks, what happens? Is there a bug? |
| 4 | Why are scheduled and waveFinished BitSets rather than List<Integer>? What is the time complexity of BitSet.andNot()? |
onVertexStarted()
| # | Question |
|---|---|
| 1 | The completions map passed to onVertexStarted is ignored. Under what condition would a real plugin need to process it? |
| 2 | Why is scheduleNextWave() called here and not from initialize()? |
onTaskAttemptCompleted()
| # | Question |
|---|---|
| 1 | Failed attempts are silently ignored (if (!successful) return). What should a production plugin do instead? |
| 2 | checkAndScheduleNextWave() clones scheduled to avoid mutating it. What subtle bug would occur without the clone? |
| 3 | Trace through the state machine for 4 tasks, waveSize=2. Draw the state of scheduled and waveFinished after each callback. |
scheduleNextWave()
| # | Question |
|---|---|
| 1 | The while loop has two conditions: nextTaskToSchedule < totalTasks AND count < waveSize. Which terminates the loop for the last wave if the number of tasks is not a multiple of waveSize? |
| 2 | The scheduled.get(idx) guard protects against double-scheduling. In what scenario could idx already be set? (Hint: look at the testTaskNotScheduledTwice test.) |
Step 4 — Read the Test Suite
Open TestWavingVertexManager.java. For each test, before reading the assertions:
- Read the test name
- Predict what the test will assert
- Then read the actual assertions and compare to your prediction
Pay particular attention to how Mockito is used:
| Mockito call | What it does |
|---|---|
mock(VertexManagerPluginContext.class) | Creates a fake context that records all calls |
when(ctx.getVertexNumTasks(...)).thenReturn(6) | Stubs a specific return value |
verify(ctx, times(2)).scheduleVertexTasks(anyList()) | Asserts the method was called exactly twice |
ArgumentCaptor.forClass(List.class) | Captures the actual argument for deep inspection |
Questions
| # | Question |
|---|---|
| 1 | testThreeWavesForSixTasks is an integration test of the entire scheduling lifecycle. Which individual unit tests cover the sub-cases that this test depends on? |
| 2 | testPartialWave0DoesNotTriggerWave1 verifies the negative case (wave NOT triggered). How does verify(times(1)) prove this? Could you use verifyNoMoreInteractions() instead? |
| 3 | The test class has a @Before setUp() method. What happens if you remove it and inline mockContext = mock(...) into each test instead? |
Step 5 — Break It: Three Experiments
Experiment A — Remove the if (!successful) return guard
Delete the early-return in onTaskAttemptCompleted. Run:
mvn -pl level-4-waving-manager test -Dtest=TestWavingVertexManager#testFailedAttemptDoesNotAdvanceWave
- Which test fails?
- What is the actual vs. expected
scheduleVertexTaskscall count? - Why does treating failures as successes cause premature wave advancement?
Experiment B — Remove the BitSet.clone() in checkAndScheduleNextWave
Change:
BitSet scheduledCopy = (BitSet) scheduled.clone();
scheduledCopy.andNot(waveFinished);
to:
scheduled.andNot(waveFinished);
Run the full test suite.
- Which tests fail?
- What data corruption does this mutation cause? Trace through
testThreeWavesForSixTasksmanually.
Experiment C — Change count < waveSize to count <= waveSize
In scheduleNextWave(), change the loop condition.
- How many tasks does wave 0 now schedule?
- Which test catches this?
Step 6 — Add a New Feature: onVertexManagerEventReceived
The real ShuffleVertexManager uses onVertexManagerEventReceived to receive
partition statistics from map tasks. Add support for a simple variant:
Create a new callback method:
@Override
public void onVertexManagerEventReceived(
List<VertexManagerEvent> vmEvents) throws Exception {
// If any event's user payload contains "skip=true", mark
// that task as finished so it does not block wave advancement.
for (VertexManagerEvent event : vmEvents) {
// TODO: parse UserPayload for "skip=true"; if present, call
// onTaskAttemptCompleted(taskIndex, true) to release the wave
}
}
Write a test for this method:
@Test
public void testSkipEventReleasesWave() {
// set up 4 tasks, wave size 2
// trigger onVertexStarted (wave 0: tasks 0,1)
// send a VertexManagerEvent for task 0 with payload "skip=true"
// verify task 0 is treated as done for wave-completion purposes
}
Step 7 — Tez Source Connection Table
| Class used in this project | Tez source file |
|---|---|
VertexManagerPlugin | |
VertexManagerPluginContext | |
ScheduleTaskRequest | |
ImmediateStartVertexManager | |
ShuffleVertexManager |
Step 8 — ShuffleVertexManager Deep Dive
Open ShuffleVertexManager.java in the Tez source:
find ~/tez-src -name "ShuffleVertexManager.java"
- Read
onVertexStarted(). Does it schedule tasks immediately likeWavingVertexManager, or does it wait? What does it wait for? - Find the
slowStartFractionfield. How does it determine when to start scheduling? - Find where
reconfigureVertex()is called. What does it change about the vertex? - How does
ShuffleVertexManagerprevent double-scheduling? Compare its guard to thescheduledBitSet inWavingVertexManager. ShuffleVertexManagerhas ~700 lines. Identify the 5 most important methods (the ones that contain the core scheduling logic) and list them.
Step 9 — JIRA Research: VertexManager Bugs
Search:
project = TEZ AND component = "tez-dag" AND text ~ "VertexManager" AND resolution = Fixed
Find one resolved issue where a VertexManagerPlugin had a scheduling bug.
- What was the bug? (Race condition? Double scheduling? Wrong wave boundary?)
- What was the fix?
- Was a test added? What does it mock?