目次 †
概要 †オンプレミスで稼働しているサーバのログをS3に収集してCSVに変換する所までの仕組みを構築する。 処理の流れ †
イメージ †AWS側の処理作成 †ファイル/フォルダ構成 †/collectlog2aws Lambda本体(メインハンドラ) †src/index.py from collectlog2aws import genurl, log2csv def generate_url(event, context): """ 署名付きURLを発行する. """ return genurl.handler(event, context) def analyze_log(event, context): """ PUTされたログを解析する. """ return log2csv.handler(event, context) 共通処理 †src/collectlog2aws/__init__.py import boto3 client = boto3.client('ssm') # 署名付きURLの生成を許可するホスト ALLOW_HOSTS_TEXT = client.get_parameter(Name='LogPutAllowHosts', WithDecryption=False)["Parameter"]["Value"] ALLOW_HOSTS = [x.strip() for x in ALLOW_HOSTS_TEXT.split(",")] # 署名付きURLで許可する操作など ALLOW_CLIENT_METHOD = "put_object" ALLOW_HTTP_METHOD = "PUT" # バケット名 SERVER_LOG_BUCKET = client.get_parameter(Name='ServerlogBucketName' , WithDecryption=False)["Parameter"]["Value"] ANALYZED_LOG_BUCKET = client.get_parameter(Name='AnalyzedlogBucketName', WithDecryption=False)["Parameter"]["Value"] def make_object_key(target_server, target_date, kind): """ オブジェクトキーを生成する. """ # 大量アクセスが想定される場合は4文字程度のプレフィックスを付与しておく #object_key_prefix = base64.b64encode(bytes(target_server + target_date[0:6], "utf-8")).decode()[0:4] #return f'{object_key_prefix}/{target_server}/{target_date}.{kind}' return f'{target_server}/{target_date}.{kind}' def make_bad_response(status=400, message="Bad Request"): """ エラー時のレスポンスデータを作成する. """ response_body = message response_type = "plain/text" if isinstance(response_body, (list, dict)): response_body = json.dumps(response_body) response_type = "application/json" return { "statusCode": status, "headers": {"Content-Type": response_type}, "body": response_body } 署名付きURLの生成のLambda作成 †src/collectlog2aws/genurl.py """ サーバの生ログをアップロードする為の署名付きURLを生成する. """ import boto3 import json import base64 import re from collectlog2aws import * def handler(event, context): """ メイン処理. """ # リクエスト情報のチェック if not check_request(event): return make_bad_response() # サーバ名、ログ日付の取得 req_body = event['body'] if 'body' in event else {} if 'queryStringParameters' in event and event['queryStringParameters']: req_body = event['queryStringParameters'] target_server = req_body["server"] if 'server' in req_body else None target_date = req_body["date"] if 'date' in req_body else None # 許可するバケット名、オブジェクトキー、操作など bucket = SERVER_LOG_BUCKET object_key = make_object_key(target_server, target_date, "log") # 署名付きURLを生成する url = boto3.client('s3').generate_presigned_url( ClientMethod = ALLOW_CLIENT_METHOD, Params = {'Bucket' : bucket, 'Key' : object_key}, ExpiresIn = 300, HttpMethod = ALLOW_HTTP_METHOD ) # TODO: URLの暗号化など encrypted_url = url # レスポンスの組み立て return { "statusCode": 200, "headers": {"Content-Type": "plain/text"}, "body": encrypted_url } def check_request(event): """ リクエスト情報のチェック. """ req_body = event['body'] if 'body' in event else {} if 'queryStringParameters' in event and event['queryStringParameters']: req_body = event['queryStringParameters'] print(json.dumps(event)) # リクエスト元のIPアドレスチェック等 if event["requestContext"]["identity"]["sourceIp"] not in ALLOW_HOSTS: print("sourceIp error!") return False # サーバ、日付のチェック target_server = req_body["server"] if 'server' in req_body else None target_date = req_body["date"] if 'date' in req_body else None if target_server is None or target_date is None: # サーバ、日付が未指定の時はエラー print("server empty!") return False else: if len(re.sub("[^0-9]", "", target_date)) != 8: # 日付形式チェック print("date error!") return False #else: # TODO: 日付の妥当性チェックなど # return False return True CSVに変換するLambda作成 †src/collectlog2aws/log2csv.py "" S3バケットにPUTされたログを解析する. (1) logをcsvファイルに変換する. (2) 対象日付のサーバ負荷グラフを作成する. """ import boto3 import json import base64 import re from collectlog2aws import * s3 = boto3.resource('s3') def handler(event, context): """ メイン処理. """ print(json.dumps(event)) # リクエスト情報のチェック if not check_request(event): return make_bad_response() # バケット、オブジェクトキーの取得 log_bucket = event["Records"][0]["s3"]["bucket"]["name"] log_key = event["Records"][0]["s3"]["object"]["key"] # ログファイルをS3から取得 s3obj = s3.Object(log_bucket, log_key) log_text = s3obj.get()['Body'].read().decode("utf-8") # ログの取得 及び CSV変換 csv_text = to_csv_text(log_text) # CSV変換後の結果出力 csv_bucket = ANALYZED_LOG_BUCKET csv_key = re.sub("log$", "csv", log_key) s3obj = s3.Object(csv_bucket, csv_key) res = s3obj.put(Body = csv_text.encode("utf-8")) ## TODO: サーバ負荷グラフの作成 # TODO response_body = { "event": event } return { "statusCode": 200, "headers": {"Content-Type": "application/json"}, "body": json.dumps(response_body) } def check_request(event): """ リクエスト情報のチェック. """ records = event.get("Records", []) if not isinstance(records, list) or len(records) == 0: return False host_ip = records[0].get("requestParameters", {}).get("sourceIPAddress", None) bucket = records[0].get("s3", {}).get("bucket", {}).get("name", None) key = records[0].get("s3", {}).get("object", {}).get("key", None) # リクエスト元のIPアドレスチェック等 if host_ip is None or host_ip not in ALLOW_HOSTS: print("sourceIp error!") return False # バケット名のチェック if bucket is None or host_ip not in ALLOW_HOSTS: return False # オブジェクトキーのチェック if key is None or host_ip not in ALLOW_HOSTS: return False return True def to_csv_text(log_text): """ ログ(テキスト)をCSV形式(テキスト)に変換する. """ # この辺はログフォーマットに合わせて適宜調整. log_text = re.sub("[ ]+", ",", re.sub(",", " ", re.sub(" min", "min", log_text))) lines = log_text.split("\n") rows = filter(lambda x: len(x) > 29, [[y.strip() for y in x.split(",")] for x in lines]) csv_header = ",".join(["datetime", "past-times", "users", "load-average01", "load-average05", "load-average15", "procs-r", "procs-b", "mem-swpd", "mem-free", "mem-buff", "mem-cache", "swap-in", "swap-out", "buff-in", "buff-out", "system-in", "system-cs", "cpu-us", "cpu-sy", "cpu-id", "cpu-wa", "cpu-st"]) csv_body = "\n".join([",".join([x[0] + " " + x[1], x[3] + x[4] + " " + x[5], x[6], x[10], x[11], x[12] ,x[13], x[14], x[15], x[16], x[17], x[18], x[19], x[20] ,x[21], x[22], x[23], x[24], x[25], x[26], x[27], x[28], x[29] ]) for x in rows ]) return csv_header + "\n" + csv_body + "\n" CloudFormationテンプレートの作成 †S3バケット 及び Lambdaを作成/デプロイする為の CloudFormationテンプレートを作成する template.yml †AWSTemplateFormatVersion: "2010-09-09" Transform: AWS::Serverless-2016-10-31 Description: "Stack for generate pre signed url and analyze log when put to s3." Parameters: # ログのPUTを許可するホスト(ローカルからのテスト用に一応パラメータオーバーライドできるようにしておく) ParamLogPutAllowHosts: Type: "String" Default: "xxx.xxx.xxx.xxx, xxx.xxx.xxx.xxx, xxx.xxx.xxx.xxx, xxx.xxx.xxx.xxx" Resources: # ログのPUTを許可するホスト(パラメータストアに格納) LogPutAllowHosts: Type: "AWS::SSM::Parameter" Properties: Name: "LogPutAllowHosts" Type: "String" Value: !Ref ParamLogPutAllowHosts # サーバの生ログ格納用バケット名(パラメータストアに格納) ServerlogBucketName: Type: "AWS::SSM::Parameter" Properties: Name: "ServerlogBucketName" Type: "String" Value: !Sub 'serverlogs-${AWS::AccountId}' # 変換後のCSV格納用バケット名(パラメータストアに格納) AnalyzedlogBucketName: Type: "AWS::SSM::Parameter" Properties: Name: "AnalyzedlogBucketName" Type: "String" Value: !Sub 'analyzedlogs-${AWS::AccountId}' # ログファイルアップロード用の署名付きURL生成するLambda GenerateSignedUrlFunc: Type: "AWS::Serverless::Function" Properties: FunctionName: GenerateSignedUrl Description: "サーバログ用のs3バケットの署名付きURLを生成する" Handler: index.generate_url Runtime: python3.6 CodeUri: ./src MemorySize: 128 Timeout: 60 # 直接パラメータストアから取得する為、コメントアウト #Environment: # Variables: # ALLOW_HOSTS: !GetAtt LogPutAllowHosts.Value # ANALYZED_LOG_BUCKET: !GetAtt AnalyzedlogBucketName.Value # SERVER_LOG_BUCKET: !GetAtt ServerlogBucketName.Value Events: GenUrlApi: Type: Api Properties: Path: / Method: get Role: !GetAtt GenerateSignedUrlFuncRole.Arn # ログファイルアップロード用の署名付きURLを生成するLambda用のロール GenerateSignedUrlFuncRole: Type: "AWS::IAM::Role" Properties: RoleName: GenerateSignedUrlFuncRole AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: "lambda.amazonaws.com" Action: "sts:AssumeRole" Policies: - PolicyName: "GenerateSignedUrlPolicy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents - ssm:GetParameters # パラメータストアから値を取得する為の権限 - ssm:GetParameter # パラメータストアから値を取得する為の権限 - s3:PutObject # 生ログ格納用のs3バケットにログをputする為の権限を付与しておく Resource: "*" # TODO: 対象のバケットを絞る(循環参照への対応が必要) # アップロードされた生ログを解析するLambda AnalyzeLogFunc: Type: "AWS::Serverless::Function" Properties: FunctionName: AnalyzeLog Description: "アップロードされたログをフォーマット 及び 解析する" Runtime: python3.6 Handler: index.analyze_log CodeUri: ./src MemorySize: 128 Timeout: 60 # 直接パラメータストアから取得する為、コメントアウト #Environment: # Variables: # ALLOW_HOSTS: !GetAtt LogPutAllowHosts.Value # ANALYZED_LOG_BUCKET: !GetAtt AnalyzedlogBucketName.Value # SERVER_LOG_BUCKET: !GetAtt ServerlogBucketName.Value Role: !GetAtt AnalyzeLogFuncRole.Arn # アップロードされた生ログを解析するLambda用のロール AnalyzeLogFuncRole: Type: "AWS::IAM::Role" Properties: RoleName: AnalyzeLogFuncRole AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: "lambda.amazonaws.com" Action: "sts:AssumeRole" Policies: - PolicyName: "AnalyzeLogFuncPolicy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents - ssm:GetParameters # パラメータストアから値を取得する為の権限 - ssm:GetParameter # パラメータストアから値を取得する為の権限 - s3:GetObject # 生ログ格納用のs3バケットからログをgetする為の権限を付与しておく - s3:PutObject # 解析済ログ格納用のs3バケットにログをputする為の権限を付与しておく Resource: "*" # TODO: 対象のバケットを絞る(循環参照への対応が必要) # サーバログ解析用Lambdaへのアクセス許可 AnalyzeLogFuncPermission: Type: "AWS::Lambda::Permission" Properties: Action: "lambda:InvokeFunction" FunctionName: !GetAtt AnalyzeLogFunc.Arn Principal: "s3.amazonaws.com" SourceArn: !Join - "" - - "arn:aws:s3:::" - !GetAtt ServerlogBucketName.Value # サーバ生ログ格納用バケット ServerlogBucket: Type: AWS::S3::Bucket #DeletionPolicy: Retain # スタック削除時にバケットを削除しない DependsOn: - AnalyzeLogFuncPermission Properties: BucketName: !GetAtt ServerlogBucketName.Value # 生ログがputされた時に自動的にログ解析用Lambdaを実行する NotificationConfiguration: LambdaConfigurations: - Event: "s3:ObjectCreated:Put" Function: !GetAtt AnalyzeLogFunc.Arn Filter: S3Key: Rules: - Name: suffix Value: log # 解析済みログ格納用バケット AnalyzedlogBucket: Type: AWS::S3::Bucket #DeletionPolicy: Retain # スタック削除時にバケットを削除しない Properties: BucketName: !GetAtt AnalyzedlogBucketName.Value Outputs: # 署名付きURL生成用のAPIエンドポイントをエクスポートしておく GenerateSignedUrl: Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/" Export: Name: GenerateSignedUrl awsbuild.sh †※ CloudFormation実行用のシェル
#!/bin/bash CURRENT_DIR=`dirname $0` MODE=$1 STAGE=dev REGION= FORCE= while [ "$1" != "" ]; do if [ "$1" == "--region" ]; then shift REGION=$1 fi if [ "$1" == "--env" ]; then shift STAGE=$1 fi if [ "$1" == "--force" ]; then FORCE=y fi shift done # テンプレートファイル TEMPLATE=template.yml # スタック名 ( gitプロジェクトの場合はgitリポジトリ名、それ以外の場合はフォルダ名をスタック名とする ) cd $CURRENT_DIR STACK_NAME=`basename \`pwd\`` if [ -e ".git" ]; then STACK_NAME=`cat .git/config | grep url | head -1 | awk -F"/" '{print $NF}'` fi STACK_NAME=`echo $STACK_NAME | sed 's/_/-/g' | sed 's/Repo$/Stack/'` STACK_NAME=${STACK_NAME}-${STAGE} # アカウントIDの取得 ACCOUNT_ID=`aws sts get-caller-identity | grep Account | awk '{print $2}' | sed -e "s/[^0-9]//g"` # リージョン指定 if [ "${REGION}" == "" ]; then REGION=`aws configure list | grep "region" | awk '{print $2}'` fi REGION_PARAM="--region ${REGION}" # スタック作成時のイベント確認 if [ "${MODE}" == "events" ]; then echo "Display events of Stack: ${STACK_NAME}" echo `date "+%Y-%m-%d %H:%M:%S"`" - START" aws cloudformation describe-stack-events --region $REGION --stack-name $STACK_NAME echo `date "+%Y-%m-%d %H:%M:%S"`" - END" exit 0 fi # 削除 if [ "${MODE}" == "delete" ]; then if [ "$FORCE" != "y" ] && [ "$STAGE" == "prod" ]; then echo "" read -p "Delete Stack in production environment? (y/n): " yn if [ "$yn" != "y" ]; then echo "\nDelete Stack Canceled." echo "" exit 0 fi fi echo "Delete Stack: ${STACK_NAME}" echo `date "+%Y-%m-%d %H:%M:%S"`" - START" aws cloudformation delete-stack --region $REGION --stack-name $STACK_NAME aws cloudformation wait stack-delete-complete --region $REGION --stack-name $STACK_NAME echo `date "+%Y-%m-%d %H:%M:%S"`" - END" exit 0 fi # 登録/更新 if [ "${MODE}" == "deploy" ]; then if [ "$FORCE" != "y" ] && [ "$STAGE" == "prod" ]; then echo "" read -p "Create/Update Stack in production environment? (y/n): " yn if [ "$yn" != "y" ]; then echo "Create/Update Stack Canceled." echo "" exit 0 fi fi echo "Create/Update Stack: ${STACK_NAME}" echo `date "+%Y-%m-%d %H:%M:%S"`" - START" # S3バケットがない場合は作る(バケット名は世界で唯一である必要がある為、末尾にアカウントID等を付与しておく) #BUCKET_NAME=stack-${STACK_NAME}-${ACCOUNT_ID} BUCKET_NAME=cf-templates-${REGION}-${ACCOUNT_ID} BUCKET_COUNT=`aws s3api list-buckets --region $REGION | grep -e "\"${BUCKET_NAME}\"" | wc -l | awk '{print $1}'` if [ "${BUCKET_COUNT}" == "0" ]; then echo Create s3 bucket: ${BUCKET_NAME} if [ "${REGION}" == "us-east-1" ]; then # es-east-1 の場合は LocationConstraint の指定なしで作成 aws s3api create-bucket --region $REGION --bucket $BUCKET_NAME else aws s3api create-bucket --region $REGION --create-bucket-configuration "{\"LocationConstraint\": \"${REGION}\"}" --bucket $BUCKET_NAME fi fi # 検証&パッケージング&デプロイ(成功時は作成したAPIのURIを表示する) #aws cloudformation validate-template --template-body file://${TEMPLATE} \ aws cloudformation package --region $REGION --template-file $TEMPLATE --s3-bucket $BUCKET_NAME --output-template-file packaged-template.yml \ && aws cloudformation deploy --region $REGION --template-file packaged-template.yml --stack-name $STACK_NAME --parameter-overrides StageName="$STAGE" --capabilities CAPABILITY_IAM CAPABILITY_NAMED_IAM CAPABILITY_AUTO_EXPAND \ && echo "" \ && echo "### Exported Value ###" \ && aws cloudformation describe-stacks --region $REGION --stack-name $STACK_NAME \ | awk 'BEGIN{key=""}{ if ($1 == "\"OutputKey\":") key=$2; if ($1 == "\"OutputValue\":") print key" : "$2 }' \ | sed 's/[",]//g' \ && echo "######################/" \ && echo "" echo `date "+%Y-%m-%d %H:%M:%S"`" - END" exit 0 fi echo "Usage)" echo "" echo " ${0} (deploy|delete|events) [--region regionName] [--env envName] [--force]" echo "" echo "Example)" echo "" echo " # create or update stack named '${STACK_NAME}'" echo " ${0} deploy --env prod" echo " ${0} deploy --env prod --region us-east-1" echo "" echo " # delete stack named '${STACK_NAME}'" echo " ${0} delete --env prod" echo " ${0} delete --env prod --region us-east-1" echo "" echo " # display events details of create or update or delete stack of '${STACK_NAME}'" echo " ${0} events --env prod" echo " ${0} events --env prod --region us-east-1" echo "" echo "Details)" echo "" echo " StackName ... The stack name will be the git repository name or folder name" echo "" ※ 関連 AWS CloudFormationメモ デプロイ †./awsbuild.sh deploy Uploading to XXXXXXXXXXXXXXXXXXXXXX 123456 / 123456.0 (100.00%) Successfully packaged artifacts and wrote output template to file packaged-template.yml. Execute the following command to deploy the packaged template aws cloudformation deploy --template-file /path_to/packaged-template.yml --stack-name オンプレミスのサーバ側のバッチ処理 †log2aws.sh †#!/bin/bash gen_url=上記のデプロイ時に表示された GenerateSignedUrlの値 server=`hostname` log_dir=ログディレクトリのPATH date=`date --date '+1day ago' +%Y-%m-%d` # 前日のログが対象 # 署名付きURL取得 signed_url=`curl -s -L ${gen_url}?server=${server}\&date=${date}` if [[ "$signed_url" =~ ^http(s|)://.+$ ]]; then # アップロード curl -s -L -D - -X PUT --upload-file ${log_dir}/${date}.log $signed_url >/dev/null 2>&1 else echo get signed url error!. echo $signed_url fi 動作確認 †オンプレミスのサーバ側からバッチ実行 †./log2aws.sh AWS CLI が使用できる環境から、S3バケットに変換後のCSVが出力されているか確認 †aws s3 ls s3://バケット名 --recursive 補足 †収集するログ †当記事の本筋とは離れるが、収集するログの形式について、以下に記載する。 10分間隔で以下のコマンドで出力されたサーバのロードアベレージ等の情報 echo `date +%Y-%m-%d` `uptime` `vmstat | tail -1`>>$log ログイメージ 2019-08-01 00:00:01 up 45 days, 20:04, 1 user, load average: 25.06, 22.63, 21.70 124 10 493744 379918752 0 8428556 0 0 1852 156 0 0 26 12 60 2 0 2019-08-01 00:10:01 up 45 days, 20:14, 0 users, load average: 21.12, 22.57, 22.50 68 2 493744 379083296 0 8007348 0 0 1852 156 0 0 26 12 60 2 0 2019-08-01 00:20:01 up 45 days, 20:24, 0 users, load average: 22.43, 22.59, 22.38 68 7 493744 378277888 0 8262264 0 0 1852 156 0 0 26 12 60 2 0 : 2019-08-01 23:40:01 up 46 days, 19:44, 0 users, load average: 19.00, 19.48, 19.66 54 2 365536 376942464 0 9174176 0 0 1823 156 0 0 26 12 60 2 0 2019-08-01 23:50:01 up 46 days, 19:54, 0 users, load average: 20.41, 19.96, 19.70 25 0 365536 377214816 0 8972716 0 0 1822 156 0 0 26 12 60 2 0 CSV変換後のイメージ 2019-08-01 00:00:01,45days 20:04,1,25.06,22.63,21.70,124,10,493744,379918752,0,8428556,0,0,1852,156,0,0,26,12,60,2,0 2019-08-01 00:10:01,45days 20:14,0,21.12,22.57,22.50,68,2,493744,379083296,0,8007348,0,0,1852,156,0,0,26,12,60,2,0 2019-08-01 00:20:01,45days 20:24,0,22.43,22.59,22.38,68,7,493744,378277888,0,8262264,0,0,1852,156,0,0,26,12,60,2,0 : 2019-08-01 23:40:01,46days 19:44,0,19.00,19.48,19.66,54,2,365536,376942464,0,9174176,0,0,1823,156,0,0,26,12,60,2,0 2019-08-01 23:50:01,46days 19:54,0,20.41,19.96,19.70,25,0,365536,377214816,0,8972716,0,0,1822,156,0,0,26,12,60,2,0 |