{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# T5 inference with Tensor Parallelism" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is an extension to the [t5 inference tutorial](https://awsdocs-neuron.readthedocs-hosted.com/en/latest/src/examples/pytorch/torch-neuronx/t5-inference-tutorial.html). Here we will use NeuronxDistributed to improve the inference performance using tensor parallelism.\n", "\n", "This tutorial has the following main sections:\n", "\n", "1. Install dependencies\n", "1. Plug in `NeuronxDistributed` layers into T5\n", "1. Compile the T5 model\n", "1. Run distributed inference with beam search \n", "\n", "This Jupyter notebook should be run on a Inf2 instance (`inf2.24xlarge`) or Trn1 isntance (`trn1.32xlarge`)\n", "\n", "> The tutorial works for t5 and flan-t5 models. In this notebook we will run distributed inference with flan-t5-xl." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Install dependencies\n", "\n", "The code in this tutorial is written for Jupyter Notebooks. To use Jupyter Notebook on the Neuron instance, you\n", "can use this [guide](https://awsdocs-neuron.readthedocs-hosted.com/en/latest/general/setup/notebook/setup-jupyter-notebook-steps-troubleshooting.html).\n", "\n", "Run the notebook by cloning aws-neuron-sdk\n", "```\n", "git clone https://github.com/aws-neuron/aws-neuron-sdk.git\n", "cd aws-neuron-sdk/src/examples/pytorch/neuronx_distributed/t5-inference/\n", "```\n", "\n", "Once done execute `t5-inference-tutorial.ipynb`\n", "\n", "It is recommended to go through the [t5 inference tutorial](https://awsdocs-neuron.readthedocs-hosted.com/en/latest/src/examples/pytorch/torch-neuronx/t5-inference-tutorial.html) before you start this tutorial. \n", "In addition to the dependencies in the [t5 inference tutorial](https://awsdocs-neuron.readthedocs-hosted.com/en/latest/src/examples/pytorch/torch-neuronx/t5-inference-tutorial.html), we need to install neuronx-distributed. \n", "\n", "This tutorial requires the following pip packages:\n", "\n", "- `torch-neuronx`\n", "- `neuronx-cc`\n", "- `transformers`\n", "- `optimum-neuron`\n", "- `neuronx-distributed`\n", "\n", "Most of these packages will be installed when configuring your environment using the Trn1/Inf2 [ setup guide ](https://awsdocs-neuron.readthedocs-hosted.com/en/latest/general/setup/neuron-setup/pytorch/neuronx/ubuntu/torch-neuronx-ubuntu20.html#setup-torch-neuronx-ubuntu20). The additional dependencies must be installed here:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "! pip install --upgrade transformers==4.33.1 optimum-neuron==0.0.12 neuronx_distributed" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Pull the latest version of the compiler\n", "! pip install --upgrade neuronx-cc>=2.11 --no-deps" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Lets update numpy to a newer version \n", "! pip install --upgrade numpy>=1.22.2 --no-deps" ] }, { "cell_type": "markdown", "metadata": {}, "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Plug in NeuronxDistributed layers into T5\n", "\n", "We extend the huggingface's T5 model to use the `NeuronxDistributed` parallel layers. To do so, we simply swap linear layers in `T5LayerSelfAttention`, `T5LayerCrossAttention`, and `T5LayerFF` definitions with `ColumnParallelLinear` and `RowParallelLinear`. We also need to swap the `Embedding` layer with `ParallelEmbedding`.\n", "\n", "Let us take the example of T5Attention. The [attention block](https://github.com/huggingface/transformers/blob/main/src/transformers/models/t5/modeling_t5.py#L363-L366) has q, k, v, and o linear layers. \n", "The multi-head attention block uses q, k and v to compute the attention scores. The attention scores are then passed through o to compute the attention block output. \n", "So let us swap q, k and v layers with `ColumnParallelLinear` and o with `RowParallelLinear`. Having `RowParallelLinear` following a `ColumnParallelLinear` is a performance optimization. The attention scores computed with q, k and v are already split across Neuron devices. The row parallel layer can use this shared output directly. \n", "The embedding layer is simply swapped with the `ParallelEmbedding`.\n", "\n", "```\n", "class ParallelAttention(T5Attention):\n", " def __init__(self, config: T5Config, has_relative_attention_bias=False):\n", " super().__init__(config, has_relative_attention_bias)\n", " # Per attention head and per partition values\n", " world_size = parallel_state.get_tensor_model_parallel_size()\n", " self.num_attention_heads_per_partition = divide(self.n_heads, world_size)\n", " self.hidden_size_per_partition = self.num_attention_heads_per_partition * self.key_value_proj_dim\n", "\n", " # Mesh TensorFlow initialization to avoid scaling before softmax\n", " self.q = ColumnParallelLinear(self.d_model,\n", " self.inner_dim,\n", " bias=False,\n", " gather_output=False)\n", " self.k = ColumnParallelLinear(self.d_model,\n", " self.inner_dim,\n", " bias=False,\n", " gather_output=False)\n", " self.v = ColumnParallelLinear(self.d_model,\n", " self.inner_dim,\n", " bias=False,\n", " gather_output=False)\n", " self.o = RowParallelLinear(self.inner_dim,\n", " self.d_model,\n", " bias=False,\n", " input_is_parallel=True)\n", "\n", " if self.has_relative_attention_bias:\n", " self.relative_attention_bias = ParallelEmbedding(self.relative_attention_num_buckets, self.n_heads)\n", " self.n_heads = self.num_attention_heads_per_partition\n", "...\n", "```\n", "\n", "You can find the all modified T5 layers defined in [t5_model_layers.py](https://github.com/aws-neuron/aws-neuron-sdk/tree/master/src/examples/pytorch/neuronx_distributed/t5-inference/t5_model_layers.py). \n", "\n", "\n", "Once we have the modified T5 layers, we can plug in the T5Attention and T5LayerFF into the pretrained model. Here is how you do that. \n", "\n", "```\n", "def load_pretrained_with_parallel_attn(model_name):\n", " \n", " model = T5ForConditionalGeneration.from_pretrained(model_name, torch_dtype=\"auto\")\n", "\n", " # Parallel implementation of Attention modules.\n", " from t5_model_layers import ParallelSelfAttention, ParallelFF, ParallelCrossAttention\n", "\n", " for index, block in enumerate(model.decoder.block):\n", " if index == 0:\n", " block.layer[0] = ParallelSelfAttention(model.config,\n", " has_relative_attention_bias=True)\n", " else:\n", " block.layer[0] = ParallelSelfAttention(model.config)\n", " block.layer[1] = ParallelCrossAttention(model.config)\n", " block.layer[2] = ParallelFF(model.config)\n", " # Load the weights into the parallel layers \n", " neuronx_distributed.parallel_layers.load(model_name + \".pt\", model, sharded=False)\n", "\n", " return model\n", "\n", "```\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Compile the parallel T5 model\n", "\n", "Let us set some model parameters." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_name = \"google/flan-t5-xl\" \n", "max_length = 128\n", "num_beams = 4\n", "tp_degree = 8 # tensor parallelism degree" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Download and save the model that we want to trace. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import torch\n", "from transformers import T5ForConditionalGeneration\n", "\n", "model = T5ForConditionalGeneration.from_pretrained(model_name, torch_dtype=\"auto\")\n", "torch.save({\"model\":model.state_dict()}, model_name.split(\"/\")[-1] + \".pt\")\n", "model.config.use_cache = True" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To run HuggingFace T5 models on Neuron, we need to make a couple of changes. Let us reuse the code from the [t5 inference tutorial](https://awsdocs-neuron.readthedocs-hosted.com/en/latest/src/examples/pytorch/torch-neuronx/t5-inference-tutorial.html) which makes T5 compatible with Neuron. For your convenience, the code copied into [wrapper.py](https://github.com/aws-neuron/aws-neuron-sdk/tree/master/src/examples/pytorch/neuronx_distributed/t5-inference/wrapper.py) and [t5_models.py](https://github.com/aws-neuron/aws-neuron-sdk/tree/master/src/examples/pytorch/neuronx_distributed/t5-inference/t5_models.py). This notebook will import these files. \n", "\n", "The only change made to this code is that we use `neuronx_distributed.trace` instead of `torch_neuronx.trace`. \n", "\n", "Let us trace the encoder and decoder. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import t5_models \n", "import neuronx_distributed\n", "import time \n", "\n", "# This can take up to 20 minutes\n", "encoder_compile_start_time = time.time()\n", "traced_encoder = t5_models.parallel_trace_encoder(model_name, max_length, num_beams, tp_degree)\n", "print(\"Encoder compilation time {}\".format(time.time() - encoder_compile_start_time))\n", "\n", "neuronx_distributed.trace.parallel_model_save(traced_encoder, \"TracedParallelEncoder.pt\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# This can take up to 15 minutes\n", "decoder_compile_start_time = time.time()\n", "traced_decoder = t5_models.parallel_trace_decoder(model, model_name, num_beams, max_length, tp_degree)\n", "print(\"Decoder compilation time {}\".format(time.time() - decoder_compile_start_time))\n", "\n", "neuronx_distributed.trace.parallel_model_save(traced_decoder, \"TracedParallelDecoder.pt\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Inference with the traced parallel T5 model\n", "\n", "With the traced model, let us try using beam search for inference." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Results:\n", "1 Lassen Sie uns gutes Essen essen.\n", "2 Lassen Sie uns gut essen.\n", "3 Lassen Sie uns gutes Essen zu essen.\n", "4 Lassen Sie uns gutes Essen zu sich nehmen.\n" ] } ], "source": [ "import neuronx_distributed\n", "from wrapper import T5Wrapper\n", "from transformers import T5Tokenizer\n", "\n", "\n", "num_return_sequences = 4\n", "\n", "traced_encoder = neuronx_distributed.trace.parallel_model_load(\"TracedParallelEncoder.pt\")\n", "traced_decoder = neuronx_distributed.trace.parallel_model_load(\"TracedParallelDecoder.pt\")\n", "\n", "tokenizer = T5Tokenizer.from_pretrained(model_name)\n", "model = T5Wrapper.from_pretrained(model_name)\n", "\n", "model.encoder = traced_encoder\n", "model.decoder = traced_decoder\n", "setattr(model.encoder, 'main_input_name', 'input_ids') # Attribute required by beam search\n", "\n", "output = model.parallel_infer(tokenizer=tokenizer,\n", " prompt=\"translate English to German: Lets eat good food.\",\n", " max_length=max_length,\n", " num_beams=num_beams,\n", " num_return_sequences=num_return_sequences,\n", " device=\"xla\")\n", "\n", "results = [tokenizer.decode(t, skip_special_tokens=True) for t in output]\n", "\n", "print('Results:')\n", "for i, summary in enumerate(results):\n", " print(i + 1, summary)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Benchmarking\n", "\n", "Let us benchmark the per token decoder latency" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Let us install NeuronPerf. We will use it to measure the performance.\n", "! pip install neuronperf --extra-index-url=https://pip.repos.neuron.amazonaws.com" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os \n", "import neuronperf as npf\n", "\n", "d_model = model.config.d_model\n", "model_dir = \"TracedParallelDecoder.pt\"\n", "decoder_run_count = 128\n", "\n", "def load_fn(model_path, **kwargs):\n", " return neuronx_distributed.trace.parallel_model_load(model_path)\n", " \n", "# NeuronPerf can't see tp_degree at the moment, so just expose all cores\n", "def env_setup_fn(*_):\n", " del os.environ[\"NEURON_RT_VISIBLE_CORES\"]\n", "\n", "def benchmark():\n", "\n", " # Create some sample inputs for the decoder\n", " decoder_input_ids = torch.ones((num_beams, 1), dtype=torch.int64)\n", " decoder_attention_mask = torch.ones((num_beams, max_length), dtype=torch.int32)\n", " encoder_attention_mask = torch.ones((num_beams, max_length), dtype=torch.int64)\n", " encoder_hidden_states = torch.ones((num_beams, max_length, d_model), dtype=torch.float32)\n", " beam_idx = torch.arange(0, num_beams, dtype=torch.int64)\n", " beam_scores = torch.zeros((num_beams,), dtype=torch.float)\n", "\n", " inputs = (decoder_input_ids,\n", " decoder_attention_mask,\n", " encoder_hidden_states,\n", " encoder_attention_mask,\n", " beam_idx,\n", " beam_scores)\n", "\n", " reports = npf.benchmark(\n", " load_fn,\n", " model_dir,\n", " [inputs], \n", " batch_sizes=1,\n", " n_models=1,\n", " max_infers=decoder_run_count,\n", " workers_per_model=1, # no bottleneck on model inputs, so 1 is fine\n", " env_setup_fn=env_setup_fn,\n", " multiprocess=False,\n", " )\n", " \n", " report = reports[0]\n", "\n", " # let's update throughput to be tokens / second and add a new recor\n", " latency_in_s = report[\"latency_ms_avg\"] / 1000\n", " tokens_per_s = decoder_run_count / latency_in_s\n", " report[\"throughput_avg\"] = tokens_per_s\n", " \n", " # display and save results\n", " npf.print_reports(reports, cols=[\"throughput_avg\", \"latency_ms_p50\", \"latency_ms_p99\"])\n", " print(f\"Results saved to: {npf.write_json(reports[0])}\")\n", "\n", "benchmark()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now lets benchmark inference as a whole including sampling. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import torch\n", "import neuronx_distributed\n", "import neuronperf as npf\n", "\n", "from transformers import T5Tokenizer\n", "from wrapper import T5Wrapper\n", "\n", "tokenizer = T5Tokenizer.from_pretrained(model_name)\n", "\n", "generated_token_count = 0\n", "\n", "class Wrapper(torch.nn.Module):\n", " def __init__(self, \n", " traced_encoder,\n", " traced_decoder):\n", " super().__init__()\n", " self.model = T5Wrapper.from_pretrained(model_name)\n", " self.model.encoder = traced_encoder\n", " self.model.decoder = traced_decoder\n", " setattr(self.model.encoder, 'main_input_name', 'input_ids') # Attribute required by beam search\n", "\n", " def forward(self, *inputs):\n", " input_ids = inputs[0]['input_ids']\n", " attention_mask = inputs[0]['attention_mask']\n", " return self.model.parallel_infer(input_ids=input_ids,\n", " attention_mask=attention_mask,\n", " max_length=max_length,\n", " num_beams=num_beams,\n", " num_return_sequences=num_return_sequences)\n", "\n", "def load_fn(filename, **kwargs):\n", " traced_encoder = neuronx_distributed.trace.parallel_model_load(filename + \"TracedParallelEncoder.pt\")\n", " traced_decoder = neuronx_distributed.trace.parallel_model_load(filename + \"TracedParallelDecoder.pt\")\n", " return Wrapper(traced_encoder, traced_decoder)\n", "\n", "# NeuronPerf can't see tp_degree at the moment, so just expose all cores\n", "def env_setup_fn(*_):\n", " del os.environ[\"NEURON_RT_VISIBLE_CORES\"]\n", "\n", "def preprocess_fn(inputs):\n", " \n", " encoding = []\n", " for text in inputs:\n", " batch_encoding = tokenizer(text, \n", " max_length=max_length, \n", " truncation=True, \n", " padding='max_length',\n", " return_tensors=\"pt\")\n", " input_ids = batch_encoding['input_ids']\n", " attention_mask = batch_encoding['attention_mask']\n", " encoding.append({\"input_ids\": input_ids,\n", " \"attention_mask\": attention_mask})\n", " return encoding\n", "\n", "def postprocess_fn(outputs):\n", " output = [tokenizer.decode(seq) for seq in outputs]\n", " global generated_token_count \n", " generated_token_count = len(outputs[0])\n", " return output\n", "\n", "def benchmark():\n", " inputs = [\"summarize: The Inflation Reduction Act lowers prescription drug costs, health care costs, and energy costs. It's the most aggressive action on tackling the climate crisis in American history, which will lift up American workers and create good-paying, union jobs across the country. It'll lower the deficit and ask the ultra-wealthy and corporations to pay their fair share. And no one making under $400,000 per year will pay a penny more in taxes.\"]\n", " reports = npf.benchmark(\n", " load_fn,\n", " \"\", # Model dir\n", " [inputs], \n", " batch_sizes=1,\n", " n_models=1,\n", " max_infers=5,\n", " max_duration=0, # sampling can take a while, so let's not timeout\n", " workers_per_model=1, \n", " env_setup_fn=env_setup_fn,\n", " preprocess_fn=preprocess_fn,\n", " postprocess_fn=postprocess_fn,\n", " multiprocess=False,\n", " )\n", " \n", " report = reports[0]\n", "\n", " report[\"throughput_avg\"] = round(generated_token_count / (report[\"latency_ms_avg\"] / 1000), 2)\n", " report[\"latency_per_token_ms_p50\"] = round((report[\"latency_ms_p50\"])/generated_token_count, 2)\n", " report[\"latency_per_token_ms_p99\"] = round((report[\"latency_ms_p99\"])/generated_token_count, 2)\n", "\n", " # display and save results\n", " npf.print_reports(reports, cols=[\"throughput_avg\", \"latency_per_token_ms_p50\", \"latency_per_token_ms_p99\"])\n", " print(f\"Results saved to: {npf.write_json(report)}\")\n", "\n", "benchmark()" ] } ], "metadata": { "kernelspec": { "display_name": "aws_neuron_venv_pytorch", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" }, "orig_nbformat": 4 }, "nbformat": 4, "nbformat_minor": 2 }