Lab 4.3 — Build It: WavingVertexManager

Lab type: Build It — VertexManagerPlugin with full JUnit + Mockito test suite
Estimated time: 120–150 min
Maven module: book/projects/level-4-waving-manager
Key class: org.apache.tez.learning.l4.WavingVertexManager


What You Will Build

A VertexManagerPlugin that schedules tasks in configurable waves:

  • Wave 0: tasks 0 to waveSize-1
  • Wave 1: tasks waveSize to 2×waveSize-1
  • Wave N: starts only when all tasks in wave N-1 have succeeded

Wave size is read from UserPayload as "waveSize=N".
Default: WavingVertexManager.DEFAULT_WAVE_SIZE = 2.

This is a minimal but complete VertexManagerPlugin — the same architectural pattern used by ImmediateStartVertexManager, ShuffleVertexManager, and the VertexManagerPlugin inside every Hive-on-Tez reduce vertex.


Step 1 — Understand the VertexManagerPlugin Contract

Before reading any code, open the Tez source:

find ~/tez-src -name "VertexManagerPlugin.java" | head -3
find ~/tez-src -name "VertexManagerPluginContext.java" | head -3
find ~/tez-src -name "ImmediateStartVertexManager.java" | head -3

Read all three files completely. Then answer:

#Question
1What are all the lifecycle callback methods in VertexManagerPlugin? List them.
2When does the Tez AM call initialize()? Can you call scheduleVertexTasks() from inside initialize()?
3What does VertexManagerPluginContext.scheduleVertexTasks(List<ScheduleTaskRequest>) actually do to the DAG execution engine?
4ImmediateStartVertexManager.onVertexStarted() calls scheduleAllTasks(). Does it call scheduleVertexTasks once (all tasks in one list) or once per task? Why does that matter for performance?
5What is the purpose of VertexManagerPluginContext.reconfigureVertex()? Does WavingVertexManager use it?

Step 2 — Compile and Run the Tests

cd /path/to/apache-tez/book/projects
mvn -pl level-4-waving-manager test

Expected:

Tests run: 13, Failures: 0, Errors: 0, Skipped: 0

Step 3 — Read the Source Code

Open WavingVertexManager.java and work through every section.

initialize()

#Question
1The payload is parsed as "waveSize=N". Where in a real DAG would you set this payload? (Hint: VertexManagerPluginDescriptor.setUserPayload() in DAG.create())
2Why does initialize() store totalTasks from the context rather than accepting it as a constructor argument?
3If the user sets waveSize=1000 but there are only 5 tasks, what happens? Is there a bug?
4Why are scheduled and waveFinished BitSets rather than List<Integer>? What is the time complexity of BitSet.andNot()?

onVertexStarted()

#Question
1The completions map passed to onVertexStarted is ignored. Under what condition would a real plugin need to process it?
2Why is scheduleNextWave() called here and not from initialize()?

onTaskAttemptCompleted()

#Question
1Failed attempts are silently ignored (if (!successful) return). What should a production plugin do instead?
2checkAndScheduleNextWave() clones scheduled to avoid mutating it. What subtle bug would occur without the clone?
3Trace through the state machine for 4 tasks, waveSize=2. Draw the state of scheduled and waveFinished after each callback.

scheduleNextWave()

#Question
1The while loop has two conditions: nextTaskToSchedule < totalTasks AND count < waveSize. Which terminates the loop for the last wave if the number of tasks is not a multiple of waveSize?
2The scheduled.get(idx) guard protects against double-scheduling. In what scenario could idx already be set? (Hint: look at the testTaskNotScheduledTwice test.)

Step 4 — Read the Test Suite

Open TestWavingVertexManager.java. For each test, before reading the assertions:

  1. Read the test name
  2. Predict what the test will assert
  3. Then read the actual assertions and compare to your prediction

Pay particular attention to how Mockito is used:

