How to use on-device Forward Pipeline Execution Mode for optimization#

Task overview#

This topic shows you how to use Pipeline Execution Mode to optimize performance for large models with multiple submodels using the NxD Inference. This technique keeps intermediate tensors from sub models on the device to reduce data transfer overhead and minimize model latency.

In this guide, you’ll learn to:

  • Configure pipeline execution flags for optimal performance

  • Set up multi-stage model wrappers that communicate efficiently

  • Manage intermediate tensor placement between pipeline stages

  • Implement a simple vision-text pipeline as a practical example

Sample Architecture#

This guide uses a vision-text multimodal model to demonstrate pipeline execution. The architecture consists of:

Vision Model: Processes image inputs through convolutional layers and outputs vision embeddings

Text Model: Takes vision embeddings and text inputs, then produces final classification results

This two-stage pipeline shows how intermediate vision embeddings can remain on the device, avoiding costly CPU transfers between model stages. The same principles apply to other multi-stage architectures like transformer decoder chains, diffusion model denoisers, or encoder-decoder pairs.

Prerequisites#

  • NeuronX Distributed Inference (NxDI): You must have NxDI installed and configured. See NxD Inference Setup Guide.

  • Multi-stage model: Your model should have intermediate tensors in a pipeline structure, such as Llama4-style models, Pixtral, or diffusion-based models.

The following diagram shows how intermediate tensors flow through a multi-stage pipeline:

Input Data
    |
    v
┌─────────────┐
│   Stage 1   │  <- Vision Model (Conv2D + Pooling)
│ (SubModel)  │
└─────────────┘
    |
    v
Intermediate    <- Kept on device with pipeline_execution=True
Tensors            and return_ranked_to_cpu=False
    |
    v
┌─────────────┐
│   Stage 2   │  <- Text Model (Embedding + Fusion)
│ (SubModel)  │
└─────────────┘
    |
    v
Final Output    <- Returned to CPU with return_ranked_to_cpu=True

Without pipeline execution, intermediate tensors transfer between CPU and device at each stage, creating overhead. With pipeline execution enabled, intermediate tensors remain on the device, reducing latency.

Note

Padding Requirements: When passing outputs between ModelWrapper instances, you must manually pad the list of lists to ensure consistent input dimensions. Padding is crucial to maintain tensor compatibility across pipeline stages.

Instructions#

1: Import required modules and define model classes

Start by importing the necessary modules and defining your model architectures:

import torch
from torch import nn
from neuronx_distributed_inference.models.encoder_base import NeuronEncoderBase
from neuronx_distributed_inference.models.model_wrapper import ModelWrapper
from neuronx_distributed_inference.models.application_base import NeuronApplicationBase
from neuronx_distributed_inference.models.config import InferenceConfig, NeuronConfig

# Vision Model Definition
class VisionModel(NeuronEncoderBase):
    def __init__(self, config: InferenceConfig):
        super().__init__(config)
        self.conv = nn.Conv2d(3, 64, kernel_size=3, padding=1)
        self.pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(64, config.vision_embedding_size)

    def forward(self, x):
        x = self.conv(x)
        x = self.pool(x)
        x = torch.flatten(x, 1)
        return self.fc(x)

# Text Model Definition
class TextModel(NeuronEncoderBase):
    def __init__(self, config: InferenceConfig):
        super().__init__(config)
        self.embedding = nn.Linear(config.text_input_size, config.text_embedding_size)
        self.fusion = nn.Linear(
            config.vision_embedding_size + config.text_embedding_size,
            config.output_size
        )

    def forward(self, vision_features, text_input):
        text_features = self.embedding(text_input)
        combined = torch.cat([vision_features, text_features], dim=1)
        return self.fusion(combined)

2: Configure ModelWrappers with pipeline execution flags

Set up your ModelWrapper classes with appropriate pipeline execution parameters:

