Files
buun-stack/airflow/justfile

665 lines
28 KiB
Makefile

set fallback := true
export AIRFLOW_NAMESPACE := env("AIRFLOW_NAMESPACE", "datastack")
export AIRFLOW_CHART_VERSION := env("AIRFLOW_CHART_VERSION", "1.18.0")
export EXTERNAL_SECRETS_NAMESPACE := env("EXTERNAL_SECRETS_NAMESPACE", "external-secrets")
export KEYCLOAK_REALM := env("KEYCLOAK_REALM", "buunstack")
export AIRFLOW_DAGS_PERSISTENCE_ENABLED := env("AIRFLOW_DAGS_PERSISTENCE_ENABLED", "")
export AIRFLOW_DAGS_STORAGE_TYPE := env("AIRFLOW_DAGS_STORAGE_TYPE", "")
export AIRFLOW_NFS_IP := env("AIRFLOW_NFS_IP", "")
export AIRFLOW_NFS_PATH := env("AIRFLOW_NFS_PATH", "")
export AIRFLOW_DAGS_STORAGE_SIZE := env("AIRFLOW_DAGS_STORAGE_SIZE", "10Gi")
export AIRFLOW_EXTRA_PACKAGES := env("AIRFLOW_EXTRA_PACKAGES", "'PyJWT>=2.10' cryptography 'requests>=2.32' 'dlt[duckdb,filesystem,postgres,s3]'")
# ↑ PyJWT, cryptography, and requests are needed for Keycloak OAuth
[private]
default:
@just --list --unsorted --list-submodules
# Add Helm repository
add-helm-repo:
helm repo add apache-airflow https://airflow.apache.org
helm repo update
# Remove Helm repository
remove-helm-repo:
helm repo remove apache-airflow
# Create namespace (shared with JupyterHub when using jupyter namespace)
create-namespace:
@kubectl get namespace ${AIRFLOW_NAMESPACE} &>/dev/null || \
kubectl create namespace ${AIRFLOW_NAMESPACE}
# Setup database for Airflow
setup-database:
#!/bin/bash
set -euo pipefail
echo "Setting up Airflow database..."
if just postgres::db-exists airflow &>/dev/null; then
echo "Database 'airflow' already exists."
else
echo "Creating new database 'airflow'..."
just postgres::create-db airflow
fi
# Generate password for user creation/update
if just postgres::user-exists airflow &>/dev/null; then
echo "User 'airflow' already exists."
# Check if we can get existing password from Vault/Secret
if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then
# Try to get existing password from Vault
if DB_PASSWORD=$(just vault::get airflow/database password 2>/dev/null); then
echo "Using existing password from Vault."
else
echo "Generating new password and updating Vault..."
DB_PASSWORD=$(just utils::random-password)
just postgres::change-password airflow "$DB_PASSWORD"
fi
else
# For direct Secret approach, generate new password
echo "Generating new password for existing user..."
DB_PASSWORD=$(just utils::random-password)
just postgres::change-password airflow "$DB_PASSWORD"
fi
else
echo "Creating new user 'airflow'..."
DB_PASSWORD=$(just utils::random-password)
just postgres::create-user airflow "$DB_PASSWORD"
fi
echo "Ensuring database permissions..."
just postgres::grant airflow airflow
# Create Airflow metadata connection secret (required by Helm chart)
CONNECTION_STRING="postgresql+psycopg2://airflow:${DB_PASSWORD}@postgres-cluster-rw.postgres:5432/airflow"
kubectl delete secret airflow-metadata-connection -n ${AIRFLOW_NAMESPACE} --ignore-not-found
kubectl create secret generic airflow-metadata-connection -n ${AIRFLOW_NAMESPACE} \
--from-literal=connection="$CONNECTION_STRING"
if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then
echo "External Secrets available. Storing credentials in Vault and creating ExternalSecret..."
just vault::put airflow/database username=airflow password="$DB_PASSWORD"
gomplate -f airflow-database-external-secret.gomplate.yaml -o airflow-database-external-secret.yaml
kubectl apply -f airflow-database-external-secret.yaml
echo "Waiting for database secret to be ready..."
kubectl wait --for=condition=Ready externalsecret/airflow-database-external-secret \
-n ${AIRFLOW_NAMESPACE} --timeout=60s
fi
echo "Database setup completed."
# Delete database secret
delete-database-secret:
@kubectl delete secret airflow-metadata-connection -n ${AIRFLOW_NAMESPACE} --ignore-not-found
@kubectl delete secret airflow-database-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
@kubectl delete externalsecret airflow-database-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
# Create environment variables secret example (customize as needed)
create-env-secrets-example:
#!/bin/bash
set -euo pipefail
echo "Creating Airflow environment secrets example..."
echo "This is an example - customize the environment variables as needed"
if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then
echo "External Secrets available. Creating ExternalSecret using template..."
echo "Edit airflow-env-external-secret.gomplate.yaml to customize environment variables"
kubectl delete externalsecret airflow-env-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
kubectl delete secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
gomplate -f airflow-env-external-secret.gomplate.yaml -o airflow-env-external-secret.yaml
kubectl apply -f airflow-env-external-secret.yaml
echo "Waiting for environment secret to be ready..."
kubectl wait --for=condition=Ready externalsecret/airflow-env-external-secret \
-n ${AIRFLOW_NAMESPACE} --timeout=60s
else
echo "External Secrets not available. Creating Kubernetes Secret directly..."
# Example credentials - customize as needed
MINIO_ACCESS_KEY="minio"
MINIO_SECRET_KEY="minio123"
kubectl delete secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
kubectl create secret generic airflow-env-secret -n ${AIRFLOW_NAMESPACE} \
--from-literal=POSTGRES_USER="$POSTGRES_USER" \
--from-literal=POSTGRES_PASSWORD="$POSTGRES_PASSWORD"
# Add more environment variables here:
# --from-literal=AWS_ACCESS_KEY_ID="your_value" \
# --from-literal=AWS_SECRET_ACCESS_KEY="your_value"
echo "Environment secret created directly in Kubernetes"
fi
echo "Example environment secrets created successfully"
echo "Customize the environment variables in this recipe as needed for your project"
# Delete environment secrets
delete-env-secrets:
@kubectl delete secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
@kubectl delete externalsecret airflow-env-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
# Create OAuth client in Keycloak for Airflow authentication
create-oauth-client:
#!/bin/bash
set -euo pipefail
if [ -z "${AIRFLOW_HOST:-}" ]; then
echo "Error: AIRFLOW_HOST environment variable is required"
exit 1
fi
echo "Creating Airflow OAuth client in Keycloak..."
# Check if client already exists
if just keycloak::client-exists ${KEYCLOAK_REALM} airflow &>/dev/null; then
echo "Client 'airflow' already exists, skipping creation..."
echo "Existing client will preserve roles and mappers"
# Get existing client secret for Vault/Secret synchronization
CLIENT_SECRET=$(just keycloak::get-client-secret ${KEYCLOAK_REALM} airflow | grep "Client 'airflow' secret:" | cut -d' ' -f4)
echo "Retrieved existing client secret for synchronization"
else
echo "Creating new client..."
CLIENT_SECRET=$(just utils::random-password)
just keycloak::create-client \
realm=${KEYCLOAK_REALM} \
client_id=airflow \
redirect_url="https://${AIRFLOW_HOST}/auth/oauth-authorized/keycloak" \
client_secret="$CLIENT_SECRET"
fi
if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then
echo "External Secrets available. Storing credentials in Vault and creating ExternalSecret..."
just vault::put airflow/oauth \
client_id=airflow \
client_secret="$CLIENT_SECRET"
# Delete existing ExternalSecret to force recreation and refresh
kubectl delete externalsecret airflow-oauth-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
kubectl delete secret airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
gomplate -f airflow-oauth-external-secret.gomplate.yaml -o airflow-oauth-external-secret.yaml
kubectl apply -f airflow-oauth-external-secret.yaml
echo "Waiting for OAuth secret to be ready..."
kubectl wait --for=condition=Ready externalsecret/airflow-oauth-external-secret \
-n ${AIRFLOW_NAMESPACE} --timeout=60s
else
echo "External Secrets not available. Creating Kubernetes Secret directly..."
kubectl delete secret airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
kubectl create secret generic airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} \
--from-literal=client_id=airflow \
--from-literal=client_secret="$CLIENT_SECRET"
echo "OAuth secret created directly in Kubernetes"
fi
echo "OAuth client created successfully"
# Create Keycloak roles for Airflow
create-keycloak-roles:
#!/bin/bash
set -euo pipefail
echo "Creating Keycloak roles for Airflow..."
# Create client roles for Airflow
ROLES=("airflow_admin" "airflow_op" "airflow_user" "airflow_viewer")
for role in "${ROLES[@]}"; do
echo "Creating role: $role"
just keycloak::create-client-role ${KEYCLOAK_REALM} airflow "$role" || true
done
# Create client roles mapper to include roles in JWT tokens
echo "Creating client roles mapper..."
just keycloak::add-client-roles-mapper airflow "airflow_roles" "Airflow Roles Mapper"
# Add client roles mapper to profile scope for userinfo endpoint
echo "Adding client roles mapper to profile scope..."
just keycloak::add-client-roles-to-profile-scope ${KEYCLOAK_REALM} airflow "airflow_roles"
echo "Keycloak roles created successfully"
echo "Role mappings:"
echo " - airflow_admin -> Airflow Admin (full access)"
echo " - airflow_op -> Airflow Operator (can trigger DAGs)"
echo " - airflow_user -> Airflow User (read/write access)"
echo " - airflow_viewer -> Airflow Viewer (read-only access)"
# Assign Airflow role to user
assign-role username='' role='':
#!/bin/bash
set -euo pipefail
USERNAME="{{ username }}"
ROLE="{{ role }}"
# Interactive input if not provided
while [ -z "${USERNAME}" ]; do
USERNAME=$(gum input --prompt="Username: " --width=100)
done
if [ -z "${ROLE}" ]; then
ROLE=$(gum choose --header="Select Airflow role:" \
"airflow_admin" "airflow_op" "airflow_user" "airflow_viewer")
fi
# Validate role
VALID_ROLES=("airflow_admin" "airflow_op" "airflow_user" "airflow_viewer")
if [[ ! " ${VALID_ROLES[@]} " =~ " ${ROLE} " ]]; then
echo "Error: Invalid role '${ROLE}'. Valid roles: ${VALID_ROLES[*]}"
exit 1
fi
echo "Assigning role '${ROLE}' to user '${USERNAME}'..."
just keycloak::add-user-to-client-role ${KEYCLOAK_REALM} "${USERNAME}" airflow "${ROLE}"
# Display role permissions
case "${ROLE}" in
"airflow_admin")
echo "${USERNAME} now has Admin access (full system administration)"
;;
"airflow_op")
echo "${USERNAME} now has Operator access (can trigger and manage DAGs)"
;;
"airflow_user")
echo "${USERNAME} now has User access (read/write access to DAGs and tasks)"
;;
"airflow_viewer")
echo "${USERNAME} now has Viewer access (read-only access)"
;;
esac
# List user's Airflow roles
list-user-roles username='':
#!/bin/bash
set -euo pipefail
USERNAME="{{ username }}"
# Interactive input if not provided
while [ -z "${USERNAME}" ]; do
USERNAME=$(gum input --prompt="Username: " --width=100)
done
echo "Checking Airflow roles for user '${USERNAME}'..."
just keycloak::list-user-client-roles ${KEYCLOAK_REALM} "${USERNAME}" airflow
# Remove Airflow role from user
remove-role username='' role='':
#!/bin/bash
set -euo pipefail
USERNAME="{{ username }}"
ROLE="{{ role }}"
# Interactive input if not provided
while [ -z "${USERNAME}" ]; do
USERNAME=$(gum input --prompt="Username: " --width=100)
done
if [ -z "${ROLE}" ]; then
ROLE=$(gum choose --header="Select Airflow role to remove:" \
"airflow_admin" "airflow_op" "airflow_user" "airflow_viewer")
fi
echo "Removing role '${ROLE}' from user '${USERNAME}'..."
just keycloak::remove-user-from-client-role ${KEYCLOAK_REALM} "${USERNAME}" airflow "${ROLE}" || true
echo "✅ Role '${ROLE}' removed from user '${USERNAME}'"
# Delete OAuth secret
delete-oauth-secret:
@kubectl delete secret airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
@kubectl delete externalsecret airflow-oauth-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
# Install Airflow
install:
#!/bin/bash
set -euo pipefail
export AIRFLOW_HOST=${AIRFLOW_HOST:-}
while [ -z "${AIRFLOW_HOST}" ]; do
AIRFLOW_HOST=$(
gum input --prompt="Airflow host (FQDN): " --width=100 \
--placeholder="e.g., airflow.example.com"
)
done
if [ -z "${AIRFLOW_DAGS_PERSISTENCE_ENABLED}" ]; then
if gum confirm "Enable DAG persistence with PVC?"; then
AIRFLOW_DAGS_PERSISTENCE_ENABLED="true"
else
AIRFLOW_DAGS_PERSISTENCE_ENABLED="false"
fi
fi
# Force default storage type (NFS disabled due to permission issues)
if [ "${AIRFLOW_DAGS_PERSISTENCE_ENABLED}" = "true" ]; then
AIRFLOW_DAGS_STORAGE_TYPE="default"
fi
echo "Installing Airflow..."
just create-namespace
just setup-database
just create-oauth-client
just create-keycloak-roles
just add-helm-repo
if [ "${AIRFLOW_DAGS_PERSISTENCE_ENABLED}" = "true" ]; then
just setup-dags-storage "default"
fi
KEYCLOAK_HOST=${KEYCLOAK_HOST} KEYCLOAK_REALM=${KEYCLOAK_REALM} \
gomplate -f webserver_config.gomplate.py -o webserver_config.py
kubectl delete configmap airflow-api-server-config -n ${AIRFLOW_NAMESPACE} --ignore-not-found
kubectl create configmap airflow-api-server-config -n ${AIRFLOW_NAMESPACE} \
--from-file=webserver_config.py=webserver_config.py
export AIRFLOW_ENV_SECRETS_EXIST="false"
if kubectl get secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} &>/dev/null; then
echo "Environment secrets found - will include in deployment"
export AIRFLOW_ENV_SECRETS_EXIST="true"
else
echo "No environment secrets found - use 'just airflow::create-env-secrets-example' to create them if needed"
export AIRFLOW_ENV_SECRETS_EXIST="false"
fi
AIRFLOW_WEBSERVER_SECRET_KEY=$(just utils::random-password) \
gomplate -f airflow-values.gomplate.yaml -o airflow-values.yaml
helm upgrade --install airflow apache-airflow/airflow \
--version ${AIRFLOW_CHART_VERSION} -n ${AIRFLOW_NAMESPACE} --wait \
-f airflow-values.yaml
echo "Airflow installation completed"
echo "Access Airflow at: https://${AIRFLOW_HOST}"
if [ "${AIRFLOW_NAMESPACE}" = "jupyter" ] && [ "${AIRFLOW_DAGS_PERSISTENCE_ENABLED}" = "true" ]; then
echo ""
echo "📝 JupyterHub Integration Notes:"
echo " • If JupyterHub is already installed with DAG mounting enabled:"
echo " Restart user pods to access DAGs: kubectl delete pods -n jupyter -l app.kubernetes.io/component=singleuser-server"
echo " • If JupyterHub will be installed later:"
echo " Enable 'Airflow DAG storage mounting' during JupyterHub installation"
fi
# Uninstall Airflow
uninstall delete-db='true':
#!/bin/bash
set -euo pipefail
echo "Uninstalling Airflow..."
helm uninstall airflow -n ${AIRFLOW_NAMESPACE} --ignore-not-found
# Force delete stuck resources
echo "Checking for stuck resources..."
# Delete stuck pods (especially Redis StatefulSet)
STUCK_PODS=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -o name 2>/dev/null || true)
if [ -n "$STUCK_PODS" ]; then
echo "Force deleting stuck pods..."
kubectl delete pods --all -n ${AIRFLOW_NAMESPACE} --force --grace-period=0 2>/dev/null || true
fi
# Delete PVCs
PVCS=$(kubectl get pvc -n ${AIRFLOW_NAMESPACE} -o name 2>/dev/null || true)
if [ -n "$PVCS" ]; then
echo "Deleting PersistentVolumeClaims..."
kubectl delete pvc --all -n ${AIRFLOW_NAMESPACE} --force --grace-period=0 2>/dev/null || true
fi
# Delete any remaining resources
kubectl delete all --all -n ${AIRFLOW_NAMESPACE} --force --grace-period=0 2>/dev/null || true
just delete-database-secret
just delete-oauth-secret
if [ "{{ delete-db }}" = "true" ]; then
just postgres::delete-db airflow
fi
# Clean up Keycloak client
just keycloak::delete-client ${KEYCLOAK_REALM} airflow || true
echo "Airflow uninstalled"
# Create API user for JupyterHub integration
create-api-user username='' role='':
#!/bin/bash
set -euo pipefail
USERNAME="{{ username }}"
ROLE="{{ role }}"
while [ -z "${USERNAME}" ]; do
USERNAME=$(gum input --prompt="API Username: " --width=100 --placeholder="e.g., john.doe")
done
# Interactive role selection if not provided
if [ -z "${ROLE}" ]; then
echo ""
echo "Airflow Roles:"
echo " Admin - Full administrative access (all permissions)"
echo " Op - Operator permissions (can trigger DAGs, manage runs)"
echo " User - Standard user permissions (recommended for most users)"
echo " Viewer - Read-only access (can view DAGs and runs)"
echo " Public - Minimal public permissions"
echo ""
ROLE=$(gum choose --header="Select user role:" \
"User" "Admin" "Op" "Viewer" "Public")
fi
echo "Creating Airflow API user: ${USERNAME} (role: ${ROLE})"
API_PASSWORD=$(just utils::random-password)
kubectl exec deployment/airflow-api-server -n ${AIRFLOW_NAMESPACE} -- \
airflow users create \
--username "${USERNAME}" \
--firstname "API" \
--lastname "User" \
--role "${ROLE}" \
--email "${USERNAME}@api.local" \
--password "${API_PASSWORD}"
if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then
echo "External Secrets available. Storing credentials in Vault..."
just vault::put "jupyter/users/${USERNAME}/airflow-api" \
username="${USERNAME}" \
password="${API_PASSWORD}"
echo "API credentials stored in Vault"
echo " Path: jupyter/users/${USERNAME}/airflow-api"
echo " Ready for JupyterHub notebook access with SecretStore"
else
echo "External Secrets not available. Creating Kubernetes Secret directly..."
kubectl delete secret "airflow-user-${USERNAME}" -n jupyterhub --ignore-not-found
kubectl create secret generic "airflow-user-${USERNAME}" -n jupyterhub \
--from-literal=username="${USERNAME}" \
--from-literal=password="${API_PASSWORD}"
echo "API credentials stored in Kubernetes Secret"
echo " Secret: airflow-user-${USERNAME} (namespace: jupyterhub)"
fi
echo "✅ API user created successfully: ${USERNAME}"
echo " User has '${ROLE}' role permissions"
# List API users
list-api-users:
#!/bin/bash
set -euo pipefail
echo "Airflow API Users:"
kubectl exec deployment/airflow-api-server -n ${AIRFLOW_NAMESPACE} -- \
airflow users list --output table
# Delete API user
delete-api-user username='':
#!/bin/bash
set -euo pipefail
USERNAME="{{ username }}"
while [ -z "${USERNAME}" ]; do
USERNAME=$(gum input --prompt="Username to delete: " --width=100)
done
if gum confirm "Delete API user '${USERNAME}' and all associated credentials?"; then
echo "Deleting Airflow API user: ${USERNAME}"
kubectl exec deployment/airflow-api-server -n ${AIRFLOW_NAMESPACE} -- \
airflow users delete --username "${USERNAME}" || true
if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then
just vault::delete "jupyter/users/${USERNAME}/airflow-api" || true
echo "✅ Vault credentials cleaned up"
fi
kubectl delete secret "airflow-user-${USERNAME}" -n jupyterhub --ignore-not-found
echo "✅ Kubernetes secret cleaned up"
echo " API user deleted: ${USERNAME}"
else
echo "Deletion cancelled"
fi
# Setup DAG storage (PVC)
setup-dags-storage storage-type='':
#!/bin/bash
set -euo pipefail
echo "Setting up DAG storage (default)..."
echo "Creating PersistentVolumeClaim with default StorageClass..."
kubectl apply -n ${AIRFLOW_NAMESPACE} -f dags-pvc.yaml
echo "✅ Default storage configured"
echo " PVC: airflow-dags-pvc"
echo " Uses cluster default StorageClass (k3s local-path, etc.)"
echo ""
echo "DAG storage is ready for use"
echo "Mount path in pods: /opt/airflow/dags"
echo ""
if [ "${AIRFLOW_NAMESPACE}" = "jupyter" ]; then
echo "📝 JupyterHub Integration:"
echo " Since Airflow is in the 'jupyter' namespace, JupyterHub can mount this PVC"
echo " Enable 'Airflow DAG storage mounting' when installing JupyterHub"
echo " DAGs will be available at: /opt/airflow-dags in notebooks"
fi
# Delete DAG storage
delete-dags-storage:
#!/bin/bash
set -euo pipefail
echo "Deleting DAG storage resources..."
kubectl delete pvc airflow-dags-pvc -n ${AIRFLOW_NAMESPACE} --ignore-not-found
echo "✅ DAG storage deleted"
# View DAG import error logs
logs-dag-errors dag_file='':
#!/bin/bash
set -euo pipefail
DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}')
if [ -z "${DAG_PROCESSOR_POD}" ]; then
echo "❌ DAG processor pod not found"
exit 1
fi
LOG_DATE=$(date +%Y-%m-%d)
LOG_DIR="/opt/airflow/logs/dag_processor/${LOG_DATE}/dags-folder"
if [ -n "{{ dag_file }}" ]; then
# Show specific DAG file errors
LOG_FILE="${LOG_DIR}/{{ dag_file }}.log"
echo "📋 Showing errors for DAG file: {{ dag_file }}"
echo "📂 Log file: ${LOG_FILE}"
echo ""
kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \
cat "${LOG_FILE}" 2>/dev/null | jq -r 'select(.level == "error") | .timestamp + " " + .event + ": " + .error_detail[0].exc_value' || \
echo "❌ No error log found for {{ dag_file }} or file doesn't exist"
else
# List all DAG files with errors
echo "📋 Available DAG error logs:"
echo "📂 Log directory: ${LOG_DIR}"
echo ""
kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \
ls -la "${LOG_DIR}" 2>/dev/null || echo "❌ No DAG logs found for today"
echo ""
echo "💡 Usage: just airflow::logs-dag-errors <dag_file_name>"
echo " Example: just airflow::logs-dag-errors csv_to_postgres_dag.py"
fi
# View DAG processor real-time logs
logs-dag-processor:
#!/bin/bash
set -euo pipefail
DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}')
if [ -z "${DAG_PROCESSOR_POD}" ]; then
echo "❌ DAG processor pod not found"
exit 1
fi
echo "📋 Following DAG processor logs..."
echo "🔍 Pod: ${DAG_PROCESSOR_POD}"
echo ""
kubectl logs -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -f
# List all DAG import errors
logs-import-errors:
#!/bin/bash
set -euo pipefail
DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}')
if [ -z "${DAG_PROCESSOR_POD}" ]; then
echo "❌ DAG processor pod not found"
exit 1
fi
echo "📋 Checking DAG import errors..."
echo ""
kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \
airflow dags list-import-errors || echo "❌ Failed to list import errors"
# View DAG files in directory
logs-dag-files:
#!/bin/bash
set -euo pipefail
DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}')
if [ -z "${DAG_PROCESSOR_POD}" ]; then
echo "❌ DAG processor pod not found"
exit 1
fi
echo "📋 DAG files in /opt/airflow/dags/:"
echo ""
kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \
ls -la /opt/airflow/dags/
# Test DAG file import manually
logs-test-import dag_file:
#!/bin/bash
set -euo pipefail
DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}')
if [ -z "${DAG_PROCESSOR_POD}" ]; then
echo "❌ DAG processor pod not found"
exit 1
fi
echo "🧪 Testing import of DAG file: {{ dag_file }}"
echo ""
kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \
python /opt/airflow/dags/{{ dag_file }}
# Delete user from Airflow database (to force role resync)
delete-user username='':
#!/bin/bash
set -euo pipefail
USERNAME="{{ username }}"
# Interactive input if not provided
while [ -z "${USERNAME}" ]; do
USERNAME=$(gum input --prompt="Username to delete from Airflow: " --width=100)
done
echo "Deleting user '${USERNAME}' from Airflow database..."
if gum confirm "This will delete the user from Airflow database. The user will be recreated with current Keycloak roles on next login. Continue?"; then
# Get scheduler pod (which has airflow CLI access)
SCHEDULER_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=scheduler -o jsonpath='{.items[0].metadata.name}')
# Delete user using airflow CLI
kubectl exec -n ${AIRFLOW_NAMESPACE} ${SCHEDULER_POD} -- \
airflow users delete --username "${USERNAME}" || echo "User '${USERNAME}' not found in Airflow database"
echo "✅ User '${USERNAME}' deleted from Airflow. They will be recreated with current Keycloak roles on next login."
else
echo "User deletion cancelled"
fi
# Force role sync for all users (delete all OAuth users)
reset-oauth-users:
#!/bin/bash
set -euo pipefail
echo "This will delete ALL OAuth users from Airflow database."
echo "Users will be recreated with current Keycloak roles on next login."
if gum confirm "Are you sure you want to proceed?"; then
# Get scheduler pod (which has airflow CLI access)
SCHEDULER_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=scheduler -o jsonpath='{.items[0].metadata.name}')
# List and delete OAuth users (exclude admin users created manually)
echo "Deleting OAuth users from Airflow database..."
kubectl exec -n ${AIRFLOW_NAMESPACE} ${SCHEDULER_POD} -- \
airflow db shell -s "DELETE FROM ab_user WHERE email IS NOT NULL AND username != 'admin';" || true
echo "✅ All OAuth users deleted. They will be recreated with current Keycloak roles on next login."
else
echo "Reset cancelled"
fi
# Clean up database and secrets
cleanup:
#!/bin/bash
set -euo pipefail
echo "This will delete the Airflow database and all secrets."
if gum confirm "Are you sure you want to proceed?"; then
echo "Cleaning up Airflow resources..."
just postgres::delete-db airflow || true
just vault::delete airflow/database || true
just vault::delete airflow/oauth || true
just keycloak::delete-client ${KEYCLOAK_REALM} airflow || true
echo "Cleanup completed"
else
echo "Cleanup cancelled"
fi