fix(airflow): fix JWT decode and verify

This commit is contained in:
Masaki Yatsu
2025-09-18 15:01:25 +09:00
parent dc30a37a42
commit 0106e22c84
3 changed files with 131 additions and 64 deletions

View File

@@ -67,7 +67,7 @@ workers:
- /bin/bash - /bin/bash
- -c - -c
- | - |
pip install --target /opt/airflow/site-packages "{{ .Env.AIRFLOW_EXTRA_PACKAGES }}" pip install --target /opt/airflow/site-packages {{ .Env.AIRFLOW_EXTRA_PACKAGES }}
volumeMounts: volumeMounts:
- name: extra-packages - name: extra-packages
mountPath: /opt/airflow/site-packages mountPath: /opt/airflow/site-packages
@@ -89,7 +89,7 @@ scheduler:
- /bin/bash - /bin/bash
- -c - -c
- | - |
pip install --target /opt/airflow/site-packages "{{ .Env.AIRFLOW_EXTRA_PACKAGES }}" pip install --target /opt/airflow/site-packages {{ .Env.AIRFLOW_EXTRA_PACKAGES }}
volumeMounts: volumeMounts:
- name: extra-packages - name: extra-packages
mountPath: /opt/airflow/site-packages mountPath: /opt/airflow/site-packages
@@ -111,7 +111,7 @@ dagProcessor:
- /bin/bash - /bin/bash
- -c - -c
- | - |
pip install --target /opt/airflow/site-packages "{{ .Env.AIRFLOW_EXTRA_PACKAGES }}" pip install --target /opt/airflow/site-packages {{ .Env.AIRFLOW_EXTRA_PACKAGES }}
volumeMounts: volumeMounts:
- name: extra-packages - name: extra-packages
mountPath: /opt/airflow/site-packages mountPath: /opt/airflow/site-packages

View File

@@ -9,7 +9,9 @@ export AIRFLOW_DAGS_STORAGE_TYPE := env("AIRFLOW_DAGS_STORAGE_TYPE", "")
export AIRFLOW_NFS_IP := env("AIRFLOW_NFS_IP", "") export AIRFLOW_NFS_IP := env("AIRFLOW_NFS_IP", "")
export AIRFLOW_NFS_PATH := env("AIRFLOW_NFS_PATH", "") export AIRFLOW_NFS_PATH := env("AIRFLOW_NFS_PATH", "")
export AIRFLOW_DAGS_STORAGE_SIZE := env("AIRFLOW_DAGS_STORAGE_SIZE", "10Gi") export AIRFLOW_DAGS_STORAGE_SIZE := env("AIRFLOW_DAGS_STORAGE_SIZE", "10Gi")
export AIRFLOW_EXTRA_PACKAGES := env("AIRFLOW_EXTRA_PACKAGES", "dlt[duckdb,filesystem,postgres,s3]>=1.12.1") export AIRFLOW_EXTRA_PACKAGES := env("AIRFLOW_EXTRA_PACKAGES", "'PyJWT>=2.10' cryptography 'requests>=2.32' 'dlt[duckdb,filesystem,postgres,s3]'")
# ↑ PyJWT, cryptography, and requests are needed for Keycloak OAuth
[private] [private]
default: default:
@@ -600,6 +602,51 @@ logs-test-import dag_file:
kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \ kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \
python /opt/airflow/dags/{{ dag_file }} python /opt/airflow/dags/{{ dag_file }}
# Delete user from Airflow database (to force role resync)
delete-user username='':
#!/bin/bash
set -euo pipefail
USERNAME="{{ username }}"
# Interactive input if not provided
while [ -z "${USERNAME}" ]; do
USERNAME=$(gum input --prompt="Username to delete from Airflow: " --width=100)
done
echo "Deleting user '${USERNAME}' from Airflow database..."
if gum confirm "This will delete the user from Airflow database. The user will be recreated with current Keycloak roles on next login. Continue?"; then
# Get scheduler pod (which has airflow CLI access)
SCHEDULER_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=scheduler -o jsonpath='{.items[0].metadata.name}')
# Delete user using airflow CLI
kubectl exec -n ${AIRFLOW_NAMESPACE} ${SCHEDULER_POD} -- \
airflow users delete --username "${USERNAME}" || echo "User '${USERNAME}' not found in Airflow database"
echo "✅ User '${USERNAME}' deleted from Airflow. They will be recreated with current Keycloak roles on next login."
else
echo "User deletion cancelled"
fi
# Force role sync for all users (delete all OAuth users)
reset-oauth-users:
#!/bin/bash
set -euo pipefail
echo "This will delete ALL OAuth users from Airflow database."
echo "Users will be recreated with current Keycloak roles on next login."
if gum confirm "Are you sure you want to proceed?"; then
# Get scheduler pod (which has airflow CLI access)
SCHEDULER_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=scheduler -o jsonpath='{.items[0].metadata.name}')
# List and delete OAuth users (exclude admin users created manually)
echo "Deleting OAuth users from Airflow database..."
kubectl exec -n ${AIRFLOW_NAMESPACE} ${SCHEDULER_POD} -- \
airflow db shell -s "DELETE FROM ab_user WHERE email IS NOT NULL AND username != 'admin';" || true
echo "✅ All OAuth users deleted. They will be recreated with current Keycloak roles on next login."
else
echo "Reset cancelled"
fi
# Clean up database and secrets # Clean up database and secrets
cleanup: cleanup:
#!/bin/bash #!/bin/bash

