From 0106e22c84277bcdc4a704206c286bfbab88228b Mon Sep 17 00:00:00 2001 From: Masaki Yatsu Date: Thu, 18 Sep 2025 15:01:25 +0900 Subject: [PATCH] fix(airflow): fix JWT decode and verify --- airflow/airflow-values.gomplate.yaml | 6 +- airflow/justfile | 49 +++++++++- airflow/webserver_config.gomplate.py | 140 +++++++++++++++------------ 3 files changed, 131 insertions(+), 64 deletions(-) diff --git a/airflow/airflow-values.gomplate.yaml b/airflow/airflow-values.gomplate.yaml index 097476a..c7a7534 100644 --- a/airflow/airflow-values.gomplate.yaml +++ b/airflow/airflow-values.gomplate.yaml @@ -67,7 +67,7 @@ workers: - /bin/bash - -c - | - pip install --target /opt/airflow/site-packages "{{ .Env.AIRFLOW_EXTRA_PACKAGES }}" + pip install --target /opt/airflow/site-packages {{ .Env.AIRFLOW_EXTRA_PACKAGES }} volumeMounts: - name: extra-packages mountPath: /opt/airflow/site-packages @@ -89,7 +89,7 @@ scheduler: - /bin/bash - -c - | - pip install --target /opt/airflow/site-packages "{{ .Env.AIRFLOW_EXTRA_PACKAGES }}" + pip install --target /opt/airflow/site-packages {{ .Env.AIRFLOW_EXTRA_PACKAGES }} volumeMounts: - name: extra-packages mountPath: /opt/airflow/site-packages @@ -111,7 +111,7 @@ dagProcessor: - /bin/bash - -c - | - pip install --target /opt/airflow/site-packages "{{ .Env.AIRFLOW_EXTRA_PACKAGES }}" + pip install --target /opt/airflow/site-packages {{ .Env.AIRFLOW_EXTRA_PACKAGES }} volumeMounts: - name: extra-packages mountPath: /opt/airflow/site-packages diff --git a/airflow/justfile b/airflow/justfile index 38d5c83..09b1871 100644 --- a/airflow/justfile +++ b/airflow/justfile @@ -9,7 +9,9 @@ export AIRFLOW_DAGS_STORAGE_TYPE := env("AIRFLOW_DAGS_STORAGE_TYPE", "") export AIRFLOW_NFS_IP := env("AIRFLOW_NFS_IP", "") export AIRFLOW_NFS_PATH := env("AIRFLOW_NFS_PATH", "") export AIRFLOW_DAGS_STORAGE_SIZE := env("AIRFLOW_DAGS_STORAGE_SIZE", "10Gi") -export AIRFLOW_EXTRA_PACKAGES := env("AIRFLOW_EXTRA_PACKAGES", "dlt[duckdb,filesystem,postgres,s3]>=1.12.1") +export AIRFLOW_EXTRA_PACKAGES := env("AIRFLOW_EXTRA_PACKAGES", "'PyJWT>=2.10' cryptography 'requests>=2.32' 'dlt[duckdb,filesystem,postgres,s3]'") + +# ↑ PyJWT, cryptography, and requests are needed for Keycloak OAuth [private] default: @@ -600,6 +602,51 @@ logs-test-import dag_file: kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \ python /opt/airflow/dags/{{ dag_file }} +# Delete user from Airflow database (to force role resync) +delete-user username='': + #!/bin/bash + set -euo pipefail + USERNAME="{{ username }}" + + # Interactive input if not provided + while [ -z "${USERNAME}" ]; do + USERNAME=$(gum input --prompt="Username to delete from Airflow: " --width=100) + done + + echo "Deleting user '${USERNAME}' from Airflow database..." + if gum confirm "This will delete the user from Airflow database. The user will be recreated with current Keycloak roles on next login. Continue?"; then + # Get scheduler pod (which has airflow CLI access) + SCHEDULER_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=scheduler -o jsonpath='{.items[0].metadata.name}') + + # Delete user using airflow CLI + kubectl exec -n ${AIRFLOW_NAMESPACE} ${SCHEDULER_POD} -- \ + airflow users delete --username "${USERNAME}" || echo "User '${USERNAME}' not found in Airflow database" + + echo "✅ User '${USERNAME}' deleted from Airflow. They will be recreated with current Keycloak roles on next login." + else + echo "User deletion cancelled" + fi + +# Force role sync for all users (delete all OAuth users) +reset-oauth-users: + #!/bin/bash + set -euo pipefail + echo "This will delete ALL OAuth users from Airflow database." + echo "Users will be recreated with current Keycloak roles on next login." + if gum confirm "Are you sure you want to proceed?"; then + # Get scheduler pod (which has airflow CLI access) + SCHEDULER_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=scheduler -o jsonpath='{.items[0].metadata.name}') + + # List and delete OAuth users (exclude admin users created manually) + echo "Deleting OAuth users from Airflow database..." + kubectl exec -n ${AIRFLOW_NAMESPACE} ${SCHEDULER_POD} -- \ + airflow db shell -s "DELETE FROM ab_user WHERE email IS NOT NULL AND username != 'admin';" || true + + echo "✅ All OAuth users deleted. They will be recreated with current Keycloak roles on next login." + else + echo "Reset cancelled" + fi + # Clean up database and secrets cleanup: #!/bin/bash diff --git a/airflow/webserver_config.gomplate.py b/airflow/webserver_config.gomplate.py index 0e16fe3..c9dbd6f 100644 --- a/airflow/webserver_config.gomplate.py +++ b/airflow/webserver_config.gomplate.py @@ -1,8 +1,14 @@ import os import logging +import json +import base64 +import requests +from typing import Dict, Any, Optional +from urllib.parse import urljoin from flask_appbuilder.security.manager import AUTH_OAUTH from airflow.providers.fab.auth_manager.security_manager.override import FabAirflowSecurityManagerOverride + log = logging.getLogger(__name__) AUTH_TYPE = AUTH_OAUTH @@ -56,8 +62,7 @@ class KeycloakSecurityManager(FabAirflowSecurityManagerOverride): """Extract user info and roles from Keycloak token""" if provider == "keycloak": import jwt - import base64 - import json + import requests # Get access token token = response.get("access_token") @@ -66,69 +71,84 @@ class KeycloakSecurityManager(FabAirflowSecurityManagerOverride): return None try: - # Decode token without verification for debugging - # In production, you should verify the signature - parts = token.split('.') - if len(parts) == 3: - # Decode payload - payload_b64 = parts[1] - # Add padding if needed - payload_b64 += '=' * (4 - len(payload_b64) % 4) - payload = json.loads(base64.b64decode(payload_b64)) + # Get JWKS URL from OpenID configuration + jwks_url = f"{OIDC_ISSUER}/.well-known/openid-configuration" + oidc_config = requests.get(jwks_url).json() + jwks_uri = oidc_config["jwks_uri"] - log.info(f"Decoded token payload: {payload}") + # Use PyJWT to decode and verify the token + from jwt import PyJWKClient + jwks_client = PyJWKClient(jwks_uri) + signing_key = jwks_client.get_signing_key_from_jwt(token) - # Extract user information - userinfo = { - "username": payload.get("preferred_username"), - "email": payload.get("email"), - "first_name": payload.get("given_name"), - "last_name": payload.get("family_name"), + payload = jwt.decode( + token, + signing_key.key, + algorithms=["RS256"], + audience=["airflow", "account"], # Keycloak uses both + issuer=OIDC_ISSUER, + options={"verify_signature": True, "verify_aud": False} # Relax audience check + ) + + log.info(f"JWT signature verified successfully") + log.debug(f"Decoded token payload keys: {list(payload.keys())}") + log.debug(f"Token has preferred_username: {bool(payload.get('preferred_username'))}") + log.debug(f"Token has email: {bool(payload.get('email'))}") + + # Extract user information + userinfo = { + "username": payload.get("preferred_username"), + "email": payload.get("email"), + "first_name": payload.get("given_name"), + "last_name": payload.get("family_name"), + } + + log.debug(f"Extracted userinfo keys: {list(userinfo.keys())}") + + # Extract roles from different possible locations + roles = [] + + # Check realm access roles + realm_access = payload.get("realm_access", {}) + realm_roles = realm_access.get("roles", []) + + # Check resource access (client roles) + resource_access = payload.get("resource_access", {}) + client_access = resource_access.get("airflow", {}) + client_roles = client_access.get("roles", []) + + # Check airflow_roles claim directly + direct_roles = payload.get("airflow_roles", []) + + log.info(f"Realm roles: {realm_roles}") + log.info(f"Client roles: {client_roles}") + log.info(f"Direct airflow roles: {direct_roles}") + + # Prefer client roles, then direct roles, then realm roles + if client_roles: + roles = client_roles + log.info(f"Using client roles: {roles}") + elif direct_roles: + roles = direct_roles + log.info(f"Using direct airflow roles: {roles}") + elif realm_roles: + # Map common realm roles to Airflow roles + role_mapping = { + 'admin': 'Admin', + 'user': 'User', + 'viewer': 'Viewer' } + roles = [role_mapping.get(role.lower(), 'Viewer') for role in realm_roles] + log.info(f"Using mapped realm roles: {roles}") + else: + roles = ['Viewer'] + log.info("No roles found, defaulting to Viewer") - # Extract roles from different possible locations - roles = [] + userinfo["role_keys"] = roles + log.info(f"User authentication successful for: {userinfo.get('username', 'unknown')}") + log.debug(f"Final userinfo keys: {list(userinfo.keys())}") - # Check realm access roles - realm_access = payload.get("realm_access", {}) - realm_roles = realm_access.get("roles", []) - - # Check resource access (client roles) - resource_access = payload.get("resource_access", {}) - client_access = resource_access.get("airflow", {}) - client_roles = client_access.get("roles", []) - - # Check airflow_roles claim directly - direct_roles = payload.get("airflow_roles", []) - - log.info(f"Realm roles: {realm_roles}") - log.info(f"Client roles: {client_roles}") - log.info(f"Direct airflow roles: {direct_roles}") - - # Prefer client roles, then direct roles, then realm roles - if client_roles: - roles = client_roles - log.info(f"Using client roles: {roles}") - elif direct_roles: - roles = direct_roles - log.info(f"Using direct airflow roles: {roles}") - elif realm_roles: - # Map common realm roles to Airflow roles - role_mapping = { - 'admin': 'Admin', - 'user': 'User', - 'viewer': 'Viewer' - } - roles = [role_mapping.get(role.lower(), 'Viewer') for role in realm_roles] - log.info(f"Using mapped realm roles: {roles}") - else: - roles = ['Viewer'] - log.info("No roles found, defaulting to Viewer") - - userinfo["role_keys"] = roles - log.info(f"Final userinfo: {userinfo}") - - return userinfo + return userinfo except Exception as e: log.error(f"Error decoding JWT token: {e}")