目次 †
概要 †オンプレミスで稼働しているサーバのログをS3に収集してCSVに変換する所までの仕組みを構築する。 処理の流れ †
イメージ †![]() AWS側の処理作成 †ファイル/フォルダ構成 †/collectlog2aws Lambda本体(メインハンドラ) †src/index.py from collectlog2aws import genurl, log2csv def generate_url(event, context): """ 署名付きURLを発行する. """ return genurl.handler(event, context) def analyze_log(event, context): """ PUTされたログを解析する. """ return log2csv.handler(event, context) 共通処理 †src/collectlog2aws/__init__.py import boto3
client = boto3.client('ssm')
# 署名付きURLの生成を許可するホスト
ALLOW_HOSTS_TEXT = client.get_parameter(Name='LogPutAllowHosts', WithDecryption=False)["Parameter"]["Value"]
ALLOW_HOSTS = [x.strip() for x in ALLOW_HOSTS_TEXT.split(",")]
# 署名付きURLで許可する操作など
ALLOW_CLIENT_METHOD = "put_object"
ALLOW_HTTP_METHOD = "PUT"
# バケット名
SERVER_LOG_BUCKET = client.get_parameter(Name='ServerlogBucketName' , WithDecryption=False)["Parameter"]["Value"]
ANALYZED_LOG_BUCKET = client.get_parameter(Name='AnalyzedlogBucketName', WithDecryption=False)["Parameter"]["Value"]
def make_object_key(target_server, target_date, kind):
"""
オブジェクトキーを生成する.
"""
# 大量アクセスが想定される場合は4文字程度のプレフィックスを付与しておく
#object_key_prefix = base64.b64encode(bytes(target_server + target_date[0:6], "utf-8")).decode()[0:4]
#return f'{object_key_prefix}/{target_server}/{target_date}.{kind}'
return f'{target_server}/{target_date}.{kind}'
def make_bad_response(status=400, message="Bad Request"):
"""
エラー時のレスポンスデータを作成する.
"""
response_body = message
response_type = "plain/text"
if isinstance(response_body, (list, dict)):
response_body = json.dumps(response_body)
response_type = "application/json"
return {
"statusCode": status,
"headers": {"Content-Type": response_type},
"body": response_body
}
署名付きURLの生成のLambda作成 †src/collectlog2aws/genurl.py """
サーバの生ログをアップロードする為の署名付きURLを生成する.
"""
import boto3
import json
import base64
import re
from collectlog2aws import *
def handler(event, context):
"""
メイン処理.
"""
# リクエスト情報のチェック
if not check_request(event):
return make_bad_response()
# サーバ名、ログ日付の取得
req_body = event['body'] if 'body' in event else {}
if 'queryStringParameters' in event and event['queryStringParameters']:
req_body = event['queryStringParameters']
target_server = req_body["server"] if 'server' in req_body else None
target_date = req_body["date"] if 'date' in req_body else None
# 許可するバケット名、オブジェクトキー、操作など
bucket = SERVER_LOG_BUCKET
object_key = make_object_key(target_server, target_date, "log")
# 署名付きURLを生成する
url = boto3.client('s3').generate_presigned_url(
ClientMethod = ALLOW_CLIENT_METHOD,
Params = {'Bucket' : bucket, 'Key' : object_key},
ExpiresIn = 300,
HttpMethod = ALLOW_HTTP_METHOD
)
# TODO: URLの暗号化など
encrypted_url = url
# レスポンスの組み立て
return {
"statusCode": 200,
"headers": {"Content-Type": "plain/text"},
"body": encrypted_url
}
def check_request(event):
"""
リクエスト情報のチェック.
"""
req_body = event['body'] if 'body' in event else {}
if 'queryStringParameters' in event and event['queryStringParameters']:
req_body = event['queryStringParameters']
print(json.dumps(event))
# リクエスト元のIPアドレスチェック等
if event["requestContext"]["identity"]["sourceIp"] not in ALLOW_HOSTS:
print("sourceIp error!")
return False
# サーバ、日付のチェック
target_server = req_body["server"] if 'server' in req_body else None
target_date = req_body["date"] if 'date' in req_body else None
if target_server is None or target_date is None:
# サーバ、日付が未指定の時はエラー
print("server empty!")
return False
else:
if len(re.sub("[^0-9]", "", target_date)) != 8:
# 日付形式チェック
print("date error!")
return False
#else:
# TODO: 日付の妥当性チェックなど
# return False
return True
CSVに変換するLambda作成 †src/collectlog2aws/log2csv.py ""
S3バケットにPUTされたログを解析する.
(1) logをcsvファイルに変換する.
(2) 対象日付のサーバ負荷グラフを作成する.
"""
import boto3
import json
import base64
import re
from collectlog2aws import *
s3 = boto3.resource('s3')
def handler(event, context):
"""
メイン処理.
"""
print(json.dumps(event))
# リクエスト情報のチェック
if not check_request(event):
return make_bad_response()
# バケット、オブジェクトキーの取得
log_bucket = event["Records"][0]["s3"]["bucket"]["name"]
log_key = event["Records"][0]["s3"]["object"]["key"]
# ログファイルをS3から取得
s3obj = s3.Object(log_bucket, log_key)
log_text = s3obj.get()['Body'].read().decode("utf-8")
# ログの取得 及び CSV変換
csv_text = to_csv_text(log_text)
# CSV変換後の結果出力
csv_bucket = ANALYZED_LOG_BUCKET
csv_key = re.sub("log$", "csv", log_key)
s3obj = s3.Object(csv_bucket, csv_key)
res = s3obj.put(Body = csv_text.encode("utf-8"))
## TODO: サーバ負荷グラフの作成
# TODO
response_body = {
"event": event
}
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(response_body)
}
def check_request(event):
"""
リクエスト情報のチェック.
"""
records = event.get("Records", [])
if not isinstance(records, list) or len(records) == 0:
return False
host_ip = records[0].get("requestParameters", {}).get("sourceIPAddress", None)
bucket = records[0].get("s3", {}).get("bucket", {}).get("name", None)
key = records[0].get("s3", {}).get("object", {}).get("key", None)
# リクエスト元のIPアドレスチェック等
if host_ip is None or host_ip not in ALLOW_HOSTS:
print("sourceIp error!")
return False
# バケット名のチェック
if bucket is None or host_ip not in ALLOW_HOSTS:
return False
# オブジェクトキーのチェック
if key is None or host_ip not in ALLOW_HOSTS:
return False
return True
def to_csv_text(log_text):
"""
ログ(テキスト)をCSV形式(テキスト)に変換する.
"""
# この辺はログフォーマットに合わせて適宜調整.
log_text = re.sub("[ ]+", ",", re.sub(",", " ", re.sub(" min", "min", log_text)))
lines = log_text.split("\n")
rows = filter(lambda x: len(x) > 29, [[y.strip() for y in x.split(",")] for x in lines])
csv_header = ",".join(["datetime", "past-times", "users", "load-average01", "load-average05", "load-average15",
"procs-r", "procs-b", "mem-swpd", "mem-free", "mem-buff", "mem-cache", "swap-in", "swap-out",
"buff-in", "buff-out", "system-in", "system-cs", "cpu-us", "cpu-sy", "cpu-id", "cpu-wa", "cpu-st"])
csv_body = "\n".join([",".join([x[0] + " " + x[1], x[3] + x[4] + " " + x[5], x[6], x[10], x[11], x[12]
,x[13], x[14], x[15], x[16], x[17], x[18], x[19], x[20]
,x[21], x[22], x[23], x[24], x[25], x[26], x[27], x[28], x[29]
]) for x in rows
])
return csv_header + "\n" + csv_body + "\n"
CloudFormationテンプレートの作成 †S3バケット 及び Lambdaを作成/デプロイする為の CloudFormationテンプレートを作成する template.yml †AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: "Stack for generate pre signed url and analyze log when put to s3."
Parameters:
# ログのPUTを許可するホスト(ローカルからのテスト用に一応パラメータオーバーライドできるようにしておく)
ParamLogPutAllowHosts:
Type: "String"
Default: "xxx.xxx.xxx.xxx, xxx.xxx.xxx.xxx, xxx.xxx.xxx.xxx, xxx.xxx.xxx.xxx"
Resources:
# ログのPUTを許可するホスト(パラメータストアに格納)
LogPutAllowHosts:
Type: "AWS::SSM::Parameter"
Properties:
Name: "LogPutAllowHosts"
Type: "String"
Value: !Ref ParamLogPutAllowHosts
# サーバの生ログ格納用バケット名(パラメータストアに格納)
ServerlogBucketName:
Type: "AWS::SSM::Parameter"
Properties:
Name: "ServerlogBucketName"
Type: "String"
Value: !Sub 'serverlogs-${AWS::AccountId}'
# 変換後のCSV格納用バケット名(パラメータストアに格納)
AnalyzedlogBucketName:
Type: "AWS::SSM::Parameter"
Properties:
Name: "AnalyzedlogBucketName"
Type: "String"
Value: !Sub 'analyzedlogs-${AWS::AccountId}'
# ログファイルアップロード用の署名付きURL生成するLambda
GenerateSignedUrlFunc:
Type: "AWS::Serverless::Function"
Properties:
FunctionName: GenerateSignedUrl
Description: "サーバログ用のs3バケットの署名付きURLを生成する"
Handler: index.generate_url
Runtime: python3.6
CodeUri: ./src
MemorySize: 128
Timeout: 60
# 直接パラメータストアから取得する為、コメントアウト
#Environment:
# Variables:
# ALLOW_HOSTS: !GetAtt LogPutAllowHosts.Value
# ANALYZED_LOG_BUCKET: !GetAtt AnalyzedlogBucketName.Value
# SERVER_LOG_BUCKET: !GetAtt ServerlogBucketName.Value
Events:
GenUrlApi:
Type: Api
Properties:
Path: /
Method: get
Role: !GetAtt GenerateSignedUrlFuncRole.Arn
# ログファイルアップロード用の署名付きURLを生成するLambda用のロール
GenerateSignedUrlFuncRole:
Type: "AWS::IAM::Role"
Properties:
RoleName: GenerateSignedUrlFuncRole
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service: "lambda.amazonaws.com"
Action: "sts:AssumeRole"
Policies:
- PolicyName: "GenerateSignedUrlPolicy"
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
- ssm:GetParameters # パラメータストアから値を取得する為の権限
- ssm:GetParameter # パラメータストアから値を取得する為の権限
- s3:PutObject # 生ログ格納用のs3バケットにログをputする為の権限を付与しておく
Resource: "*" # TODO: 対象のバケットを絞る(循環参照への対応が必要)
# アップロードされた生ログを解析するLambda
AnalyzeLogFunc:
Type: "AWS::Serverless::Function"
Properties:
FunctionName: AnalyzeLog
Description: "アップロードされたログをフォーマット 及び 解析する"
Runtime: python3.6
Handler: index.analyze_log
CodeUri: ./src
MemorySize: 128
Timeout: 60
# 直接パラメータストアから取得する為、コメントアウト
#Environment:
# Variables:
# ALLOW_HOSTS: !GetAtt LogPutAllowHosts.Value
# ANALYZED_LOG_BUCKET: !GetAtt AnalyzedlogBucketName.Value
# SERVER_LOG_BUCKET: !GetAtt ServerlogBucketName.Value
Role: !GetAtt AnalyzeLogFuncRole.Arn
# アップロードされた生ログを解析するLambda用のロール
AnalyzeLogFuncRole:
Type: "AWS::IAM::Role"
Properties:
RoleName: AnalyzeLogFuncRole
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service: "lambda.amazonaws.com"
Action: "sts:AssumeRole"
Policies:
- PolicyName: "AnalyzeLogFuncPolicy"
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
- ssm:GetParameters # パラメータストアから値を取得する為の権限
- ssm:GetParameter # パラメータストアから値を取得する為の権限
- s3:GetObject # 生ログ格納用のs3バケットからログをgetする為の権限を付与しておく
- s3:PutObject # 解析済ログ格納用のs3バケットにログをputする為の権限を付与しておく
Resource: "*" # TODO: 対象のバケットを絞る(循環参照への対応が必要)
# サーバログ解析用Lambdaへのアクセス許可
AnalyzeLogFuncPermission:
Type: "AWS::Lambda::Permission"
Properties:
Action: "lambda:InvokeFunction"
FunctionName: !GetAtt AnalyzeLogFunc.Arn
Principal: "s3.amazonaws.com"
SourceArn: !Join
- ""
- - "arn:aws:s3:::"
- !GetAtt ServerlogBucketName.Value
# サーバ生ログ格納用バケット
ServerlogBucket:
Type: AWS::S3::Bucket
#DeletionPolicy: Retain # スタック削除時にバケットを削除しない
DependsOn:
- AnalyzeLogFuncPermission
Properties:
BucketName: !GetAtt ServerlogBucketName.Value
# 生ログがputされた時に自動的にログ解析用Lambdaを実行する
NotificationConfiguration:
LambdaConfigurations:
- Event: "s3:ObjectCreated:Put"
Function: !GetAtt AnalyzeLogFunc.Arn
Filter:
S3Key:
Rules:
- Name: suffix
Value: log
# 解析済みログ格納用バケット
AnalyzedlogBucket:
Type: AWS::S3::Bucket
#DeletionPolicy: Retain # スタック削除時にバケットを削除しない
Properties:
BucketName: !GetAtt AnalyzedlogBucketName.Value
Outputs:
# 署名付きURL生成用のAPIエンドポイントをエクスポートしておく
GenerateSignedUrl:
Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/"
Export:
Name: GenerateSignedUrl
awsbuild.sh †※ CloudFormation実行用のシェル
#!/bin/bash
CURRENT_DIR=`dirname $0`
MODE=$1
STAGE=dev
REGION=
FORCE=
while [ "$1" != "" ]; do
if [ "$1" == "--region" ]; then
shift
REGION=$1
fi
if [ "$1" == "--env" ]; then
shift
STAGE=$1
fi
if [ "$1" == "--force" ]; then
FORCE=y
fi
shift
done
# テンプレートファイル
TEMPLATE=template.yml
# スタック名 ( gitプロジェクトの場合はgitリポジトリ名、それ以外の場合はフォルダ名をスタック名とする )
cd $CURRENT_DIR
STACK_NAME=`basename \`pwd\``
if [ -e ".git" ]; then
STACK_NAME=`cat .git/config | grep url | head -1 | awk -F"/" '{print $NF}'`
fi
STACK_NAME=`echo $STACK_NAME | sed 's/_/-/g' | sed 's/Repo$/Stack/'`
STACK_NAME=${STACK_NAME}-${STAGE}
# アカウントIDの取得
ACCOUNT_ID=`aws sts get-caller-identity | grep Account | awk '{print $2}' | sed -e "s/[^0-9]//g"`
# リージョン指定
if [ "${REGION}" == "" ]; then
REGION=`aws configure list | grep "region" | awk '{print $2}'`
fi
REGION_PARAM="--region ${REGION}"
# スタック作成時のイベント確認
if [ "${MODE}" == "events" ]; then
echo "Display events of Stack: ${STACK_NAME}"
echo `date "+%Y-%m-%d %H:%M:%S"`" - START"
aws cloudformation describe-stack-events --region $REGION --stack-name $STACK_NAME
echo `date "+%Y-%m-%d %H:%M:%S"`" - END"
exit 0
fi
# 削除
if [ "${MODE}" == "delete" ]; then
if [ "$FORCE" != "y" ] && [ "$STAGE" == "prod" ]; then
echo ""
read -p "Delete Stack in production environment? (y/n): " yn
if [ "$yn" != "y" ]; then
echo "\nDelete Stack Canceled."
echo ""
exit 0
fi
fi
echo "Delete Stack: ${STACK_NAME}"
echo `date "+%Y-%m-%d %H:%M:%S"`" - START"
aws cloudformation delete-stack --region $REGION --stack-name $STACK_NAME
aws cloudformation wait stack-delete-complete --region $REGION --stack-name $STACK_NAME
echo `date "+%Y-%m-%d %H:%M:%S"`" - END"
exit 0
fi
# 登録/更新
if [ "${MODE}" == "deploy" ]; then
if [ "$FORCE" != "y" ] && [ "$STAGE" == "prod" ]; then
echo ""
read -p "Create/Update Stack in production environment? (y/n): " yn
if [ "$yn" != "y" ]; then
echo "Create/Update Stack Canceled."
echo ""
exit 0
fi
fi
echo "Create/Update Stack: ${STACK_NAME}"
echo `date "+%Y-%m-%d %H:%M:%S"`" - START"
# S3バケットがない場合は作る(バケット名は世界で唯一である必要がある為、末尾にアカウントID等を付与しておく)
#BUCKET_NAME=stack-${STACK_NAME}-${ACCOUNT_ID}
BUCKET_NAME=cf-templates-${REGION}-${ACCOUNT_ID}
BUCKET_COUNT=`aws s3api list-buckets --region $REGION | grep -e "\"${BUCKET_NAME}\"" | wc -l | awk '{print $1}'`
if [ "${BUCKET_COUNT}" == "0" ]; then
echo Create s3 bucket: ${BUCKET_NAME}
if [ "${REGION}" == "us-east-1" ]; then
# es-east-1 の場合は LocationConstraint の指定なしで作成
aws s3api create-bucket --region $REGION --bucket $BUCKET_NAME
else
aws s3api create-bucket --region $REGION --create-bucket-configuration "{\"LocationConstraint\": \"${REGION}\"}" --bucket $BUCKET_NAME
fi
fi
# 検証&パッケージング&デプロイ(成功時は作成したAPIのURIを表示する)
#aws cloudformation validate-template --template-body file://${TEMPLATE} \
aws cloudformation package --region $REGION --template-file $TEMPLATE --s3-bucket $BUCKET_NAME --output-template-file packaged-template.yml \
&& aws cloudformation deploy --region $REGION --template-file packaged-template.yml --stack-name $STACK_NAME --parameter-overrides StageName="$STAGE" --capabilities CAPABILITY_IAM CAPABILITY_NAMED_IAM CAPABILITY_AUTO_EXPAND \
&& echo "" \
&& echo "### Exported Value ###" \
&& aws cloudformation describe-stacks --region $REGION --stack-name $STACK_NAME \
| awk 'BEGIN{key=""}{ if ($1 == "\"OutputKey\":") key=$2; if ($1 == "\"OutputValue\":") print key" : "$2 }' \
| sed 's/[",]//g' \
&& echo "######################/" \
&& echo ""
echo `date "+%Y-%m-%d %H:%M:%S"`" - END"
exit 0
fi
echo "Usage)"
echo ""
echo " ${0} (deploy|delete|events) [--region regionName] [--env envName] [--force]"
echo ""
echo "Example)"
echo ""
echo " # create or update stack named '${STACK_NAME}'"
echo " ${0} deploy --env prod"
echo " ${0} deploy --env prod --region us-east-1"
echo ""
echo " # delete stack named '${STACK_NAME}'"
echo " ${0} delete --env prod"
echo " ${0} delete --env prod --region us-east-1"
echo ""
echo " # display events details of create or update or delete stack of '${STACK_NAME}'"
echo " ${0} events --env prod"
echo " ${0} events --env prod --region us-east-1"
echo ""
echo "Details)"
echo ""
echo " StackName ... The stack name will be the git repository name or folder name"
echo ""
※ 関連 AWS CloudFormationメモ デプロイ †./awsbuild.sh deploy Uploading to XXXXXXXXXXXXXXXXXXXXXX 123456 / 123456.0 (100.00%) Successfully packaged artifacts and wrote output template to file packaged-template.yml. Execute the following command to deploy the packaged template aws cloudformation deploy --template-file /path_to/packaged-template.yml --stack-name オンプレミスのサーバ側のバッチ処理 †log2aws.sh †#!/bin/bash
gen_url=上記のデプロイ時に表示された GenerateSignedUrlの値
server=`hostname`
log_dir=ログディレクトリのPATH
date=`date --date '+1day ago' +%Y-%m-%d` # 前日のログが対象
# 署名付きURL取得
signed_url=`curl -s -L ${gen_url}?server=${server}\&date=${date}`
if [[ "$signed_url" =~ ^http(s|)://.+$ ]]; then
# アップロード
curl -s -L -D - -X PUT --upload-file ${log_dir}/${date}.log $signed_url >/dev/null 2>&1
else
echo get signed url error!.
echo $signed_url
fi
動作確認 †オンプレミスのサーバ側からバッチ実行 †./log2aws.sh AWS CLI が使用できる環境から、S3バケットに変換後のCSVが出力されているか確認 †aws s3 ls s3://バケット名 --recursive 補足 †収集するログ †当記事の本筋とは離れるが、収集するログの形式について、以下に記載する。 10分間隔で以下のコマンドで出力されたサーバのロードアベレージ等の情報 echo `date +%Y-%m-%d` `uptime` `vmstat | tail -1`>>$log ログイメージ 2019-08-01 00:00:01 up 45 days, 20:04, 1 user, load average: 25.06, 22.63, 21.70 124 10 493744 379918752 0 8428556 0 0 1852 156 0 0 26 12 60 2 0 2019-08-01 00:10:01 up 45 days, 20:14, 0 users, load average: 21.12, 22.57, 22.50 68 2 493744 379083296 0 8007348 0 0 1852 156 0 0 26 12 60 2 0 2019-08-01 00:20:01 up 45 days, 20:24, 0 users, load average: 22.43, 22.59, 22.38 68 7 493744 378277888 0 8262264 0 0 1852 156 0 0 26 12 60 2 0 : 2019-08-01 23:40:01 up 46 days, 19:44, 0 users, load average: 19.00, 19.48, 19.66 54 2 365536 376942464 0 9174176 0 0 1823 156 0 0 26 12 60 2 0 2019-08-01 23:50:01 up 46 days, 19:54, 0 users, load average: 20.41, 19.96, 19.70 25 0 365536 377214816 0 8972716 0 0 1822 156 0 0 26 12 60 2 0 CSV変換後のイメージ 2019-08-01 00:00:01,45days 20:04,1,25.06,22.63,21.70,124,10,493744,379918752,0,8428556,0,0,1852,156,0,0,26,12,60,2,0 2019-08-01 00:10:01,45days 20:14,0,21.12,22.57,22.50,68,2,493744,379083296,0,8007348,0,0,1852,156,0,0,26,12,60,2,0 2019-08-01 00:20:01,45days 20:24,0,22.43,22.59,22.38,68,7,493744,378277888,0,8262264,0,0,1852,156,0,0,26,12,60,2,0 : 2019-08-01 23:40:01,46days 19:44,0,19.00,19.48,19.66,54,2,365536,376942464,0,9174176,0,0,1823,156,0,0,26,12,60,2,0 2019-08-01 23:50:01,46days 19:54,0,20.41,19.96,19.70,25,0,365536,377214816,0,8972716,0,0,1822,156,0,0,26,12,60,2,0 |