#author("2019-12-18T13:26:29+00:00","","") [[Python]] > * pythonでWebサーバを書いてみる [#ff21fcce] #setlinebreak(on); #contents -- 関連 --- [[pythonでmarkdownを表示するシンプルなサーバを書く]] --- [[Python覚え書き]] --- [[Pythonのインストール]] --- [[Python のパッケージング]] --- [[PythonでAWS DynamoDBのCRUDを書いてみる]] --- [[Pythonのstrptimeが遅い]] --- [[Pythonのチューニング]] --- [[Pythonのパフォーマンス確認]] --- [[FizzBuzzで頭の体操]] --- [[pytest入門]] --- [[OpenSSLで電子署名の生成と署名検証]] --- [[pythonでWebサーバを書いてみる]] -- 参考 --- https://docs.python.jp/3/library/socketserver.html --- https://docs.python.jp/3/library/http.server.html --- https://docs.python.jp/3/library/urllib.error.html#urllib.error.HTTPError ** socketserver を使って簡単なサーバを書いて見る [#iee0e631] #mycode2(){{ # coding: utf-8 from http.server import BaseHTTPRequestHandler import json import socketserver import threading import urllib.request from urllib.error import HTTPError class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): pass class MyHTTPRequestHandler(BaseHTTPRequestHandler): def do_POST(self): self.do_PROCESS() def do_PUT(self): self.do_PROCESS() def do_GET(self): self.do_PROCESS() def do_DELETE(self): self.do_PROCESS() def do_PROCESS(self): print(f'method: {self.command}') print(f'path: {self.path}') # TODO: # 正常ステータス status = "200" body = '{"result": true}' # エラーステータス #status = "403" #body = '{"message": "error!"}' if status[0] != '2': self.send_response(int(status), "Error") else: self.send_response(int(status)) self.send_header('Content-Type', 'application/json') self.send_header('Content-Length', len(body.encode('utf-8'))) self.end_headers() self.wfile.write(body.encode('utf-8')) if __name__ == "__main__": #HOST, PORT = "localhost", 8000 HOST, PORT = "localhost", 0 # ポート=0の時は開いてるポートを使用する socketserver.TCPServer.allow_reuse_address = True # ポートの再利用を許可する(コレがないとしばらくの間は同じポートを利用できない) server = ThreadedTCPServer((HOST, PORT), MyHTTPRequestHandler) # 永続起動 #server.serve_forever() # 非同期で起動して動作確認(テスト)後に落とす with server: ip, port = server.server_address server_thread = threading.Thread(target=server.serve_forever) server_thread.daemon = True server_thread.start() print("Web server running in thread:", server_thread.name) # 動作確認 try: url = f"http://localhost:{port}/post_ok" data = {'var1': 'abcedfg'} headers = {'Content-Type': 'application/json'} req = urllib.request.Request(url, json.dumps(data).encode(), headers, method='POST') with urllib.request.urlopen(req) as res: print(f'status: {res.status}') print(f'headers : {res.getheaders()}') print(f'body: {res.read()}') except HTTPError as e: body = e.read() headers = e.getheaders() print(f'status: {e.code}') print(f'headers : {headers}') print(f'body: {body}') except Exception as ex: print('error!!') print(ex) server.shutdown() server.socket.close() print("Web server stop") }}