- 追加された行はこの色です。
- 削除された行はこの色です。
#author("2019-12-18T13:26:29+00:00","","")
[[Python]] >
* pythonでWebサーバを書いてみる [#ff21fcce]
#setlinebreak(on);
#contents
-- 関連
--- [[pythonでmarkdownを表示するシンプルなサーバを書く]]
--- [[pythonでmarkdownを表示するサーバを書く]]
--- [[Python覚え書き]]
--- [[Pythonのインストール]]
--- [[Python のパッケージング]]
--- [[PythonでAWS DynamoDBのCRUDを書いてみる]]
--- [[Pythonのstrptimeが遅い]]
--- [[Pythonのチューニング]]
--- [[Pythonのパフォーマンス確認]]
--- [[FizzBuzzで頭の体操]]
--- [[pytest入門]]
--- [[OpenSSLで電子署名の生成と署名検証]]
--- [[pythonでWebサーバを書いてみる]]
-- 参考
--- https://docs.python.jp/3/library/socketserver.html
--- https://docs.python.jp/3/library/http.server.html
--- https://docs.python.jp/3/library/urllib.error.html#urllib.error.HTTPError
** socketserver を使って簡単なサーバを書いて見る [#iee0e631]
#mycode2(){{
# coding: utf-8
from http.server import BaseHTTPRequestHandler
import json
import socketserver
import threading
import urllib.request
from urllib.error import HTTPError
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
class MyHTTPRequestHandler(BaseHTTPRequestHandler):
def do_POST(self):
self.do_PROCESS()
def do_PUT(self):
self.do_PROCESS()
def do_GET(self):
self.do_PROCESS()
def do_DELETE(self):
self.do_PROCESS()
def do_PROCESS(self):
print(f'method: {self.command}')
print(f'path: {self.path}')
# TODO:
# 正常ステータス
status = "200"
body = '{"result": true}'
# エラーステータス
#status = "403"
#body = '{"message": "error!"}'
if status[0] != '2':
self.send_response(int(status), "Error")
else:
self.send_response(int(status))
self.send_header('Content-Type', 'application/json')
self.send_header('Content-Length', len(body.encode('utf-8')))
self.end_headers()
self.wfile.write(body.encode('utf-8'))
if __name__ == "__main__":
#HOST, PORT = "localhost", 8000
HOST, PORT = "localhost", 0 # ポート=0の時は開いてるポートを使用する
socketserver.TCPServer.allow_reuse_address = True # ポートの再利用を許可する(コレがないとしばらくの間は同じポートを利用できない)
server = ThreadedTCPServer((HOST, PORT), MyHTTPRequestHandler)
# 永続起動
#server.serve_forever()
# 非同期で起動して動作確認(テスト)後に落とす
with server:
ip, port = server.server_address
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
print("Web server running in thread:", server_thread.name)
# 動作確認
try:
url = f"http://localhost:{port}/post_ok"
data = {'var1': 'abcedfg'}
headers = {'Content-Type': 'application/json'}
req = urllib.request.Request(url, json.dumps(data).encode(), headers, method='POST')
with urllib.request.urlopen(req) as res:
print(f'status: {res.status}')
print(f'headers : {res.getheaders()}')
print(f'body: {res.read()}')
except HTTPError as e:
body = e.read()
headers = e.getheaders()
print(f'status: {e.code}')
print(f'headers : {headers}')
print(f'body: {body}')
except Exception as ex:
print('error!!')
print(ex)
server.shutdown()
server.socket.close()
print("Web server stop")
}}