Lab 4.2: VertexManager Deep Dive

Background

VertexManager is the hook that makes Tez more than just a DAG scheduler. By plugging in a custom VertexManagerPlugin, applications can implement dynamic parallelism, slow start, skew handling, and custom task scheduling — without modifying the core AM.

This lab walks through the two built-in VertexManager implementations, explains their behaviors via code reading, and ends with a minimal custom VertexManagerPlugin that you write and unit-test.


The VertexManagerPlugin Contract

Full interface: tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java

public abstract class VertexManagerPlugin {
    private VertexManagerPluginContext context;

    // Called once by the AM to provide the context object
    public final void setContext(VertexManagerPluginContext context) { ... }

    // The plugin implementation must implement these:
    public abstract void initialize() throws Exception;
    public abstract void onVertexStarted(List<TaskAttemptIdentifier> completions)
        throws Exception;
    public abstract void onSourceTaskCompleted(
        TaskAttemptIdentifier completedSrcTaskAttempt) throws Exception;
    public abstract void onVertexManagerEventReceived(
        VertexManagerEvent vmEvent) throws Exception;
    // Called when an input is initialized (root inputs only):
    public void onRootVertexInitialized(String inputName,
        InputDescriptor inputDescriptor, List<Event> events) throws Exception {}
}

VertexManagerPluginContext — what the plugin can call back into

find tez-api/src/main/java -name "VertexManagerPluginContext.java"
cat $(find tez-api/src/main/java -name "VertexManagerPluginContext.java")

Key methods on the context:

MethodWhat it does
scheduleVertexTasks(List<TaskWithLocation>)Schedules the given tasks for execution
reconfigureVertex(int parallelism, VertexLocationHint, Map<String,EdgeProperty>)Changes parallelism and/or edge properties at runtime
getVertexNumTasks(String vertexName)Returns the current parallelism of a named vertex
getCurrentParallelism()Returns this vertex's current parallelism
getInputVertexEdgeProperties()Returns the EdgeProperty for each input edge
sendEventToProcessor(List<Event>, String, int)Sends a VertexManagerEvent to a task

Reading ImmediateStartVertexManager

find tez-dag/src/main/java -name "ImmediateStartVertexManager.java"
cat $(find tez-dag/src/main/java -name "ImmediateStartVertexManager.java")

Answer these questions from the code:

  1. In initialize(): does ImmediateStartVertexManager do anything? If not, why does it exist?
  2. In onVertexStarted(): does it schedule tasks immediately or wait for anything?
  3. What TaskWithLocation does it create for each task? Does it provide any location hints?
  4. Does it implement onSourceTaskCompleted()? If so, what does it do?

Expected finding: ImmediateStartVertexManager is intentionally minimal. Its purpose is to provide a named, testable implementation that schedules all tasks immediately with no location hints. It is the baseline from which ShuffleVertexManager diverges.


Reading ShuffleVertexManager

find tez-dag/src/main/java -name "ShuffleVertexManager.java"
wc -l $(find tez-dag/src/main/java -name "ShuffleVertexManager.java")

Slow Start

Find the slow-start logic in onSourceTaskCompleted().

grep -n "minFraction\|maxFraction\|min-src-fraction\|completedSourceTasks\|pendingTasksToSchedule" \
  $(find tez-dag/src/main/java -name "ShuffleVertexManager.java") | head -20

Answer:

  1. What is the variable that tracks how many source tasks have completed?
  2. At what fraction does ShuffleVertexManager start scheduling tasks?
  3. What is the formula: at fraction F between minFraction and maxFraction, what percentage of downstream tasks are scheduled?

Auto-Parallelism

Find the auto-parallelism logic:

grep -n "reconfigureVertex\|numBipartiteSourceTasks\|desiredTaskInputSize\|targetParallelism" \
  $(find tez-dag/src/main/java -name "ShuffleVertexManager.java") | head -20

Answer:

  1. What configuration key enables auto-parallelism?
  2. What information does ShuffleVertexManager use to compute the optimal parallelism?
  3. When is context.reconfigureVertex() called?
  4. What is the minimum parallelism ShuffleVertexManager will ever set (the floor)?

VertexManagerEvent handling

When auto-parallelism is enabled, each upstream task sends a VertexManagerEvent to the downstream VertexManagerPlugin containing statistics about its output (byte count, record count, partition sizes).

grep -n "VertexManagerEvent\|onVertexManagerEventReceived\|vmEvent" \
  $(find tez-dag/src/main/java -name "ShuffleVertexManager.java") | head -15

Answer:

  1. What protobuf message is decoded from the event payload?
  2. What statistic is accumulated across all events?
  3. How does ShuffleVertexManager use the accumulated statistics to decide on new parallelism?

