³µÍ× †Amazon SageMaker ¤Î¥¨¥ó¥É¥Ý¥¤¥ó¥È¤Ë·±ÎýºÑ¤ß¥â¥Ç¥ë¤ò¥Ç¥×¥í¥¤¤·¤Æ¿äÏÀ¤ò¹Ô¤Ã¤Æ¤ß¤¿¡£ Amazon SageMaker ¤Ç PyTorch ¤ò»ÈÍÑ Ìܼ¡ †
¥â¥Ç¥ë¥Ç¡¼¥¿¤òºîÀ®¤·S3¥Ð¥±¥Ã¥È¤Ë¾å¤²¤ë †¥â¥Ç¥ë¤Ï PyTorch¤Ç½Å²óµ¢Ê¬ÀÏ ¤ÇºîÀ®¤·¤¿¤â¤Î¤ò¤½¤Î¤Þ¤ÞÍøÍѤ¹¤ë¡£ ¥¢¥Ã¥×¥í¡¼¥É¤¹¤ë¥Õ¥©¥ë¥À¤Î¹½À® †°Ê²¼¤Î¹½À®¤Î¥Õ¥©¥ë¥À¤òºîÀ®¤¹¤ë¡£
sample_torch_model.tgz
¡¡¨¦ sample_torch_model.pth ... ¥¨¥¯¥¹¥Ý¡¼¥È¤·¤¿·±ÎýºÑ¤ß¥â¥Ç¥ë ¡¡¨¦ sample_torch_model_scalar.json ... º£²ó¤Ï¥¨¥ó¥É¥Ý¥¤¥ó¥È¦¤ÇÆþÎϥǡ¼¥¿¤Îɸ½à²½¤ò¤·¤¿¤¤¤Î¤ÇɬÍפʾðÊó¤òJSON²½¤·¤Æ°ì½ï¤Ë¥¢¥Ã¥×¤·¤Æ¤ª¤¯(¸å½Ò) ¥â¥Ç¥ë¤ÎºîÀ® µÚ¤Ó ¥¨¥¯¥¹¥Ý¡¼¥È †¥â¥Ç¥ë¤Ï PyTorch¤Ç½Å²óµ¢Ê¬ÀÏ ¤ÇºîÀ®¤·¤¿¤â¤Î¤ò¤½¤Î¤Þ¤Þ»ÈÍÑ¡£ ¤¢¤È¤Ï°Ê²¼¤ÎÄ̤ꡢ¥¨¥¯¥¹¥Ý¡¼¥È¤¹¤ë¤À¤±¡£ import json import os model_name = "sample_torch_model" if not os.path.exists(model_name): os.mkdir(model_name) # ·±ÎýºÑ¤ß¥â¥Ç¥ë¤òÊݸ model_path = f"{model_name}/{model_name}.pth" #model_state = model.state_dict() #model_state["my_scaler_params"] = scaler.get_params() #model_state["my_scaler_mean"] = scaler.mean_ #model_state["my_scaler_var"] = scaler.var_ #model_state["my_scaler_scale"] = scaler.scale_ #torch.save(model_state, model_path) torch.save(model.state_dict(), model_path) # # ɸ½à²½¤ËɬÍפÊÃͤòJSON¤ËÊݸ # scaler_dict = {} scaler_dict["my_scaler_params"] = scaler.get_params() scaler_dict["my_scaler_mean"] = scaler.mean_.tolist() scaler_dict["my_scaler_var"] = scaler.var_.tolist() scaler_dict["my_scaler_scale"] = scaler.scale_.tolist() with open(f"{model_name}/{model_name}_scalar.json", "w") as f: f.write(json.dumps(scaler_dict)) tar.gz ¤Ë¤¹¤ë †³¬Áؤòºî¤ê¤¿¤¯¤Ê¤«¤Ã¤¿¤Î¤Ç¡¢¤¤¤Ã¤¿¤óÂоݥե©¥ë¥À¤Ë°ÜÆ°¤·¤ÆƱ¤¸¥Õ¥©¥ë¥À¤Î¤â¤Î¤ò¥¢¡¼¥«¥¤¥Ö¤·¤¿¡£ cd sample_torch_model tar czfv ../sample_torch_model.tar.gz . cd ../ S3¤Ë¥¢¥Ã¥×¥í¡¼¥É †¥Ð¥±¥Ã¥ÈºîÀ® aws s3 mb s3://¥Ð¥±¥Ã¥È̾ s3¤Ë¥¢¥Ã¥×¥í¡¼¥É aws s3api put-object --bucket ºîÀ®¤·¤¿¥Ð¥±¥Ã¥È̾ --key sample_torch_model.tar.gz --body ./sample_torch_model.tar.gz ¥Î¡¼¥È¥Ö¥Ã¥¯¥¤¥ó¥¹¥¿¥ó¥¹¤ÎºîÀ® †Amazon SageMaker ¥³¥ó¥½¡¼¥ë¤«¤é ¥Î¡¼¥È¥Ö¥Ã¥¯¥¤¥ó¥¹¥¿¥ó¥¹ ¤Ë°ÜÆ°¸å¡¢[¥Î¡¼¥È¥Ö¥Ã¥¯¥¤¥ó¥¹¥¿¥ó¥¹¤ÎºîÀ®] ¤ò²¡²¼¤·¤ÆºîÀ®¤¹¤ë¡£ º£²ó¤Ï°Ê²¼¤ÎÄ̤êºîÀ®¤·¤¿¡£
¤·¤Ð¤é¤¯¤¹¤ë¤È¥¹¥Æ¡¼¥¿¥¹¤¬ [Pending] ¤«¤é [InService] ¤ËÊѤï¤ë¤Î¤Ç¡¢[Jupyter ¤ò³«¤¯] ¤«¤é jupyter notebook ¤ò³«¤¯¡£ [ÊäÂ] ¥Ç¥×¥í¥¤ †¥¨¥ó¥È¥ê¥Ý¥¤¥ó¥È¤È¤Ê¤ë¥Õ¥¡¥¤¥ë¤ÎºîÀ® †¤Þ¤º¥¨¥ó¥È¥ê¥Ý¥¤¥ó¥È¤È¤Ê¤ë¥Õ¥¡¥¤¥ë¤ò¥Î¡¼¥È¥Ö¥Ã¥¯¥¤¥ó¥¹¥¿¥ó¥¹¾å¤ËºîÀ®¤¹¤ë¡£ entry_point.py import argparse import logging import sagemaker_containers import requests import torch import torch.nn as nn import numpy as np from six import BytesIO from sklearn.preprocessing import StandardScaler import os import io import json import glob import time import re logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) JSON_CONTENT_TYPE = 'application/json' XNPY_CONTENT_TYPE = 'application/x-npy' CSV_CONTENT_TYPE = 'text/csv' INPUT_SIZE = 2 OUTPUT_SIZE = 1 class LinearRegression(nn.Module): """¥â¥Ç¥ëÄêµÁ""" def __init__(self, input_size, output_size): super(LinearRegression, self).__init__() self.linear = nn.Linear(input_size, output_size) def forward(self, x): out = self.linear(x) return out def model_fn(model_dir): """¥â¥Ç¥ë¤Î¥í¡¼¥É.""" logger.info('START model_fn') model = LinearRegression(INPUT_SIZE, OUTPUT_SIZE) # ¥â¥Ç¥ë¤Î¥Ñ¥é¥á¡¼¥¿ÀßÄê with open(os.path.join(model_dir, 'sample_torch_model.pth'), 'rb') as f: model.load_state_dict(torch.load(f)) # Æȼ«¥Ñ¥é¥á¡¼¥¿¤òÀßÄê with open(os.path.join(model_dir, 'sample_torch_model_scalar.json')) as f: my_state = json.load(f) for k,v in my_state.items(): model.__dict__[k] = v logger.info('END model_fn') return model def input_fn(request_body, content_type=JSON_CONTENT_TYPE): """ÆþÎϥǡ¼¥¿¤Î·Á¼°ÊÑ´¹.""" logger.info('START input_fn') logger.info(f'content_type: {content_type}') logger.info(f'request_body: {request_body}') logger.info(f'type: {type(request_body)}') if content_type == XNPY_CONTENT_TYPE: stream = BytesIO(request_body) input_data = np.load(stream) elif content_type == CSV_CONTENT_TYPE: request_body = request_body.encode("utf-8") if isinstance(request_body, str) else request_body input_data = np.loadtxt(BytesIO(request_body), delimiter=",") elif content_type == JSON_CONTENT_TYPE: input_data = np.array(json.loads(request_body)) else: # TODO: content_type¤Ë±þ¤¸¤Æ¥Ç¡¼¥¿·¿ÊÑ´¹ logger.error(f"content_type invalid: {content_type}") input_data = {"errors": [f"content_type invalid: {content_type}"]} logger.info('END input_fn') return input_data def predict_fn(input_data, model): """¿äÏÀ.""" logger.info('START predict_fn') if isinstance(input_data, dict) and 'errors' in input_data: logger.info('SKIP predict_fn') logger.info('END predict_fn') return input_data device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') model.to(device) model.eval() # ÀâÌÀÊÑ¿ô¤Îɸ½à²½ scaler = StandardScaler() scaler.set_params(**model.my_scaler_params) scaler.mean_ = model.my_scaler_mean scaler.var_ = model.my_scaler_var scaler.scale_ = model.my_scaler_scale scaled_input_data = scaler.transform(input_data) converted_input_data = torch.Tensor(scaled_input_data) # ¿äÏÀ with torch.no_grad(): logger.info('END predict_fn') return model(converted_input_data.to(device)) def output_fn(prediction, accept=JSON_CONTENT_TYPE): """½ÐÎϥǡ¼¥¿¤Î·Á¼°ÊÑ´¹.""" logger.info('START output_fn') logger.info(f"accept: {accept}") if isinstance(prediction, dict) and 'errors' in prediction: logger.info('SKIP output_fn') response = json.dumps(prediction) content_type = JSON_CONTENT_TYPE elif accept == XNPY_CONTENT_TYPE: buffer = BytesIO() np.save(buffer, prediction) response = buffer.getvalue() content_type = XNPY_CONTENT_TYPE elif accept == JSON_CONTENT_TYPE: response = json.dumps({"results": [prediction.data[i].item() for i in range(len(prediction.data))]}) content_type = JSON_CONTENT_TYPE else: # TODO: ¥³¥ó¥Æ¥ó¥Ä¥¿¥¤¥×¤Ë±þ¤¸¤ÆÊÑ´¹ response = json.dumps({"results": [prediction.data[i].item() for i in range(len(prediction.data))]}) content_type = JSON_CONTENT_TYPE logger.info('END output_fn') return response, content_type if __name__ == '__main__': # ·±Îý¤·¤Æ¤«¤é¥Ç¥×¥í¥¤¤¹¤ë¾ì¹ç¤Ï¤³¤³¤Ç¹Ô¤¦ logger.info("process main!") pass ¥¨¥ó¥È¥ê¥Ý¥¤¥ó¥È¤Î²òÀâ †»²¹Í ¼ÂÁõ¤¹¤ëɬÍפ¬¤¢¤ë´Ø¿ô †¾åµ¤Ç¥¨¥ó¥È¥ê¥Ý¥¤¥ó¥È¤Ë»ØÄꤷ¤¿¥Õ¥¡¥¤¥ë¤Ë¤Ï¡¢°Ê²¼¤Î´Ø¿ô¤ò´Þ¤á¤ëɬÍפ¬¤¢¤ë¡£
¥â¥Ç¥ë¤Î¥í¡¼¥É ( model_fn ) †model_fn ¤Ç¤Ï¥â¥Ç¥ë¤Î¥í¡¼¥É¤ò¹Ô¤¦¡£ ÆþÎϥǡ¼¥¿¤Î·Á¼°ÊÑ´¹ ( input_fn ) †input_fn ¤Ç¤ÏÆþÎϥǡ¼¥¿¤Î·Á¼°ÊÑ´¹¤ò¹Ô¤¦¡£ °ú¿ô¤Î¥³¥ó¥Æ¥ó¥Ä¥¿¥¤¥×¤«¤éÆþÎϥǡ¼¥¿¤Î·Á¼°¤òȽÊ̤·¤Æ¡¢¿äÏÀÍѤΥǡ¼¥¿·¿¤ËÊÑ´¹¤¹¤ëºî¶È¤ò¹Ô¤¦»ö¤Ë¤Ê¤ë¡£ º£²ó¤Ï PyTorch ¤ò»ÈÍѤ·¤Æ¤¤¤ë¤Î¤Ç¡¢¤³¤³¤Ç torch.Tensor ¤Þ¤ÇÊÑ´¹¤·¤Æ¤ª¤¤¤Æ¤â¤è¤«¤Ã¤¿¤Î¤À¤¬¡¢ ¿äÏÀ¤Î¼Â¹Ô ( predict_fn ) †input_fn ¤Ç·Á¼°ÊÑ´¹¤·¤¿¥Ç¡¼¥¿¤òÆþÎϤȤ·¤Æ¼ÂºÝ¤Î¿äÏÀ¤ò¹Ô¤¦¡£ º£²ó¤Ï¡¢ÅÔ¹ç¾å¤³¤³¤Çɸ½à²½ µÚ¤Ó torch.Tensor ¤Ø¤ÎÊÑ´¹¤Þ¤Ç¹Ô¤Ã¤¿¡£ ½ÐÎϥǡ¼¥¿¤Î·Á¼°ÊÑ´¹ ( output_fn ) †¸Æ¤Ó½Ð¤·¸µ¤Ç»ØÄꤵ¤ì¤¿ accept ¤ÎÃͤ˱þ¤¸¤Æ¡¢½ÐÎÏ·Á¼°¤ÎÊÑ´¹¤ò¹Ô¤¦¡£ ¿äÏÀ»þ¤Î½èÍý¤Îή¤ì †¿äÏÀ»þ¤Î½èÍý¤Ï°Ê²¼¤ÎÄ̤ê¼Â¹Ô¤µ¤ì¤ë¡£ input_object = input_fn(request_body, request_content_type) prediction = predict_fn(input_object, model) output = output_fn(prediction, response_content_type) ¤½¤Î¾ †¤³¤³¤é¤Ø¤ó¤ò¸«¤ë¸Â¤ê¡¢Â¾¤Î°Í¸¥é¥¤¥Ö¥é¥êÅù¤¬¤¢¤ë¾ì¹ç¤Ë¤Ï¥â¥Ç¥ë¥Ç¡¼¥¿¤Ë´Þ¤á¤Æ¥¢¥Ã¥×¥í¡¼¥É¤¹¤ë»ö¤¬¤Ç¤¤ëÌÏÍÍ¡£ ¥¨¥ó¥É¥Ý¥¤¥ó¥È¤ÎºîÀ®¡¢¥Ç¥×¥í¥¤ †¥Î¡¼¥È¥Ö¥Ã¥¯¥¤¥ó¥¹¥¿¥ó¥¹¾å¤«¤é°Ê²¼¤ò¼Â¹Ô¤¹¤ë¡£ # ¥¨¥ó¥É¥Ý¥¤¥ó¥È¤ÎºîÀ®¡¢¥Ç¥×¥í¥¤ sagemaker_session = sagemaker.Session() role = sagemaker.get_execution_role() # ¥â¥Ç¥ë¤ÎºîÀ® pytorch_model = PyTorchModel(model_data="s3://¥Ð¥±¥Ã¥È̾/sample_torch_model.tar.gz", role=role, framework_version='1.3.1', entry_point="entry_point.py") # ¥Ç¥×¥í¥¤¥Ñ¥é¥á¡¼¥¿ deploy_params = { 'instance_type' : 'ml.t2.medium' # ¤ª»î¤·ÍÑ (https://aws.amazon.com/jp/sagemaker/pricing/instance-types/ ) ,'initial_instance_count' : 1 # ¤ª»î¤·ÍÑ #,'endpoint_name' : 'sample-torch-model4' # ¥¨¥ó¥É¥Ý¥¤¥ó¥È̾¤ò»ØÄꤷ¤Æ¤Î¥Ç¥×¥í¥¤¤¬²¿¸Î¤«¤Ç¤¤Ê¤¤ } # ¥Ç¥×¥í¥¤ predictor = pytorch_model.deploy(**deploy_params) ¥Ç¥×¥í¥¤¤·¤¿¥¨¥ó¥É¥Ý¥¤¥ó¥È¤ò»È¤Ã¤Æ¿äÏÀ¤·¤Æ¤ß¤ë †import pandas as pd # ÆþÎϥǡ¼¥¿ ([Éô²°¤Î¹¤µ, ÃÛǯ¿ô]) input_data = [[60.0, 10.0], [50.0, 10.0], [40.0, 10.0]] # ¿äÏÀ predict_data = np.array(input_data) results = predictor.predict(predict_data) # ·ë²Ìɽ¼¨ result_df = pd.DataFrame(results, columns=["²ÈÄÂ(Ëü±ß)"]) result_df["¹¤µ(Ö)"] = predict_data[:,0] result_df["ÃÛǯ¿ô"] = predict_data[:,1] result_df ·ë²Ì
Lambda¤Ê¤É¤«¤é¥¨¥ó¥É¥Ý¥¤¥ó¥È¤òÍøÍѤ¹¤ë †# # sage maker°Ê³°¤«¤é¥¨¥ó¥É¥Ý¥¤¥ó¥È¤òÍøÍѤ·¤Æ¿äÏÀ # import boto3 import json # ÆþÎϥǡ¼¥¿ ([Éô²°¤Î¹¤µ, ÃÛǯ¿ô]) input_data = [[60.0, 10.0], [50.0, 10.0], [40.0, 10.0]] # ¥¨¥ó¥É¥Ý¥¤¥ó¥È̾ endpoint_name = "pytorch-inference-2020-02-28-12-35-37-541" # JSON¤òÁ÷¿®¤¹¤ë¾ì¹ç request_body = json.dumps(input_data) content_type = "application/json" accept_type = "application/json" # CSV¤òÁ÷¿®¤¹¤ë¾ì¹ç #request_body = '\n'.join([','.join([str(x) for x in rec]) for rec in input_data]) #content_type = "text/csv" #accept_type = "application/json" # ¿äÏÀ client = boto3.client('sagemaker-runtime') response = client.invoke_endpoint( EndpointName=endpoint_name, Body=request_body, ContentType=content_type, Accept=accept_type ) # ·ë²Ìɽ¼¨ print("### response (Body°Ê³°)###") print(json.dumps({k:v for k,v in response.items() if k != 'Body'}, indent=4)) print("### response (Body) ###") response_dict = json.loads(response['Body'].read().decode("utf-8")) print(json.dumps(response_dict, indent=4)) ·ë²Ì ### response (Body°Ê³°)### { "ResponseMetadata": { "RequestId": "f5cca038......", "HTTPStatusCode": 200, "HTTPHeaders": { "x-amzn-requestid": "f5cca038......", "x-amzn-invoked-production-variant": "AllTraffic", "date": "Sat, 29 Feb 2020 XX:XX:XX GMT", "content-type": "application/json", "content-length": "69" }, "RetryAttempts": 0 }, "ContentType": "application/json", "InvokedProductionVariant": "AllTraffic" } ### response (Body) ### { "results": [ 8.117216110229492, 7.191902160644531, 6.26658821105957 ] } ¸åÊÒÉÕ¤± †¥Î¡¼¥È¥Ö¥Ã¥¯¥¤¥ó¥¹¥¿¥ó¥¹¤«¤é°Ê²¼¤ò¼Â¹Ô¤¹¤ë»ö¤Ç¥¨¥ó¥É¥Ý¥¤¥ó¥È¤Îºï½ü¤¬²Äǽ¡£ sagemaker.Session().delete_endpoint(predictor.endpoint) |