Running OpenPose on Inferentia#

Note: this tutorial runs on tensorflow-neuron 1.x only#

Introduction:#

In this tutorial we will compile and deploy Openpose model for Inferentia. This jupyter notebook should run on an inf1.6xlarge instance for compilation and inference. The inference part of this tutorial requires inf1.6xlarge and not the compilation itself. For simplicity we will run this tutorial on a single instance but in real life scenario the compilation can be done on a compute c5.4xlarge instance and the deployment on the inf1 instance family.

In this tutorial we provide two main sections: 1. Compile the OpenPose model on inf1x6large. 2. Infer the same compiled model on inf1x6large.

Verify that this Jupyter notebook is running the Python kernel environment that was set up according to the Tensorflow Installation Guide. You can select the Kernel from the “Kernel -> Change Kernel” option on the top of this Jupyter notebook page.

Acknowledgement:#

Many thanks to https://github.com/ildoonet for providing pretrained model as well as the image preprocessing/pose estimating infrastructure.

Download tensorflow pose net frozen graph.#

[ ]:
!wget -c --tries=2 $( wget -q -O - http://www.mediafire.com/file/qlzzr20mpocnpa3/graph_opt.pb | grep -o 'http*://download[^"]*' | tail -n 1 ) -O graph_opt.pb


Compile#

Compile the pose net frozen graph into AWS Neuron compatible form. Network input image resolution is adjustable with argument –net_resolution (e. g., –net_resolution=656x368). The compiled model can accept arbitrary batch size input at runtime.

[ ]:
"""
Usage: python convert_graph_opt.py /path/to/graph_opt.pb /path/to/graph_opt_neuron.pb
"""
#import argparse
import numpy as np
import tensorflow as tf
from tensorflow.core.framework.tensor_shape_pb2 import TensorShapeProto
import tensorflow.neuron as tfn


def compile():
    #parser = argparse.ArgumentParser()
    #parser.add_argument('input_pb_path', help='Input serialized GraphDef protobuf')
    #parser.add_argument('output_pb_path', help='Ouput serialized GraphDef protobuf')
    #parser.add_argument('--net_resolution', default='656x368', help='Network resolution in WxH format, e. g., --net_resolution=656x368')
    #parser.add_argument('--debug_verify', action='store_true')
    #args = parser.parse_args()

    input_pb_path = './graph_opt.pb'
    net_resolution = '656x368'
    output_pb_path = './graph_opt_neuron_' + net_resolution + '.pb'

    debug_verify = 'store_true'
    dim_w, dim_h = net_resolution.split('x')
    dim_w = int(dim_w)
    dim_h = int(dim_h)
    graph_def = tf.GraphDef()
    with open(input_pb_path, 'rb') as f:
        graph_def.ParseFromString(f.read())

    if debug_verify:
        np.random.seed(0)
        feed_dict = {'image:0': np.random.rand(1, dim_h, dim_w, 3)}
        output_name = 'Openpose/concat_stage7:0'
        with tf.Session(graph=tf.Graph()) as sess:
            tf.import_graph_def(graph_def, name='')
            result_reference = sess.run(output_name, feed_dict)

    preprocessing_ops = {'preprocess_divide', 'preprocess_divide/y', 'preprocess_subtract', 'preprocess_subtract/y'}
    graph_def = nhwc_to_nchw(graph_def, preprocessing_ops)
    graph_def = inline_float32_to_float16(graph_def, preprocessing_ops)
    with tf.Session(graph=tf.Graph()) as sess:
        tf.import_graph_def(graph_def, name='')
        no_fuse_ops = preprocessing_ops.union({'Openpose/concat_stage7'})
        infer_graph = tfn.graph_util.inference_graph_from_session(
            sess, shape_feed_dict={'image:0': [1, dim_h, dim_w, 3]}, output_tensors=['Openpose/concat_stage7:0'],
            no_fuse_ops=no_fuse_ops, dynamic_batch_size=True,
        )
    with open(output_pb_path, 'wb') as f:
        f.write(infer_graph.as_graph_def().SerializeToString())

    if debug_verify:
        with tf.Session(graph=infer_graph) as sess:
            result_compiled = sess.run(output_name, feed_dict)
        np.testing.assert_allclose(result_compiled, result_reference, rtol=1e-2, atol=1e-3)


def inline_float32_to_float16(graph_def, preprocessing_ops):
    float32_enum = tf.float32.as_datatype_enum
    float16_enum = tf.float16.as_datatype_enum
    graph = tf.Graph()
    with graph.as_default():
        tf.import_graph_def(graph_def, name='')
    graph_def = graph.as_graph_def()
    for node in graph_def.node:
        if node.name in preprocessing_ops or node.op == 'Placeholder':
            cast_input_node_name = node.name
            continue
        if node.op == 'Const':
            if node.attr['dtype'].type == float32_enum:
                node.attr['dtype'].type = float16_enum
                tensor_def = node.attr['value'].tensor
                tensor_def.dtype = float16_enum
                if tensor_def.tensor_content:
                    const_np = np.frombuffer(tensor_def.tensor_content, dtype=np.float32).astype(np.float16)
                    tensor_def.tensor_content = const_np.tobytes()
                elif len(tensor_def.float_val):
                    const_np = np.array(tensor_def.float_val).astype(np.float16).view(np.uint16)
                    tensor_def.float_val[:] = []
                    tensor_def.half_val[:] = list(const_np)
                else:
                    raise NotImplementedError
        elif 'T' in node.attr and node.attr['T'].type == float32_enum:
            node.attr['T'].type = float16_enum
    for node in graph_def.node:
        if node.name == cast_input_node_name:
            node.name = '{}_PreCastFloat32ToFlot16'.format(node.name)
            input_node = node
            break
    cast_input_node = _gen_cast_node_def(cast_input_node_name, tf.float16, input_node)

    output_node = graph_def.node[-1]
    cast_output_node_name = output_node.name
    output_node.name = '{}_PreCastFloat16ToFlot32'.format(output_node.name)
    cast_output_node = _gen_cast_node_def(cast_output_node_name, tf.float32, output_node)

    preprocessing_ops.add(input_node.name)
    new_graph_def = tf.GraphDef()
    new_graph_def.node.extend(graph_def.node)
    new_graph_def.node.append(cast_input_node)
    new_graph_def.node.append(cast_output_node)
    graph = tf.Graph()
    with graph.as_default():
        tf.import_graph_def(new_graph_def, name='')
    return graph.as_graph_def()


def nhwc_to_nchw(graph_def, preprocessing_ops):
    graph = tf.Graph()
    with graph.as_default():
        tf.import_graph_def(graph_def, name='')
    graph_def = graph.as_graph_def()
    node_name_to_node = {node.name: node for node in graph_def.node}
    for node in graph_def.node:
        if node.name in preprocessing_ops or node.op == 'Placeholder':
            transpose_input_node_name = node.name
            continue
        if node.op == 'Conv2D':
            node.attr['data_format'].s = b'NCHW'
            strides = node.attr['strides'].list.i
            strides[:] = [strides[0], strides[3], strides[1], strides[2]]
        elif node.op == 'BiasAdd':
            if node.name != 'probs/BiasAdd':
                node.attr['data_format'].s = b'NCHW'
        elif node.op == 'MaxPool':
            node.attr['data_format'].s = b'NCHW'
            ksize = node.attr['ksize'].list.i
            ksize[:] = [ksize[0], ksize[3], ksize[1], ksize[2]]
            strides = node.attr['strides'].list.i
            strides[:] = [strides[0], strides[3], strides[1], strides[2]]
        elif node.op in {'Concat', 'ConcatV2'}:
            node_axes = node_name_to_node[node.input[-1]]
            node_axes.attr['value'].tensor.int_val[:] = [1]
    for node in graph_def.node:
        if node.name == transpose_input_node_name:
            node.name = '{}_PreTransposeNHWC2NCHW'.format(node.name)
            input_node = node
            break
    transpose_input_node, transpose_input_perm_node = _gen_transpose_def(transpose_input_node_name, [0, 3, 1, 2], input_node)

    output_node = graph_def.node[-1]
    transpose_output_node_name = output_node.name
    output_node.name = '{}_PreTransposeNCHW2NHWC'.format(output_node.name)
    transpose_output_node, transpose_output_perm_node = _gen_transpose_def(transpose_output_node_name, [0, 2, 3, 1], output_node)

    preprocessing_ops.add(input_node.name)
    preprocessing_ops.add(transpose_input_perm_node.name)
    new_graph_def = tf.GraphDef()
    new_graph_def.node.extend(graph_def.node)
    new_graph_def.node.append(transpose_input_perm_node)
    new_graph_def.node.append(transpose_input_node)
    new_graph_def.node.append(transpose_output_perm_node)
    new_graph_def.node.append(transpose_output_node)
    graph = tf.Graph()
    with graph.as_default():
        tf.import_graph_def(new_graph_def, name='')
    return graph.as_graph_def()


def _gen_cast_node_def(name, target_dtype, input_node):
    cast_node = tf.NodeDef(name=name, op='Cast')
    cast_node.input.append(input_node.name)
    cast_node.attr['DstT'].type = target_dtype.as_datatype_enum
    cast_node.attr['SrcT'].type = input_node.attr['T'].type
    cast_node.attr['Truncate'].b = False
    return cast_node


def _gen_transpose_def(name, perm, input_node):
    perm_node = tf.NodeDef(name='{}/perm'.format(name), op='Const')
    perm_node.attr['dtype'].type = tf.int32.as_datatype_enum
    tensor_def = perm_node.attr['value'].tensor
    tensor_def.dtype = tf.int32.as_datatype_enum
    tensor_def.tensor_shape.dim.append(TensorShapeProto.Dim(size=4))
    tensor_def.tensor_content = np.array(perm, dtype=np.int32).tobytes()
    transpose_node = tf.NodeDef(name=name, op='Transpose')
    transpose_node.input.append(input_node.name)
    transpose_node.input.append(perm_node.name)
    transpose_node.attr['T'].type = input_node.attr['T'].type
    transpose_node.attr['Tperm'].type = tf.int32.as_datatype_enum
    return transpose_node, perm_node

[ ]:
compile()

# Sample output will look like below:
# WARNING:tensorflow:From <ipython-input-3-27d3844cd753>:47: inference_graph_from_session (from tensorflow_neuron.python.graph_util) is deprecated and will be removed in a future version.
# Instructions for updating:
# Please refer to AWS documentation on Neuron integrated TensorFlow 2.0.
# INFO:tensorflow:Froze 0 variables.
# INFO:tensorflow:Converted 0 variables to const ops.
# INFO:tensorflow:fusing subgraph {subgraph neuron_op_ed41d2deb8c54255 with input tensors ["<tf.Tensor 'preprocess_subtract0/_0:0' shape=(1, 3, 368, 656) dtype=float16>"], output tensors ["<tf.Tensor 'Openpose/concat_stage7_PreCastFloat16ToFlot32:0' shape=(1, 46, 82, 57) dtype=float16>"]} with neuron-cc
# INFO:tensorflow:Number of operations in TensorFlow session: 474
# INFO:tensorflow:Number of operations after tf.neuron optimizations: 474
# INFO:tensorflow:Number of operations placed on Neuron runtime: 465

Deploy#

Using same instance to deploy the model. In case of different deployment instance, launch a deployment inf1 instance and copy the AWS Neuron optimized tensorflow frozen graph graph_opt_neuron_656x368.pb to the deployment inf1 instance. The smallest instance type inf1.xlarge is sufficient for this demo.

Your graph_opt_neuron_656x368.pb can now be plugged into https://github.com/ildoonet seemlessly if you have tensorflow-neuron installed. When it is used at runtime, please ensure that the image resolution is the same as compile-time image resolution, i. e., 656x368.

Measure performance on the compiled frozen graph using dummy inputs.

[ ]:
"""
Copyright (C) 2020, Amazon.com. All Rights Reserved
"""
import os
import atexit
import time
import math
import json
from collections import OrderedDict, Counter
from contextlib import contextmanager, ContextDecorator
from functools import wraps
from tensorflow.python.client import session
from tensorflow.python.platform import tf_logging as logging


class measure_performance(ContextDecorator):
    """Convenient tool for performance measurements.
    Can be apply on tensorflow session.run, tf-serving unary gRPC calls, or a given custom function.
    Usage:
    To generate performance report for the entire Python or gRPC-client process, insert
    the following function call before running inferences:
    `tfn.measure_performance()`
    Then latency/throughput report will be generated when the process terminates.
    Alternatively, it is possible to use `tfn.measure_performance` programmatically
    as a context manager. Performance measurement will be done for all inferences
    happening under this context. Report will be displayed as INFO level log when exiting
    the context. It is also possible to obtain a JSON format report in Python.
    For example:
    ```
    with tfn.measure_performance() as perf:
        ... (run some inferences) ...
    report_json = perf.report()
    report_full_json = perf.report(verbosity=1)
    ```
    """

    def __init__(self, func=None, window_size=1):
        self.perf_tracker = PerformanceTracker(window_size)
        atexit.register(self.perf_tracker.report)
        self._original_run = session.Session.run
        self._original_grpc_call = None
        if callable(func):
            self.perf_tracker.register_func(self._track_performance(func))
        else:
            session.Session.run = self._track_performance(session.Session.run)
            try:
                import grpc
                from tensorflow_serving.apis import prediction_service_pb2_grpc
                dummy_stub = prediction_service_pb2_grpc.PredictionServiceStub(grpc.insecure_channel(''))
                self._grpc_callable_type = type(dummy_stub.Predict)
                self._original_grpc_call = self._grpc_callable_type.__call__
            except ImportError:
                pass
            if callable(self._original_grpc_call):
                self._grpc_callable_type.__call__ = self._track_performance(
                    grpc._channel._UnaryUnaryMultiCallable.__call__
                )

    def __enter__(self):
        return self.perf_tracker

    def __exit__(self, *exc):
        atexit.unregister(self.perf_tracker.report)
        self.perf_tracker.report()
        session.Session.run = self._original_run
        if self._original_grpc_call is not None:
            self._grpc_callable_type.__call__ = self._original_grpc_call
        return False

    def _track_performance(self, func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            start = time.time()
            result = func(*args, **kwargs)
            end = time.time()
            self.perf_tracker.add_timestamps(start, end)
            return result
        return wrapper


class PerformanceTracker(ContextDecorator):

    description = (
        "Latency unit: second. Throughput unit: number of batched inferences per second. "
        "Reported throughput is a lower bound of the actual throughput as inferences "
        "spanning across window boundaries are not counted towards any of the windows. "
        "'Quiet' periods (i. e., window buckets where the inference function is not called) "
        "are not counted towards the reported average throughput."
    )

    def __init__(self, window_size):
        self.window_size = window_size
        self.timestamps_list = []
        self._func = None

    def __call__(self, *args, **kwargs):
        return self._func(*args, **kwargs)

    def register_func(self, func):
        self._func = func

    def add_timestamps(self, start, end):
        self.timestamps_list.append([start, end])

    def report(self, verbosity=0):
        if self.timestamps_list:
            latency_list = [end - start for start, end in self.timestamps_list]
            latency_json = {
                'p50': percentile(latency_list, 50),
                'p90': percentile(latency_list, 90),
                'p99': percentile(latency_list, 99),
                'p100': percentile(latency_list, 100),
            }
            bucketed_timestamps = [self._get_bucket(start, end) for start, end in self.timestamps_list]
            counted_buckets = Counter(item for item in bucketed_timestamps if item is not None)
            bucket_throughputs = [(key, value / self.window_size) for key, value in sorted(counted_buckets.items())]
            busy_throughputs = list(OrderedDict((key, value) for key, value in bucket_throughputs).values())
            throughput_json = {
                'peak': max(busy_throughputs),
                'median': percentile(busy_throughputs, 50),
                'average': sum(busy_throughputs) / len(busy_throughputs),
            }
            if verbosity > 0:
                throughput_json['trend'] = busy_throughputs
            report_json = {
                'pid': os.getpid(),
                'throughput': throughput_json,
                'latency': latency_json,
                'description': PerformanceTracker.description,
            }
            with _logging_show_info():
                logging.info('performance report:\n{}'.format(json.dumps(report_json, indent=4)))
            return report_json

    def _get_bucket(self, start, end):
        bucketed_start = math.floor(start / self.window_size) * self.window_size
        bucketed_end = math.ceil(end / self.window_size) * self.window_size
        if bucketed_end - bucketed_start == self.window_size:
            return bucketed_start
        else:
            return None


def percentile(number_list, percent):
    pos_float = len(number_list) * percent / 100
    max_pos = len(number_list) - 1
    pos_floor = min(math.floor(pos_float), max_pos)
    pos_ceil = min(math.ceil(pos_float), max_pos)
    number_list = sorted(number_list)
    return number_list[pos_ceil] if pos_float - pos_floor > 0.5 else number_list[pos_floor]


@contextmanager
def _logging_show_info():
    try:
        verbosity = logging.get_verbosity()
        logging.set_verbosity(logging.INFO)
        yield
    finally:
        logging.set_verbosity(verbosity)
[ ]:
"""
Below are the inputs for compiled frozen graph

pb_path is a /path/graph_opt_neuron_656x368.pb
num_thread = 8 ( Number of threads that work on each tensorflow session )
batch_size =1 ( batch_size )
net_resolution ,default=656x368
num_inferences = 200
"""
import os
from concurrent import futures
import numpy as np
import tensorflow as tf
import tensorflow.neuron as tfn

def run_with_dummy(sess, dummy_feed_dict, num_inferences):
    for _ in range(num_inferences):
        sess.run('Openpose/concat_stage7:0', dummy_feed_dict)

def main():
    NUM_NEURON_CORES = 16
    pb_path = './graph_opt_neuron_656x368.pb'
    num_thread = 8
    batch_size = 1
    net_resolution = '656x368'
    num_inferences = 200
    dim_w, dim_h = net_resolution.split('x')
    dim_w = int(dim_w)
    dim_h = int(dim_h)
    graph_def = tf.GraphDef()
    with open(pb_path, 'rb') as f:
        graph_def.ParseFromString(f.read())

    graph_def = tfn.graph_util.tag_multicore(graph_def, NUM_NEURON_CORES)

    with tfn.measure_performance() as perf:
        with tf.Session(graph=tf.Graph()) as sess:
            tf.import_graph_def(graph_def, name='')
            input_name = 'image:0'
            input_shape = sess.graph.get_tensor_by_name(input_name).shape.as_list()
            input_shape[0] = batch_size
            input_shape[1] = dim_h
            input_shape[2] = dim_w
            dummy_feed_dict = {input_name: np.zeros(input_shape).astype(np.float32)}
            with futures.ThreadPoolExecutor(max_workers=num_thread) as executor:
                fut_list = [executor.submit(run_with_dummy, sess, dummy_feed_dict, num_inferences) for _ in range(num_thread)]
                res_list = [fut.result() for fut in fut_list]

main()

# Sample output will look like below:
# INFO:tensorflow:performance report:
# {
#    "pid": 17713,
#    "throughput": {
#        "peak": 66.0,
#        "median": 64.0,
#        "average": 61.56521739130435
#    },
#    "latency": {
#        "p50": 0.1106414794921875,
#        "p90": 0.11212301254272461,
#        "p99": 0.11337876319885254,
#        "p100": 7.08282732963562
#    },
#    "description": "Latency unit: second. Throughput unit: number of batched inferences per second. Reported throughput is a lower bound of the actual throughput as inferences spanning across window boundaries are not counted towards any of the windows. 'Quiet' periods (i. e., window buckets where the inference function is not called) are not counted towards the reported average throughput."
# }