概要

Amazon SageMaker のエンドポイントに訓練済みモデルをデプロイして推論を行ってみた。
尚、今回は PyTorch を利用したが、 他にも tensorflow や chainer など他のフレームワークも利用できる。

Amazon SageMaker で PyTorch を使用
https://docs.aws.amazon.com/ja_jp/sagemaker/latest/dg/pytorch.html

目次

モデルデータを作成しS3バケットに上げる

モデルは PyTorchで重回帰分析 で作成したものをそのまま利用する。

アップロードするフォルダの構成

以下の構成のフォルダを作成する。

sample_torch_model.tgz
 └ sample_torch_model.pth ... エクスポートした訓練済みモデル
 └ sample_torch_model_scalar.json ... 今回はエンドポイント側で入力データの標準化をしたいので必要な情報をJSON化して一緒にアップしておく(後述)

モデルの作成 及び エクスポート

モデルは PyTorchで重回帰分析 で作成したものをそのまま使用。

あとは以下の通り、エクスポートするだけ。

import json
import os

model_name = "sample_torch_model"
if not os.path.exists(model_name):
    os.mkdir(model_name)

# 訓練済みモデルを保存
model_path = f"{model_name}/{model_name}.pth"
#model_state = model.state_dict()
#model_state["my_scaler_params"] = scaler.get_params()
#model_state["my_scaler_mean"] = scaler.mean_
#model_state["my_scaler_var"] = scaler.var_
#model_state["my_scaler_scale"] = scaler.scale_
#torch.save(model_state, model_path)
torch.save(model.state_dict(), model_path)

#
# 標準化に必要な値をJSONに保存
#
scaler_dict = {}
scaler_dict["my_scaler_params"] = scaler.get_params()
scaler_dict["my_scaler_mean"] = scaler.mean_.tolist()
scaler_dict["my_scaler_var"] = scaler.var_.tolist()
scaler_dict["my_scaler_scale"] = scaler.scale_.tolist()
with open(f"{model_name}/{model_name}_scalar.json", "w") as f:
    f.write(json.dumps(scaler_dict))

tar.gz にする

階層を作りたくなかったので、いったん対象フォルダに移動して同じフォルダのものをアーカイブした。

cd sample_torch_model
tar czfv ../sample_torch_model.tar.gz .
cd ../

S3にアップロード

バケット作成

aws s3 mb s3://バケット名

s3にアップロード

aws s3api put-object --bucket 作成したバケット名 --key sample_torch_model.tar.gz --body ./sample_torch_model.tar.gz

ノートブックインスタンスの作成

Amazon SageMaker コンソールから ノートブックインスタンス に移動後、[ノートブックインスタンスの作成] を押下して作成する。

今回は以下の通り作成した。
※以下に記載のないものは、全て未選択 または デフォルト値で作成した。

設定名設定内容補足
ノートブックインスタンス名sample-notebook1
ノートブックインスタンスのタイプml.t2.medium
Elastic Inference今回は使用しない推論をガンガン行う場合は使用する(GPUが使える)
ライフサイクル設定なし
ボリュームサイズ5GB
IAM ロール[新しいロールの作成] から 作成「任意の S3 バケット」へのアクセスを許可。
ルートアクセス有効化

しばらくするとステータスが [Pending] から [InService] に変わるので、[Jupyter を開く] から jupyter notebook を開く。

[補足]
恐らく IAM ロール ( AmazonSageMakerFullAccess ) が設定されているIAMユーザを使用していれば、ローカルの jupyter でも問題ないと思われる。
※今回は試していない。

デプロイ

エントリポイントとなるファイルの作成

まずエントリポイントとなるファイルをノートブックインスタンス上に作成する。
解説は後述する事としてまずはコード。

entry_point.py

import argparse
import logging
import sagemaker_containers
import requests

import torch
import torch.nn as nn
import numpy as np
from six import BytesIO
from sklearn.preprocessing import StandardScaler

import os
import io
import json
import glob
import time
import re

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

JSON_CONTENT_TYPE = 'application/json'
XNPY_CONTENT_TYPE = 'application/x-npy'
CSV_CONTENT_TYPE  = 'text/csv'

INPUT_SIZE = 2
OUTPUT_SIZE = 1

class LinearRegression(nn.Module):
    """モデル定義"""
    def __init__(self, input_size, output_size):
        super(LinearRegression, self).__init__()
        self.linear = nn.Linear(input_size, output_size)
    def forward(self, x): 
        out = self.linear(x)
        return out

