feat(airflow): additional python packages and env secrets

This commit is contained in:
Masaki Yatsu
2025-09-16 12:24:17 +09:00
parent 55e00c01a0
commit cbe8c15e22
3 changed files with 264 additions and 17 deletions

View File

@@ -0,0 +1,46 @@
apiVersion: external-secrets.io/v1
kind: ExternalSecret
metadata:
name: airflow-env-external-secret
namespace: {{ .Env.AIRFLOW_NAMESPACE }}
spec:
refreshInterval: 1h
secretStoreRef:
name: vault-secret-store
kind: ClusterSecretStore
target:
name: airflow-env-secret
creationPolicy: Owner
template:
data:
# Fixed values - customize as needed
AWS_ENDPOINT_URL: "http://minio.minio.svc.cluster.local:9000"
DESTINATION__POSTGRES__DATA_WRITER__INSERT_VALUES_MAX_ROWS: "10000"
# Template values from Vault - reference via {{ .postgres_user }}
POSTGRES_USER: "{{ .postgres_user }}"
POSTGRES_PASSWORD: "{{ .postgres_password }}"
# Add more fixed values here:
# SOME_CONFIG_VALUE: "fixed-value"
#
# Add more Vault references here:
# AWS_ACCESS_KEY_ID: "{{ .aws_access_key_id }}"
# AWS_SECRET_ACCESS_KEY: "{{ .aws_secret_access_key }}"
data:
# PostgreSQL configuration - fetch from Vault
- secretKey: postgres_user
remoteRef:
key: postgres/admin
property: username
- secretKey: postgres_password
remoteRef:
key: postgres/admin
property: password
# Add more Vault references here:
# - secretKey: aws_access_key_id
# remoteRef:
# key: minio
# property: access_key_id
# - secretKey: aws_secret_access_key
# remoteRef:
# key: minio
# property: secret_access_key

View File

@@ -2,8 +2,21 @@ useStandardNaming: true
webserverSecretKey: {{ .Env.AIRFLOW_WEBSERVER_SECRET_KEY }}
{{- if eq (.Env.AIRFLOW_ENV_SECRETS_EXIST | default "false") "true" }}
# Extra envFrom for all Airflow containers
extraEnvFrom: |
- secretRef:
name: airflow-env-secret
{{- end }}
executor: CeleryExecutor
# Custom Airflow configuration
config:
scheduler:
# Scan for new DAG files every 60 seconds instead of 300
dag_dir_list_interval: 60
apiServer:
replicas: 1
apiServerConfigConfigMapName: airflow-api-server-config
@@ -38,6 +51,73 @@ migrateDatabaseJob:
images:
migrationsWaitTimeout: 180
# Install additional packages using init containers
workers:
extraInitContainers:
- name: install-packages
image: apache/airflow:3.0.2
command:
- /bin/bash
- -c
- |
pip install --target /opt/airflow/site-packages "{{ .Env.AIRFLOW_EXTRA_PACKAGES }}"
volumeMounts:
- name: extra-packages
mountPath: /opt/airflow/site-packages
extraVolumes:
- name: extra-packages
emptyDir: {}
extraVolumeMounts:
- name: extra-packages
mountPath: /opt/airflow/site-packages
env:
- name: PYTHONPATH
value: "/opt/airflow/site-packages:$PYTHONPATH"
scheduler:
extraInitContainers:
- name: install-packages
image: apache/airflow:3.0.2
command:
- /bin/bash
- -c
- |
pip install --target /opt/airflow/site-packages "{{ .Env.AIRFLOW_EXTRA_PACKAGES }}"
volumeMounts:
- name: extra-packages
mountPath: /opt/airflow/site-packages
extraVolumes:
- name: extra-packages
emptyDir: {}
extraVolumeMounts:
- name: extra-packages
mountPath: /opt/airflow/site-packages
env:
- name: PYTHONPATH
value: "/opt/airflow/site-packages:$PYTHONPATH"
dagProcessor:
extraInitContainers:
- name: install-packages
image: apache/airflow:3.0.2
command:
- /bin/bash
- -c
- |
pip install --target /opt/airflow/site-packages "{{ .Env.AIRFLOW_EXTRA_PACKAGES }}"
volumeMounts:
- name: extra-packages
mountPath: /opt/airflow/site-packages
extraVolumes:
- name: extra-packages
emptyDir: {}
extraVolumeMounts:
- name: extra-packages
mountPath: /opt/airflow/site-packages
env:
- name: PYTHONPATH
value: "/opt/airflow/site-packages:$PYTHONPATH"
flower:
enabled: false

