This document is relevant for: Inf1

BERT TorchServe Tutorial#

Overview#

This tutorial demonstrates the use of TorchServe with Neuron, the SDK for Amazon Inf1 instances. By the end of this tutorial, you will understand how TorchServe can be used to serve a model backed by EC2 Inf1 instances. We will use a pretrained BERT-Base model to determine if one sentence is a paraphrase of another.

Run the tutorial#

First run the HuggingFace Pretrained BERT tutorial [html] [notebook].

You should now have a compiled bert_neuron_b6.pt file, which is required going forward.

Open a shell on the instance you prepared earlier, create a new directory named torchserve. Copy your compiled model from the previous tutorial into this new directory.

$ cd torchserve
$ ls
bert_neuron_b6.pt

Prepare a new Python virtual environment with the necessary Neuron and TorchServe components. Use a virtual environment to keep (most of) the various tutorial components isolated from the rest of the system in a controlled way.

$ python3 -m venv env
$ . env/bin/activate
$ pip install -U pip
$ pip install torch-neuron 'neuron-cc[tensorflow]' --extra-index-url=https://pip.repos.neuron.amazonaws.com
$ pip install transformers==4.12.5 torchserve==0.5.0 torch-model-archiver==0.5.0

Install the system requirements for TorchServe.

Ubuntu

AL2

$ sudo apt install openjdk-11-jdk
$ sudo yum install java-11-amazon-corretto-headless
$ sudo alternatives --config java
$ sudo alternatives --config javac
$ java -version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.18.04)
OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.18.04, mixed mode, sharing)
$ javac -version
javac 11.0.11

Verify that TorchServe is now available.

$ torchserve --version
TorchServe Version is 0.5.0

Setup TorchServe#

During this tutorial you will need to download various files onto your instance. The simplest way to accomplish this is to paste the download links provided above each file into a wget command. (We don’t provide the links directly because they are subject to change.) For example, right-click and copy the download link for config.json shown below.

config.json#
{
    "model_name": "bert-base-cased-finetuned-mrpc",
    "max_length": 128,
    "batch_size": 6
}

Now execute the following in your shell:

$ wget <paste link here>
$ ls
bert_neuron_b6.pt  config.json

Download the custom handler script that will eventually respond to inference requests.

  1import os
  2import json
  3import sys
  4import logging
  5
  6import torch, torch_neuron
  7from transformers import AutoTokenizer
  8from abc import ABC
  9from ts.torch_handler.base_handler import BaseHandler
 10
 11# one core per worker
 12os.environ['NEURON_RT_NUM_CORES'] = '1'
 13
 14logger = logging.getLogger(__name__)
 15
 16class BertEmbeddingHandler(BaseHandler, ABC):
 17    """
 18    Handler class for Bert Embedding computations.
 19    """
 20    def __init__(self):
 21        super(BertEmbeddingHandler, self).__init__()
 22        self.initialized = False
 23
 24    def initialize(self, ctx):
 25        self.manifest = ctx.manifest
 26        properties = ctx.system_properties
 27        self.device = 'cpu'
 28        model_dir = properties.get('model_dir')
 29        serialized_file = self.manifest['model']['serializedFile']
 30        model_pt_path = os.path.join(model_dir, serialized_file)
 31
 32        # point sys.path to our config file
 33        with open('config.json') as fp:
 34            config = json.load(fp)
 35        self.max_length = config['max_length']
 36        self.batch_size = config['batch_size']
 37        self.classes = ['not paraphrase', 'paraphrase']
 38
 39        self.model = torch.jit.load(model_pt_path)
 40        logger.debug(f'Model loaded from {model_dir}')
 41        self.model.to(self.device)
 42        self.model.eval()
 43
 44        self.tokenizer = AutoTokenizer.from_pretrained(config['model_name'])
 45        self.initialized = True
 46
 47    def preprocess(self, input_data):
 48        """
 49        Tokenization pre-processing
 50        """
 51
 52        input_ids = []
 53        attention_masks = []
 54        token_type_ids = []
 55        for row in input_data:
 56            seq_0 = row['seq_0'].decode('utf-8')
 57            seq_1 = row['seq_1'].decode('utf-8')
 58            logger.debug(f'Received text: "{seq_0}", "{seq_1}"')
 59
 60            inputs = self.tokenizer.encode_plus(
 61                    seq_0,
 62                    seq_1,
 63                    max_length=self.max_length,
 64                    padding='max_length',
 65                    truncation=True,
 66                    return_tensors='pt'
 67                    )
 68
 69            input_ids.append(inputs['input_ids'])
 70            attention_masks.append(inputs['attention_mask'])
 71            token_type_ids.append(inputs['token_type_ids'])
 72
 73        batch = (torch.cat(input_ids, 0),
 74                torch.cat(attention_masks, 0),
 75                torch.cat(token_type_ids, 0))
 76
 77        return batch
 78
 79    def inference(self, inputs):
 80        """
 81        Predict the class of a text using a trained transformer model.
 82        """
 83
 84        # sanity check dimensions
 85        assert(len(inputs) == 3)
 86        num_inferences = len(inputs[0])
 87        assert(num_inferences <= self.batch_size)
 88
 89        # insert padding if we received a partial batch
 90        padding = self.batch_size - num_inferences
 91        if padding > 0:
 92            pad = torch.nn.ConstantPad1d((0, 0, 0, padding), value=0)
 93            inputs = [pad(x) for x in inputs]
 94
 95        outputs = self.model(*inputs)[0]
 96        predictions = []
 97        for i in range(num_inferences):
 98            prediction = self.classes[outputs[i].argmax().item()]
 99            predictions.append([prediction])