def model_fn(model_dir):
    """モデルのロード."""
    logger.info('START model_fn')
    model = LinearRegression(INPUT_SIZE, OUTPUT_SIZE)
    # モデルのパラメータ設定
    with open(os.path.join(model_dir, 'sample_torch_model.pth'), 'rb') as f:
        model.load_state_dict(torch.load(f))
    # 独自パラメータを設定
    with open(os.path.join(model_dir, 'sample_torch_model_scalar.json')) as f:
        my_state = json.load(f)
        for k,v in my_state.items():
            model.__dict__[k] = v
    logger.info('END   model_fn')
    return model

def input_fn(request_body, content_type=JSON_CONTENT_TYPE):
    """入力データの形式変換."""
    logger.info('START input_fn')
    logger.info(f'content_type: {content_type}')
    logger.info(f'request_body: {request_body}')
    logger.info(f'type: {type(request_body)}')
    if content_type == XNPY_CONTENT_TYPE:
        stream = BytesIO(request_body)
        input_data = np.load(stream)
    elif content_type == CSV_CONTENT_TYPE:
        request_body = request_body.encode("utf-8") if isinstance(request_body, str) else request_body
        input_data = np.loadtxt(BytesIO(request_body), delimiter=",")
    elif content_type == JSON_CONTENT_TYPE:
        input_data = np.array(json.loads(request_body))
    else:
        # TODO: content_typeに応じてデータ型変換
        logger.error(f"content_type invalid: {content_type}")
        input_data = {"errors": [f"content_type invalid: {content_type}"]}
    logger.info('END   input_fn')
    return input_data

def predict_fn(input_data, model):
    """推論."""
    logger.info('START predict_fn')

    if isinstance(input_data, dict) and 'errors' in input_data:
        logger.info('SKIP  predict_fn')
        logger.info('END   predict_fn')
        return input_data
        
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.eval()

    # 説明変数の標準化
    scaler = StandardScaler()
    scaler.set_params(**model.my_scaler_params)
    scaler.mean_ = model.my_scaler_mean
    scaler.var_ = model.my_scaler_var
    scaler.scale_ = model.my_scaler_scale
    scaled_input_data = scaler.transform(input_data)
    converted_input_data = torch.Tensor(scaled_input_data)

    # 推論
    with torch.no_grad():
        logger.info('END   predict_fn')
        return model(converted_input_data.to(device))

def output_fn(prediction, accept=JSON_CONTENT_TYPE):
    """出力データの形式変換."""
    logger.info('START output_fn')
    logger.info(f"accept: {accept}")

    if isinstance(prediction, dict) and 'errors' in prediction:
        logger.info('SKIP  output_fn')
        response = json.dumps(prediction)
        content_type = JSON_CONTENT_TYPE
    elif accept == XNPY_CONTENT_TYPE:
        buffer = BytesIO()
        np.save(buffer, prediction)
        response = buffer.getvalue()
        content_type = XNPY_CONTENT_TYPE
    elif accept == JSON_CONTENT_TYPE:
        response = json.dumps({"results": [prediction.data[i].item() for i in range(len(prediction.data))]})
        content_type = JSON_CONTENT_TYPE
    else:
        # TODO: コンテンツタイプに応じて変換
        response = json.dumps({"results": [prediction.data[i].item() for i in range(len(prediction.data))]})
        content_type = JSON_CONTENT_TYPE

    logger.info('END   output_fn')
    return response, content_type


if __name__ == '__main__':
    # 訓練してからデプロイする場合はここで行う
    logger.info("process main!")
    pass

エントリポイントの解説

参考
https://sagemaker.readthedocs.io/en/stable/using_pytorch.html#load-a-model
https://sagemaker.readthedocs.io/en/stable/using_pytorch.html#serve-a-pytorch-model

実装する必要がある関数

上記でエントリポイントに指定したファイルには、以下の関数を含める必要がある。

関数名説明
model_fnモデルのロードを行う関数
input_fn入力データの形式変換を行う関数
predict_fn推論を行う関数
output_fn結果を呼び出し元に返却する時に形式変換を行う関数

モデルのロード ( model_fn )

model_fn ではモデルのロードを行う。
やる事は、引数で渡ってくるモデルディレクトリ配下のモデルを利用する形式に合わせてロードするだけ。
モデルの他に別のデータ (今回でいうと標準化用のJSON) を含めてアップロードした場合は、ここで利用する事ができる。

入力データの形式変換 ( input_fn )

input_fn では入力データの形式変換を行う。

引数のコンテンツタイプから入力データの形式を判別して、推論用のデータ型に変換する作業を行う事になる。
尚、今回試した3つの形式( numpy.ndarray、csv、json ) の場合、引数のデータは全て bytearray で渡ってくる為、
bytearray を numpy.ndarray に変換する作業を行った。

今回は PyTorch を使用しているので、ここで torch.Tensor まで変換しておいてもよかったのだが、
標準化用の独自パラメータをモデルに引っ付けており、ここでは標準化まで行う事ができなかったので、いったん numpy.ndarray までの変換とした。
※引数にモデルがない為。