Mockito callWhat it does
mock(VertexManagerPluginContext.class)Creates a fake context that records all calls
when(ctx.getVertexNumTasks(...)).thenReturn(6)Stubs a specific return value
verify(ctx, times(2)).scheduleVertexTasks(anyList())Asserts the method was called exactly twice
ArgumentCaptor.forClass(List.class)Captures the actual argument for deep inspection

Questions

#Question
1testThreeWavesForSixTasks is an integration test of the entire scheduling lifecycle. Which individual unit tests cover the sub-cases that this test depends on?
2testPartialWave0DoesNotTriggerWave1 verifies the negative case (wave NOT triggered). How does verify(times(1)) prove this? Could you use verifyNoMoreInteractions() instead?
3The test class has a @Before setUp() method. What happens if you remove it and inline mockContext = mock(...) into each test instead?

Step 5 — Break It: Three Experiments

Experiment A — Remove the if (!successful) return guard

Delete the early-return in onTaskAttemptCompleted. Run:

mvn -pl level-4-waving-manager test -Dtest=TestWavingVertexManager#testFailedAttemptDoesNotAdvanceWave
  • Which test fails?
  • What is the actual vs. expected scheduleVertexTasks call count?
  • Why does treating failures as successes cause premature wave advancement?

Experiment B — Remove the BitSet.clone() in checkAndScheduleNextWave

Change:

BitSet scheduledCopy = (BitSet) scheduled.clone();
scheduledCopy.andNot(waveFinished);

to:

scheduled.andNot(waveFinished);

Run the full test suite.

  • Which tests fail?
  • What data corruption does this mutation cause? Trace through testThreeWavesForSixTasks manually.

Experiment C — Change count < waveSize to count <= waveSize

In scheduleNextWave(), change the loop condition.

  • How many tasks does wave 0 now schedule?
  • Which test catches this?

Step 6 — Add a New Feature: onVertexManagerEventReceived

The real ShuffleVertexManager uses onVertexManagerEventReceived to receive partition statistics from map tasks. Add support for a simple variant:

Create a new callback method:

@Override
public void onVertexManagerEventReceived(
        List<VertexManagerEvent> vmEvents) throws Exception {
    // If any event's user payload contains "skip=true", mark
    // that task as finished so it does not block wave advancement.
    for (VertexManagerEvent event : vmEvents) {
        // TODO: parse UserPayload for "skip=true"; if present, call
        //       onTaskAttemptCompleted(taskIndex, true) to release the wave
    }
}

Write a test for this method:

@Test
public void testSkipEventReleasesWave() {
    // set up 4 tasks, wave size 2
    // trigger onVertexStarted (wave 0: tasks 0,1)
    // send a VertexManagerEvent for task 0 with payload "skip=true"
    // verify task 0 is treated as done for wave-completion purposes
}

Step 7 — Tez Source Connection Table

Class used in this projectTez source file
VertexManagerPlugin
VertexManagerPluginContext
ScheduleTaskRequest
ImmediateStartVertexManager
ShuffleVertexManager

Step 8 — ShuffleVertexManager Deep Dive

Open ShuffleVertexManager.java in the Tez source:

find ~/tez-src -name "ShuffleVertexManager.java"
  1. Read onVertexStarted(). Does it schedule tasks immediately like WavingVertexManager, or does it wait? What does it wait for?
  2. Find the slowStartFraction field. How does it determine when to start scheduling?
  3. Find where reconfigureVertex() is called. What does it change about the vertex?
  4. How does ShuffleVertexManager prevent double-scheduling? Compare its guard to the scheduled BitSet in WavingVertexManager.
  5. ShuffleVertexManager has ~700 lines. Identify the 5 most important methods (the ones that contain the core scheduling logic) and list them.

Step 9 — JIRA Research: VertexManager Bugs

Search:

project = TEZ AND component = "tez-dag" AND text ~ "VertexManager" AND resolution = Fixed

Find one resolved issue where a VertexManagerPlugin had a scheduling bug.

  • What was the bug? (Race condition? Double scheduling? Wrong wave boundary?)
  • What was the fix?
  • Was a test added? What does it mock?