Airflow Documentation
Overview
This document covers Airflow installation, deployment, and debugging in the buun-stack environment.
Installation
Prerequisites
- Kubernetes cluster with buun-stack components
- PostgreSQL database cluster
- MinIO object storage
- External Secrets Operator (optional, for Vault integration)
- JupyterHub (optional, for DAG deployment via web interface)
Installation Steps
-
Setup Environment Secrets (if needed):
- See Environment Variables Setup section below for configuration options
- Create ExternalSecret or Secret before installation if you want environment variables available immediately
-
Install Airflow:
# Interactive installation with configuration prompts just airflow::install -
Access Airflow Web UI:
- Navigate to your Airflow instance (e.g.,
https://airflow.buun.dev) - Login with your Keycloak credentials
- Navigate to your Airflow instance (e.g.,
-
Assign User Roles (if needed):
# Add user role for DAG execution permissions just airflow::assign-role <username> airflow_user # Available roles: # - airflow_admin: Full administrative access # - airflow_op: Operator access (can trigger DAGs) # - airflow_user: User access (read/write access to DAGs) # - airflow_viewer: Viewer access (read-only)
Uninstalling
# Remove Airflow (keeps database by default)
just airflow::uninstall
# Remove Airflow and delete database
just airflow::uninstall true
DAG Deployment
1. Access JupyterHub
- Navigate to your JupyterHub instance (e.g.,
https://jupyter.buun.dev) - Login with your credentials
2. Navigate to Airflow DAGs Directory
In JupyterHub, the Airflow DAGs directory is mounted at:
/home/jovyan/airflow-dags/
3. Upload the DAG File
- Open JupyterHub file browser
- Navigate to
/home/jovyan/airflow-dags/ - Upload or copy
csv_to_postgres_dag.pyto this directory
4. Verify Deployment
- Access Airflow Web UI (e.g.,
https://airflow.buun.dev) - Check that the DAG
csv_to_postgresappears in the DAGs list - If the DAG doesn't appear immediately, wait 1-2 minutes for Airflow to detect the new file
DAG Features
Tables Processed
- movies: MovieLens movies data with primary key
movieId - ratings: User ratings with composite primary key
[userId, movieId] - tags: User tags with composite primary key
[userId, movieId, timestamp] - summary: Generates metadata summary of all processed tables
Smart Processing
- Table Existence Check: Uses DuckDB PostgreSQL scanner to check if tables already exist
- Skip Logic: If a table already contains data, the task will skip processing to avoid reprocessing large files
- Write Disposition: Uses
replacemode for initial loads
Environment Variables Required
The DAG expects the following environment variables to be set:
POSTGRES_URL: PostgreSQL connection string (format:postgresql://user:password@host:port/database)AWS_ACCESS_KEY_ID: MinIO/S3 access keyAWS_SECRET_ACCESS_KEY: MinIO/S3 secret keyAWS_ENDPOINT_URL: MinIO endpoint URL- Additional dlt-specific environment variables for advanced configuration
Environment Variables Setup
Environment variables are provided to Airflow through Kubernetes Secrets. You have several options:
Option 1: Customize the Example Template
-
Create the example environment secrets template:
just airflow::create-env-secrets-example -
Important: This creates a template with sample values. You must customize it:
- If using External Secrets: Edit
airflow-env-external-secret.gomplate.yamlto reference your actual Vault paths - If using Direct Secrets: Update the created
airflow-env-secretwith your actual credentials
- If using External Secrets: Edit
Option 2: Create ExternalSecret Manually
Create an ExternalSecret that references your Vault credentials:
apiVersion: external-secrets.io/v1
kind: ExternalSecret
metadata:
name: airflow-env-external-secret
namespace: datastack
spec:
refreshInterval: 1h
secretStoreRef:
name: vault-secret-store
kind: ClusterSecretStore
target:
name: airflow-env-secret
data:
- secretKey: AWS_ACCESS_KEY_ID
remoteRef:
key: minio/credentials
property: access_key
- secretKey: AWS_SECRET_ACCESS_KEY
remoteRef:
key: minio/credentials
property: secret_key
# Add more variables as needed
Option 3: Create Kubernetes Secret Directly
kubectl create secret generic airflow-env-secret -n datastack \
--from-literal=AWS_ACCESS_KEY_ID="your-access-key" \
--from-literal=AWS_SECRET_ACCESS_KEY="your-secret-key" \
--from-literal=AWS_ENDPOINT_URL="http://minio.minio.svc.cluster.local:9000" \
--from-literal=POSTGRES_URL="postgresql://user:pass@postgres-cluster-rw.postgres:5432"
After creating the environment secrets, redeploy Airflow to pick up the new configuration.
Manual Execution
The DAG is configured for manual execution only (schedule_interval=None). To run:
- Go to Airflow Web UI
- Find the
csv_to_postgresDAG - Click "Trigger DAG" to start execution
Example DAGs
CSV to PostgreSQL DAG
The csv_to_postgres_dag.py demonstrates a complete ETL pipeline that loads data from MinIO object storage into PostgreSQL using dlt (data load tool).
Dataset Information
MovieLens 20M Dataset
This DAG processes the MovieLens 20M dataset from GroupLens Research. The dataset contains:
- 27,278 movies with metadata
- 20 million ratings from 138,493 users
- 465,564 tags applied by users
- Additional genome data for content-based filtering
MinIO Storage Structure
The dataset files are stored in MinIO under the movie-lens bucket:
mc alias set buun https://minio.your-domain.com access-key secret-key
mc ls buun/movie-lens
[2025-09-14 12:13:09 JST] 309MiB STANDARD genome-scores.csv
[2025-09-14 12:12:37 JST] 18KiB STANDARD genome-tags.csv
[2025-09-14 12:12:38 JST] 557KiB STANDARD links.csv
[2025-09-14 12:12:38 JST] 1.3MiB STANDARD movies.csv
[2025-09-14 12:13:15 JST] 509MiB STANDARD ratings.csv
[2025-09-14 12:12:42 JST] 16MiB STANDARD tags.csv
The DAG currently processes:
- movies.csv (1.3MiB) - Movie metadata
- tags.csv (16MiB) - User-generated tags
- ratings.csv (509MiB) - User ratings (available but currently disabled in DAG)
DAG Features
Tables Processed
- movies: MovieLens movies data with primary key
movieId - ratings: User ratings with composite primary key
[userId, movieId] - tags: User tags with composite primary key
[userId, movieId, timestamp] - summary: Generates metadata summary of all processed tables
Smart Processing
- Table Existence Check: Uses DuckDB PostgreSQL scanner to check if tables already exist
- Skip Logic: If a table already contains data, the task will skip processing to avoid reprocessing large files
- Write Disposition: Uses
replacemode for initial loads
Dependencies
dlt[duckdb,filesystem,postgres,s3]>=1.12.1- duckdb (for table existence checking)
- Standard Airflow libraries
Debugging and Troubleshooting
Debug Commands
The Airflow justfile provides several debugging recipes:
DAG Import and Processing Logs
# Check DAG import errors from processor logs
just airflow::logs-dag-errors
# Check DAG import errors for a specific file
just airflow::logs-dag-errors csv_to_postgres_dag.py
# Test DAG file import manually
just airflow::logs-test-import csv_to_postgres_dag.py
# Monitor DAG processing in real-time
just airflow::logs-dag-processor
Worker and Task Logs
# View worker logs (where tasks execute)
just airflow::logs-worker
# View scheduler logs
just airflow::logs-scheduler
# View API server logs (Airflow 3.0)
just airflow::logs-api-server
# View all Airflow component logs
just airflow::logs-all
Specific Component Debugging
# Check specific pod logs
kubectl logs -n datastack <pod-name> -c <container-name>
# Examples:
kubectl logs -n datastack airflow-worker-0 -c worker --tail=100
kubectl logs -n datastack airflow-scheduler-xxx -c scheduler --tail=100
kubectl logs -n datastack airflow-dag-processor-xxx -c dag-processor --tail=100
Common Issues
DAG Not Appearing
Symptoms: DAG file uploaded but not visible in Airflow UI
Debugging Steps:
-
Check DAG processor logs:
just airflow::logs-dag-errors -
Test DAG import manually:
just airflow::logs-test-import your-dag-file.py -
Verify file location and permissions:
kubectl exec -n datastack airflow-dag-processor-xxx -c dag-processor -- ls -la /opt/airflow/dags/
Common Causes:
- Python syntax errors in DAG file
- Missing Python package imports
- Duplicate DAG IDs
- File permissions issues
Task Execution Failures
Symptoms: DAG appears but tasks fail during execution
Debugging Steps:
-
Check worker logs for the specific task:
just airflow::logs-worker | grep -A 10 -B 10 "task_id" -
Check environment variables in worker:
kubectl exec -n datastack airflow-worker-0 -c worker -- env | grep -E "(AWS|POSTGRES)" -
Test connectivity from worker:
# Test MinIO connectivity kubectl exec -n datastack airflow-worker-0 -c worker -- ping minio.minio.svc.cluster.local # Test PostgreSQL connectivity kubectl exec -n datastack airflow-worker-0 -c worker -- nc -zv postgres-cluster-rw.postgres 5432
Environment Variables Issues
Symptoms: Tasks fail with authentication or connection errors
Debugging Steps:
-
Verify secret exists and contains data:
kubectl describe secret airflow-env-secret -n datastack -
Check if ExternalSecret is syncing (if using External Secrets):
kubectl get externalsecret airflow-env-external-secret -n datastack kubectl describe externalsecret airflow-env-external-secret -n datastack -
Verify environment variables are loaded in pods:
kubectl exec -n datastack airflow-worker-0 -c worker -- printenv | grep -E "(AWS|POSTGRES|DLT)"
Authentication and Permissions
Symptoms: 403 Forbidden errors when triggering DAGs
Debugging Steps:
-
Check user roles in Airflow:
kubectl exec -n datastack airflow-scheduler-xxx -c scheduler -- airflow users list -
Assign proper role if needed:
just airflow::assign-role <username> airflow_user -
Check Keycloak client roles:
- Ensure user has appropriate Keycloak client role
- Re-login to Airflow to sync roles
Package Installation Issues
Symptoms: Import errors for packages like dlt, duckdb
Debugging Steps:
-
Check if packages are installed correctly:
kubectl exec -n datastack airflow-worker-0 -c worker -- pip list | grep -E "(dlt|duckdb)" -
Verify init container logs:
kubectl logs -n datastack airflow-worker-0 -c install-packages -
Check PYTHONPATH configuration:
kubectl exec -n datastack airflow-worker-0 -c worker -- echo $PYTHONPATH
Connection Testing
MinIO Connectivity
# Test MinIO access from worker
kubectl exec -n datastack airflow-worker-0 -c worker -- python3 -c "
import boto3
import os
client = boto3.client('s3',
endpoint_url=os.getenv('AWS_ENDPOINT_URL'),
aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY')
)
print('Buckets:', [b['Name'] for b in client.list_buckets()['Buckets']])
"
Log Analysis Tips
-
Filter logs by timestamp:
kubectl logs -n datastack airflow-worker-0 -c worker --since=10m -
Search for specific errors:
just airflow::logs-worker | grep -i "error\|exception\|failed" -
Monitor logs in real-time:
kubectl logs -n datastack airflow-worker-0 -c worker -f -
Check resource usage:
kubectl top pods -n datastack | grep airflow