# Vision Model Wrapper - keeps output on device
class VisionModelWrapper(ModelWrapper):
    def __init__(self, config: InferenceConfig):
        super().__init__(
            config=config,
            model_cls=VisionModel,
            pipeline_execution=True,
            return_ranked_to_cpu=False,  # Keep output ranked for efficient pipeline
            tag="vision_model"
        )

    def input_generator(self):
        # Generate sample input for compilation
        x = torch.randn(
            self.neuron_config.batch_size,
            3,
            224,
            224
        )
        return [(x,)]

# Text Model Wrapper - returns final output to CPU
class TextModelWrapper(ModelWrapper):
    def __init__(self, config: InferenceConfig):
        super().__init__(
            config=config,
            model_cls=TextModel,
            pipeline_execution=True,
            return_ranked_to_cpu=True,  # Return final output to CPU
            tag="text_model"
        )

    def input_generator(self):
        # Generate sample inputs for compilation
        vision_features = torch.randn(
            self.neuron_config.batch_size,
            self.config.vision_embedding_size
        )
        text_input = torch.randn(
            self.neuron_config.batch_size,
            self.config.text_input_size
        )
        return [(vision_features, text_input)]

3: Create application classes

Build application classes that use your configured ModelWrappers:

# Application Classes
class VisionModelApp(NeuronApplicationBase):
    def __init__(self, model_path: str, config: InferenceConfig):
        super().__init__(model_path=model_path, config=config)
        self.model = VisionModelWrapper(config)
        self.models.append(self.model)

    def forward(self, x):
        return self.models[0].forward(x)

class TextModelApp(NeuronApplicationBase):
    def __init__(self, model_path: str, config: InferenceConfig):
        super().__init__(model_path=model_path, config=config)
        self.model = TextModelWrapper(config)
        self.models.append(self.model)

    def forward(self, vision_features, text_input):
        return self.models[0].forward(vision_features, text_input)

4: Run the complete pipeline example

Execute your pipeline with the configured models:

def main():
    # Configure models
    config = InferenceConfig(
        NeuronConfig(batch_size=32, torch_dtype=torch.float32, tp_degree=2),
        vision_embedding_size=512,
        text_input_size=256,
        text_embedding_size=512,
        output_size=1024
    )

    # Create applications
    vision_app = VisionModelApp("path/to/vision/model", config)
    text_app = TextModelApp("path/to/text/model", config)

    # Compile models
    vision_app.compile("path/to/compiled/vision")
    text_app.compile("path/to/compiled/text")

    # Load models
    vision_app.load("path/to/compiled/vision")
    text_app.load("path/to/compiled/text")

    # Example inference
    image_input = torch.randn(32, 3, 224, 224)
    text_input = torch.randn(32, 256)

    # Forward pass through vision model
    # Returns ranked output (list of lists) since return_ranked_to_cpu=False
    vision_features = vision_app.forward(image_input)

    # Forward pass through text model
    # Returns CPU tensor since return_ranked_to_cpu=True
    final_output = text_app.forward(vision_features, text_input)

    print(f"Final output shape: {final_output.shape}")  # [32, 1024]

Confirm your work#

To confirm you have successfully configured pipeline execution mode, check that your model outputs have the expected tensor placement:

# Check intermediate output placement
print(f"Vision features type: {type(vision_features)}")  # Should be list of lists
print(f"Final output shape: {final_output.shape}")       # Should be [32, 1024]
print(f"Final output device: {final_output.device}")     # Should be CPU

Common issues#

Tensor dimension mismatch between pipeline stages

  • Possible solution: Ensure you manually pad the list of lists when passing outputs between ModelWrapper instances to maintain consistent input dimensions.

Performance not improving with pipeline execution

  • Possible solution: Verify that your model has intermediate tensors in a pipeline structure. Pipeline execution works best with models like Llama4-style, Pixtral, or diffusion-based models.

Memory issues with large models

  • Possible solution: Adjust your batch size and tensor parallelism degree (tp_degree) in the NeuronConfig to better fit your available memory.