Lab 4.2: VertexManager Deep Dive
Background
VertexManager is the hook that makes Tez more than just a DAG scheduler. By plugging in
a custom VertexManagerPlugin, applications can implement dynamic parallelism, slow start,
skew handling, and custom task scheduling — without modifying the core AM.
This lab walks through the two built-in VertexManager implementations, explains their
behaviors via code reading, and ends with a minimal custom VertexManagerPlugin that you
write and unit-test.
The VertexManagerPlugin Contract
Full interface:
tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
public abstract class VertexManagerPlugin {
private VertexManagerPluginContext context;
// Called once by the AM to provide the context object
public final void setContext(VertexManagerPluginContext context) { ... }
// The plugin implementation must implement these:
public abstract void initialize() throws Exception;
public abstract void onVertexStarted(List<TaskAttemptIdentifier> completions)
throws Exception;
public abstract void onSourceTaskCompleted(
TaskAttemptIdentifier completedSrcTaskAttempt) throws Exception;
public abstract void onVertexManagerEventReceived(
VertexManagerEvent vmEvent) throws Exception;
// Called when an input is initialized (root inputs only):
public void onRootVertexInitialized(String inputName,
InputDescriptor inputDescriptor, List<Event> events) throws Exception {}
}
VertexManagerPluginContext — what the plugin can call back into
find tez-api/src/main/java -name "VertexManagerPluginContext.java"
cat $(find tez-api/src/main/java -name "VertexManagerPluginContext.java")
Key methods on the context:
| Method | What it does |
|---|---|
scheduleVertexTasks(List<TaskWithLocation>) | Schedules the given tasks for execution |
reconfigureVertex(int parallelism, VertexLocationHint, Map<String,EdgeProperty>) | Changes parallelism and/or edge properties at runtime |
getVertexNumTasks(String vertexName) | Returns the current parallelism of a named vertex |
getCurrentParallelism() | Returns this vertex's current parallelism |
getInputVertexEdgeProperties() | Returns the EdgeProperty for each input edge |
sendEventToProcessor(List<Event>, String, int) | Sends a VertexManagerEvent to a task |
Reading ImmediateStartVertexManager
find tez-dag/src/main/java -name "ImmediateStartVertexManager.java"
cat $(find tez-dag/src/main/java -name "ImmediateStartVertexManager.java")
Answer these questions from the code:
- In
initialize(): doesImmediateStartVertexManagerdo anything? If not, why does it exist? - In
onVertexStarted(): does it schedule tasks immediately or wait for anything? - What
TaskWithLocationdoes it create for each task? Does it provide any location hints? - Does it implement
onSourceTaskCompleted()? If so, what does it do?
Expected finding: ImmediateStartVertexManager is intentionally minimal. Its purpose is to
provide a named, testable implementation that schedules all tasks immediately with no
location hints. It is the baseline from which ShuffleVertexManager diverges.
Reading ShuffleVertexManager
find tez-dag/src/main/java -name "ShuffleVertexManager.java"
wc -l $(find tez-dag/src/main/java -name "ShuffleVertexManager.java")
Slow Start
Find the slow-start logic in onSourceTaskCompleted().
grep -n "minFraction\|maxFraction\|min-src-fraction\|completedSourceTasks\|pendingTasksToSchedule" \
$(find tez-dag/src/main/java -name "ShuffleVertexManager.java") | head -20
Answer:
- What is the variable that tracks how many source tasks have completed?
- At what fraction does
ShuffleVertexManagerstart scheduling tasks? - What is the formula: at fraction F between
minFractionandmaxFraction, what percentage of downstream tasks are scheduled?
Auto-Parallelism
Find the auto-parallelism logic:
grep -n "reconfigureVertex\|numBipartiteSourceTasks\|desiredTaskInputSize\|targetParallelism" \
$(find tez-dag/src/main/java -name "ShuffleVertexManager.java") | head -20
Answer:
- What configuration key enables auto-parallelism?
- What information does
ShuffleVertexManageruse to compute the optimal parallelism? - When is
context.reconfigureVertex()called? - What is the minimum parallelism
ShuffleVertexManagerwill ever set (the floor)?
VertexManagerEvent handling
When auto-parallelism is enabled, each upstream task sends a VertexManagerEvent to the
downstream VertexManagerPlugin containing statistics about its output (byte count,
record count, partition sizes).
grep -n "VertexManagerEvent\|onVertexManagerEventReceived\|vmEvent" \
$(find tez-dag/src/main/java -name "ShuffleVertexManager.java") | head -15
Answer:
- What protobuf message is decoded from the event payload?
- What statistic is accumulated across all events?
- How does
ShuffleVertexManageruse the accumulated statistics to decide on new parallelism?
Write a Minimal Custom VertexManager
Create a CountingVertexManager that:
- Schedules 50% of tasks immediately when
onVertexStarted()is called - Schedules the remaining tasks when all source tasks have completed
- Logs the number of scheduled tasks at each scheduling call
This is the core pattern of slow-start, stripped to its minimum.
Implementation skeleton
package org.apache.tez.dag.library.vertexmanager;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.TaskAttemptIdentifier;
import org.apache.tez.dag.api.event.VertexManagerEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
public class CountingVertexManager extends VertexManagerPlugin {
private static final Logger LOG =
LoggerFactory.getLogger(CountingVertexManager.class);
private int totalSourceTasks = 0;
private int completedSourceTasks = 0;
private boolean secondBatchScheduled = false;
private int totalTasksToSchedule = 0;
@Override
public void initialize() {
totalTasksToSchedule = getContext().getCurrentParallelism();
// Count source tasks across all input vertices
for (String inputVertex : getContext().getInputVertexEdgeProperties().keySet()) {
totalSourceTasks += getContext().getVertexNumTasks(inputVertex);
}
}
@Override
public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
// Schedule first 50%
int firstBatch = totalTasksToSchedule / 2;
List<VertexManagerPluginContext.ScheduleTaskRequest> toSchedule = new ArrayList<>();
for (int i = 0; i < firstBatch; i++) {
toSchedule.add(VertexManagerPluginContext.ScheduleTaskRequest.create(i, null));
}
LOG.info("CountingVertexManager: scheduling first batch of {} tasks", firstBatch);
getContext().scheduleTasks(toSchedule);
}
@Override
public void onSourceTaskCompleted(TaskAttemptIdentifier completedSrcTaskAttempt) {
completedSourceTasks++;
if (!secondBatchScheduled && completedSourceTasks >= totalSourceTasks) {
// Schedule remaining 50%
int firstBatch = totalTasksToSchedule / 2;
List<VertexManagerPluginContext.ScheduleTaskRequest> toSchedule = new ArrayList<>();
for (int i = firstBatch; i < totalTasksToSchedule; i++) {
toSchedule.add(VertexManagerPluginContext.ScheduleTaskRequest.create(i, null));
}
LOG.info("CountingVertexManager: scheduling second batch of {} tasks",
toSchedule.size());
getContext().scheduleTasks(toSchedule);
secondBatchScheduled = true;
}
}
@Override
public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
// No-op: we don't need statistics for this simple implementation
}
}
Implementation tasks
-
Identify the correct API method:
getContext().scheduleTasks()vsgetContext().scheduleVertexTasks()— check which one exists in your version of the API. -
Write a unit test using
MockVertexManagerPluginContext(if it exists) or a mock:- Initialize the manager with parallelism = 10 and 4 source tasks
- Call
onVertexStarted()— verify 5 tasks are scheduled - Call
onSourceTaskCompleted()4 times — verify remaining 5 tasks are scheduled on the 4th call - Verify
secondBatchScheduledistrueafter
-
Register the
CountingVertexManagerin a DAG:Vertex reduceVertex = Vertex.create("reducer", ProcessorDescriptor.create(MyReducer.class.getName()), 10); reduceVertex.setVertexManagerPlugin( VertexManagerPluginDescriptor.create(CountingVertexManager.class.getName()));
Finding the VertexManager Test Utilities
# Find mock context for testing
find tez-dag/src/test -name "*Mock*Vertex*" -o -name "*VertexManager*Test*" | grep -v ".class"
# Find TestShuffleVertexManager
find . -name "TestShuffleVertexManager.java" | grep test
Read TestShuffleVertexManager.java to understand how VertexManager tests are structured.
The test creates a mock context, calls lifecycle methods in order, and asserts which tasks
were scheduled.
Expected Output
- Answers to all questions in the
ImmediateStartVertexManagerandShuffleVertexManagersections, with file:line references - A working
CountingVertexManagerimplementation that compiles - A unit test that passes for the two scheduling scenarios
Stretch Goals
-
Read
CartesianProductVertexManager— the most complex VertexManager:find tez-dag/src/main/java -name "CartesianProductVertexManager.java"What computation does it coordinate? When is it used?
-
Find a
ShuffleVertexManagerrelated JIRA (search for "ShuffleVertexManager" in JIRA). Read the issue description and the patch. What invariant was violated? -
Implement a
NoOpVertexManagerthat schedules no tasks (for testing DAG failure paths). Use it in a test DAG and verify the vertex fails withFAILEDstatus after the timeout.