From e265063857f9d86960475f23ce502b54a1ac8ae3 Mon Sep 17 00:00:00 2001 From: Masaki Yatsu Date: Wed, 10 Sep 2025 16:35:19 +0900 Subject: [PATCH] feat(airflow): add Airflow --- airflow/.gitignore | 4 + ...low-database-external-secret.gomplate.yaml | 27 ++ ...irflow-oauth-external-secret.gomplate.yaml | 27 ++ airflow/airflow-values.gomplate.yaml | 60 ++++ airflow/justfile | 323 ++++++++++++++++++ airflow/webserver_config.py.gomplate | 52 +++ justfile | 1 + 7 files changed, 494 insertions(+) create mode 100644 airflow/.gitignore create mode 100644 airflow/airflow-database-external-secret.gomplate.yaml create mode 100644 airflow/airflow-oauth-external-secret.gomplate.yaml create mode 100644 airflow/airflow-values.gomplate.yaml create mode 100644 airflow/justfile create mode 100644 airflow/webserver_config.py.gomplate diff --git a/airflow/.gitignore b/airflow/.gitignore new file mode 100644 index 0000000..4a5daf4 --- /dev/null +++ b/airflow/.gitignore @@ -0,0 +1,4 @@ +airflow-values.yaml +airflow-database-external-secret.yaml +airflow-oauth-external-secret.yaml +webserver_config.py diff --git a/airflow/airflow-database-external-secret.gomplate.yaml b/airflow/airflow-database-external-secret.gomplate.yaml new file mode 100644 index 0000000..8d17965 --- /dev/null +++ b/airflow/airflow-database-external-secret.gomplate.yaml @@ -0,0 +1,27 @@ +apiVersion: external-secrets.io/v1 +kind: ExternalSecret +metadata: + name: airflow-database-external-secret + namespace: {{ .Env.AIRFLOW_NAMESPACE }} +spec: + refreshInterval: 1h + secretStoreRef: + name: vault-secret-store + kind: ClusterSecretStore + target: + name: airflow-database-secret + creationPolicy: Owner + template: + type: Opaque + data: + username: "{{ `{{ .username }}` }}" + password: "{{ `{{ .password }}` }}" + data: + - secretKey: username + remoteRef: + key: airflow/database + property: username + - secretKey: password + remoteRef: + key: airflow/database + property: password \ No newline at end of file diff --git a/airflow/airflow-oauth-external-secret.gomplate.yaml b/airflow/airflow-oauth-external-secret.gomplate.yaml new file mode 100644 index 0000000..ceabb9c --- /dev/null +++ b/airflow/airflow-oauth-external-secret.gomplate.yaml @@ -0,0 +1,27 @@ +apiVersion: external-secrets.io/v1 +kind: ExternalSecret +metadata: + name: airflow-oauth-external-secret + namespace: {{ .Env.AIRFLOW_NAMESPACE }} +spec: + refreshInterval: 1h + secretStoreRef: + name: vault-secret-store + kind: ClusterSecretStore + target: + name: airflow-oauth-secret + creationPolicy: Owner + template: + type: Opaque + data: + client_id: "{{ `{{ .client_id }}` }}" + client_secret: "{{ `{{ .client_secret }}` }}" + data: + - secretKey: client_id + remoteRef: + key: airflow/oauth + property: client_id + - secretKey: client_secret + remoteRef: + key: airflow/oauth + property: client_secret \ No newline at end of file diff --git a/airflow/airflow-values.gomplate.yaml b/airflow/airflow-values.gomplate.yaml new file mode 100644 index 0000000..c11a1ea --- /dev/null +++ b/airflow/airflow-values.gomplate.yaml @@ -0,0 +1,60 @@ +useStandardNaming: true + +webserverSecretKey: {{ .Env.AIRFLOW_WEBSERVER_SECRET_KEY }} + +executor: CeleryExecutor + +apiServer: + replicas: 1 + apiServerConfigConfigMapName: airflow-api-server-config + env: + - name: AIRFLOW_OAUTH_CLIENT_ID + valueFrom: + secretKeyRef: + name: airflow-oauth-secret + key: client_id + - name: AIRFLOW_OAUTH_CLIENT_SECRET + valueFrom: + secretKeyRef: + name: airflow-oauth-secret + key: client_secret + - name: KEYCLOAK_HOST + value: "{{ .Env.KEYCLOAK_HOST }}" + - name: KEYCLOAK_REALM + value: "{{ .Env.KEYCLOAK_REALM }}" + +webserver: + enabled: true + replicas: 1 + +createUserJob: + useHelmHooks: false + applyCustomEnv: false + +migrateDatabaseJob: + useHelmHooks: false + applyCustomEnv: false + +images: + migrationsWaitTimeout: 180 + +flower: + enabled: false + +postgresql: + enabled: false + +data: + metadataSecretName: airflow-metadata-connection + +ingress: + apiServer: + enabled: true + annotations: + kubernetes.io/ingress.class: traefik + traefik.ingress.kubernetes.io/router.entrypoints: websecure + ingressClassName: traefik + hosts: + - name: {{ .Env.AIRFLOW_HOST }} + tls: + enabled: true diff --git a/airflow/justfile b/airflow/justfile new file mode 100644 index 0000000..607fcdd --- /dev/null +++ b/airflow/justfile @@ -0,0 +1,323 @@ +set fallback := true + +export AIRFLOW_NAMESPACE := env("AIRFLOW_NAMESPACE", "airflow") +export AIRFLOW_CHART_VERSION := env("AIRFLOW_CHART_VERSION", "1.18.0") +export EXTERNAL_SECRETS_NAMESPACE := env("EXTERNAL_SECRETS_NAMESPACE", "external-secrets") +export KEYCLOAK_REALM := env("KEYCLOAK_REALM", "buunstack") + +[private] +default: + @just --list --unsorted --list-submodules + +# Add Helm repository +add-helm-repo: + helm repo add apache-airflow https://airflow.apache.org + helm repo update + +# Remove Helm repository +remove-helm-repo: + helm repo remove apache-airflow + +# Create Airflow namespace +create-namespace: + @kubectl get namespace ${AIRFLOW_NAMESPACE} &>/dev/null || \ + kubectl create namespace ${AIRFLOW_NAMESPACE} + +# Delete Airflow namespace +delete-namespace: + #!/bin/bash + set -euo pipefail + # First try normal deletion + kubectl delete namespace ${AIRFLOW_NAMESPACE} --ignore-not-found --timeout=30s || true + + # If namespace still exists in Terminating state, force remove + if kubectl get namespace ${AIRFLOW_NAMESPACE} 2>/dev/null | grep -q Terminating; then + echo "Namespace stuck in Terminating, forcing deletion..." + # Remove finalizers + kubectl patch namespace ${AIRFLOW_NAMESPACE} -p '{"metadata":{"finalizers":[]}}' --type=merge || true + # Force delete the namespace + kubectl delete namespace ${AIRFLOW_NAMESPACE} --force --grace-period=0 || true + fi + +# Setup database for Airflow +setup-database: + #!/bin/bash + set -euo pipefail + echo "Setting up Airflow database..." + + if just postgres::db-exists airflow &>/dev/null; then + echo "Database 'airflow' already exists." + else + echo "Creating new database 'airflow'..." + just postgres::create-db airflow + fi + + # Generate password for user creation/update + if just postgres::user-exists airflow &>/dev/null; then + echo "User 'airflow' already exists." + # Check if we can get existing password from Vault/Secret + if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then + # Try to get existing password from Vault + if DB_PASSWORD=$(just vault::get airflow/database password 2>/dev/null); then + echo "Using existing password from Vault." + else + echo "Generating new password and updating Vault..." + DB_PASSWORD=$(just utils::random-password) + just postgres::change-password airflow "$DB_PASSWORD" + fi + else + # For direct Secret approach, generate new password + echo "Generating new password for existing user..." + DB_PASSWORD=$(just utils::random-password) + just postgres::change-password airflow "$DB_PASSWORD" + fi + else + echo "Creating new user 'airflow'..." + DB_PASSWORD=$(just utils::random-password) + just postgres::create-user airflow "$DB_PASSWORD" + fi + + echo "Ensuring database permissions..." + just postgres::grant airflow airflow + + # Create Airflow metadata connection secret (required by Helm chart) + CONNECTION_STRING="postgresql+psycopg2://airflow:${DB_PASSWORD}@postgres-cluster-rw.postgres:5432/airflow" + kubectl delete secret airflow-metadata-connection -n ${AIRFLOW_NAMESPACE} --ignore-not-found + kubectl create secret generic airflow-metadata-connection -n ${AIRFLOW_NAMESPACE} \ + --from-literal=connection="$CONNECTION_STRING" + + if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then + echo "External Secrets available. Storing credentials in Vault and creating ExternalSecret..." + just vault::put airflow/database username=airflow password="$DB_PASSWORD" + gomplate -f airflow-database-external-secret.gomplate.yaml -o airflow-database-external-secret.yaml + kubectl apply -f airflow-database-external-secret.yaml + echo "Waiting for database secret to be ready..." + kubectl wait --for=condition=Ready externalsecret/airflow-database-external-secret \ + -n ${AIRFLOW_NAMESPACE} --timeout=60s + fi + echo "Database setup completed." + +# Delete database secret +delete-database-secret: + @kubectl delete secret airflow-metadata-connection -n ${AIRFLOW_NAMESPACE} --ignore-not-found + @kubectl delete secret airflow-database-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found + @kubectl delete externalsecret airflow-database-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found + +# Create OAuth client in Keycloak for Airflow authentication +create-oauth-client: + #!/bin/bash + set -euo pipefail + if [ -z "${AIRFLOW_HOST:-}" ]; then + echo "Error: AIRFLOW_HOST environment variable is required" + exit 1 + fi + echo "Creating Airflow OAuth client in Keycloak..." + # Delete existing client to ensure fresh creation + echo "Removing existing client if present..." + just keycloak::delete-client ${KEYCLOAK_REALM} airflow || true + + CLIENT_SECRET=$(just utils::random-password) + just keycloak::create-client \ + ${KEYCLOAK_REALM} \ + airflow \ + "https://${AIRFLOW_HOST}/auth/oauth-authorized/keycloak" \ + "$CLIENT_SECRET" + + if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then + echo "External Secrets available. Storing credentials in Vault and creating ExternalSecret..." + just vault::put airflow/oauth \ + client_id=airflow \ + client_secret="$CLIENT_SECRET" + # Delete existing ExternalSecret to force recreation and refresh + kubectl delete externalsecret airflow-oauth-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found + kubectl delete secret airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found + gomplate -f airflow-oauth-external-secret.gomplate.yaml -o airflow-oauth-external-secret.yaml + kubectl apply -f airflow-oauth-external-secret.yaml + echo "Waiting for OAuth secret to be ready..." + kubectl wait --for=condition=Ready externalsecret/airflow-oauth-external-secret \ + -n ${AIRFLOW_NAMESPACE} --timeout=60s + else + echo "External Secrets not available. Creating Kubernetes Secret directly..." + kubectl delete secret airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found + kubectl create secret generic airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} \ + --from-literal=client_id=airflow \ + --from-literal=client_secret="$CLIENT_SECRET" + echo "OAuth secret created directly in Kubernetes" + fi + echo "OAuth client created successfully" + +# Create Keycloak roles for Airflow +create-keycloak-roles: + #!/bin/bash + set -euo pipefail + echo "Creating Keycloak roles for Airflow..." + + # Create client roles for Airflow + ROLES=("airflow_admin" "airflow_op" "airflow_user" "airflow_viewer") + + for role in "${ROLES[@]}"; do + echo "Creating role: $role" + just keycloak::create-client-role ${KEYCLOAK_REALM} airflow "$role" || true + done + + echo "Keycloak roles created successfully" + echo "Role mappings:" + echo " - airflow_admin -> Airflow Admin (full access)" + echo " - airflow_op -> Airflow Operator (can trigger DAGs)" + echo " - airflow_user -> Airflow User (read/write access)" + echo " - airflow_viewer -> Airflow Viewer (read-only access)" + +# Assign Airflow role to user +assign-role username='' role='': + #!/bin/bash + set -euo pipefail + USERNAME="{{ username }}" + ROLE="{{ role }}" + + # Interactive input if not provided + while [ -z "${USERNAME}" ]; do + USERNAME=$(gum input --prompt="Username: " --width=100) + done + + if [ -z "${ROLE}" ]; then + ROLE=$(gum choose --header="Select Airflow role:" \ + "airflow_admin" "airflow_op" "airflow_user" "airflow_viewer") + fi + + # Validate role + VALID_ROLES=("airflow_admin" "airflow_op" "airflow_user" "airflow_viewer") + if [[ ! " ${VALID_ROLES[@]} " =~ " ${ROLE} " ]]; then + echo "Error: Invalid role '${ROLE}'. Valid roles: ${VALID_ROLES[*]}" + exit 1 + fi + + echo "Assigning role '${ROLE}' to user '${USERNAME}'..." + just keycloak::add-user-to-client-role ${KEYCLOAK_REALM} "${USERNAME}" airflow "${ROLE}" + + # Display role permissions + case "${ROLE}" in + "airflow_admin") + echo "✅ ${USERNAME} now has Admin access (full system administration)" + ;; + "airflow_op") + echo "✅ ${USERNAME} now has Operator access (can trigger and manage DAGs)" + ;; + "airflow_user") + echo "✅ ${USERNAME} now has User access (read/write access to DAGs and tasks)" + ;; + "airflow_viewer") + echo "✅ ${USERNAME} now has Viewer access (read-only access)" + ;; + esac + +# Remove Airflow role from user +remove-role username='' role='': + #!/bin/bash + set -euo pipefail + USERNAME="{{ username }}" + ROLE="{{ role }}" + + # Interactive input if not provided + while [ -z "${USERNAME}" ]; do + USERNAME=$(gum input --prompt="Username: " --width=100) + done + + if [ -z "${ROLE}" ]; then + ROLE=$(gum choose --header="Select Airflow role to remove:" \ + "airflow_admin" "airflow_op" "airflow_user" "airflow_viewer") + fi + + echo "Removing role '${ROLE}' from user '${USERNAME}'..." + just keycloak::remove-user-from-client-role ${KEYCLOAK_REALM} "${USERNAME}" airflow "${ROLE}" || true + echo "✅ Role '${ROLE}' removed from user '${USERNAME}'" + +# Delete OAuth secret +delete-oauth-secret: + @kubectl delete secret airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found + @kubectl delete externalsecret airflow-oauth-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found + +# Install Airflow +install: + #!/bin/bash + set -euo pipefail + export AIRFLOW_HOST=${AIRFLOW_HOST:-} + while [ -z "${AIRFLOW_HOST}" ]; do + AIRFLOW_HOST=$( + gum input --prompt="Airflow host (FQDN): " --width=100 \ + --placeholder="e.g., airflow.example.com" + ) + done + echo "Installing Airflow..." + just create-namespace + just setup-database + just create-oauth-client + just create-keycloak-roles + just add-helm-repo + + # Create API server config ConfigMap + KEYCLOAK_HOST=${KEYCLOAK_HOST} KEYCLOAK_REALM=${KEYCLOAK_REALM} \ + gomplate -f webserver_config.py.gomplate -o webserver_config.py + kubectl delete configmap airflow-api-server-config -n ${AIRFLOW_NAMESPACE} --ignore-not-found + kubectl create configmap airflow-api-server-config -n ${AIRFLOW_NAMESPACE} \ + --from-file=webserver_config.py=webserver_config.py + + AIRFLOW_WEBSERVER_SECRET_KEY=$(just utils::random-password) \ + gomplate -f airflow-values.gomplate.yaml -o airflow-values.yaml + helm upgrade --install airflow apache-airflow/airflow \ + --version ${AIRFLOW_CHART_VERSION} -n ${AIRFLOW_NAMESPACE} --wait \ + -f airflow-values.yaml + echo "Airflow installation completed" + echo "Access Airflow at: https://${AIRFLOW_HOST}" + +# Uninstall Airflow +uninstall delete-db='true': + #!/bin/bash + set -euo pipefail + echo "Uninstalling Airflow..." + helm uninstall airflow -n ${AIRFLOW_NAMESPACE} --ignore-not-found + + # Force delete stuck resources + echo "Checking for stuck resources..." + + # Delete stuck pods (especially Redis StatefulSet) + STUCK_PODS=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -o name 2>/dev/null || true) + if [ -n "$STUCK_PODS" ]; then + echo "Force deleting stuck pods..." + kubectl delete pods --all -n ${AIRFLOW_NAMESPACE} --force --grace-period=0 2>/dev/null || true + fi + + # Delete PVCs + PVCS=$(kubectl get pvc -n ${AIRFLOW_NAMESPACE} -o name 2>/dev/null || true) + if [ -n "$PVCS" ]; then + echo "Deleting PersistentVolumeClaims..." + kubectl delete pvc --all -n ${AIRFLOW_NAMESPACE} --force --grace-period=0 2>/dev/null || true + fi + + # Delete any remaining resources + kubectl delete all --all -n ${AIRFLOW_NAMESPACE} --force --grace-period=0 2>/dev/null || true + + just delete-database-secret + just delete-oauth-secret + just delete-namespace + if [ "{{ delete-db }}" = "true" ]; then + just postgres::delete-db airflow + fi + # Clean up Keycloak client + just keycloak::delete-client ${KEYCLOAK_REALM} airflow || true + echo "Airflow uninstalled" + +# Clean up database and secrets +cleanup: + #!/bin/bash + set -euo pipefail + echo "This will delete the Airflow database and all secrets." + if gum confirm "Are you sure you want to proceed?"; then + echo "Cleaning up Airflow resources..." + just postgres::delete-db airflow || true + just vault::delete airflow/database || true + just vault::delete airflow/oauth || true + just keycloak::delete-client ${KEYCLOAK_REALM} airflow || true + echo "Cleanup completed" + else + echo "Cleanup cancelled" + fi diff --git a/airflow/webserver_config.py.gomplate b/airflow/webserver_config.py.gomplate new file mode 100644 index 0000000..1ae8a54 --- /dev/null +++ b/airflow/webserver_config.py.gomplate @@ -0,0 +1,52 @@ +import os +import logging +from flask_appbuilder.security.manager import AUTH_OAUTH +from airflow.providers.fab.auth_manager.security_manager.override import FabAirflowSecurityManagerOverride + +log = logging.getLogger(__name__) + +AUTH_TYPE = AUTH_OAUTH +AUTH_USER_REGISTRATION = True +AUTH_ROLES_SYNC_AT_LOGIN = True +AUTH_USER_REGISTRATION_ROLE = "Viewer" + +# Keycloak OIDC configuration +KEYCLOAK_HOST = "{{ .Env.KEYCLOAK_HOST }}" +KEYCLOAK_REALM = "{{ .Env.KEYCLOAK_REALM }}" +OIDC_ISSUER = f"https://{KEYCLOAK_HOST}/realms/{KEYCLOAK_REALM}" + +# OAuth Providers configuration +OAUTH_PROVIDERS = [{ + 'name': 'keycloak', + 'icon': 'fa-key', + 'token_key': 'access_token', + 'remote_app': { + 'client_id': os.environ.get('AIRFLOW_OAUTH_CLIENT_ID', ''), + 'client_secret': os.environ.get('AIRFLOW_OAUTH_CLIENT_SECRET', ''), + 'server_metadata_url': f'{OIDC_ISSUER}/.well-known/openid-configuration', + 'api_base_url': f'{OIDC_ISSUER}/protocol/openid-connect', + 'access_token_url': f'{OIDC_ISSUER}/protocol/openid-connect/token', + 'authorize_url': f'{OIDC_ISSUER}/protocol/openid-connect/auth', + 'request_token_url': None, + 'client_kwargs': { + 'scope': 'openid profile email' + } + } +}] + +# Role mappings from Keycloak to Airflow +AUTH_ROLES_MAPPING = { + "airflow_admin": ["Admin"], + "airflow_op": ["Op"], + "airflow_user": ["User"], + "airflow_viewer": ["Viewer"] +} + +# Security Manager Override +class KeycloakSecurityManager(FabAirflowSecurityManagerOverride): + """Custom Security Manager for Keycloak integration""" + + def __init__(self, appbuilder): + super().__init__(appbuilder) + +SECURITY_MANAGER_CLASS = KeycloakSecurityManager diff --git a/justfile b/justfile index a0ae84c..d836a24 100644 --- a/justfile +++ b/justfile @@ -6,6 +6,7 @@ export PATH := "./node_modules/.bin:" + env_var('PATH') default: @just --list --unsorted --list-submodules +mod airflow mod clickhouse mod datahub mod env