|
2025-05-29
2025-05-31
目次 †概要 †Python スクリプトを書く感覚で、対話的な Web アプリケーションを簡単に作成できる(※)フレームワーク。 [補足] インストール †pip install streamlit 起動 †以下で、デフォルトで組み込まれているサンプルが起動出来る。 streamlit hello 任意のファイルを指定して起動。 streamlit run sample.py 任意のインターネット上の任意のファイルを指定して起動する事も可能 streamlit run https://raw.githubusercontent.com/streamlit/demo-uber-nyc-pickups/master/streamlit_app.py streamlit run の主なオプション †
config.toml でも指定可能。 基本的な使用方法 †UI部品使用して画面を構成する。 import streamlit as st
# テキスト入力BOXの表示
input_text = st.number_text('Input text')
# 入力されたテキストの表示
st.write('input_text: ', input_text)
UIパーツの例 †https://docs.streamlit.io/develop/api-reference/widgets
[所感] ナビゲーション †st.navigation を使用してページ切り替え用のUI等を構築可能。 例) import streamlit as st
def root():
st.title("TOPページ")
def page1():
st.title("ページ1")
def page2():
st.title("ページ2")
#------------------------------------------
# サイドバー(ナビゲーションメニュー)
#------------------------------------------
pg = st.navigation([
st.Page(root, title="TOP page"),
st.Page(page1, title="First page"),
st.Page(page2, title="Second page"),
st.Page("sample_ui.py", title="sample UI"),
])
pg.run()
他レイアウトなど †レイアウト用のUIが用意されている。
[所感] 例) ダイアログの使用 †import streamlit as st
@st.dialog("ダイアログタイトル")
def mydialog(arg):
text = st.text_input("何か入力して下さい")
if st.button("Submit"):
st.session_state.mydialog = {"arg": arg, "text": text}
st.rerun()
if "mydialog" not in st.session_state:
if st.button("ダイアログを表示する"):
mydialog("ダイアログ引数")
else:
f"\"{st.session_state.mydialog['text']}\" が入力されました"
入力データの取得 †以下のいずれかの方法で入力データを取得する事が出来る。 関数戻り値から取得する text1 = st.text_input("text input", key="text1")
st.write(f"text1 に {text1} が入力されました")
st.session_state から取得する st.text_input("text input", key="text1")
text1 = st.session_state["text1"] or ""
st.write(f"text1 に {text1} が入力されました")
テレメトリ収集のOFF †デフォルトのままだとテレメトリ収集用に webhooks.fivetran.com にリクエストが飛びまくる。 ![]() OFFにするには設定ファイルを以下のように修正する。 [browser] gatherUsageStats = true もしくは、streamlit を引数付きで起動する。 streamlit run --browser.gatherUsageStats false app.py 参考
認証 †認証機能用の関数が用意されている。 例) app.py import streamlit as st
if not st.user.is_logged_in:
if st.button("Log in"):
st.login()
st.stop()
def top():
st.write("TOPページ")
def page1():
st.title("ページ1")
def page2():
st.title("ページ2")
pg = st.navigation([
st.Page(top, title="First page"),
st.Page(page1, title="First page"),
st.Page(page2, title="Second page"),
st.Page(lambda: st.logout(), title="ログアウト"),
])
pg.run()
Microsoft Entra(Azure AD)を使用する場合 †secrets.toml [auth]
redirect_uri = "http://localhost:8501/oauth2callback"
cookie_secret = "cookie暗号化用のシークレット"
client_id = "クライアントID"
client_secret = "シークレット"
server_metadata_url = "https://login.microsoftonline.com/{テナントID}/v2.0/.well-known/openid-configuration"
複数のプロバイダを利用する場合 †secrets.toml [auth]
redirect_uri = "http://localhost:8501/oauth2callback"
cookie_secret = "cookie暗号化用のシークレット"
[auth.microsoft]
client_id = "クライアントID"
client_secret = "シークレット"
server_metadata_url = "https://login.microsoftonline.com/{テナントID}/v2.0/.well-known/openid-configuration"
# テスト用のローカルプロバイダ(後述)
[auth.myauth]
client_id = "client1"
client_secret = "your-jwt-secret"
server_metadata_url = "http://localhost:8000/.well-known/openid-configuration"
app.py if not st.user.is_logged_in:
if st.button("Microsoft Entraでログイン"):
st.login("microsoft")
if st.button("テスト用プロバイダでログイン"):
st.login("myauth")
st.stop()
:
起動 streamlit run --secrets.files secrets.toml --browser.gatherUsageStats false app.py テスト用のOpenID Connect認証サーバ †あくまでも動作確認用の為、いろいろと簡略化。 #!/bin/bash
if [ ! .venv ]; then
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
else
source .venv/bin/activate
fi
uvicorn main:app --reload
deactivate
annotated-types==0.7.0 anyio==4.9.0 Authlib==1.6.0 certifi==2025.4.26 cffi==1.17.1 click==8.2.1 cryptography==45.0.3 fastapi==0.115.12 h11==0.16.0 httpcore==1.0.9 httpx==0.28.1 idna==3.10 itsdangerous==2.2.0 pycparser==2.22 pydantic==2.11.5 pydantic_core==2.33.2 PyJWT==2.10.1 python-dotenv==1.1.0 python-multipart==0.0.20 sniffio==1.3.1 SQLAlchemy==2.0.41 starlette==0.46.2 typing-inspection==0.4.1 typing_extensions==4.13.2 uvicorn==0.34.2 import jwt
import os
import time
from fastapi import FastAPI, Form, Header, Request
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from starlette.responses import PlainTextResponse
from starlette.middleware.sessions import SessionMiddleware
from dotenv import load_dotenv
from authlib.jose import JsonWebKey
from uuid import uuid4
load_dotenv()
# RSA鍵とJWKセットの生成
# TODO: 複数のJWKのサポート
# TODO: キャッシュ 及び 一定期間ごとの更新
JWK_KEY = JsonWebKey.generate_key("RSA", 2048, is_private=True)
PRIVATE_KEY_PEM = JWK_KEY.as_pem(is_private=True)
JWKS = {"keys": [JWK_KEY.as_dict(is_private=False)]}
# テスト用のユーザー・クライアント・認可コード管理
# TODO: DB化
USERS = {
"user1": {"sub": "user1", "name": "ユーザー1",
"email": "user1@example.com", "password": "test123"}
}
CLIENTS = {
"client1": {
"client_id": "client1",
"client_secret": "your-jwt-secret",
"redirect_uris": ["http://localhost:8501/oauth2callback"],
}
}
AUTHORIZATION_CODES = {}
app = FastAPI()
# セッション用のシークレットキー
app.add_middleware(SessionMiddleware, secret_key=os.getenv("SESSION_SECRET"))
# OpenID Connect Discoveryエンドポイント
@app.get("/.well-known/openid-configuration")
async def openid_configuration(request: Request):
issuer = str(request.base_url).rstrip("/")
return {
"issuer": issuer,
"authorization_endpoint": f"{issuer}/authorize",
"token_endpoint": f"{issuer}/token",
"userinfo_endpoint": f"{issuer}/userinfo",
"jwks_uri": f"{issuer}/jwks",
"response_types_supported": ["code", "id_token", "token id_token"],
"subject_types_supported": ["public"],
"id_token_signing_alg_values_supported": ["RS256"],
"scopes_supported": ["openid", "profile", "email"],
"token_endpoint_auth_methods_supported": ["client_secret_basic"],
"claims_supported": ["sub", "name", "email"],
}
@app.get("/authorize")
async def authorize(
response_type: str,
client_id: str,
redirect_uri: str,
scope: str,
state: str = "",
nonce: str = "",
):
# クライアント検証
client = CLIENTS.get(client_id)
if not client or redirect_uri not in client["redirect_uris"]:
return JSONResponse({"error": "invalid_client"}, status_code=400)
# OAuth認証画面(ログイン+同意)
html = f"""
<form method="post" action="/authorize">
<input type="hidden" name="response_type" value="{response_type}">
<input type="hidden" name="client_id" value="{client_id}">
<input type="hidden" name="redirect_uri" value="{redirect_uri}">
<input type="hidden" name="scope" value="{scope}">
<input type="hidden" name="state" value="{state}">
<input type="hidden" name="nonce" value="{nonce}">
<h2>OAuth認証リクエスト</h2>
<p>クライアント: <b>{client_id}</b></p>
<p>リクエストされたスコープ: <b>{scope}</b></p>
<label>ユーザー名: <input name="username"></label><br>
<label>パスワード: <input name="password" type="password"></label><br>
<button type="submit" name="action" value="approve">許可</button>
<button type="submit" name="action" value="deny">拒否</button>
</form>
"""
return HTMLResponse(content=html)
@app.post("/authorize")
async def authorize_post(
response_type: str = Form(...),
client_id: str = Form(...),
redirect_uri: str = Form(...),
scope: str = Form(...),
state: str = Form(""),
nonce: str = Form(""),
username: str = Form(...),
password: str = Form(...),
action: str = Form(...),
):
# ユーザー認証
user = USERS.get(username)
if not user or user["password"] != password:
return HTMLResponse(content="認証失敗", status_code=401)
# 同意確認
if action != "approve":
location = f"{redirect_uri}?error=access_denied&state={state}"
return RedirectResponse(url=location, status_code=302)
# 認可コード発行
code = str(uuid4())
AUTHORIZATION_CODES[code] = {
"client_id": client_id,
"redirect_uri": redirect_uri,
"username": username,
"scope": scope,
"nonce": nonce,
}
# リダイレクト
location = f"{redirect_uri}?code={code}&state={state}"
return RedirectResponse(url=location, status_code=302)
@app.post("/token")
async def token(request: Request, authorization: str = Header(None)):
print("Received token request")
# リクエストデータ取得 (フォームまたはJSON)
content_type = request.headers.get("content-type", "")
if content_type.startswith("application/json"):
data = await request.json()
else:
data = await request.form()
print(f"Request data: {data}")
print(f"Authorization header: {authorization}")
grant_type = data.get("grant_type")
code = data.get("code")
redirect_uri = data.get("redirect_uri")
client_id = data.get("client_id")
client_secret = data.get("client_secret")
# Basic認証優先
if authorization and authorization.startswith("Basic "):
import base64
decoded = base64.b64decode(authorization.split(" ")[1]).decode()
client_id, client_secret = decoded.split(":", 1)
print(f"grant_type: {grant_type}, code: {code}, redirect_uri: {redirect_uri}, client_id: {client_id}, client_secret: {client_secret}")
# リクエスト検証
if grant_type != "authorization_code" or not code or not redirect_uri or not client_id or not client_secret:
print("Invalid request parameters")
return JSONResponse({"error": "invalid_request"}, status_code=400)
# クライアント検証
client = CLIENTS.get(client_id)
if not client or client["client_secret"] != client_secret or redirect_uri not in client["redirect_uris"]:
print("Invalid client credentials")
print(f"client: {client}")
print("client_secret: {}".format(client["client_secret"] if client else "No client found"))
print(f"client_secret: {client_secret}")
print(f"redirect_uri: {redirect_uri}")
print(f"Expected redirect_uris: {client['redirect_uris'] if client else 'No client found'}")
return JSONResponse({"error": "invalid_client"}, status_code=400)
# 認可コード検証
code_data = AUTHORIZATION_CODES.pop(code, None)
if not code_data or code_data["client_id"] != client_id or code_data["redirect_uri"] != redirect_uri:
print("Invalid or expired authorization code")
return JSONResponse({"error": "invalid_grant"}, status_code=400)
# ユーザー取得
username = code_data["username"]
user = USERS.get(username)
if not user:
print("Invalid user")
return JSONResponse({"error": "invalid_user"}, status_code=400)
# トークン発行
now = int(time.time())
id_token_payload = {
"iss": str(request.base_url).rstrip("/"),
"sub": user["sub"],
"aud": client_id,
"exp": now + 600,
"iat": now,
"nonce": code_data["nonce"],
"name": user["name"],
"email": user["email"],
}
id_token = jwt.encode(id_token_payload, PRIVATE_KEY_PEM, algorithm="RS256")
access_token = str(uuid4())
if "ACCESS_TOKENS" not in globals():
global ACCESS_TOKENS
ACCESS_TOKENS = {}
ACCESS_TOKENS[access_token] = username
print(f"access_token: {access_token}")
print(f"id_token: {id_token}")
return {
"access_token": access_token,
"token_type": "Bearer",
"expires_in": 600,
"id_token": id_token,
}
@app.get("/userinfo")
async def userinfo(authorization: str = Header(None)):
if not authorization or not authorization.startswith("Bearer "):
return JSONResponse({"error": "invalid_request"}, status_code=401)
access_token = authorization.split(" ")[1]
username = ACCESS_TOKENS.get(access_token) if "ACCESS_TOKENS" in globals() else None
user = USERS.get(username) if username else None
if not user:
return JSONResponse({"error": "invalid_token"}, status_code=401)
return {
"sub": user["sub"],
"name": user["name"],
"email": user["email"],
}
@app.get("/jwks")
async def jwks():
# TODO: 複数のJWKのサポート
# TODO: キャッシュ 及び 一定期間ごとの更新
return JWKS
@app.get("/")
async def root():
return PlainTextResponse("テスト用の OpenID Connect 認証サーバ")
# セッション用のシークレットキー SESSION_SECRET=your-session-secret [所感] スタイルの設定 †st.set_page_config でもある程度の設定は可能だが、CSSを上書きした方が細かなデザイン変更を行う事が出来る。 各要素に data-testid 属性が付与される為、これを利用してスタイル調整が可能。 以下、streamlit 1.45.1 時点。
例) def set_style():
st.html("""
<style>
[data-testid="stApp"] {
/* ヘッダ */
[data-testid="stHeader"] {
}
[data-testid="stAppViewContainer"] {
[data-testid="stSidebarCollapsedControl"] {
left: 5px;
}
/* サイドバー */
[data-testid=stSidebar] {
/* 最小幅を少し小さく */
min-width: 200px;
/* 幅を固定する場合 */
/* width: 200px !important; */
background-color: #efe;
color: #333;
[data-testid=stSidebarContent] {
[data-testid="stSidebarHeader"]{
padding: 10px;
}
[data-testid="stSidebarNav"] {
a[data-testid="stSidebarNavLink"] {
padding: 0 10px;
margin: 0.125rem 5px;
span {
color: #333;
}
}
}
/* ユーザ定義のサイドバー */
[data-testid="stSidebarUserContent"] {
}
}
}
[data-testid="stMain"] {
/* メインコンテンツの余白を少し小さく */
[data-testid="stMainBlockContainer"] {
padding: 2rem 10px 2rem;
}
}
}
}
</style>
""")
その他サンプル †TODO: もう少し使用例を記載する(グラフ系のサンプル、チャット系の機能など)
|