set fallback := true export AIRFLOW_NAMESPACE := env("AIRFLOW_NAMESPACE", "datastack") export AIRFLOW_CHART_VERSION := env("AIRFLOW_CHART_VERSION", "1.18.0") export EXTERNAL_SECRETS_NAMESPACE := env("EXTERNAL_SECRETS_NAMESPACE", "external-secrets") export KEYCLOAK_REALM := env("KEYCLOAK_REALM", "buunstack") export AIRFLOW_DAGS_PERSISTENCE_ENABLED := env("AIRFLOW_DAGS_PERSISTENCE_ENABLED", "") export AIRFLOW_DAGS_STORAGE_TYPE := env("AIRFLOW_DAGS_STORAGE_TYPE", "") export AIRFLOW_NFS_IP := env("AIRFLOW_NFS_IP", "") export AIRFLOW_NFS_PATH := env("AIRFLOW_NFS_PATH", "") export AIRFLOW_DAGS_STORAGE_SIZE := env("AIRFLOW_DAGS_STORAGE_SIZE", "10Gi") export AIRFLOW_EXTRA_PACKAGES := env("AIRFLOW_EXTRA_PACKAGES", "'PyJWT>=2.10' cryptography 'requests>=2.32' 'dlt[duckdb,filesystem,postgres,s3]' pyarrow pyiceberg s3fs simple-salesforce") export MONITORING_ENABLED := env("MONITORING_ENABLED", "") export PROMETHEUS_NAMESPACE := env("PROMETHEUS_NAMESPACE", "monitoring") # ↑ PyJWT, cryptography, and requests are needed for Keycloak OAuth [private] default: @just --list --unsorted --list-submodules # Add Helm repository add-helm-repo: helm repo add apache-airflow https://airflow.apache.org helm repo update # Remove Helm repository remove-helm-repo: helm repo remove apache-airflow # Create namespace create-namespace: @kubectl get namespace ${AIRFLOW_NAMESPACE} &>/dev/null || \ kubectl create namespace ${AIRFLOW_NAMESPACE} # Setup database for Airflow setup-database: #!/bin/bash set -euo pipefail echo "Setting up Airflow database..." if just postgres::db-exists airflow &>/dev/null; then echo "Database 'airflow' already exists." else echo "Creating new database 'airflow'..." just postgres::create-db airflow fi # Generate password for user creation/update if just postgres::user-exists airflow &>/dev/null; then echo "User 'airflow' already exists." # Check if we can get existing password from Vault/Secret if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then # Try to get existing password from Vault if DB_PASSWORD=$(just vault::get airflow/database password 2>/dev/null); then echo "Using existing password from Vault." else echo "Generating new password and updating Vault..." DB_PASSWORD=$(just utils::random-password) just postgres::change-password airflow "$DB_PASSWORD" fi else # For direct Secret approach, generate new password echo "Generating new password for existing user..." DB_PASSWORD=$(just utils::random-password) just postgres::change-password airflow "$DB_PASSWORD" fi else echo "Creating new user 'airflow'..." DB_PASSWORD=$(just utils::random-password) just postgres::create-user airflow "$DB_PASSWORD" fi echo "Ensuring database permissions..." just postgres::grant airflow airflow # Create Airflow metadata connection secret (required by Helm chart) CONNECTION_STRING="postgresql+psycopg2://airflow:${DB_PASSWORD}@postgres-cluster-rw.postgres:5432/airflow" kubectl delete secret airflow-metadata-connection -n ${AIRFLOW_NAMESPACE} --ignore-not-found kubectl create secret generic airflow-metadata-connection -n ${AIRFLOW_NAMESPACE} \ --from-literal=connection="$CONNECTION_STRING" if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then echo "External Secrets available. Storing credentials in Vault and creating ExternalSecret..." just vault::put airflow/database username=airflow password="$DB_PASSWORD" gomplate -f airflow-database-external-secret.gomplate.yaml -o airflow-database-external-secret.yaml kubectl apply -f airflow-database-external-secret.yaml echo "Waiting for database secret to be ready..." kubectl wait --for=condition=Ready externalsecret/airflow-database-external-secret \ -n ${AIRFLOW_NAMESPACE} --timeout=60s fi echo "Database setup completed." # Delete database secret delete-database-secret: @kubectl delete secret airflow-metadata-connection -n ${AIRFLOW_NAMESPACE} --ignore-not-found @kubectl delete secret airflow-database-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found @kubectl delete externalsecret airflow-database-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found # Create environment variables secret example (customize as needed) create-env-secrets-example: #!/bin/bash set -euo pipefail echo "Creating Airflow environment secrets example..." echo "This is an example - customize the environment variables as needed" if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then echo "External Secrets available. Creating ExternalSecret using template..." echo "Edit airflow-env-external-secret.gomplate.yaml to customize environment variables" kubectl delete externalsecret airflow-env-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found kubectl delete secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found gomplate -f airflow-env-external-secret.gomplate.yaml -o airflow-env-external-secret.yaml kubectl apply -f airflow-env-external-secret.yaml echo "Waiting for environment secret to be ready..." kubectl wait --for=condition=Ready externalsecret/airflow-env-external-secret \ -n ${AIRFLOW_NAMESPACE} --timeout=60s else echo "External Secrets not available. Creating Kubernetes Secret directly..." # Example credentials - customize as needed MINIO_ACCESS_KEY="minio" MINIO_SECRET_KEY="minio123" kubectl delete secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found kubectl create secret generic airflow-env-secret -n ${AIRFLOW_NAMESPACE} \ --from-literal=POSTGRES_USER="$POSTGRES_USER" \ --from-literal=POSTGRES_PASSWORD="$POSTGRES_PASSWORD" # Add more environment variables here: # --from-literal=AWS_ACCESS_KEY_ID="your_value" \ # --from-literal=AWS_SECRET_ACCESS_KEY="your_value" echo "Environment secret created directly in Kubernetes" fi echo "Example environment secrets created successfully" echo "Customize the environment variables in this recipe as needed for your project" # Delete environment secrets delete-env-secrets: @kubectl delete secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found @kubectl delete externalsecret airflow-env-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found # Create OAuth client in Keycloak for Airflow authentication create-oauth-client: #!/bin/bash set -euo pipefail if [ -z "${AIRFLOW_HOST:-}" ]; then echo "Error: AIRFLOW_HOST environment variable is required" exit 1 fi echo "Creating Airflow OAuth client in Keycloak..." # Check if client already exists if just keycloak::client-exists ${KEYCLOAK_REALM} airflow &>/dev/null; then echo "Client 'airflow' already exists, skipping creation..." echo "Existing client will preserve roles and mappers" # Get existing client secret for Vault/Secret synchronization CLIENT_SECRET=$(just keycloak::get-client-secret ${KEYCLOAK_REALM} airflow | grep "Client 'airflow' secret:" | cut -d' ' -f4) echo "Retrieved existing client secret for synchronization" else echo "Creating new client..." CLIENT_SECRET=$(just utils::random-password) just keycloak::create-client \ realm=${KEYCLOAK_REALM} \ client_id=airflow \ redirect_url="https://${AIRFLOW_HOST}/auth/oauth-authorized/keycloak" \ client_secret="$CLIENT_SECRET" fi if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then echo "External Secrets available. Storing credentials in Vault and creating ExternalSecret..." just vault::put airflow/oauth \ client_id=airflow \ client_secret="$CLIENT_SECRET" # Delete existing ExternalSecret to force recreation and refresh kubectl delete externalsecret airflow-oauth-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found kubectl delete secret airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found gomplate -f airflow-oauth-external-secret.gomplate.yaml -o airflow-oauth-external-secret.yaml kubectl apply -f airflow-oauth-external-secret.yaml echo "Waiting for OAuth secret to be ready..." kubectl wait --for=condition=Ready externalsecret/airflow-oauth-external-secret \ -n ${AIRFLOW_NAMESPACE} --timeout=60s else echo "External Secrets not available. Creating Kubernetes Secret directly..." kubectl delete secret airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found kubectl create secret generic airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} \ --from-literal=client_id=airflow \ --from-literal=client_secret="$CLIENT_SECRET" echo "OAuth secret created directly in Kubernetes" fi echo "OAuth client created successfully" # Create Keycloak roles for Airflow create-keycloak-roles: #!/bin/bash set -euo pipefail echo "Creating Keycloak roles for Airflow..." # Create client roles for Airflow ROLES=("airflow_admin" "airflow_op" "airflow_user" "airflow_viewer") for role in "${ROLES[@]}"; do echo "Creating role: $role" just keycloak::create-client-role ${KEYCLOAK_REALM} airflow "$role" || true done # Create client roles mapper to include roles in JWT tokens echo "Creating client roles mapper..." just keycloak::add-client-roles-mapper airflow "airflow_roles" "Airflow Roles Mapper" # Add client roles mapper to profile scope for userinfo endpoint echo "Adding client roles mapper to profile scope..." just keycloak::add-client-roles-to-profile-scope ${KEYCLOAK_REALM} airflow "airflow_roles" echo "Keycloak roles created successfully" echo "Role mappings:" echo " - airflow_admin -> Airflow Admin (full access)" echo " - airflow_op -> Airflow Operator (can trigger DAGs)" echo " - airflow_user -> Airflow User (read/write access)" echo " - airflow_viewer -> Airflow Viewer (read-only access)" # Assign Airflow role to user assign-role username='' role='': #!/bin/bash set -euo pipefail USERNAME="{{ username }}" ROLE="{{ role }}" # Interactive input if not provided while [ -z "${USERNAME}" ]; do USERNAME=$(gum input --prompt="Username: " --width=100) done if [ -z "${ROLE}" ]; then ROLE=$(gum choose --header="Select Airflow role:" \ "airflow_admin" "airflow_op" "airflow_user" "airflow_viewer") fi # Validate role VALID_ROLES=("airflow_admin" "airflow_op" "airflow_user" "airflow_viewer") if [[ ! " ${VALID_ROLES[@]} " =~ " ${ROLE} " ]]; then echo "Error: Invalid role '${ROLE}'. Valid roles: ${VALID_ROLES[*]}" exit 1 fi echo "Assigning role '${ROLE}' to user '${USERNAME}'..." just keycloak::add-user-to-client-role ${KEYCLOAK_REALM} "${USERNAME}" airflow "${ROLE}" # Display role permissions case "${ROLE}" in "airflow_admin") echo "✅ ${USERNAME} now has Admin access (full system administration)" ;; "airflow_op") echo "✅ ${USERNAME} now has Operator access (can trigger and manage DAGs)" ;; "airflow_user") echo "✅ ${USERNAME} now has User access (read/write access to DAGs and tasks)" ;; "airflow_viewer") echo "✅ ${USERNAME} now has Viewer access (read-only access)" ;; esac # List user's Airflow roles list-user-roles username='': #!/bin/bash set -euo pipefail USERNAME="{{ username }}" # Interactive input if not provided while [ -z "${USERNAME}" ]; do USERNAME=$(gum input --prompt="Username: " --width=100) done echo "Checking Airflow roles for user '${USERNAME}'..." just keycloak::list-user-client-roles ${KEYCLOAK_REALM} "${USERNAME}" airflow # Remove Airflow role from user remove-role username='' role='': #!/bin/bash set -euo pipefail USERNAME="{{ username }}" ROLE="{{ role }}" # Interactive input if not provided while [ -z "${USERNAME}" ]; do USERNAME=$(gum input --prompt="Username: " --width=100) done if [ -z "${ROLE}" ]; then ROLE=$(gum choose --header="Select Airflow role to remove:" \ "airflow_admin" "airflow_op" "airflow_user" "airflow_viewer") fi echo "Removing role '${ROLE}' from user '${USERNAME}'..." just keycloak::remove-user-from-client-role ${KEYCLOAK_REALM} "${USERNAME}" airflow "${ROLE}" || true echo "✅ Role '${ROLE}' removed from user '${USERNAME}'" # Delete OAuth secret delete-oauth-secret: @kubectl delete secret airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found @kubectl delete externalsecret airflow-oauth-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found # Install Airflow install: #!/bin/bash set -euo pipefail export AIRFLOW_HOST=${AIRFLOW_HOST:-} while [ -z "${AIRFLOW_HOST}" ]; do AIRFLOW_HOST=$( gum input --prompt="Airflow host (FQDN): " --width=100 \ --placeholder="e.g., airflow.example.com" ) done if [ -z "${AIRFLOW_DAGS_PERSISTENCE_ENABLED}" ]; then if gum confirm "Enable DAG persistence with PVC?"; then AIRFLOW_DAGS_PERSISTENCE_ENABLED="true" else AIRFLOW_DAGS_PERSISTENCE_ENABLED="false" fi fi # Force default storage type (NFS disabled due to permission issues) if [ "${AIRFLOW_DAGS_PERSISTENCE_ENABLED}" = "true" ]; then AIRFLOW_DAGS_STORAGE_TYPE="default" fi echo "Installing Airflow..." just create-namespace kubectl label namespace ${AIRFLOW_NAMESPACE} \ pod-security.kubernetes.io/enforce=restricted --overwrite just setup-database just create-oauth-client just create-keycloak-roles just add-helm-repo if [ "${AIRFLOW_DAGS_PERSISTENCE_ENABLED}" = "true" ]; then just setup-dags-storage "default" fi KEYCLOAK_HOST=${KEYCLOAK_HOST} KEYCLOAK_REALM=${KEYCLOAK_REALM} \ gomplate -f webserver_config.gomplate.py -o webserver_config.py kubectl delete configmap airflow-api-server-config -n ${AIRFLOW_NAMESPACE} --ignore-not-found kubectl create configmap airflow-api-server-config -n ${AIRFLOW_NAMESPACE} \ --from-file=webserver_config.py=webserver_config.py export AIRFLOW_ENV_SECRETS_EXIST="false" if kubectl get secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} &>/dev/null; then echo "Environment secrets found - will include in deployment" export AIRFLOW_ENV_SECRETS_EXIST="true" else echo "No environment secrets found - use 'just airflow::create-env-secrets-example' to create them if needed" export AIRFLOW_ENV_SECRETS_EXIST="false" fi # Check if Prometheus monitoring should be enabled if helm status kube-prometheus-stack -n ${PROMETHEUS_NAMESPACE} &>/dev/null; then if [ -z "${MONITORING_ENABLED}" ]; then if gum confirm "Enable Prometheus monitoring?"; then MONITORING_ENABLED="true" else MONITORING_ENABLED="false" fi fi fi # Enable monitoring label on namespace if monitoring is enabled if [ "${MONITORING_ENABLED}" = "true" ]; then kubectl label namespace ${AIRFLOW_NAMESPACE} buun.channel/enable-monitoring=true --overwrite fi AIRFLOW_WEBSERVER_SECRET_KEY=$(just utils::random-password) \ gomplate -f airflow-values.gomplate.yaml -o airflow-values.yaml helm upgrade --install airflow apache-airflow/airflow \ --version ${AIRFLOW_CHART_VERSION} -n ${AIRFLOW_NAMESPACE} --wait \ -f airflow-values.yaml echo "Synchronizing Airflow permissions..." kubectl exec deployment/airflow-scheduler -n ${AIRFLOW_NAMESPACE} -- airflow sync-perm echo "Airflow installation completed" echo "Access Airflow at: https://${AIRFLOW_HOST}" # Uninstall Airflow uninstall delete-db='true': #!/bin/bash set -euo pipefail echo "Uninstalling Airflow..." helm uninstall airflow -n ${AIRFLOW_NAMESPACE} --ignore-not-found # Force delete stuck resources echo "Checking for stuck resources..." # Delete stuck pods (especially Redis StatefulSet) - only Airflow pods STUCK_PODS=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} \ -l 'release=airflow' -o name 2>/dev/null || true) if [ -n "$STUCK_PODS" ]; then echo "Force deleting stuck Airflow pods..." kubectl delete pods -n ${AIRFLOW_NAMESPACE} -l 'release=airflow' \ --force --grace-period=0 2>/dev/null || true fi # Delete Airflow-specific PVCs only AIRFLOW_PVCS=$(kubectl get pvc -n ${AIRFLOW_NAMESPACE} \ -l 'release=airflow' -o name 2>/dev/null || true) if [ -n "$AIRFLOW_PVCS" ]; then echo "Deleting Airflow PersistentVolumeClaims..." kubectl delete pvc -n ${AIRFLOW_NAMESPACE} -l 'release=airflow' \ --force --grace-period=0 2>/dev/null || true fi # Delete DAG storage PVC if it exists kubectl delete pvc airflow-dags-pvc -n ${AIRFLOW_NAMESPACE} --ignore-not-found # Delete Airflow-specific resources (with label selector to avoid deleting JupyterHub) echo "Deleting Airflow-specific resources..." kubectl delete all -n ${AIRFLOW_NAMESPACE} -l 'release=airflow' \ --force --grace-period=0 2>/dev/null || true just delete-database-secret just delete-oauth-secret if [ "{{ delete-db }}" = "true" ]; then just postgres::delete-db airflow fi # Clean up Keycloak client just keycloak::delete-client ${KEYCLOAK_REALM} airflow || true # Create API user for JupyterHub integration create-api-user username='' role='': #!/bin/bash set -euo pipefail USERNAME="{{ username }}" ROLE="{{ role }}" while [ -z "${USERNAME}" ]; do USERNAME=$(gum input --prompt="API Username: " --width=100 --placeholder="e.g., john.doe") done # Interactive role selection if not provided if [ -z "${ROLE}" ]; then echo "" echo "Airflow Roles:" echo " Admin - Full administrative access (all permissions)" echo " Op - Operator permissions (can trigger DAGs, manage runs)" echo " User - Standard user permissions (recommended for most users)" echo " Viewer - Read-only access (can view DAGs and runs)" echo " Public - Minimal public permissions" echo "" ROLE=$(gum choose --header="Select user role:" \ "User" "Admin" "Op" "Viewer" "Public") fi echo "Creating Airflow API user: ${USERNAME} (role: ${ROLE})" API_PASSWORD=$(just utils::random-password) kubectl exec deployment/airflow-api-server -n ${AIRFLOW_NAMESPACE} -- \ airflow users create \ --username "${USERNAME}" \ --firstname "API" \ --lastname "User" \ --role "${ROLE}" \ --email "${USERNAME}@api.local" \ --password "${API_PASSWORD}" if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then echo "External Secrets available. Storing credentials in Vault..." just vault::put "jupyter/users/${USERNAME}/airflow-api" \ username="${USERNAME}" \ password="${API_PASSWORD}" echo "API credentials stored in Vault" echo " Path: jupyter/users/${USERNAME}/airflow-api" echo " Ready for JupyterHub notebook access with SecretStore" else echo "External Secrets not available. Creating Kubernetes Secret directly..." kubectl delete secret "airflow-user-${USERNAME}" -n jupyterhub --ignore-not-found kubectl create secret generic "airflow-user-${USERNAME}" -n jupyterhub \ --from-literal=username="${USERNAME}" \ --from-literal=password="${API_PASSWORD}" echo "API credentials stored in Kubernetes Secret" echo " Secret: airflow-user-${USERNAME} (namespace: jupyterhub)" fi echo "✅ API user created successfully: ${USERNAME}" echo " User has '${ROLE}' role permissions" # List API users list-api-users: #!/bin/bash set -euo pipefail echo "Airflow API Users:" kubectl exec deployment/airflow-api-server -n ${AIRFLOW_NAMESPACE} -- \ airflow users list --output table # Delete API user delete-api-user username='': #!/bin/bash set -euo pipefail USERNAME="{{ username }}" while [ -z "${USERNAME}" ]; do USERNAME=$(gum input --prompt="Username to delete: " --width=100) done if gum confirm "Delete API user '${USERNAME}' and all associated credentials?"; then echo "Deleting Airflow API user: ${USERNAME}" kubectl exec deployment/airflow-api-server -n ${AIRFLOW_NAMESPACE} -- \ airflow users delete --username "${USERNAME}" || true if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then just vault::delete "jupyter/users/${USERNAME}/airflow-api" || true echo "✅ Vault credentials cleaned up" fi kubectl delete secret "airflow-user-${USERNAME}" -n jupyterhub --ignore-not-found echo "✅ Kubernetes secret cleaned up" echo " API user deleted: ${USERNAME}" else echo "Deletion cancelled" fi # Setup DAG storage (PVC) setup-dags-storage storage-type='': #!/bin/bash set -euo pipefail echo "Setting up DAG storage (default)..." echo "Creating PersistentVolumeClaim with default StorageClass..." kubectl apply -n ${AIRFLOW_NAMESPACE} -f dags-pvc.yaml echo "✅ Default storage configured" echo " PVC: airflow-dags-pvc" echo " Uses cluster default StorageClass (k3s local-path, etc.)" echo "" echo "DAG storage is ready for use" echo "Mount path in pods: /opt/airflow/dags" echo "" if [ "${AIRFLOW_NAMESPACE}" = "jupyter" ]; then echo "📝 JupyterHub Integration:" echo " Since Airflow is in the 'jupyter' namespace, JupyterHub can mount this PVC" echo " Enable 'Airflow DAG storage mounting' when installing JupyterHub" echo " DAGs will be available at: /opt/airflow-dags in notebooks" fi # Delete DAG storage delete-dags-storage: #!/bin/bash set -euo pipefail echo "Deleting DAG storage resources..." kubectl delete pvc airflow-dags-pvc -n ${AIRFLOW_NAMESPACE} --ignore-not-found echo "✅ DAG storage deleted" # View DAG import error logs logs-dag-errors dag_file='': #!/bin/bash set -euo pipefail DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}') if [ -z "${DAG_PROCESSOR_POD}" ]; then echo "❌ DAG processor pod not found" exit 1 fi LOG_DATE=$(date +%Y-%m-%d) LOG_DIR="/opt/airflow/logs/dag_processor/${LOG_DATE}/dags-folder" if [ -n "{{ dag_file }}" ]; then # Show specific DAG file errors LOG_FILE="${LOG_DIR}/{{ dag_file }}.log" echo "📋 Showing errors for DAG file: {{ dag_file }}" echo "📂 Log file: ${LOG_FILE}" echo "" kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \ cat "${LOG_FILE}" 2>/dev/null | jq -r 'select(.level == "error") | .timestamp + " " + .event + ": " + .error_detail[0].exc_value' || \ echo "❌ No error log found for {{ dag_file }} or file doesn't exist" else # List all DAG files with errors echo "📋 Available DAG error logs:" echo "📂 Log directory: ${LOG_DIR}" echo "" kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \ ls -la "${LOG_DIR}" 2>/dev/null || echo "❌ No DAG logs found for today" echo "" echo "💡 Usage: just airflow::logs-dag-errors " echo " Example: just airflow::logs-dag-errors csv_to_postgres_dag.py" fi # View DAG processor real-time logs logs-dag-processor: #!/bin/bash set -euo pipefail DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}') if [ -z "${DAG_PROCESSOR_POD}" ]; then echo "❌ DAG processor pod not found" exit 1 fi echo "📋 Following DAG processor logs..." echo "🔍 Pod: ${DAG_PROCESSOR_POD}" echo "" kubectl logs -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -f # List all DAG import errors logs-import-errors: #!/bin/bash set -euo pipefail DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}') if [ -z "${DAG_PROCESSOR_POD}" ]; then echo "❌ DAG processor pod not found" exit 1 fi echo "📋 Checking DAG import errors..." echo "" kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \ airflow dags list-import-errors || echo "❌ Failed to list import errors" # View DAG files in directory logs-dag-files: #!/bin/bash set -euo pipefail DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}') if [ -z "${DAG_PROCESSOR_POD}" ]; then echo "❌ DAG processor pod not found" exit 1 fi echo "📋 DAG files in /opt/airflow/dags/:" echo "" kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \ ls -la /opt/airflow/dags/ # Test DAG file import manually logs-test-import dag_file: #!/bin/bash set -euo pipefail DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}') if [ -z "${DAG_PROCESSOR_POD}" ]; then echo "❌ DAG processor pod not found" exit 1 fi echo "🧪 Testing import of DAG file: {{ dag_file }}" echo "" kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \ python /opt/airflow/dags/{{ dag_file }} # Delete user from Airflow database (to force role resync) delete-user username='': #!/bin/bash set -euo pipefail USERNAME="{{ username }}" # Interactive input if not provided while [ -z "${USERNAME}" ]; do USERNAME=$(gum input --prompt="Username to delete from Airflow: " --width=100) done echo "Deleting user '${USERNAME}' from Airflow database..." if gum confirm "This will delete the user from Airflow database. The user will be recreated with current Keycloak roles on next login. Continue?"; then # Get scheduler pod (which has airflow CLI access) SCHEDULER_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=scheduler -o jsonpath='{.items[0].metadata.name}') # Delete user using airflow CLI kubectl exec -n ${AIRFLOW_NAMESPACE} ${SCHEDULER_POD} -- \ airflow users delete --username "${USERNAME}" || echo "User '${USERNAME}' not found in Airflow database" echo "✅ User '${USERNAME}' deleted from Airflow. They will be recreated with current Keycloak roles on next login." else echo "User deletion cancelled" fi # Force role sync for all users (delete all OAuth users) reset-oauth-users: #!/bin/bash set -euo pipefail echo "This will delete ALL OAuth users from Airflow database." echo "Users will be recreated with current Keycloak roles on next login." if gum confirm "Are you sure you want to proceed?"; then # Get scheduler pod (which has airflow CLI access) SCHEDULER_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=scheduler -o jsonpath='{.items[0].metadata.name}') # List and delete OAuth users (exclude admin users created manually) echo "Deleting OAuth users from Airflow database..." kubectl exec -n ${AIRFLOW_NAMESPACE} ${SCHEDULER_POD} -- \ airflow db shell -s "DELETE FROM ab_user WHERE email IS NOT NULL AND username != 'admin';" || true echo "✅ All OAuth users deleted. They will be recreated with current Keycloak roles on next login." else echo "Reset cancelled" fi # Clean up database and secrets cleanup: #!/bin/bash set -euo pipefail echo "This will delete the Airflow database and all secrets." if gum confirm "Are you sure you want to proceed?"; then echo "Cleaning up Airflow resources..." just postgres::delete-db airflow || true just vault::delete airflow/database || true just vault::delete airflow/oauth || true just keycloak::delete-client ${KEYCLOAK_REALM} airflow || true echo "Cleanup completed" else echo "Cleanup cancelled" fi