100            logger.debug("Model predicted: '%s'", prediction)
101        return predictions
102
103    def postprocess(self, inference_output):
104        return inference_output

Next, we need to associate the handler script with the compiled model using torch-model-archiver. Run the following commands in your terminal:

$ mkdir model_store
$ MAX_LENGTH=$(jq '.max_length' config.json)
$ BATCH_SIZE=$(jq '.batch_size' config.json)
$ MODEL_NAME=bert-max_length$MAX_LENGTH-batch_size$BATCH_SIZE
$ torch-model-archiver --model-name "$MODEL_NAME" --version 1.0 --serialized-file ./bert_neuron_b6.pt --handler "./handler_bert.py" --extra-files "./config.json" --export-path model_store

Note

If you modify your model or a dependency, you will need to rerun the archiver command with the -f flag appended to update the archive.

The result of the above will be a mar file inside the model_store directory.

$ ls model_store
bert-max_length128-batch_size6.mar

This file is essentially an archive associated with a fixed version of your model along with its dependencies (e.g. the handler code).

Note

The version specified in the torch-model-archiver command can be appended to REST API requests to access a specific version of your model. For example, if your model was hosted locally on port 8080 and named “bert”, the latest version of your model would be available at http://localhost:8080/predictions/bert, while version 1.0 would be accessible at http://localhost:8080/predictions/bert/1.0. We will see how to perform inference using this API in Step 6.

Create a custom config file to set some parameters. This file will be used to configure the server at launch when we run torchserve --start.

# bind inference API to all network interfaces with SSL enabled
inference_address=http://0.0.0.0:8080
default_workers_per_model=1

Note

This will cause TorchServe to bind on all interfaces. For security in real-world applications, you’ll probably want to use port 8443 and enable SSL.

Run TorchServe#

It’s time to start the server. Typically we’d want to launch this in a separate console, but for this demo we’ll just redirect output to a file.

$ torchserve --start --ncs --model-store model_store --ts-config torchserve.config 2>&1 >torchserve.log

Verify that the server seems to have started okay.

$ curl http://127.0.0.1:8080/ping
{
  "status": "Healthy"
}

Note

If you get an error when trying to ping the server, you may have tried before the server was fully launched. Check torchserve.log for details.

Use the Management API to instruct TorchServe to load our model.

$ MAX_BATCH_DELAY=5000 # ms timeout before a partial batch is processed
$ INITIAL_WORKERS=4 # number of models that will be loaded at launch
$ curl -X POST "http://localhost:8081/models?url=$MODEL_NAME.mar&batch_size=$BATCH_SIZE&initial_workers=$INITIAL_WORKERS&max_batch_delay=$MAX_BATCH_DELAY"
{
  "status": "Model \"bert-max_length128-batch_size6\" Version: 1.0 registered with 4 initial workers"
}

Note

Any additional attempts to configure the model after the initial curl request will cause the server to return a 409 error. You’ll need to stop/start/configure the server to realize any changes.

The MAX_BATCH_DELAY is a timeout value that determines how long to wait before processing a partial batch. This is why the handler code needs to check the batch dimension and potentially add padding. TorchServe will instantiate the number of model handlers indicated by INITIAL_WORKERS, so this value controls how many models we will load onto Inferentia in parallel. This tutorial was performed on an inf1.xlarge instance (one Inferentia chip), so there are four NeuronCores available. If you want to control worker scaling more dynamically, see the docs.

Warning

If you attempt to load more models than NeuronCores available, one of two things will occur. Either the extra models will fit in device memory but performance will suffer, or you will encounter an error on your initial inference. You shouldn’t set INITIAL_WORKERS above the number of NeuronCores. However, you may want to use fewer cores if you are using the NeuronCore Pipeline feature.

It looks like everything is running successfully at this point, so it’s time for an inference.

Create the infer_bert.py file below on your instance.

 1import json
 2import concurrent.futures
 3import requests
 4
 5with open('config.json') as fp:
 6    config = json.load(fp)
 7max_length = config['max_length']
 8batch_size = config['batch_size']
 9name = f'bert-max_length{max_length}-batch_size{batch_size}'
