feat(airflow): add Airflow

This commit is contained in:
Masaki Yatsu
2025-09-10 16:35:19 +09:00
parent 9780753a81
commit e265063857
7 changed files with 494 additions and 0 deletions

4
airflow/.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
airflow-values.yaml
airflow-database-external-secret.yaml
airflow-oauth-external-secret.yaml
webserver_config.py

View File

@@ -0,0 +1,27 @@
apiVersion: external-secrets.io/v1
kind: ExternalSecret
metadata:
name: airflow-database-external-secret
namespace: {{ .Env.AIRFLOW_NAMESPACE }}
spec:
refreshInterval: 1h
secretStoreRef:
name: vault-secret-store
kind: ClusterSecretStore
target:
name: airflow-database-secret
creationPolicy: Owner
template:
type: Opaque
data:
username: "{{ `{{ .username }}` }}"
password: "{{ `{{ .password }}` }}"
data:
- secretKey: username
remoteRef:
key: airflow/database
property: username
- secretKey: password
remoteRef:
key: airflow/database
property: password

View File

@@ -0,0 +1,27 @@
apiVersion: external-secrets.io/v1
kind: ExternalSecret
metadata:
name: airflow-oauth-external-secret
namespace: {{ .Env.AIRFLOW_NAMESPACE }}
spec:
refreshInterval: 1h
secretStoreRef:
name: vault-secret-store
kind: ClusterSecretStore
target:
name: airflow-oauth-secret
creationPolicy: Owner
template:
type: Opaque
data:
client_id: "{{ `{{ .client_id }}` }}"
client_secret: "{{ `{{ .client_secret }}` }}"
data:
- secretKey: client_id
remoteRef:
key: airflow/oauth
property: client_id
- secretKey: client_secret
remoteRef:
key: airflow/oauth
property: client_secret

View File

@@ -0,0 +1,60 @@
useStandardNaming: true
webserverSecretKey: {{ .Env.AIRFLOW_WEBSERVER_SECRET_KEY }}
executor: CeleryExecutor
apiServer:
replicas: 1
apiServerConfigConfigMapName: airflow-api-server-config
env:
- name: AIRFLOW_OAUTH_CLIENT_ID
valueFrom:
secretKeyRef:
name: airflow-oauth-secret
key: client_id
- name: AIRFLOW_OAUTH_CLIENT_SECRET
valueFrom:
secretKeyRef:
name: airflow-oauth-secret
key: client_secret
- name: KEYCLOAK_HOST
value: "{{ .Env.KEYCLOAK_HOST }}"
- name: KEYCLOAK_REALM
value: "{{ .Env.KEYCLOAK_REALM }}"
webserver:
enabled: true
replicas: 1
createUserJob:
useHelmHooks: false
applyCustomEnv: false
migrateDatabaseJob:
useHelmHooks: false
applyCustomEnv: false
images:
migrationsWaitTimeout: 180
flower:
enabled: false
postgresql:
enabled: false
data:
metadataSecretName: airflow-metadata-connection
ingress:
apiServer:
enabled: true
annotations:
kubernetes.io/ingress.class: traefik
traefik.ingress.kubernetes.io/router.entrypoints: websecure
ingressClassName: traefik
hosts:
- name: {{ .Env.AIRFLOW_HOST }}
tls:
enabled: true

323
airflow/justfile Normal file
View File