Write a Minimal Custom VertexManager

Create a CountingVertexManager that:

  1. Schedules 50% of tasks immediately when onVertexStarted() is called
  2. Schedules the remaining tasks when all source tasks have completed
  3. Logs the number of scheduled tasks at each scheduling call

This is the core pattern of slow-start, stripped to its minimum.

Implementation skeleton

package org.apache.tez.dag.library.vertexmanager;

import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.TaskAttemptIdentifier;
import org.apache.tez.dag.api.event.VertexManagerEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

public class CountingVertexManager extends VertexManagerPlugin {

    private static final Logger LOG =
        LoggerFactory.getLogger(CountingVertexManager.class);

    private int totalSourceTasks = 0;
    private int completedSourceTasks = 0;
    private boolean secondBatchScheduled = false;
    private int totalTasksToSchedule = 0;

    @Override
    public void initialize() {
        totalTasksToSchedule = getContext().getCurrentParallelism();
        // Count source tasks across all input vertices
        for (String inputVertex : getContext().getInputVertexEdgeProperties().keySet()) {
            totalSourceTasks += getContext().getVertexNumTasks(inputVertex);
        }
    }

    @Override
    public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
        // Schedule first 50%
        int firstBatch = totalTasksToSchedule / 2;
        List<VertexManagerPluginContext.ScheduleTaskRequest> toSchedule = new ArrayList<>();
        for (int i = 0; i < firstBatch; i++) {
            toSchedule.add(VertexManagerPluginContext.ScheduleTaskRequest.create(i, null));
        }
        LOG.info("CountingVertexManager: scheduling first batch of {} tasks", firstBatch);
        getContext().scheduleTasks(toSchedule);
    }

    @Override
    public void onSourceTaskCompleted(TaskAttemptIdentifier completedSrcTaskAttempt) {
        completedSourceTasks++;
        if (!secondBatchScheduled && completedSourceTasks >= totalSourceTasks) {
            // Schedule remaining 50%
            int firstBatch = totalTasksToSchedule / 2;
            List<VertexManagerPluginContext.ScheduleTaskRequest> toSchedule = new ArrayList<>();
            for (int i = firstBatch; i < totalTasksToSchedule; i++) {
                toSchedule.add(VertexManagerPluginContext.ScheduleTaskRequest.create(i, null));
            }
            LOG.info("CountingVertexManager: scheduling second batch of {} tasks",
                toSchedule.size());
            getContext().scheduleTasks(toSchedule);
            secondBatchScheduled = true;
        }
    }

    @Override
    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
        // No-op: we don't need statistics for this simple implementation
    }
}

Implementation tasks

  1. Identify the correct API method: getContext().scheduleTasks() vs getContext().scheduleVertexTasks() — check which one exists in your version of the API.

  2. Write a unit test using MockVertexManagerPluginContext (if it exists) or a mock:

    • Initialize the manager with parallelism = 10 and 4 source tasks
    • Call onVertexStarted() — verify 5 tasks are scheduled
    • Call onSourceTaskCompleted() 4 times — verify remaining 5 tasks are scheduled on the 4th call
    • Verify secondBatchScheduled is true after
  3. Register the CountingVertexManager in a DAG:

    Vertex reduceVertex = Vertex.create("reducer",
        ProcessorDescriptor.create(MyReducer.class.getName()), 10);
    reduceVertex.setVertexManagerPlugin(
        VertexManagerPluginDescriptor.create(CountingVertexManager.class.getName()));
    

Finding the VertexManager Test Utilities

# Find mock context for testing
find tez-dag/src/test -name "*Mock*Vertex*" -o -name "*VertexManager*Test*" | grep -v ".class"

# Find TestShuffleVertexManager
find . -name "TestShuffleVertexManager.java" | grep test

Read TestShuffleVertexManager.java to understand how VertexManager tests are structured. The test creates a mock context, calls lifecycle methods in order, and asserts which tasks were scheduled.


Expected Output

  1. Answers to all questions in the ImmediateStartVertexManager and ShuffleVertexManager sections, with file:line references
  2. A working CountingVertexManager implementation that compiles
  3. A unit test that passes for the two scheduling scenarios

Stretch Goals

  1. Read CartesianProductVertexManager — the most complex VertexManager:

    find tez-dag/src/main/java -name "CartesianProductVertexManager.java"
    

    What computation does it coordinate? When is it used?

  2. Find a ShuffleVertexManager related JIRA (search for "ShuffleVertexManager" in JIRA). Read the issue description and the patch. What invariant was violated?

  3. Implement a NoOpVertexManager that schedules no tasks (for testing DAG failure paths). Use it in a test DAG and verify the vertex fails with FAILED status after the timeout.