10
11# dispatch requests in parallel
12url = f'http://localhost:8080/predictions/{name}'
13paraphrase = {'seq_0': "HuggingFace's headquarters are situated in Manhattan",
14        'seq_1': "The company HuggingFace is based in New York City"}
15not_paraphrase = {'seq_0': paraphrase['seq_0'], 'seq_1': 'This is total nonsense.'}
16
17with concurrent.futures.ThreadPoolExecutor(max_workers=batch_size) as executor:
18    def worker_thread(worker_index):
19        # we'll send half the requests as not_paraphrase examples for sanity
20        data = paraphrase if worker_index < batch_size//2 else not_paraphrase
21        response = requests.post(url, data=data)
22        print(worker_index, response.json())
23
24    for worker_index in range(batch_size):
25        executor.submit(worker_thread, worker_index)

This script will send a batch_size number of requests to our model. In this example, we are using a model that estimates the probability that one sentence is a paraphrase of another. The script sends positive examples in the first half of the batch and negative examples in the second half.

Execute the script in your terminal.

$ python infer_bert.py
1 ['paraphrase']
3 ['not paraphrase']
4 ['not paraphrase']
0 ['paraphrase']
5 ['not paraphrase']
2 ['paraphrase']

We can see that the first three threads (0, 1, 2) all report paraphrase, as expected. If we instead modify the script to send an incomplete batch and then wait for the timeout to expire, the excess padding results will be discarded.

Benchmark TorchServe#

We’ve seen how to perform a single batched inference, but how many inferences can we process per second? A separate upcoming tutorial will document performance tuning to maximize throughput. In the meantime, we can still perform a simple naïve stress test. The code below will spawn 64 worker threads, with each thread repeatedly sending a full batch of data to process. A separate thread will periodically print throughput and latency measurements.

 1import os
 2import argparse
 3import time
 4import numpy as np
 5import requests
 6import sys
 7from concurrent import futures
 8
 9import torch
10
11parser = argparse.ArgumentParser()
12parser.add_argument('--url', help='Torchserve model URL', type=str, default=f'http://127.0.0.1:8080/predictions/bert-max_length128-batch_size6')
13parser.add_argument('--num_thread', type=int, default=64, help='Number of threads invoking the model URL')
14parser.add_argument('--batch_size', type=int, default=6)
15parser.add_argument('--sequence_length', type=int, default=128)
16parser.add_argument('--latency_window_size', type=int, default=1000)
17parser.add_argument('--throughput_time', type=int, default=300)
18parser.add_argument('--throughput_interval', type=int, default=10)
19args = parser.parse_args()
20
21data = { 'seq_0': 'A completely made up sentence.',
22    'seq_1': 'Well, I suppose they are all made up.' }
23
24live = True
25num_infer = 0
26latency_list = []
27def one_thread(pred, feed_data):
28    global latency_list
29    global num_infer
30    global live
31    session = requests.Session()
32    while True:
33        start = time.time()
34        result = session.post(pred, data=feed_data)
35        latency = time.time() - start
36        latency_list.append(latency)
37        num_infer += 1
38        if not live:
39            break
40
41def current_performance():
42    last_num_infer = num_infer
43    for _ in range(args.throughput_time // args.throughput_interval):
44        current_num_infer = num_infer
45        throughput = (current_num_infer - last_num_infer) / args.throughput_interval
46        p50 = 0.0
47        p90 = 0.0
48        if latency_list:
49            p50 = np.percentile(latency_list[-args.latency_window_size:], 50)
50            p90 = np.percentile(latency_list[-args.latency_window_size:], 90)
51        print('pid {}: current throughput {}, latency p50={:.3f} p90={:.3f}'.format(os.getpid(), throughput, p50, p90))
52        sys.stdout.flush()
53        last_num_infer = current_num_infer
54        time.sleep(args.throughput_interval)
55    global live
56    live = False
57
58with futures.ThreadPoolExecutor(max_workers=args.num_thread+1) as executor:
59    executor.submit(current_performance)
60    for _ in range(args.num_thread):
61        executor.submit(one_thread, args.url, data)

Run the benchmarking script.

$ python benchmark_bert.py
pid 26980: current throughput 0.0, latency p50=0.000 p90=0.000
pid 26980: current throughput 584.1, latency p50=0.099 p90=0.181
pid 26980: current throughput 594.2, latency p50=0.100 p90=0.180
pid 26980: current throughput 598.8, latency p50=0.095 p90=0.185
pid 26980: current throughput 607.9, latency p50=0.098 p90=0.182
pid 26980: current throughput 608.6, latency p50=0.096 p90=0.181
pid 26980: current throughput 611.3, latency p50=0.096 p90=0.185
pid 26980: current throughput 610.2, latency p50=0.096 p90=0.185
...

Congratulations! By now you should have successfully served a batched model over TorchServe.

This document is relevant for: Inf1