View File

@@ -9,6 +9,7 @@ export AIRFLOW_DAGS_STORAGE_TYPE := env("AIRFLOW_DAGS_STORAGE_TYPE", "")
export AIRFLOW_NFS_IP := env("AIRFLOW_NFS_IP", "")
export AIRFLOW_NFS_PATH := env("AIRFLOW_NFS_PATH", "")
export AIRFLOW_DAGS_STORAGE_SIZE := env("AIRFLOW_DAGS_STORAGE_SIZE", "10Gi")
export AIRFLOW_EXTRA_PACKAGES := env("AIRFLOW_EXTRA_PACKAGES", "dlt[duckdb,filesystem,postgres,s3]>=1.12.1")
[private]
default:
@@ -28,22 +29,6 @@ create-namespace:
@kubectl get namespace ${AIRFLOW_NAMESPACE} &>/dev/null || \
kubectl create namespace ${AIRFLOW_NAMESPACE}
# Delete Airflow namespace
delete-namespace:
#!/bin/bash
set -euo pipefail
# First try normal deletion
kubectl delete namespace ${AIRFLOW_NAMESPACE} --ignore-not-found --timeout=30s || true
# If namespace still exists in Terminating state, force remove
if kubectl get namespace ${AIRFLOW_NAMESPACE} 2>/dev/null | grep -q Terminating; then
echo "Namespace stuck in Terminating, forcing deletion..."
# Remove finalizers
kubectl patch namespace ${AIRFLOW_NAMESPACE} -p '{"metadata":{"finalizers":[]}}' --type=merge || true
# Force delete the namespace
kubectl delete namespace ${AIRFLOW_NAMESPACE} --force --grace-period=0 || true
fi
# Setup database for Airflow
setup-database:
#!/bin/bash
@@ -108,6 +93,44 @@ delete-database-secret:
@kubectl delete secret airflow-database-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
@kubectl delete externalsecret airflow-database-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
# Create environment variables secret example (customize as needed)
create-env-secrets-example:
#!/bin/bash
set -euo pipefail
echo "Creating Airflow environment secrets example..."
echo "This is an example - customize the environment variables as needed"
if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then
echo "External Secrets available. Creating ExternalSecret using template..."
echo "Edit airflow-env-external-secret.gomplate.yaml to customize environment variables"
kubectl delete externalsecret airflow-env-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
kubectl delete secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
gomplate -f airflow-env-external-secret.gomplate.yaml -o airflow-env-external-secret.yaml
kubectl apply -f airflow-env-external-secret.yaml
echo "Waiting for environment secret to be ready..."
kubectl wait --for=condition=Ready externalsecret/airflow-env-external-secret \
-n ${AIRFLOW_NAMESPACE} --timeout=60s
else
echo "External Secrets not available. Creating Kubernetes Secret directly..."
# Example credentials - customize as needed
MINIO_ACCESS_KEY="minio"
MINIO_SECRET_KEY="minio123"
kubectl delete secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
kubectl create secret generic airflow-env-secret -n ${AIRFLOW_NAMESPACE} \
--from-literal=POSTGRES_USER="$POSTGRES_USER" \
--from-literal=POSTGRES_PASSWORD="$POSTGRES_PASSWORD"
# Add more environment variables here:
# --from-literal=AWS_ACCESS_KEY_ID="your_value" \
# --from-literal=AWS_SECRET_ACCESS_KEY="your_value"
echo "Environment secret created directly in Kubernetes"
fi
echo "Example environment secrets created successfully"
echo "Customize the environment variables in this recipe as needed for your project"
# Delete environment secrets
delete-env-secrets:
@kubectl delete secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
@kubectl delete externalsecret airflow-env-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found
# Create OAuth client in Keycloak for Airflow authentication
create-oauth-client:
#!/bin/bash
@@ -280,6 +303,15 @@ install:
kubectl create configmap airflow-api-server-config -n ${AIRFLOW_NAMESPACE} \
--from-file=webserver_config.py=webserver_config.py
export AIRFLOW_ENV_SECRETS_EXIST="false"
if kubectl get secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} &>/dev/null; then
echo "Environment secrets found - will include in deployment"
export AIRFLOW_ENV_SECRETS_EXIST="true"
else
echo "No environment secrets found - use 'just airflow::create-env-secrets-example' to create them if needed"
export AIRFLOW_ENV_SECRETS_EXIST="false"
fi
AIRFLOW_WEBSERVER_SECRET_KEY=$(just utils::random-password) \
gomplate -f airflow-values.gomplate.yaml -o airflow-values.yaml
helm upgrade --install airflow apache-airflow/airflow \
@@ -325,7 +357,6 @@ uninstall delete-db='true':
just delete-database-secret
just delete-oauth-secret
just delete-namespace
if [ "{{ delete-db }}" = "true" ]; then
just postgres::delete-db airflow
fi
@@ -450,6 +481,96 @@ delete-dags-storage:
kubectl delete pvc airflow-dags-pvc -n ${AIRFLOW_NAMESPACE} --ignore-not-found
echo "✅ DAG storage deleted"
# View DAG import error logs
logs-dag-errors dag_file='':
#!/bin/bash
set -euo pipefail
DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}')
if [ -z "${DAG_PROCESSOR_POD}" ]; then
echo "❌ DAG processor pod not found"
exit 1
fi
LOG_DATE=$(date +%Y-%m-%d)
LOG_DIR="/opt/airflow/logs/dag_processor/${LOG_DATE}/dags-folder"
if [ -n "{{dag_file}}" ]; then
# Show specific DAG file errors
LOG_FILE="${LOG_DIR}/{{dag_file}}.log"
echo "📋 Showing errors for DAG file: {{dag_file}}"
echo "📂 Log file: ${LOG_FILE}"
echo ""
kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \
cat "${LOG_FILE}" 2>/dev/null | jq -r 'select(.level == "error") | .timestamp + " " + .event + ": " + .error_detail[0].exc_value' || \
echo "❌ No error log found for {{dag_file}} or file doesn't exist"
else
# List all DAG files with errors
echo "📋 Available DAG error logs:"
echo "📂 Log directory: ${LOG_DIR}"
echo ""
kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \
ls -la "${LOG_DIR}" 2>/dev/null || echo "❌ No DAG logs found for today"
echo ""
echo "💡 Usage: just airflow::logs-dag-errors <dag_file_name>"
echo " Example: just airflow::logs-dag-errors csv_to_postgres_dag.py"
fi
# View DAG processor real-time logs
logs-dag-processor:
#!/bin/bash
set -euo pipefail
DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}')
if [ -z "${DAG_PROCESSOR_POD}" ]; then
echo "❌ DAG processor pod not found"
exit 1
fi
echo "📋 Following DAG processor logs..."
echo "🔍 Pod: ${DAG_PROCESSOR_POD}"
echo ""
kubectl logs -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -f
# List all DAG import errors
logs-import-errors:
#!/bin/bash
set -euo pipefail
DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}')
if [ -z "${DAG_PROCESSOR_POD}" ]; then
echo "❌ DAG processor pod not found"
exit 1
fi
echo "📋 Checking DAG import errors..."
echo ""
kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \
airflow dags list-import-errors || echo "❌ Failed to list import errors"
# View DAG files in directory
logs-dag-files:
#!/bin/bash
set -euo pipefail
DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}')
if [ -z "${DAG_PROCESSOR_POD}" ]; then
echo "❌ DAG processor pod not found"
exit 1
fi
echo "📋 DAG files in /opt/airflow/dags/:"
echo ""
kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \
ls -la /opt/airflow/dags/
# Test DAG file import manually
logs-test-import dag_file:
#!/bin/bash
set -euo pipefail
DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}')
if [ -z "${DAG_PROCESSOR_POD}" ]; then
echo "❌ DAG processor pod not found"
exit 1
fi
echo "🧪 Testing import of DAG file: {{dag_file}}"
echo ""
kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \
python /opt/airflow/dags/{{dag_file}}
# Clean up database and secrets
cleanup:
#!/bin/bash