@@ -0,0 +1,323 @@
set fallback := true
export AIRFLOW_NAMESPACE := env("AIRFLOW_NAMESPACE", "airflow")
export AIRFLOW_CHART_VERSION := env("AIRFLOW_CHART_VERSION", "1.18.0")
export EXTERNAL_SECRETS_NAMESPACE := env("EXTERNAL_SECRETS_NAMESPACE", "external-secrets")
export KEYCLOAK_REALM := env("KEYCLOAK_REALM", "buunstack")
[private]
default:
@just --list --unsorted --list-submodules
# Add Helm repository
add-helm-repo:
helm repo add apache-airflow https://airflow.apache.org
helm repo update
# Remove Helm repository
remove-helm-repo:
helm repo remove apache-airflow
# Create Airflow namespace
create-namespace:
@kubectl get namespace ${AIRFLOW_NAMESPACE} &>/dev/null || \
kubectl create namespace ${AIRFLOW_NAMESPACE}
# Delete Airflow namespace
delete-namespace:
#!/bin/bash
set -euo pipefail
# First try normal deletion
kubectl delete namespace ${AIRFLOW_NAMESPACE} --ignore-not-found --timeout=30s || true
# If namespace still exists in Terminating state, force remove
if kubectl get namespace ${AIRFLOW_NAMESPACE} 2>/dev/null | grep -q Terminating; then
echo "Namespace stuck in Terminating, forcing deletion..."
# Remove finalizers
kubectl patch namespace ${AIRFLOW_NAMESPACE} -p '{"metadata":{"finalizers":[]}}' --type=merge || true
# Force delete the namespace
kubectl delete namespace ${AIRFLOW_NAMESPACE} --force --grace-period=0 || true
fi
# Setup database for Airflow
setup-database:
#!/bin/bash
set -euo pipefail
echo "Setting up Airflow database..."
if just postgres::db-exists airflow &>/dev/null; then
echo "Database 'airflow' already exists."
else
echo "Creating new database 'airflow'..."
just postgres::create-db airflow
fi
# Generate password for user creation/update
if just postgres::user-exists airflow &>/dev/null; then
echo "User 'airflow' already exists."
# Check if we can get existing password from Vault/Secret
if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then
# Try to get existing password from Vault
if DB_PASSWORD=$(just vault::get airflow/database password 2>/dev/null); then
echo "Using existing password from Vault."
else
echo "Generating new password and updating Vault..."
DB_PASSWORD=$(just utils::random-password)
just postgres::change-password airflow "$DB_PASSWORD"
fi
else
# For direct Secret approach, generate new password
echo "Generating new password for existing user..."
DB_PASSWORD=$(just utils::random-password)
just postgres::change-password airflow "$DB_PASSWORD"
fi
else
echo "Creating new user 'airflow'..."
DB_PASSWORD=$(just utils::random-password)
just postgres::create-user airflow "$DB_PASSWORD"
fi
echo "Ensuring database permissions..."
just postgres::grant airflow airflow
# Create Airflow metadata connection secret (required by Helm chart)
CONNECTION_STRING="postgresql+psycopg2://airflow:${DB_PASSWORD}@postgres-cluster-rw.postgres:5432/airflow"
kubectl delete secret airflow-metadata-connection -n ${AIRFLOW_NAMESPACE} --ignore-not-found
kubectl create secret generic airflow-metadata-connection -n ${AIRFLOW_NAMESPACE} \
--from-literal=connection="$CONNECTION_STRING"
if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then
echo "External Secrets available. Storing credentials in Vault and creating ExternalSecret..."
just vault::put airflow/database username=airflow password="$DB_PASSWORD"
gomplate -f airflow-database-external-secret.gomplate.yaml -o airflow-database-external-secret.yaml
kubectl apply -f airflow-database-external-secret.yaml
echo "Waiting for database secret to be ready..."
kubectl wait --for=condition=Ready externalsecret/airflow-database-external-secret \
-n ${AIRFLOW_NAMESPACE} --timeout=60s
fi
echo "Database setup completed."
# Delete database secret
delete-database-secret:
@kubectl delete secret airflow-metadata-connection -n ${AIRFLOW_NAMESPACE} --ignore-not-found
@kubectl delete secret airflow-database-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
@kubectl delete externalsecret airflow-database-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
# Create OAuth client in Keycloak for Airflow authentication
create-oauth-client:
#!/bin/bash
set -euo pipefail
if [ -z "${AIRFLOW_HOST:-}" ]; then
echo "Error: AIRFLOW_HOST environment variable is required"
exit 1
fi
echo "Creating Airflow OAuth client in Keycloak..."
# Delete existing client to ensure fresh creation
echo "Removing existing client if present..."
just keycloak::delete-client ${KEYCLOAK_REALM} airflow || true
CLIENT_SECRET=$(just utils::random-password)
just keycloak::create-client \
${KEYCLOAK_REALM} \
airflow \
"https://${AIRFLOW_HOST}/auth/oauth-authorized/keycloak" \
"$CLIENT_SECRET"
if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then
echo "External Secrets available. Storing credentials in Vault and creating ExternalSecret..."
just vault::put airflow/oauth \
client_id=airflow \
client_secret="$CLIENT_SECRET"
# Delete existing ExternalSecret to force recreation and refresh
kubectl delete externalsecret airflow-oauth-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
kubectl delete secret airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
gomplate -f airflow-oauth-external-secret.gomplate.yaml -o airflow-oauth-external-secret.yaml
kubectl apply -f airflow-oauth-external-secret.yaml
echo "Waiting for OAuth secret to be ready..."
kubectl wait --for=condition=Ready externalsecret/airflow-oauth-external-secret \
-n ${AIRFLOW_NAMESPACE} --timeout=60s
else
echo "External Secrets not available. Creating Kubernetes Secret directly..."
kubectl delete secret airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
kubectl create secret generic airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} \
--from-literal=client_id=airflow \
--from-literal=client_secret="$CLIENT_SECRET"
echo "OAuth secret created directly in Kubernetes"
fi
echo "OAuth client created successfully"
# Create Keycloak roles for Airflow
create-keycloak-roles:
#!/bin/bash
set -euo pipefail
echo "Creating Keycloak roles for Airflow..."
# Create client roles for Airflow
ROLES=("airflow_admin" "airflow_op" "airflow_user" "airflow_viewer")
for role in "${ROLES[@]}"; do
echo "Creating role: $role"
just keycloak::create-client-role ${KEYCLOAK_REALM} airflow "$role" || true
done
echo "Keycloak roles created successfully"
echo "Role mappings:"
echo " - airflow_admin -> Airflow Admin (full access)"
echo " - airflow_op -> Airflow Operator (can trigger DAGs)"
echo " - airflow_user -> Airflow User (read/write access)"
echo " - airflow_viewer -> Airflow Viewer (read-only access)"
# Assign Airflow role to user
assign-role username='' role='':
#!/bin/bash
set -euo pipefail
USERNAME="{{ username }}"
ROLE="{{ role }}"
# Interactive input if not provided
while [ -z "${USERNAME}" ]; do
USERNAME=$(gum input --prompt="Username: " --width=100)
done
if [ -z "${ROLE}" ]; then
ROLE=$(gum choose --header="Select Airflow role:" \
"airflow_admin" "airflow_op" "airflow_user" "airflow_viewer")
fi
# Validate role
VALID_ROLES=("airflow_admin" "airflow_op" "airflow_user" "airflow_viewer")
if [[ ! " ${VALID_ROLES[@]} " =~ " ${ROLE} " ]]; then
echo "Error: Invalid role '${ROLE}'. Valid roles: ${VALID_ROLES[*]}"
exit 1
fi
echo "Assigning role '${ROLE}' to user '${USERNAME}'..."
just keycloak::add-user-to-client-role ${KEYCLOAK_REALM} "${USERNAME}" airflow "${ROLE}"
# Display role permissions
case "${ROLE}" in
"airflow_admin")
echo "${USERNAME} now has Admin access (full system administration)"
;;
"airflow_op")
echo "${USERNAME} now has Operator access (can trigger and manage DAGs)"
;;
"airflow_user")
echo "${USERNAME} now has User access (read/write access to DAGs and tasks)"
;;
"airflow_viewer")
echo "${USERNAME} now has Viewer access (read-only access)"
;;
esac
# Remove Airflow role from user
remove-role username='' role='':
#!/bin/bash
set -euo pipefail
USERNAME="{{ username }}"
ROLE="{{ role }}"
# Interactive input if not provided
while [ -z "${USERNAME}" ]; do
USERNAME=$(gum input --prompt="Username: " --width=100)
done
if [ -z "${ROLE}" ]; then
ROLE=$(gum choose --header="Select Airflow role to remove:" \
"airflow_admin" "airflow_op" "airflow_user" "airflow_viewer")
fi
echo "Removing role '${ROLE}' from user '${USERNAME}'..."
just keycloak::remove-user-from-client-role ${KEYCLOAK_REALM} "${USERNAME}" airflow "${ROLE}" || true
echo "✅ Role '${ROLE}' removed from user '${USERNAME}'"
# Delete OAuth secret
delete-oauth-secret:
@kubectl delete secret airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
@kubectl delete externalsecret airflow-oauth-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
# Install Airflow
install:
#!/bin/bash
set -euo pipefail
export AIRFLOW_HOST=${AIRFLOW_HOST:-}
while [ -z "${AIRFLOW_HOST}" ]; do
AIRFLOW_HOST=$(
gum input --prompt="Airflow host (FQDN): " --width=100 \
--placeholder="e.g., airflow.example.com"
)
done
echo "Installing Airflow..."
just create-namespace
just setup-database
just create-oauth-client
just create-keycloak-roles
just add-helm-repo
# Create API server config ConfigMap
KEYCLOAK_HOST=${KEYCLOAK_HOST} KEYCLOAK_REALM=${KEYCLOAK_REALM} \
gomplate -f webserver_config.py.gomplate -o webserver_config.py
kubectl delete configmap airflow-api-server-config -n ${AIRFLOW_NAMESPACE} --ignore-not-found
kubectl create configmap airflow-api-server-config -n ${AIRFLOW_NAMESPACE} \
--from-file=webserver_config.py=webserver_config.py
AIRFLOW_WEBSERVER_SECRET_KEY=$(just utils::random-password) \
gomplate -f airflow-values.gomplate.yaml -o airflow-values.yaml
helm upgrade --install airflow apache-airflow/airflow \
--version ${AIRFLOW_CHART_VERSION} -n ${AIRFLOW_NAMESPACE} --wait \
-f airflow-values.yaml
echo "Airflow installation completed"
echo "Access Airflow at: https://${AIRFLOW_HOST}"
# Uninstall Airflow
uninstall delete-db='true':
#!/bin/bash
set -euo pipefail
echo "Uninstalling Airflow..."
helm uninstall airflow -n ${AIRFLOW_NAMESPACE} --ignore-not-found
# Force delete stuck resources
echo "Checking for stuck resources..."
# Delete stuck pods (especially Redis StatefulSet)
STUCK_PODS=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -o name 2>/dev/null || true)
if [ -n "$STUCK_PODS" ]; then
echo "Force deleting stuck pods..."
kubectl delete pods --all -n ${AIRFLOW_NAMESPACE} --force --grace-period=0 2>/dev/null || true
fi
# Delete PVCs
PVCS=$(kubectl get pvc -n ${AIRFLOW_NAMESPACE} -o name 2>/dev/null || true)
if [ -n "$PVCS" ]; then
echo "Deleting PersistentVolumeClaims..."
kubectl delete pvc --all -n ${AIRFLOW_NAMESPACE} --force --grace-period=0 2>/dev/null || true
fi
# Delete any remaining resources
kubectl delete all --all -n ${AIRFLOW_NAMESPACE} --force --grace-period=0 2>/dev/null || true
just delete-database-secret
just delete-oauth-secret
just delete-namespace
if [ "{{ delete-db }}" = "true" ]; then
just postgres::delete-db airflow
fi
# Clean up Keycloak client
just keycloak::delete-client ${KEYCLOAK_REALM} airflow || true
echo "Airflow uninstalled"
# Clean up database and secrets
cleanup:
#!/bin/bash
set -euo pipefail
echo "This will delete the Airflow database and all secrets."
if gum confirm "Are you sure you want to proceed?"; then
echo "Cleaning up Airflow resources..."
just postgres::delete-db airflow || true
just vault::delete airflow/database || true
just vault::delete airflow/oauth || true
just keycloak::delete-client ${KEYCLOAK_REALM} airflow || true
echo "Cleanup completed"
else
echo "Cleanup cancelled"
fi

