Python スクリプトを書く感覚で、対話的な Web アプリケーションを簡単に作成できる(※)フレームワーク。
特にデータサイエンスや機械学習の成果を可視化・共有する目的で使用される。
※1 ... Python スクリプト内に st.xxx() を書くだけでUIが作れる。
※2 ... 機械学習モデルのデモ画面、データ分析レポートのWeb化、チームへのツール提供用の簡易ダッシュボード等。
[補足]
これまでの streamlit に対する自身の認識は、jupyter でノートブックを共有するノリをアプリとして行う事が出来る。ぐらいの認識であったが、
今回、調べてみると結構いろいろな事が出来る事が分かった。(やろうと思えばガッツリWebアプリケーションも作れる)
pip install streamlit
以下で、デフォルトで組み込まれているサンプルが起動出来る。
streamlit hello
任意のファイルを指定して起動。
streamlit run sample.py
任意のインターネット上の任意のファイルを指定して起動する事も可能
streamlit run https://raw.githubusercontent.com/streamlit/demo-uber-nyc-pickups/master/streamlit_app.py
オプション | 説明 |
--server.port 8501 | ポート |
--server.address x.x.x.x | ホストアドレス(0.0.0.0 にするとローカルPC以外からアクセス可) |
--server.headless true | UIを開かずバックグラウンド実行する場合は true |
--server.enableCORS false | CORS(クロスオリジン)制限を無効化 |
--client.toolbarMode "minimal" | 右上のツールバー表示モード("auto" or "developer" or "viewer" or "minimal") |
--secrets.files path_to/secrets.toml | secrets.toml のPATHを指定する |
--browser.gatherUsageStats false | テレメトリ収集をOFFにする( 後述 テレメトリ収集をOFFにする 参照 ) |
config.toml でも指定可能。
※ https://docs.streamlit.io/develop/api-reference/configuration/config.toml
UI部品使用して画面を構成する。
import streamlit as st # テキスト入力BOXの表示 input_text = st.number_text('Input text') # 入力されたテキストの表示 st.write('input_text: ', input_text)
https://docs.streamlit.io/develop/api-reference/widgets
Streamlit 関数 | 機能 | 補足 |
st.title | 見出し(H1) | |
st.header | 見出し(H2) | |
st.write | テキストや数値の出力 | 引数にはテキストや数値だけでなくデータフレームやグラフを指定する事も可能 |
st.markdown | テキストや数値の出力(マークダウン指定) | |
st.button | ボタン | |
st.text_input | テキスト入力 | |
st.slider | スライダー | |
st.selectbox | プルダウン | |
st.checkbox | チェックボックス | |
st.radio | ラジオボタン | |
st.table | テーブル表示 | 縦/横幅などの指定はできない |
st.dataframe | テーブル表示 | column_config を使用する事により細かなスタイル指定等が可能 |
st.data_editor | 編集可能なテーブルの表示 | |
st.area_chart | エリアチャート | |
st.bar_chart | 線グラフ | |
st.line_chart | 棒グラフ | |
st.scatter_chart | 散布図 | |
st.plotly_chart | plotlyグラフ | |
st.pyplot | matplotlibグラフ |
[所感]
将来的に他フレームワークへの乗せ替えを視野にいれる場合、st.bar_chart や st.line_chart 等の streamlit 専用のグラフ描画ではなく plotly 等を使用した方が移行コストは低く済みそう。
st.navigation を使用してページ切り替え用のUI等を構築可能。
※ https://docs.streamlit.io/develop/api-reference/navigation
例)
import streamlit as st def root(): st.title("TOPページ") def page1(): st.title("ページ1") def page2(): st.title("ページ2") #------------------------------------------ # サイドバー(ナビゲーションメニュー) #------------------------------------------ pg = st.navigation([ st.Page(root, title="TOP page"), st.Page(page1, title="First page"), st.Page(page2, title="Second page"), st.Page("sample_ui.py", title="sample UI"), ]) pg.run()
レイアウト用のUIが用意されている。
※https://docs.streamlit.io/develop/api-reference/layout
Streamlit 関数 | 機能 | 補足 |
st.columns | ブロックを横に並べる | 内部的には flexbox を使用して展開される |
st.container | ある纏まった単位でブロック要素として定義するのに使用する | |
st.dialog | ダイアログ表示用 | |
link | リンク表示用 | |
st.empty | 単一の要素のみを保持できるコンテナを定義出来る | time.sleep と共に使用すれば簡単なアニメーションの作成が可能 |
st.expander | 展開/折りたたみ用のUI | |
st.form | FORM要素を定義する事が出来る | |
st.popover | ポップアップ用のUIを作成出来る | |
st.sidebar | サイドバーを構築できる。 | st.navigation ではページ切り替え用のボタン以外のUIは配置出来ないが、sidebar では好きなUI部品を配置可能 |
st.tabs | タブ表示用 |
[所感]
st.columns 等、いくつかの部品は微妙。
将来的に別のフレームワーク等への乗せ替える予定がある場合は、このあたりのレイアウト部品を使用してガッツリ作り込んでしまうと移行は辛そう。
import streamlit as st @st.dialog("ダイアログタイトル") def mydialog(arg): text = st.text_input("何か入力して下さい") if st.button("Submit"): st.session_state.mydialog = {"arg": arg, "text": text} st.rerun() if "mydialog" not in st.session_state: if st.button("ダイアログを表示する"): mydialog("ダイアログ引数") else: f"\"{st.session_state.mydialog['text']}\" が入力されました"
以下のいずれかの方法で入力データを取得する事が出来る。
関数戻り値から取得する
text1 = st.text_input("text input", key="text1") st.write(f"text1 に {text1} が入力されました")
st.session_state から取得する
st.text_input("text input", key="text1") text1 = st.session_state["text1"] or "" st.write(f"text1 に {text1} が入力されました")
デフォルトのままだとテレメトリ収集用に https://webhooks.fivetran.com にリクエストが飛びまくる。
OFFにするには設定ファイルを以下のように修正する。
[browser] gatherUsageStats = true
もしくは、streamlit を引数付きで起動する。
streamlit run --browser.gatherUsageStats false app.py
参考
認証機能用の関数が用意されている。
OpenID Connect プロバイダを利用する場合、簡単に認証機能を組み込む事が出来る。
※ https://docs.streamlit.io/develop/api-reference/user
例)
app.py
import streamlit as st if not st.user.is_logged_in: if st.button("Log in"): st.login() st.stop() def top(): st.write("TOPページ") def page1(): st.title("ページ1") def page2(): st.title("ページ2") pg = st.navigation([ st.Page(top, title="First page"), st.Page(page1, title="First page"), st.Page(page2, title="Second page"), st.Page(lambda: st.logout(), title="ログアウト"), ]) pg.run()
secrets.toml
[auth] redirect_uri = "http://localhost:8501/oauth2callback" cookie_secret = "cookie暗号化用のシークレット" client_id = "クライアントID" client_secret = "シークレット" server_metadata_url = "https://login.microsoftonline.com/{テナントID}/v2.0/.well-known/openid-configuration"
secrets.toml
[auth] redirect_uri = "http://localhost:8501/oauth2callback" cookie_secret = "cookie暗号化用のシークレット" [auth.microsoft] client_id = "クライアントID" client_secret = "シークレット" server_metadata_url = "https://login.microsoftonline.com/{テナントID}/v2.0/.well-known/openid-configuration" # テスト用のローカルプロバイダ(後述) [auth.myauth] client_id = "client1" client_secret = "your-jwt-secret" server_metadata_url = "http://localhost:8000/.well-known/openid-configuration"
app.py
if not st.user.is_logged_in: if st.button("Microsoft Entraでログイン"): st.login("microsoft") if st.button("テスト用プロバイダでログイン"): st.login("myauth") st.stop() :
起動
streamlit run --secrets.files secrets.toml --browser.gatherUsageStats false app.py
あくまでも動作確認用の為、いろいろと簡略化。
#!/bin/bash if [ ! .venv ]; then python3 -m venv .venv source .venv/bin/activate pip install -r requirements.txt else source .venv/bin/activate fi uvicorn main:app --reload deactivate
annotated-types==0.7.0 anyio==4.9.0 Authlib==1.6.0 certifi==2025.4.26 cffi==1.17.1 click==8.2.1 cryptography==45.0.3 fastapi==0.115.12 h11==0.16.0 httpcore==1.0.9 httpx==0.28.1 idna==3.10 itsdangerous==2.2.0 pycparser==2.22 pydantic==2.11.5 pydantic_core==2.33.2 PyJWT==2.10.1 python-dotenv==1.1.0 python-multipart==0.0.20 sniffio==1.3.1 SQLAlchemy==2.0.41 starlette==0.46.2 typing-inspection==0.4.1 typing_extensions==4.13.2 uvicorn==0.34.2
import jwt import os import time from fastapi import FastAPI, Form, Header, Request from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse from starlette.responses import PlainTextResponse from starlette.middleware.sessions import SessionMiddleware from dotenv import load_dotenv from authlib.jose import JsonWebKey from uuid import uuid4 load_dotenv() # RSA鍵とJWKセットの生成 # TODO: 複数のJWKのサポート # TODO: キャッシュ 及び 一定期間ごとの更新 JWK_KEY = JsonWebKey.generate_key("RSA", 2048, is_private=True) PRIVATE_KEY_PEM = JWK_KEY.as_pem(is_private=True) JWKS = {"keys": [JWK_KEY.as_dict(is_private=False)]} # テスト用のユーザー・クライアント・認可コード管理 # TODO: DB化 USERS = { "user1": {"sub": "user1", "name": "ユーザー1", "email": "user1@example.com", "password": "test123"} } CLIENTS = { "client1": { "client_id": "client1", "client_secret": "your-jwt-secret", "redirect_uris": ["http://localhost:8501/oauth2callback"], } } AUTHORIZATION_CODES = {} app = FastAPI() # セッション用のシークレットキー app.add_middleware(SessionMiddleware, secret_key=os.getenv("SESSION_SECRET")) # OpenID Connect Discoveryエンドポイント @app.get("/.well-known/openid-configuration") async def openid_configuration(request: Request): issuer = str(request.base_url).rstrip("/") return { "issuer": issuer, "authorization_endpoint": f"{issuer}/authorize", "token_endpoint": f"{issuer}/token", "userinfo_endpoint": f"{issuer}/userinfo", "jwks_uri": f"{issuer}/jwks", "response_types_supported": ["code", "id_token", "token id_token"], "subject_types_supported": ["public"], "id_token_signing_alg_values_supported": ["RS256"], "scopes_supported": ["openid", "profile", "email"], "token_endpoint_auth_methods_supported": ["client_secret_basic"], "claims_supported": ["sub", "name", "email"], } @app.get("/authorize") async def authorize( response_type: str, client_id: str, redirect_uri: str, scope: str, state: str = "", nonce: str = "", ): # クライアント検証 client = CLIENTS.get(client_id) if not client or redirect_uri not in client["redirect_uris"]: return JSONResponse({"error": "invalid_client"}, status_code=400) # OAuth認証画面(ログイン+同意) html = f""" <form method="post" action="/authorize"> <input type="hidden" name="response_type" value="{response_type}"> <input type="hidden" name="client_id" value="{client_id}"> <input type="hidden" name="redirect_uri" value="{redirect_uri}"> <input type="hidden" name="scope" value="{scope}"> <input type="hidden" name="state" value="{state}"> <input type="hidden" name="nonce" value="{nonce}"> <h2>OAuth認証リクエスト</h2> <p>クライアント: <b>{client_id}</b></p> <p>リクエストされたスコープ: <b>{scope}</b></p> <label>ユーザー名: <input name="username"></label><br> <label>パスワード: <input name="password" type="password"></label><br> <button type="submit" name="action" value="approve">許可</button> <button type="submit" name="action" value="deny">拒否</button> </form> """ return HTMLResponse(content=html) @app.post("/authorize") async def authorize_post( response_type: str = Form(...), client_id: str = Form(...), redirect_uri: str = Form(...), scope: str = Form(...), state: str = Form(""), nonce: str = Form(""), username: str = Form(...), password: str = Form(...), action: str = Form(...), ): # ユーザー認証 user = USERS.get(username) if not user or user["password"] != password: return HTMLResponse(content="認証失敗", status_code=401) # 同意確認 if action != "approve": location = f"{redirect_uri}?error=access_denied&state={state}" return RedirectResponse(url=location, status_code=302) # 認可コード発行 code = str(uuid4()) AUTHORIZATION_CODES[code] = { "client_id": client_id, "redirect_uri": redirect_uri, "username": username, "scope": scope, "nonce": nonce, } # リダイレクト location = f"{redirect_uri}?code={code}&state={state}" return RedirectResponse(url=location, status_code=302) @app.post("/token") async def token(request: Request, authorization: str = Header(None)): print("Received token request") # リクエストデータ取得 (フォームまたはJSON) content_type = request.headers.get("content-type", "") if content_type.startswith("application/json"): data = await request.json() else: data = await request.form() print(f"Request data: {data}") print(f"Authorization header: {authorization}") grant_type = data.get("grant_type") code = data.get("code") redirect_uri = data.get("redirect_uri") client_id = data.get("client_id") client_secret = data.get("client_secret") # Basic認証優先 if authorization and authorization.startswith("Basic "): import base64 decoded = base64.b64decode(authorization.split(" ")[1]).decode() client_id, client_secret = decoded.split(":", 1) print(f"grant_type: {grant_type}, code: {code}, redirect_uri: {redirect_uri}, client_id: {client_id}, client_secret: {client_secret}") # リクエスト検証 if grant_type != "authorization_code" or not code or not redirect_uri or not client_id or not client_secret: print("Invalid request parameters") return JSONResponse({"error": "invalid_request"}, status_code=400) # クライアント検証 client = CLIENTS.get(client_id) if not client or client["client_secret"] != client_secret or redirect_uri not in client["redirect_uris"]: print("Invalid client credentials") print(f"client: {client}") print("client_secret: {}".format(client["client_secret"] if client else "No client found")) print(f"client_secret: {client_secret}") print(f"redirect_uri: {redirect_uri}") print(f"Expected redirect_uris: {client['redirect_uris'] if client else 'No client found'}") return JSONResponse({"error": "invalid_client"}, status_code=400) # 認可コード検証 code_data = AUTHORIZATION_CODES.pop(code, None) if not code_data or code_data["client_id"] != client_id or code_data["redirect_uri"] != redirect_uri: print("Invalid or expired authorization code") return JSONResponse({"error": "invalid_grant"}, status_code=400) # ユーザー取得 username = code_data["username"] user = USERS.get(username) if not user: print("Invalid user") return JSONResponse({"error": "invalid_user"}, status_code=400) # トークン発行 now = int(time.time()) id_token_payload = { "iss": str(request.base_url).rstrip("/"), "sub": user["sub"], "aud": client_id, "exp": now + 600, "iat": now, "nonce": code_data["nonce"], "name": user["name"], "email": user["email"], } id_token = jwt.encode(id_token_payload, PRIVATE_KEY_PEM, algorithm="RS256") access_token = str(uuid4()) if "ACCESS_TOKENS" not in globals(): global ACCESS_TOKENS ACCESS_TOKENS = {} ACCESS_TOKENS[access_token] = username print(f"access_token: {access_token}") print(f"id_token: {id_token}") return { "access_token": access_token, "token_type": "Bearer", "expires_in": 600, "id_token": id_token, } @app.get("/userinfo") async def userinfo(authorization: str = Header(None)): if not authorization or not authorization.startswith("Bearer "): return JSONResponse({"error": "invalid_request"}, status_code=401) access_token = authorization.split(" ")[1] username = ACCESS_TOKENS.get(access_token) if "ACCESS_TOKENS" in globals() else None user = USERS.get(username) if username else None if not user: return JSONResponse({"error": "invalid_token"}, status_code=401) return { "sub": user["sub"], "name": user["name"], "email": user["email"], } @app.get("/jwks") async def jwks(): # TODO: 複数のJWKのサポート # TODO: キャッシュ 及び 一定期間ごとの更新 return JWKS @app.get("/") async def root(): return PlainTextResponse("テスト用の OpenID Connect 認証サーバ")
# セッション用のシークレットキー SESSION_SECRET=your-session-secret
[所感]
・動作確認してみた範囲では、IDトークンは暗号化されずに(base64エンコードだけされて)cookieに格納されている。このまま本番使用して良いものか・・
secrets.toml に設定した cookie_secret は署名作成にしか使用されない。(cookieの値自体の暗号化に使用されるわけではない)
※ cookie_secret は署名部のhmacハッシュ化時の鍵として使用されるだけ。
↓ 内部で使用されている tornado の cookie作成処理はこの辺。
https://github.com/tornadoweb/tornado/blob/v6.5.1/tornado/web.py#L3547
※他に何か設定があるのかもしれないが・・
・アプリ または 組織内で独自の認証を行う場合は Streamlit-Authenticator 等のライブラリを使用するか、自前で組んだ方が(※)恐らく楽。
※ページリロード時にセッション切れても良いなら st.session_state の利用だけで実装可能だが、
もう少しまともにしたいならcookie暗号化しつつ streamlit-cookies-controller 等を使えば、そんなに工数はかからなそう。(たぶん)
st.set_page_config でもある程度の設定は可能だが、CSSを上書きした方が細かなデザイン変更を行う事が出来る。
各要素に data-testid 属性が付与される為、これを利用してスタイル調整が可能。
以下、streamlit 1.45.1 時点。
主な data-testid 属性 | 説明 |
stAppViewContainer | ページ全体を覆うブロック要素 |
stHeader | ヘッダ部 |
stSidebar | サイドバー部 |
stMain | メインコンテンツ部 |
stMainBlockContainer | メインコンテンツ部の内部ブロック要素 |
例)
def set_style(): st.html(""" <style> [data-testid="stApp"] { /* ヘッダ */ [data-testid="stHeader"] { } [data-testid="stAppViewContainer"] { [data-testid="stSidebarCollapsedControl"] { left: 5px; } /* サイドバー */ [data-testid=stSidebar] { /* 最小幅を少し小さく */ min-width: 200px; /* 幅を固定する場合 */ /* width: 200px !important; */ background-color: #efe; color: #333; [data-testid=stSidebarContent] { [data-testid="stSidebarHeader"]{ padding: 10px; } [data-testid="stSidebarNav"] { a[data-testid="stSidebarNavLink"] { padding: 0 10px; margin: 0.125rem 5px; span { color: #333; } } } /* ユーザ定義のサイドバー */ [data-testid="stSidebarUserContent"] { } } } [data-testid="stMain"] { /* メインコンテンツの余白を少し小さく */ [data-testid="stMainBlockContainer"] { padding: 2rem 10px 2rem; } } } } </style> """)