#author("2019-08-20T21:08:31+00:00","","") #mynavi(AWSメモ) #setlinebreak(on) * 目次 [#l73ce603] #contents - 関連 -- [[AWSメモ]] -- [[AWS CloudFormationメモ]] - 参考 -- cloudformationテンプレートリファレンス(S3::Bucket) https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-properties-s3-bucket.html -- boto3で署名付きURLを生成するサンプル https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-presigned-urls.html https://docs.aws.amazon.com/ja_jp/code-samples/latest/catalog/python-s3-generate_presigned_url.py.html -- CloudFormation で Lambda 関数をデプロイするときに循環依存を避けるにはどうすればよいですか? https://aws.amazon.com/jp/premiumsupport/knowledge-center/unable-validate-circular-dependency-cloudformation/ -- Amazon S3 から HTTP 403: Access Denied エラーをトラブルシューティングする方法 https://aws.amazon.com/jp/premiumsupport/knowledge-center/s3-troubleshoot-403/ * 概要 [#i870a94c] #html(<div style="padding-left:10px;">) オンプレミスで稼働しているサーバのログをS3に収集してCSVに変換する所までの仕組みを構築する。 収集自体には色々な方法があるが、当記事では署名付きURLを利用してアップロードする方法について記載する。 ※ログデータの解析自体は、本記事の主題ではない為、別記事にて記載する。 ※当記事では CloudFront は使用しない。 ** 処理の流れ [#n5fc8eb4] - 各サーバの日次バッチで署名付きURLを取得する - 取得した署名付きURLに対して対象のログをアップロードする - S3のPUT通知からLambdaを起動してログデータをCSVに変換する - 変換したログを別のS3バケットにアップロードする ** イメージ [#ja87db22] #ref(flow.png,nolink) #html(</div>) // 概要 * AWS側の処理作成 [#ib79308c] #html(<div style="padding-left:10px;">) ** ファイル/フォルダ構成 [#fb34f269] #html(<div style="padding-left:10px; display:inline-block; border: 1px solid #999; border-radius: 5px;">) #html(<div style="display:inline-block;">) /collectlog2aws /awsbuild.sh /template.yml /src /index.py /collectlog2aws /__init__.py /genurl.py /log2csv.py #html(</div>) #html(<div style="display:inline-block; padding: 0 30px; vertical-align: top;">) ・・・ cloudformationデプロイ用シェル ・・・ cloudformationテンプレート ・・・ Lambdaソース格納用 ・・・ Lambda本体(メインハンドラ) ・・・ 一応パッケージ化しておく ・・・ パッケージの共通設定/共通処理など ・・・ 署名付きURL生成処理 ・・・ CSV変換処理 #html(</div>) #html(</div>) // ファイル/フォルダ構成 ** Lambda本体(メインハンドラ) [#h4da5efa] #html(<div style="padding-left:10px;">) src/index.py #mycode2(){{ from collectlog2aws import genurl, log2csv def generate_url(event, context): """ 署名付きURLを発行する. """ return genurl.handler(event, context) def analyze_log(event, context): """ PUTされたログを解析する. """ return log2csv.handler(event, context) }} #html(</div>) // Lambda本体(メインハンドラ) ** 共通処理 [#w7f373e6] #html(<div style="padding-left:10px;">) src/collectlog2aws/__init__.py #mycode2(){{ import boto3 client = boto3.client('ssm') # 署名付きURLの生成を許可するホスト ALLOW_HOSTS_TEXT = client.get_parameter(Name='LogPutAllowHosts', WithDecryption=False)["Parameter"]["Value"] ALLOW_HOSTS = [x.strip() for x in ALLOW_HOSTS_TEXT.split(",")] # 署名付きURLで許可する操作など ALLOW_CLIENT_METHOD = "put_object" ALLOW_HTTP_METHOD = "PUT" # バケット名 SERVER_LOG_BUCKET = client.get_parameter(Name='ServerlogBucketName' , WithDecryption=False)["Parameter"]["Value"] ANALYZED_LOG_BUCKET = client.get_parameter(Name='AnalyzedlogBucketName', WithDecryption=False)["Parameter"]["Value"] def make_object_key(target_server, target_date, kind): """ オブジェクトキーを生成する. """ # 大量アクセスが想定される場合は4文字程度のプレフィックスを付与しておく #object_key_prefix = base64.b64encode(bytes(target_server + target_date[0:6], "utf-8")).decode()[0:4] #return f'{object_key_prefix}/{target_server}/{target_date}.{kind}' return f'{target_server}/{target_date}.{kind}' def make_bad_response(status=400, message="Bad Request"): """ エラー時のレスポンスデータを作成する. """ response_body = message response_type = "plain/text" if isinstance(response_body, (list, dict)): response_body = json.dumps(response_body) response_type = "application/json" return { "statusCode": status, "headers": {"Content-Type": response_type}, "body": response_body } }} #html(</div>) // 共通処理 ** 署名付きURLの生成のLambda作成 [#g95392ad] #html(<div style="padding-left:10px;">) src/collectlog2aws/genurl.py #mycode2(){{ """ サーバの生ログをアップロードする為の署名付きURLを生成する. """ import boto3 import json import base64 import re from collectlog2aws import * def handler(event, context): """ メイン処理. """ # リクエスト情報のチェック if not check_request(event): return make_bad_response() # サーバ名、ログ日付の取得 req_body = event['body'] if 'body' in event else {} if 'queryStringParameters' in event and event['queryStringParameters']: req_body = event['queryStringParameters'] target_server = req_body["server"] if 'server' in req_body else None target_date = req_body["date"] if 'date' in req_body else None # 許可するバケット名、オブジェクトキー、操作など bucket = SERVER_LOG_BUCKET object_key = make_object_key(target_server, target_date, "log") # 署名付きURLを生成する url = boto3.client('s3').generate_presigned_url( ClientMethod = ALLOW_CLIENT_METHOD, Params = {'Bucket' : bucket, 'Key' : object_key}, ExpiresIn = 300, HttpMethod = ALLOW_HTTP_METHOD ) # TODO: URLの暗号化など encrypted_url = url # レスポンスの組み立て return { "statusCode": 200, "headers": {"Content-Type": "plain/text"}, "body": encrypted_url } def check_request(event): """ リクエスト情報のチェック. """ req_body = event['body'] if 'body' in event else {} if 'queryStringParameters' in event and event['queryStringParameters']: req_body = event['queryStringParameters'] print(json.dumps(event)) # リクエスト元のIPアドレスチェック等 if event["requestContext"]["identity"]["sourceIp"] not in ALLOW_HOSTS: print("sourceIp error!") return False # サーバ、日付のチェック target_server = req_body["server"] if 'server' in req_body else None target_date = req_body["date"] if 'date' in req_body else None if target_server is None or target_date is None: # サーバ、日付が未指定の時はエラー print("server empty!") return False else: if len(re.sub("[^0-9]", "", target_date)) != 8: # 日付形式チェック print("date error!") return False #else: # TODO: 日付の妥当性チェックなど # return False return True }} #html(</div>) // 署名付きURLの生成のLambda ** CSVに変換するLambda作成 [#g9c4bf3e] #html(<div style="padding-left:10px;">) src/collectlog2aws/log2csv.py #mycode2(){{ "" S3バケットにPUTされたログを解析する. (1) logをcsvファイルに変換する. (2) 対象日付のサーバ負荷グラフを作成する. """ import boto3 import json import base64 import re from collectlog2aws import * s3 = boto3.resource('s3') def handler(event, context): """ メイン処理. """ print(json.dumps(event)) # リクエスト情報のチェック if not check_request(event): return make_bad_response() # バケット、オブジェクトキーの取得 log_bucket = event["Records"][0]["s3"]["bucket"]["name"] log_key = event["Records"][0]["s3"]["object"]["key"] # ログファイルをS3から取得 s3obj = s3.Object(log_bucket, log_key) log_text = s3obj.get()['Body'].read().decode("utf-8") # ログの取得 及び CSV変換 csv_text = to_csv_text(log_text) # CSV変換後の結果出力 csv_bucket = ANALYZED_LOG_BUCKET csv_key = re.sub("log$", "csv", log_key) s3obj = s3.Object(csv_bucket, csv_key) res = s3obj.put(Body = csv_text.encode("utf-8")) ## TODO: サーバ負荷グラフの作成 # TODO response_body = { "event": event } return { "statusCode": 200, "headers": {"Content-Type": "application/json"}, "body": json.dumps(response_body) } def check_request(event): """ リクエスト情報のチェック. """ records = event.get("Records", []) if not isinstance(records, list) or len(records) == 0: return False host_ip = records[0].get("requestParameters", {}).get("sourceIPAddress", None) bucket = records[0].get("s3", {}).get("bucket", {}).get("name", None) key = records[0].get("s3", {}).get("object", {}).get("key", None) # リクエスト元のIPアドレスチェック等 if host_ip is None or host_ip not in ALLOW_HOSTS: print("sourceIp error!") return False # バケット名のチェック if bucket is None or host_ip not in ALLOW_HOSTS: return False # オブジェクトキーのチェック if key is None or host_ip not in ALLOW_HOSTS: return False return True def to_csv_text(log_text): """ ログ(テキスト)をCSV形式(テキスト)に変換する. """ # この辺はログフォーマットに合わせて適宜調整. log_text = re.sub("[ ]+", ",", re.sub(",", " ", re.sub(" min", "min", log_text))) lines = log_text.split("\n") rows = filter(lambda x: len(x) > 29, [[y.strip() for y in x.split(",")] for x in lines]) csv_header = ",".join(["datetime", "past-times", "users", "load-average01", "load-average05", "load-average15", "procs-r", "procs-b", "mem-swpd", "mem-free", "mem-buff", "mem-cache", "swap-in", "swap-out", "buff-in", "buff-out", "system-in", "system-cs", "cpu-us", "cpu-sy", "cpu-id", "cpu-wa", "cpu-st"]) csv_body = "\n".join([",".join([x[0] + " " + x[1], x[3] + x[4] + " " + x[5], x[6], x[10], x[11], x[12] ,x[13], x[14], x[15], x[16], x[17], x[18], x[19], x[20] ,x[21], x[22], x[23], x[24], x[25], x[26], x[27], x[28], x[29] ]) for x in rows ]) return csv_header + "\n" + csv_body + "\n" }} #html(</div>) // CSVに変換するLambda ** CloudFormationテンプレートの作成 [#u3baacec] #html(<div style="padding-left:10px;">) S3バケット 及び Lambdaを作成/デプロイする為の CloudFormationテンプレートを作成する ※生ログ格納用S3バケットは、ログがPUTされたらCSV変換Lambdaが起動するように設定しておく ※署名付きURL生成用の処理はオンプレミス側のサーバから要求があった時に起動できるようにAPI Gateway 経由で起動する *** template.yml [#gf618b7d] #html(<div style="padding-left:10px;">) #mycode2(){{ AWSTemplateFormatVersion: "2010-09-09" Transform: AWS::Serverless-2016-10-31 Description: "Stack for generate pre signed url and analyze log when put to s3." Parameters: # ログのPUTを許可するホスト(ローカルからのテスト用に一応パラメータオーバーライドできるようにしておく) ParamLogPutAllowHosts: Type: "String" Default: "xxx.xxx.xxx.xxx, xxx.xxx.xxx.xxx, xxx.xxx.xxx.xxx, xxx.xxx.xxx.xxx" Resources: # ログのPUTを許可するホスト(パラメータストアに格納) LogPutAllowHosts: Type: "AWS::SSM::Parameter" Properties: Name: "LogPutAllowHosts" Type: "String" Value: !Ref ParamLogPutAllowHosts # サーバの生ログ格納用バケット名(パラメータストアに格納) ServerlogBucketName: Type: "AWS::SSM::Parameter" Properties: Name: "ServerlogBucketName" Type: "String" Value: !Sub 'serverlogs-${AWS::AccountId}' # 変換後のCSV格納用バケット名(パラメータストアに格納) AnalyzedlogBucketName: Type: "AWS::SSM::Parameter" Properties: Name: "AnalyzedlogBucketName" Type: "String" Value: !Sub 'analyzedlogs-${AWS::AccountId}' # ログファイルアップロード用の署名付きURL生成するLambda GenerateSignedUrlFunc: Type: "AWS::Serverless::Function" Properties: FunctionName: GenerateSignedUrl Description: "サーバログ用のs3バケットの署名付きURLを生成する" Handler: index.generate_url Runtime: python3.6 CodeUri: ./src MemorySize: 128 Timeout: 60 # 直接パラメータストアから取得する為、コメントアウト #Environment: # Variables: # ALLOW_HOSTS: !GetAtt LogPutAllowHosts.Value # ANALYZED_LOG_BUCKET: !GetAtt AnalyzedlogBucketName.Value # SERVER_LOG_BUCKET: !GetAtt ServerlogBucketName.Value Events: GenUrlApi: Type: Api Properties: Path: / Method: get Role: !GetAtt GenerateSignedUrlFuncRole.Arn # ログファイルアップロード用の署名付きURLを生成するLambda用のロール GenerateSignedUrlFuncRole: Type: "AWS::IAM::Role" Properties: RoleName: GenerateSignedUrlFuncRole AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: "lambda.amazonaws.com" Action: "sts:AssumeRole" Policies: - PolicyName: "GenerateSignedUrlPolicy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents - ssm:GetParameters # パラメータストアから値を取得する為の権限 - ssm:GetParameter # パラメータストアから値を取得する為の権限 - s3:PutObject # 生ログ格納用のs3バケットにログをputする為の権限を付与しておく Resource: "*" # TODO: 対象のバケットを絞る(循環参照への対応が必要) # アップロードされた生ログを解析するLambda AnalyzeLogFunc: Type: "AWS::Serverless::Function" Properties: FunctionName: AnalyzeLog Description: "アップロードされたログをフォーマット 及び 解析する" Runtime: python3.6 Handler: index.analyze_log CodeUri: ./src MemorySize: 128 Timeout: 60 # 直接パラメータストアから取得する為、コメントアウト #Environment: # Variables: # ALLOW_HOSTS: !GetAtt LogPutAllowHosts.Value # ANALYZED_LOG_BUCKET: !GetAtt AnalyzedlogBucketName.Value # SERVER_LOG_BUCKET: !GetAtt ServerlogBucketName.Value Role: !GetAtt AnalyzeLogFuncRole.Arn # アップロードされた生ログを解析するLambda用のロール AnalyzeLogFuncRole: Type: "AWS::IAM::Role" Properties: RoleName: AnalyzeLogFuncRole AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: "lambda.amazonaws.com" Action: "sts:AssumeRole" Policies: - PolicyName: "AnalyzeLogFuncPolicy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents - ssm:GetParameters # パラメータストアから値を取得する為の権限 - ssm:GetParameter # パラメータストアから値を取得する為の権限 - s3:GetObject # 生ログ格納用のs3バケットからログをgetする為の権限を付与しておく - s3:PutObject # 解析済ログ格納用のs3バケットにログをputする為の権限を付与しておく Resource: "*" # TODO: 対象のバケットを絞る(循環参照への対応が必要) # サーバログ解析用Lambdaへのアクセス許可 AnalyzeLogFuncPermission: Type: "AWS::Lambda::Permission" Properties: Action: "lambda:InvokeFunction" FunctionName: !GetAtt AnalyzeLogFunc.Arn Principal: "s3.amazonaws.com" SourceArn: !Join - "" - - "arn:aws:s3:::" - !GetAtt ServerlogBucketName.Value # サーバ生ログ格納用バケット ServerlogBucket: Type: AWS::S3::Bucket #DeletionPolicy: Retain # スタック削除時にバケットを削除しない DependsOn: - AnalyzeLogFuncPermission Properties: BucketName: !GetAtt ServerlogBucketName.Value # 生ログがputされた時に自動的にログ解析用Lambdaを実行する NotificationConfiguration: LambdaConfigurations: - Event: "s3:ObjectCreated:Put" Function: !GetAtt AnalyzeLogFunc.Arn Filter: S3Key: Rules: - Name: suffix Value: log # 解析済みログ格納用バケット AnalyzedlogBucket: Type: AWS::S3::Bucket #DeletionPolicy: Retain # スタック削除時にバケットを削除しない Properties: BucketName: !GetAtt AnalyzedlogBucketName.Value Outputs: # 署名付きURL生成用のAPIエンドポイントをエクスポートしておく GenerateSignedUrl: Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/" Export: Name: GenerateSignedUrl }} #html(</div>) *** awsbuild.sh [#mc96d937] #include(CloudFormation実行用のシェル,notitle,notitle) #html(</div>) // CloudFormationテンプレートの作成 ** デプロイ [#x24c0c20] #html(<div style="padding-left:10px;">) #myterm2(){{ ./awsbuild.sh deploy Uploading to XXXXXXXXXXXXXXXXXXXXXX 123456 / 123456.0 (100.00%) Successfully packaged artifacts and wrote output template to file packaged-template.yml. Execute the following command to deploy the packaged template aws cloudformation deploy --template-file /path_to/packaged-template.yml --stack-name <YOUR STACK NAME> Waiting for changeset to be created.. Waiting for stack create/update to complete Successfully created/updated stack - monitoring ### Exported Value ### GenerateSignedUrl : https://xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/Prod/ ######################/ }} #html(</div>) // デプロイ #html(</div>) // AWS側の処理作成 * オンプレミスのサーバ側のバッチ処理 [#we900a8a] #html(<div style="padding-left:10px;">) ** log2aws.sh [#u29a07b3] #mycode2(){{ #!/bin/bash gen_url=上記のデプロイ時に表示された GenerateSignedUrlの値 server=`hostname` log_dir=ログディレクトリのPATH date=`date --date '+1day ago' +%Y-%m-%d` # 前日のログが対象 # 署名付きURL取得 signed_url=`curl -s -L ${gen_url}?server=${server}\&date=${date}` if [[ "$signed_url" =~ ^http(s|)://.+$ ]]; then # アップロード curl -s -L -D - -X PUT --upload-file ${log_dir}/${date}.log $signed_url >/dev/null 2>&1 else echo get signed url error!. echo $signed_url fi }} #html(</div>) // オンプレミスのサーバ側のバッチ処理 * 動作確認 [#q0b4ad4c] #html(<div style="padding-left:10px;">) ** オンプレミスのサーバ側からバッチ実行 [#wf8fc2e1] #myterm2(){{ ./log2aws.sh }} ** AWS CLI が使用できる環境から、S3バケットに変換後のCSVが出力されているか確認 [#ydfe3b53] #myterm2(){{ aws s3 ls s3://バケット名 --recursive }} #html(</div>) * 補足 [#he42f2e0] #html(<div style="padding-left:10px;">) ** 収集するログ [#jc65c9e4] #html(<div style="padding-left:10px;">) 当記事の本筋とは離れるが、収集するログの形式について、以下に記載する。 10分間隔で以下のコマンドで出力されたサーバのロードアベレージ等の情報 #myterm2(){{ echo `date +%Y-%m-%d` `uptime` `vmstat | tail -1`>>$log }} ログイメージ #mycode2(){{ 2019-08-01 00:00:01 up 45 days, 20:04, 1 user, load average: 25.06, 22.63, 21.70 124 10 493744 379918752 0 8428556 0 0 1852 156 0 0 26 12 60 2 0 2019-08-01 00:10:01 up 45 days, 20:14, 0 users, load average: 21.12, 22.57, 22.50 68 2 493744 379083296 0 8007348 0 0 1852 156 0 0 26 12 60 2 0 2019-08-01 00:20:01 up 45 days, 20:24, 0 users, load average: 22.43, 22.59, 22.38 68 7 493744 378277888 0 8262264 0 0 1852 156 0 0 26 12 60 2 0 : 2019-08-01 23:40:01 up 46 days, 19:44, 0 users, load average: 19.00, 19.48, 19.66 54 2 365536 376942464 0 9174176 0 0 1823 156 0 0 26 12 60 2 0 2019-08-01 23:50:01 up 46 days, 19:54, 0 users, load average: 20.41, 19.96, 19.70 25 0 365536 377214816 0 8972716 0 0 1822 156 0 0 26 12 60 2 0 }} CSV変換後のイメージ #mycode2(){{ 2019-08-01 00:00:01,45days 20:04,1,25.06,22.63,21.70,124,10,493744,379918752,0,8428556,0,0,1852,156,0,0,26,12,60,2,0 2019-08-01 00:10:01,45days 20:14,0,21.12,22.57,22.50,68,2,493744,379083296,0,8007348,0,0,1852,156,0,0,26,12,60,2,0 2019-08-01 00:20:01,45days 20:24,0,22.43,22.59,22.38,68,7,493744,378277888,0,8262264,0,0,1852,156,0,0,26,12,60,2,0 : 2019-08-01 23:40:01,46days 19:44,0,19.00,19.48,19.66,54,2,365536,376942464,0,9174176,0,0,1823,156,0,0,26,12,60,2,0 2019-08-01 23:50:01,46days 19:54,0,20.41,19.96,19.70,25,0,365536,377214816,0,8972716,0,0,1822,156,0,0,26,12,60,2,0 }} #html(</div>) // 収集するログ #html(</div>) // 補足