Files
buun-stack/airflow/justfile
2025-09-18 14:08:47 +09:00

618 lines
26 KiB
Makefile

set fallback := true
export AIRFLOW_NAMESPACE := env("AIRFLOW_NAMESPACE", "datastack")
export AIRFLOW_CHART_VERSION := env("AIRFLOW_CHART_VERSION", "1.18.0")
export EXTERNAL_SECRETS_NAMESPACE := env("EXTERNAL_SECRETS_NAMESPACE", "external-secrets")
export KEYCLOAK_REALM := env("KEYCLOAK_REALM", "buunstack")
export AIRFLOW_DAGS_PERSISTENCE_ENABLED := env("AIRFLOW_DAGS_PERSISTENCE_ENABLED", "")
export AIRFLOW_DAGS_STORAGE_TYPE := env("AIRFLOW_DAGS_STORAGE_TYPE", "")
export AIRFLOW_NFS_IP := env("AIRFLOW_NFS_IP", "")
export AIRFLOW_NFS_PATH := env("AIRFLOW_NFS_PATH", "")
export AIRFLOW_DAGS_STORAGE_SIZE := env("AIRFLOW_DAGS_STORAGE_SIZE", "10Gi")
export AIRFLOW_EXTRA_PACKAGES := env("AIRFLOW_EXTRA_PACKAGES", "dlt[duckdb,filesystem,postgres,s3]>=1.12.1")
[private]
default:
@just --list --unsorted --list-submodules
# Add Helm repository
add-helm-repo:
helm repo add apache-airflow https://airflow.apache.org
helm repo update
# Remove Helm repository
remove-helm-repo:
helm repo remove apache-airflow
# Create namespace (shared with JupyterHub when using jupyter namespace)
create-namespace:
@kubectl get namespace ${AIRFLOW_NAMESPACE} &>/dev/null || \
kubectl create namespace ${AIRFLOW_NAMESPACE}
# Setup database for Airflow
setup-database:
#!/bin/bash
set -euo pipefail
echo "Setting up Airflow database..."
if just postgres::db-exists airflow &>/dev/null; then
echo "Database 'airflow' already exists."
else
echo "Creating new database 'airflow'..."
just postgres::create-db airflow
fi
# Generate password for user creation/update
if just postgres::user-exists airflow &>/dev/null; then
echo "User 'airflow' already exists."
# Check if we can get existing password from Vault/Secret
if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then
# Try to get existing password from Vault
if DB_PASSWORD=$(just vault::get airflow/database password 2>/dev/null); then
echo "Using existing password from Vault."
else
echo "Generating new password and updating Vault..."
DB_PASSWORD=$(just utils::random-password)
just postgres::change-password airflow "$DB_PASSWORD"
fi
else
# For direct Secret approach, generate new password
echo "Generating new password for existing user..."
DB_PASSWORD=$(just utils::random-password)
just postgres::change-password airflow "$DB_PASSWORD"
fi
else
echo "Creating new user 'airflow'..."
DB_PASSWORD=$(just utils::random-password)
just postgres::create-user airflow "$DB_PASSWORD"
fi
echo "Ensuring database permissions..."
just postgres::grant airflow airflow
# Create Airflow metadata connection secret (required by Helm chart)
CONNECTION_STRING="postgresql+psycopg2://airflow:${DB_PASSWORD}@postgres-cluster-rw.postgres:5432/airflow"
kubectl delete secret airflow-metadata-connection -n ${AIRFLOW_NAMESPACE} --ignore-not-found
kubectl create secret generic airflow-metadata-connection -n ${AIRFLOW_NAMESPACE} \
--from-literal=connection="$CONNECTION_STRING"
if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then
echo "External Secrets available. Storing credentials in Vault and creating ExternalSecret..."
just vault::put airflow/database username=airflow password="$DB_PASSWORD"
gomplate -f airflow-database-external-secret.gomplate.yaml -o airflow-database-external-secret.yaml
kubectl apply -f airflow-database-external-secret.yaml
echo "Waiting for database secret to be ready..."
kubectl wait --for=condition=Ready externalsecret/airflow-database-external-secret \
-n ${AIRFLOW_NAMESPACE} --timeout=60s
fi
echo "Database setup completed."
# Delete database secret
delete-database-secret:
@kubectl delete secret airflow-metadata-connection -n ${AIRFLOW_NAMESPACE} --ignore-not-found
@kubectl delete secret airflow-database-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
@kubectl delete externalsecret airflow-database-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
# Create environment variables secret example (customize as needed)
create-env-secrets-example:
#!/bin/bash
set -euo pipefail
echo "Creating Airflow environment secrets example..."
echo "This is an example - customize the environment variables as needed"
if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then
echo "External Secrets available. Creating ExternalSecret using template..."
echo "Edit airflow-env-external-secret.gomplate.yaml to customize environment variables"
kubectl delete externalsecret airflow-env-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
kubectl delete secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
gomplate -f airflow-env-external-secret.gomplate.yaml -o airflow-env-external-secret.yaml
kubectl apply -f airflow-env-external-secret.yaml
echo "Waiting for environment secret to be ready..."
kubectl wait --for=condition=Ready externalsecret/airflow-env-external-secret \
-n ${AIRFLOW_NAMESPACE} --timeout=60s
else
echo "External Secrets not available. Creating Kubernetes Secret directly..."
# Example credentials - customize as needed
MINIO_ACCESS_KEY="minio"
MINIO_SECRET_KEY="minio123"
kubectl delete secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
kubectl create secret generic airflow-env-secret -n ${AIRFLOW_NAMESPACE} \
--from-literal=POSTGRES_USER="$POSTGRES_USER" \
--from-literal=POSTGRES_PASSWORD="$POSTGRES_PASSWORD"
# Add more environment variables here:
# --from-literal=AWS_ACCESS_KEY_ID="your_value" \
# --from-literal=AWS_SECRET_ACCESS_KEY="your_value"
echo "Environment secret created directly in Kubernetes"
fi
echo "Example environment secrets created successfully"
echo "Customize the environment variables in this recipe as needed for your project"
# Delete environment secrets
delete-env-secrets:
@kubectl delete secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
@kubectl delete externalsecret airflow-env-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
# Create OAuth client in Keycloak for Airflow authentication
create-oauth-client:
#!/bin/bash
set -euo pipefail
if [ -z "${AIRFLOW_HOST:-}" ]; then
echo "Error: AIRFLOW_HOST environment variable is required"
exit 1
fi
echo "Creating Airflow OAuth client in Keycloak..."
# Check if client already exists
if just keycloak::client-exists ${KEYCLOAK_REALM} airflow &>/dev/null; then
echo "Client 'airflow' already exists, skipping creation..."
echo "Existing client will preserve roles and mappers"
# Get existing client secret for Vault/Secret synchronization
CLIENT_SECRET=$(just keycloak::get-client-secret ${KEYCLOAK_REALM} airflow | grep "Client 'airflow' secret:" | cut -d' ' -f4)
echo "Retrieved existing client secret for synchronization"
else
echo "Creating new client..."
CLIENT_SECRET=$(just utils::random-password)
just keycloak::create-client \
${KEYCLOAK_REALM} \
airflow \
"https://${AIRFLOW_HOST}/auth/oauth-authorized/keycloak" \
"$CLIENT_SECRET"
fi
if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then
echo "External Secrets available. Storing credentials in Vault and creating ExternalSecret..."
just vault::put airflow/oauth \
client_id=airflow \
client_secret="$CLIENT_SECRET"
# Delete existing ExternalSecret to force recreation and refresh
kubectl delete externalsecret airflow-oauth-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
kubectl delete secret airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
gomplate -f airflow-oauth-external-secret.gomplate.yaml -o airflow-oauth-external-secret.yaml
kubectl apply -f airflow-oauth-external-secret.yaml
echo "Waiting for OAuth secret to be ready..."
kubectl wait --for=condition=Ready externalsecret/airflow-oauth-external-secret \
-n ${AIRFLOW_NAMESPACE} --timeout=60s
else
echo "External Secrets not available. Creating Kubernetes Secret directly..."
kubectl delete secret airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
kubectl create secret generic airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} \
--from-literal=client_id=airflow \
--from-literal=client_secret="$CLIENT_SECRET"
echo "OAuth secret created directly in Kubernetes"
fi
echo "OAuth client created successfully"
# Create Keycloak roles for Airflow
create-keycloak-roles:
#!/bin/bash
set -euo pipefail
echo "Creating Keycloak roles for Airflow..."
# Create client roles for Airflow
ROLES=("airflow_admin" "airflow_op" "airflow_user" "airflow_viewer")
for role in "${ROLES[@]}"; do
echo "Creating role: $role"
just keycloak::create-client-role ${KEYCLOAK_REALM} airflow "$role" || true
done
# Create client roles mapper to include roles in JWT tokens
echo "Creating client roles mapper..."
just keycloak::add-client-roles-mapper airflow "airflow_roles" "Airflow Roles Mapper"
# Add client roles mapper to profile scope for userinfo endpoint
echo "Adding client roles mapper to profile scope..."
just keycloak::add-client-roles-to-profile-scope ${KEYCLOAK_REALM} airflow "airflow_roles"
echo "Keycloak roles created successfully"
echo "Role mappings:"
echo " - airflow_admin -> Airflow Admin (full access)"
echo " - airflow_op -> Airflow Operator (can trigger DAGs)"
echo " - airflow_user -> Airflow User (read/write access)"
echo " - airflow_viewer -> Airflow Viewer (read-only access)"
# Assign Airflow role to user
assign-role username='' role='':
#!/bin/bash
set -euo pipefail
USERNAME="{{ username }}"
ROLE="{{ role }}"
# Interactive input if not provided
while [ -z "${USERNAME}" ]; do
USERNAME=$(gum input --prompt="Username: " --width=100)
done
if [ -z "${ROLE}" ]; then
ROLE=$(gum choose --header="Select Airflow role:" \
"airflow_admin" "airflow_op" "airflow_user" "airflow_viewer")
fi
# Validate role
VALID_ROLES=("airflow_admin" "airflow_op" "airflow_user" "airflow_viewer")
if [[ ! " ${VALID_ROLES[@]} " =~ " ${ROLE} " ]]; then
echo "Error: Invalid role '${ROLE}'. Valid roles: ${VALID_ROLES[*]}"
exit 1
fi
echo "Assigning role '${ROLE}' to user '${USERNAME}'..."
just keycloak::add-user-to-client-role ${KEYCLOAK_REALM} "${USERNAME}" airflow "${ROLE}"
# Display role permissions
case "${ROLE}" in
"airflow_admin")
echo "${USERNAME} now has Admin access (full system administration)"
;;
"airflow_op")
echo "${USERNAME} now has Operator access (can trigger and manage DAGs)"
;;
"airflow_user")
echo "${USERNAME} now has User access (read/write access to DAGs and tasks)"
;;
"airflow_viewer")
echo "${USERNAME} now has Viewer access (read-only access)"
;;
esac
# List user's Airflow roles
list-user-roles username='':
#!/bin/bash
set -euo pipefail
USERNAME="{{ username }}"
# Interactive input if not provided
while [ -z "${USERNAME}" ]; do
USERNAME=$(gum input --prompt="Username: " --width=100)
done
echo "Checking Airflow roles for user '${USERNAME}'..."
just keycloak::list-user-client-roles ${KEYCLOAK_REALM} "${USERNAME}" airflow
# Remove Airflow role from user
remove-role username='' role='':
#!/bin/bash
set -euo pipefail
USERNAME="{{ username }}"
ROLE="{{ role }}"
# Interactive input if not provided
while [ -z "${USERNAME}" ]; do
USERNAME=$(gum input --prompt="Username: " --width=100)
done
if [ -z "${ROLE}" ]; then
ROLE=$(gum choose --header="Select Airflow role to remove:" \
"airflow_admin" "airflow_op" "airflow_user" "airflow_viewer")
fi
echo "Removing role '${ROLE}' from user '${USERNAME}'..."
just keycloak::remove-user-from-client-role ${KEYCLOAK_REALM} "${USERNAME}" airflow "${ROLE}" || true
echo "✅ Role '${ROLE}' removed from user '${USERNAME}'"
# Delete OAuth secret
delete-oauth-secret:
@kubectl delete secret airflow-oauth-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
@kubectl delete externalsecret airflow-oauth-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
# Install Airflow
install:
#!/bin/bash
set -euo pipefail
export AIRFLOW_HOST=${AIRFLOW_HOST:-}
while [ -z "${AIRFLOW_HOST}" ]; do
AIRFLOW_HOST=$(
gum input --prompt="Airflow host (FQDN): " --width=100 \
--placeholder="e.g., airflow.example.com"
)
done
if [ -z "${AIRFLOW_DAGS_PERSISTENCE_ENABLED}" ]; then
if gum confirm "Enable DAG persistence with PVC?"; then
AIRFLOW_DAGS_PERSISTENCE_ENABLED="true"
else
AIRFLOW_DAGS_PERSISTENCE_ENABLED="false"
fi
fi
# Force default storage type (NFS disabled due to permission issues)
if [ "${AIRFLOW_DAGS_PERSISTENCE_ENABLED}" = "true" ]; then
AIRFLOW_DAGS_STORAGE_TYPE="default"
fi
echo "Installing Airflow..."
just create-namespace
just setup-database
just create-oauth-client
just create-keycloak-roles
just add-helm-repo
if [ "${AIRFLOW_DAGS_PERSISTENCE_ENABLED}" = "true" ]; then
just setup-dags-storage "default"
fi
KEYCLOAK_HOST=${KEYCLOAK_HOST} KEYCLOAK_REALM=${KEYCLOAK_REALM} \
gomplate -f webserver_config.gomplate.py -o webserver_config.py
kubectl delete configmap airflow-api-server-config -n ${AIRFLOW_NAMESPACE} --ignore-not-found
kubectl create configmap airflow-api-server-config -n ${AIRFLOW_NAMESPACE} \
--from-file=webserver_config.py=webserver_config.py
export AIRFLOW_ENV_SECRETS_EXIST="false"
if kubectl get secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} &>/dev/null; then
echo "Environment secrets found - will include in deployment"
export AIRFLOW_ENV_SECRETS_EXIST="true"
else
echo "No environment secrets found - use 'just airflow::create-env-secrets-example' to create them if needed"
export AIRFLOW_ENV_SECRETS_EXIST="false"
fi
AIRFLOW_WEBSERVER_SECRET_KEY=$(just utils::random-password) \
gomplate -f airflow-values.gomplate.yaml -o airflow-values.yaml
helm upgrade --install airflow apache-airflow/airflow \
--version ${AIRFLOW_CHART_VERSION} -n ${AIRFLOW_NAMESPACE} --wait \
-f airflow-values.yaml
echo "Airflow installation completed"
echo "Access Airflow at: https://${AIRFLOW_HOST}"
if [ "${AIRFLOW_NAMESPACE}" = "jupyter" ] && [ "${AIRFLOW_DAGS_PERSISTENCE_ENABLED}" = "true" ]; then
echo ""
echo "📝 JupyterHub Integration Notes:"
echo " • If JupyterHub is already installed with DAG mounting enabled:"
echo " Restart user pods to access DAGs: kubectl delete pods -n jupyter -l app.kubernetes.io/component=singleuser-server"
echo " • If JupyterHub will be installed later:"
echo " Enable 'Airflow DAG storage mounting' during JupyterHub installation"
fi
# Uninstall Airflow
uninstall delete-db='true':
#!/bin/bash
set -euo pipefail
echo "Uninstalling Airflow..."
helm uninstall airflow -n ${AIRFLOW_NAMESPACE} --ignore-not-found
# Force delete stuck resources
echo "Checking for stuck resources..."
# Delete stuck pods (especially Redis StatefulSet)
STUCK_PODS=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -o name 2>/dev/null || true)
if [ -n "$STUCK_PODS" ]; then
echo "Force deleting stuck pods..."
kubectl delete pods --all -n ${AIRFLOW_NAMESPACE} --force --grace-period=0 2>/dev/null || true
fi
# Delete PVCs
PVCS=$(kubectl get pvc -n ${AIRFLOW_NAMESPACE} -o name 2>/dev/null || true)
if [ -n "$PVCS" ]; then
echo "Deleting PersistentVolumeClaims..."
kubectl delete pvc --all -n ${AIRFLOW_NAMESPACE} --force --grace-period=0 2>/dev/null || true
fi
# Delete any remaining resources
kubectl delete all --all -n ${AIRFLOW_NAMESPACE} --force --grace-period=0 2>/dev/null || true
just delete-database-secret
just delete-oauth-secret
if [ "{{ delete-db }}" = "true" ]; then
just postgres::delete-db airflow
fi
# Clean up Keycloak client
just keycloak::delete-client ${KEYCLOAK_REALM} airflow || true
echo "Airflow uninstalled"
# Create API user for JupyterHub integration
create-api-user username='' role='':
#!/bin/bash
set -euo pipefail
USERNAME="{{ username }}"
ROLE="{{ role }}"
while [ -z "${USERNAME}" ]; do
USERNAME=$(gum input --prompt="API Username: " --width=100 --placeholder="e.g., john.doe")
done
# Interactive role selection if not provided
if [ -z "${ROLE}" ]; then
echo ""
echo "Airflow Roles:"
echo " Admin - Full administrative access (all permissions)"
echo " Op - Operator permissions (can trigger DAGs, manage runs)"
echo " User - Standard user permissions (recommended for most users)"
echo " Viewer - Read-only access (can view DAGs and runs)"
echo " Public - Minimal public permissions"
echo ""
ROLE=$(gum choose --header="Select user role:" \
"User" "Admin" "Op" "Viewer" "Public")
fi
echo "Creating Airflow API user: ${USERNAME} (role: ${ROLE})"
API_PASSWORD=$(just utils::random-password)
kubectl exec deployment/airflow-api-server -n ${AIRFLOW_NAMESPACE} -- \
airflow users create \
--username "${USERNAME}" \
--firstname "API" \
--lastname "User" \
--role "${ROLE}" \
--email "${USERNAME}@api.local" \
--password "${API_PASSWORD}"
if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then
echo "External Secrets available. Storing credentials in Vault..."
just vault::put "jupyter/users/${USERNAME}/airflow-api" \
username="${USERNAME}" \
password="${API_PASSWORD}"
echo "API credentials stored in Vault"
echo " Path: jupyter/users/${USERNAME}/airflow-api"
echo " Ready for JupyterHub notebook access with SecretStore"
else
echo "External Secrets not available. Creating Kubernetes Secret directly..."
kubectl delete secret "airflow-user-${USERNAME}" -n jupyterhub --ignore-not-found
kubectl create secret generic "airflow-user-${USERNAME}" -n jupyterhub \
--from-literal=username="${USERNAME}" \
--from-literal=password="${API_PASSWORD}"
echo "API credentials stored in Kubernetes Secret"
echo " Secret: airflow-user-${USERNAME} (namespace: jupyterhub)"
fi
echo "✅ API user created successfully: ${USERNAME}"
echo " User has '${ROLE}' role permissions"
# List API users
list-api-users:
#!/bin/bash
set -euo pipefail
echo "Airflow API Users:"
kubectl exec deployment/airflow-api-server -n ${AIRFLOW_NAMESPACE} -- \
airflow users list --output table
# Delete API user
delete-api-user username='':
#!/bin/bash
set -euo pipefail
USERNAME="{{ username }}"
while [ -z "${USERNAME}" ]; do
USERNAME=$(gum input --prompt="Username to delete: " --width=100)
done
if gum confirm "Delete API user '${USERNAME}' and all associated credentials?"; then
echo "Deleting Airflow API user: ${USERNAME}"
kubectl exec deployment/airflow-api-server -n ${AIRFLOW_NAMESPACE} -- \
airflow users delete --username "${USERNAME}" || true
if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then
just vault::delete "jupyter/users/${USERNAME}/airflow-api" || true
echo "✅ Vault credentials cleaned up"
fi
kubectl delete secret "airflow-user-${USERNAME}" -n jupyterhub --ignore-not-found
echo "✅ Kubernetes secret cleaned up"
echo " API user deleted: ${USERNAME}"
else
echo "Deletion cancelled"
fi
# Setup DAG storage (PVC)
setup-dags-storage storage-type='':
#!/bin/bash
set -euo pipefail
echo "Setting up DAG storage (default)..."
echo "Creating PersistentVolumeClaim with default StorageClass..."
kubectl apply -n ${AIRFLOW_NAMESPACE} -f dags-pvc.yaml
echo "✅ Default storage configured"
echo " PVC: airflow-dags-pvc"
echo " Uses cluster default StorageClass (k3s local-path, etc.)"
echo ""
echo "DAG storage is ready for use"
echo "Mount path in pods: /opt/airflow/dags"
echo ""
if [ "${AIRFLOW_NAMESPACE}" = "jupyter" ]; then
echo "📝 JupyterHub Integration:"
echo " Since Airflow is in the 'jupyter' namespace, JupyterHub can mount this PVC"
echo " Enable 'Airflow DAG storage mounting' when installing JupyterHub"
echo " DAGs will be available at: /opt/airflow-dags in notebooks"
fi
# Delete DAG storage
delete-dags-storage:
#!/bin/bash
set -euo pipefail
echo "Deleting DAG storage resources..."
kubectl delete pvc airflow-dags-pvc -n ${AIRFLOW_NAMESPACE} --ignore-not-found
echo "✅ DAG storage deleted"
# View DAG import error logs
logs-dag-errors dag_file='':
#!/bin/bash
set -euo pipefail
DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}')
if [ -z "${DAG_PROCESSOR_POD}" ]; then
echo "❌ DAG processor pod not found"
exit 1
fi
LOG_DATE=$(date +%Y-%m-%d)
LOG_DIR="/opt/airflow/logs/dag_processor/${LOG_DATE}/dags-folder"
if [ -n "{{ dag_file }}" ]; then
# Show specific DAG file errors
LOG_FILE="${LOG_DIR}/{{ dag_file }}.log"
echo "📋 Showing errors for DAG file: {{ dag_file }}"
echo "📂 Log file: ${LOG_FILE}"
echo ""
kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \
cat "${LOG_FILE}" 2>/dev/null | jq -r 'select(.level == "error") | .timestamp + " " + .event + ": " + .error_detail[0].exc_value' || \
echo "❌ No error log found for {{ dag_file }} or file doesn't exist"
else
# List all DAG files with errors
echo "📋 Available DAG error logs:"
echo "📂 Log directory: ${LOG_DIR}"
echo ""
kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \
ls -la "${LOG_DIR}" 2>/dev/null || echo "❌ No DAG logs found for today"
echo ""
echo "💡 Usage: just airflow::logs-dag-errors <dag_file_name>"
echo " Example: just airflow::logs-dag-errors csv_to_postgres_dag.py"
fi
# View DAG processor real-time logs
logs-dag-processor:
#!/bin/bash
set -euo pipefail
DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}')
if [ -z "${DAG_PROCESSOR_POD}" ]; then
echo "❌ DAG processor pod not found"
exit 1
fi
echo "📋 Following DAG processor logs..."
echo "🔍 Pod: ${DAG_PROCESSOR_POD}"
echo ""
kubectl logs -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -f
# List all DAG import errors
logs-import-errors:
#!/bin/bash
set -euo pipefail
DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}')
if [ -z "${DAG_PROCESSOR_POD}" ]; then
echo "❌ DAG processor pod not found"
exit 1
fi
echo "📋 Checking DAG import errors..."
echo ""
kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \
airflow dags list-import-errors || echo "❌ Failed to list import errors"
# View DAG files in directory
logs-dag-files:
#!/bin/bash
set -euo pipefail
DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}')
if [ -z "${DAG_PROCESSOR_POD}" ]; then
echo "❌ DAG processor pod not found"
exit 1
fi
echo "📋 DAG files in /opt/airflow/dags/:"
echo ""
kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \
ls -la /opt/airflow/dags/
# Test DAG file import manually
logs-test-import dag_file:
#!/bin/bash
set -euo pipefail
DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}')
if [ -z "${DAG_PROCESSOR_POD}" ]; then
echo "❌ DAG processor pod not found"
exit 1
fi
echo "🧪 Testing import of DAG file: {{ dag_file }}"
echo ""
kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \
python /opt/airflow/dags/{{ dag_file }}
# Clean up database and secrets
cleanup:
#!/bin/bash
set -euo pipefail
echo "This will delete the Airflow database and all secrets."
if gum confirm "Are you sure you want to proceed?"; then
echo "Cleaning up Airflow resources..."
just postgres::delete-db airflow || true
just vault::delete airflow/database || true
just vault::delete airflow/oauth || true
just keycloak::delete-client ${KEYCLOAK_REALM} airflow || true
echo "Cleanup completed"
else
echo "Cleanup cancelled"
fi