View File

@@ -1,8 +1,14 @@
import os import os
import logging import logging
import json
import base64
import requests
from typing import Dict, Any, Optional
from urllib.parse import urljoin
from flask_appbuilder.security.manager import AUTH_OAUTH from flask_appbuilder.security.manager import AUTH_OAUTH
from airflow.providers.fab.auth_manager.security_manager.override import FabAirflowSecurityManagerOverride from airflow.providers.fab.auth_manager.security_manager.override import FabAirflowSecurityManagerOverride
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
AUTH_TYPE = AUTH_OAUTH AUTH_TYPE = AUTH_OAUTH
@@ -56,8 +62,7 @@ class KeycloakSecurityManager(FabAirflowSecurityManagerOverride):
"""Extract user info and roles from Keycloak token""" """Extract user info and roles from Keycloak token"""
if provider == "keycloak": if provider == "keycloak":
import jwt import jwt
import base64 import requests
import json
# Get access token # Get access token
token = response.get("access_token") token = response.get("access_token")
@@ -66,17 +71,29 @@ class KeycloakSecurityManager(FabAirflowSecurityManagerOverride):
return None return None
try: try:
# Decode token without verification for debugging # Get JWKS URL from OpenID configuration
# In production, you should verify the signature jwks_url = f"{OIDC_ISSUER}/.well-known/openid-configuration"
parts = token.split('.') oidc_config = requests.get(jwks_url).json()
if len(parts) == 3: jwks_uri = oidc_config["jwks_uri"]
# Decode payload
payload_b64 = parts[1]
# Add padding if needed
payload_b64 += '=' * (4 - len(payload_b64) % 4)
payload = json.loads(base64.b64decode(payload_b64))
log.info(f"Decoded token payload: {payload}") # Use PyJWT to decode and verify the token
from jwt import PyJWKClient
jwks_client = PyJWKClient(jwks_uri)
signing_key = jwks_client.get_signing_key_from_jwt(token)
payload = jwt.decode(
token,
signing_key.key,
algorithms=["RS256"],
audience=["airflow", "account"], # Keycloak uses both
issuer=OIDC_ISSUER,
options={"verify_signature": True, "verify_aud": False} # Relax audience check
)
log.info(f"JWT signature verified successfully")
log.debug(f"Decoded token payload keys: {list(payload.keys())}")
log.debug(f"Token has preferred_username: {bool(payload.get('preferred_username'))}")
log.debug(f"Token has email: {bool(payload.get('email'))}")
# Extract user information # Extract user information
userinfo = { userinfo = {
@@ -86,6 +103,8 @@ class KeycloakSecurityManager(FabAirflowSecurityManagerOverride):
"last_name": payload.get("family_name"), "last_name": payload.get("family_name"),
} }
log.debug(f"Extracted userinfo keys: {list(userinfo.keys())}")
# Extract roles from different possible locations # Extract roles from different possible locations
roles = [] roles = []
@@ -126,7 +145,8 @@ class KeycloakSecurityManager(FabAirflowSecurityManagerOverride):
log.info("No roles found, defaulting to Viewer") log.info("No roles found, defaulting to Viewer")
userinfo["role_keys"] = roles userinfo["role_keys"] = roles
log.info(f"Final userinfo: {userinfo}") log.info(f"User authentication successful for: {userinfo.get('username', 'unknown')}")
log.debug(f"Final userinfo keys: {list(userinfo.keys())}")
return userinfo return userinfo