View File

@@ -0,0 +1,52 @@
import os
import logging
from flask_appbuilder.security.manager import AUTH_OAUTH
from airflow.providers.fab.auth_manager.security_manager.override import FabAirflowSecurityManagerOverride
log = logging.getLogger(__name__)
AUTH_TYPE = AUTH_OAUTH
AUTH_USER_REGISTRATION = True
AUTH_ROLES_SYNC_AT_LOGIN = True
AUTH_USER_REGISTRATION_ROLE = "Viewer"
# Keycloak OIDC configuration
KEYCLOAK_HOST = "{{ .Env.KEYCLOAK_HOST }}"
KEYCLOAK_REALM = "{{ .Env.KEYCLOAK_REALM }}"
OIDC_ISSUER = f"https://{KEYCLOAK_HOST}/realms/{KEYCLOAK_REALM}"
# OAuth Providers configuration
OAUTH_PROVIDERS = [{
'name': 'keycloak',
'icon': 'fa-key',
'token_key': 'access_token',
'remote_app': {
'client_id': os.environ.get('AIRFLOW_OAUTH_CLIENT_ID', ''),
'client_secret': os.environ.get('AIRFLOW_OAUTH_CLIENT_SECRET', ''),
'server_metadata_url': f'{OIDC_ISSUER}/.well-known/openid-configuration',
'api_base_url': f'{OIDC_ISSUER}/protocol/openid-connect',
'access_token_url': f'{OIDC_ISSUER}/protocol/openid-connect/token',
'authorize_url': f'{OIDC_ISSUER}/protocol/openid-connect/auth',
'request_token_url': None,
'client_kwargs': {
'scope': 'openid profile email'
}
}
}]
# Role mappings from Keycloak to Airflow
AUTH_ROLES_MAPPING = {
"airflow_admin": ["Admin"],
"airflow_op": ["Op"],
"airflow_user": ["User"],
"airflow_viewer": ["Viewer"]
}
# Security Manager Override
class KeycloakSecurityManager(FabAirflowSecurityManagerOverride):
"""Custom Security Manager for Keycloak integration"""
def __init__(self, appbuilder):
super().__init__(appbuilder)
SECURITY_MANAGER_CLASS = KeycloakSecurityManager