推論の実行 ( predict_fn )

input_fn で形式変換したデータを入力として実際の推論を行う。
エンドポイントのインスタンスタイプによっては GPU が利用できない為、GPU/CPU どちらでも推論ができるように実装しておく。

今回は、都合上ここで標準化 及び torch.Tensor への変換まで行った。

出力データの形式変換 ( output_fn )

呼び出し元で指定された accept の値に応じて、出力形式の変換を行う。
今回は、サンプルとして numpy.ndarray、csv、json の3つだけ実装してみた。

推論時の処理の流れ

推論時の処理は以下の通り実行される。

input_object = input_fn(request_body, request_content_type)

prediction = predict_fn(input_object, model)

output = output_fn(prediction, response_content_type)

その他

ここらへんを見る限り、他の依存ライブラリ等がある場合にはモデルデータに含めてアップロードする事ができる模様。
https://sagemaker.readthedocs.io/en/stable/using_pytorch.html#optional-arguments

エンドポイントの作成、デプロイ

ノートブックインスタンス上から以下を実行する。

# エンドポイントの作成、デプロイ
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()

# モデルの作成
pytorch_model = PyTorchModel(model_data="s3://バケット名/sample_torch_model.tar.gz",
                             role=role,
                             framework_version='1.3.1',
                             entry_point="entry_point.py")
# デプロイパラメータ
deploy_params = {
    'instance_type'          : 'ml.t2.medium'  # お試し用 (https://aws.amazon.com/jp/sagemaker/pricing/instance-types/ )
    ,'initial_instance_count' : 1              # お試し用
    #,'endpoint_name'          : 'sample-torch-model4'  # エンドポイント名を指定してのデプロイが何故かできない
}

# デプロイ
predictor = pytorch_model.deploy(**deploy_params)

https://sagemaker.readthedocs.io/en/stable/sagemaker.pytorch.html#sagemaker.pytorch.model.PyTorchModel

デプロイしたエンドポイントを使って推論してみる

import pandas as pd

# 入力データ ([部屋の広さ, 築年数])
input_data = [[60.0, 10.0], [50.0, 10.0], [40.0, 10.0]]

# 推論
predict_data = np.array(input_data)
results = predictor.predict(predict_data)

# 結果表示
result_df = pd.DataFrame(results, columns=["家賃(万円)"])
result_df["広さ()"] = predict_data[:,0]
result_df["築年数"] = predict_data[:,1]
result_df

結果

家賃(万円) 広さ() 築年数
0 8.117216 60.0 10.0
1 7.191902 50.0 10.0
2 6.266588 40.0 10.0

Lambdaなどからエンドポイントを利用する

#
# sage maker以外からエンドポイントを利用して推論
#
import boto3
import json

# 入力データ ([部屋の広さ, 築年数])
input_data = [[60.0, 10.0], [50.0, 10.0], [40.0, 10.0]]

# エンドポイント名
endpoint_name = "pytorch-inference-2020-02-28-12-35-37-541"

# JSONを送信する場合
request_body = json.dumps(input_data)
content_type = "application/json"
accept_type  = "application/json"

# CSVを送信する場合
#request_body = '\n'.join([','.join([str(x) for x in rec]) for rec in input_data])
#content_type = "text/csv"
#accept_type  = "application/json"

# 推論
client = boto3.client('sagemaker-runtime')
response = client.invoke_endpoint(
    EndpointName=endpoint_name,
    Body=request_body,
    ContentType=content_type,
    Accept=accept_type
)

# 結果表示
print("### response (Body以外)###")
print(json.dumps({k:v for k,v in response.items() if k != 'Body'}, indent=4))
print("### response (Body) ###")
response_dict = json.loads(response['Body'].read().decode("utf-8"))
print(json.dumps(response_dict, indent=4))

結果

### response (Body以外)###
{
    "ResponseMetadata": {
        "RequestId": "f5cca038......",
        "HTTPStatusCode": 200,
        "HTTPHeaders": {
            "x-amzn-requestid": "f5cca038......",
            "x-amzn-invoked-production-variant": "AllTraffic",
            "date": "Sat, 29 Feb 2020 XX:XX:XX GMT",
            "content-type": "application/json",
            "content-length": "69"
        },
        "RetryAttempts": 0
    },
    "ContentType": "application/json",
    "InvokedProductionVariant": "AllTraffic"
}
### response (Body) ###
{
    "results": [
        8.117216110229492,
        7.191902160644531,
        6.26658821105957
    ]
}

後片付け

ノートブックインスタンスから以下を実行する事でエンドポイントの削除が可能。

sagemaker.Session().delete_endpoint(predictor.endpoint)

トップ   差分 バックアップ リロード   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2020-03-02 (月) 02:02:14 (32d)