diff --git a/airflow/airflow-env-external-secret.gomplate.yaml b/airflow/airflow-env-external-secret.gomplate.yaml new file mode 100644 index 0000000..89cd864 --- /dev/null +++ b/airflow/airflow-env-external-secret.gomplate.yaml @@ -0,0 +1,46 @@ +apiVersion: external-secrets.io/v1 +kind: ExternalSecret +metadata: + name: airflow-env-external-secret + namespace: {{ .Env.AIRFLOW_NAMESPACE }} +spec: + refreshInterval: 1h + secretStoreRef: + name: vault-secret-store + kind: ClusterSecretStore + target: + name: airflow-env-secret + creationPolicy: Owner + template: + data: + # Fixed values - customize as needed + AWS_ENDPOINT_URL: "http://minio.minio.svc.cluster.local:9000" + DESTINATION__POSTGRES__DATA_WRITER__INSERT_VALUES_MAX_ROWS: "10000" + # Template values from Vault - reference via {{ .postgres_user }} + POSTGRES_USER: "{{ .postgres_user }}" + POSTGRES_PASSWORD: "{{ .postgres_password }}" + # Add more fixed values here: + # SOME_CONFIG_VALUE: "fixed-value" + # + # Add more Vault references here: + # AWS_ACCESS_KEY_ID: "{{ .aws_access_key_id }}" + # AWS_SECRET_ACCESS_KEY: "{{ .aws_secret_access_key }}" + data: + # PostgreSQL configuration - fetch from Vault + - secretKey: postgres_user + remoteRef: + key: postgres/admin + property: username + - secretKey: postgres_password + remoteRef: + key: postgres/admin + property: password + # Add more Vault references here: + # - secretKey: aws_access_key_id + # remoteRef: + # key: minio + # property: access_key_id + # - secretKey: aws_secret_access_key + # remoteRef: + # key: minio + # property: secret_access_key diff --git a/airflow/airflow-values.gomplate.yaml b/airflow/airflow-values.gomplate.yaml index 7b8d102..8609360 100644 --- a/airflow/airflow-values.gomplate.yaml +++ b/airflow/airflow-values.gomplate.yaml @@ -2,8 +2,21 @@ useStandardNaming: true webserverSecretKey: {{ .Env.AIRFLOW_WEBSERVER_SECRET_KEY }} +{{- if eq (.Env.AIRFLOW_ENV_SECRETS_EXIST | default "false") "true" }} +# Extra envFrom for all Airflow containers +extraEnvFrom: | + - secretRef: + name: airflow-env-secret +{{- end }} + executor: CeleryExecutor +# Custom Airflow configuration +config: + scheduler: + # Scan for new DAG files every 60 seconds instead of 300 + dag_dir_list_interval: 60 + apiServer: replicas: 1 apiServerConfigConfigMapName: airflow-api-server-config @@ -38,6 +51,73 @@ migrateDatabaseJob: images: migrationsWaitTimeout: 180 +# Install additional packages using init containers +workers: + extraInitContainers: + - name: install-packages + image: apache/airflow:3.0.2 + command: + - /bin/bash + - -c + - | + pip install --target /opt/airflow/site-packages "{{ .Env.AIRFLOW_EXTRA_PACKAGES }}" + volumeMounts: + - name: extra-packages + mountPath: /opt/airflow/site-packages + extraVolumes: + - name: extra-packages + emptyDir: {} + extraVolumeMounts: + - name: extra-packages + mountPath: /opt/airflow/site-packages + env: + - name: PYTHONPATH + value: "/opt/airflow/site-packages:$PYTHONPATH" + +scheduler: + extraInitContainers: + - name: install-packages + image: apache/airflow:3.0.2 + command: + - /bin/bash + - -c + - | + pip install --target /opt/airflow/site-packages "{{ .Env.AIRFLOW_EXTRA_PACKAGES }}" + volumeMounts: + - name: extra-packages + mountPath: /opt/airflow/site-packages + extraVolumes: + - name: extra-packages + emptyDir: {} + extraVolumeMounts: + - name: extra-packages + mountPath: /opt/airflow/site-packages + env: + - name: PYTHONPATH + value: "/opt/airflow/site-packages:$PYTHONPATH" + +dagProcessor: + extraInitContainers: + - name: install-packages + image: apache/airflow:3.0.2 + command: + - /bin/bash + - -c + - | + pip install --target /opt/airflow/site-packages "{{ .Env.AIRFLOW_EXTRA_PACKAGES }}" + volumeMounts: + - name: extra-packages + mountPath: /opt/airflow/site-packages + extraVolumes: + - name: extra-packages + emptyDir: {} + extraVolumeMounts: + - name: extra-packages + mountPath: /opt/airflow/site-packages + env: + - name: PYTHONPATH + value: "/opt/airflow/site-packages:$PYTHONPATH" + flower: enabled: false diff --git a/airflow/justfile b/airflow/justfile index 366bc91..285d8e4 100644 --- a/airflow/justfile +++ b/airflow/justfile @@ -9,6 +9,7 @@ export AIRFLOW_DAGS_STORAGE_TYPE := env("AIRFLOW_DAGS_STORAGE_TYPE", "") export AIRFLOW_NFS_IP := env("AIRFLOW_NFS_IP", "") export AIRFLOW_NFS_PATH := env("AIRFLOW_NFS_PATH", "") export AIRFLOW_DAGS_STORAGE_SIZE := env("AIRFLOW_DAGS_STORAGE_SIZE", "10Gi") +export AIRFLOW_EXTRA_PACKAGES := env("AIRFLOW_EXTRA_PACKAGES", "dlt[duckdb,filesystem,postgres,s3]>=1.12.1") [private] default: @@ -28,22 +29,6 @@ create-namespace: @kubectl get namespace ${AIRFLOW_NAMESPACE} &>/dev/null || \ kubectl create namespace ${AIRFLOW_NAMESPACE} -# Delete Airflow namespace -delete-namespace: - #!/bin/bash - set -euo pipefail - # First try normal deletion - kubectl delete namespace ${AIRFLOW_NAMESPACE} --ignore-not-found --timeout=30s || true - - # If namespace still exists in Terminating state, force remove - if kubectl get namespace ${AIRFLOW_NAMESPACE} 2>/dev/null | grep -q Terminating; then - echo "Namespace stuck in Terminating, forcing deletion..." - # Remove finalizers - kubectl patch namespace ${AIRFLOW_NAMESPACE} -p '{"metadata":{"finalizers":[]}}' --type=merge || true - # Force delete the namespace - kubectl delete namespace ${AIRFLOW_NAMESPACE} --force --grace-period=0 || true - fi - # Setup database for Airflow setup-database: #!/bin/bash @@ -108,6 +93,44 @@ delete-database-secret: @kubectl delete secret airflow-database-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found @kubectl delete externalsecret airflow-database-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found +# Create environment variables secret example (customize as needed) +create-env-secrets-example: + #!/bin/bash + set -euo pipefail + echo "Creating Airflow environment secrets example..." + echo "This is an example - customize the environment variables as needed" + if helm status external-secrets -n ${EXTERNAL_SECRETS_NAMESPACE} &>/dev/null; then + echo "External Secrets available. Creating ExternalSecret using template..." + echo "Edit airflow-env-external-secret.gomplate.yaml to customize environment variables" + kubectl delete externalsecret airflow-env-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found + kubectl delete secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found + gomplate -f airflow-env-external-secret.gomplate.yaml -o airflow-env-external-secret.yaml + kubectl apply -f airflow-env-external-secret.yaml + echo "Waiting for environment secret to be ready..." + kubectl wait --for=condition=Ready externalsecret/airflow-env-external-secret \ + -n ${AIRFLOW_NAMESPACE} --timeout=60s + else + echo "External Secrets not available. Creating Kubernetes Secret directly..." + # Example credentials - customize as needed + MINIO_ACCESS_KEY="minio" + MINIO_SECRET_KEY="minio123" + kubectl delete secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found + kubectl create secret generic airflow-env-secret -n ${AIRFLOW_NAMESPACE} \ + --from-literal=POSTGRES_USER="$POSTGRES_USER" \ + --from-literal=POSTGRES_PASSWORD="$POSTGRES_PASSWORD" + # Add more environment variables here: + # --from-literal=AWS_ACCESS_KEY_ID="your_value" \ + # --from-literal=AWS_SECRET_ACCESS_KEY="your_value" + echo "Environment secret created directly in Kubernetes" + fi + echo "Example environment secrets created successfully" + echo "Customize the environment variables in this recipe as needed for your project" + +# Delete environment secrets +delete-env-secrets: + @kubectl delete secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found + @kubectl delete externalsecret airflow-env-external-secret -n ${AIRFLOW_NAMESPACE} --ignore-not-found + # Create OAuth client in Keycloak for Airflow authentication create-oauth-client: #!/bin/bash @@ -280,6 +303,15 @@ install: kubectl create configmap airflow-api-server-config -n ${AIRFLOW_NAMESPACE} \ --from-file=webserver_config.py=webserver_config.py + export AIRFLOW_ENV_SECRETS_EXIST="false" + if kubectl get secret airflow-env-secret -n ${AIRFLOW_NAMESPACE} &>/dev/null; then + echo "Environment secrets found - will include in deployment" + export AIRFLOW_ENV_SECRETS_EXIST="true" + else + echo "No environment secrets found - use 'just airflow::create-env-secrets-example' to create them if needed" + export AIRFLOW_ENV_SECRETS_EXIST="false" + fi + AIRFLOW_WEBSERVER_SECRET_KEY=$(just utils::random-password) \ gomplate -f airflow-values.gomplate.yaml -o airflow-values.yaml helm upgrade --install airflow apache-airflow/airflow \ @@ -325,7 +357,6 @@ uninstall delete-db='true': just delete-database-secret just delete-oauth-secret - just delete-namespace if [ "{{ delete-db }}" = "true" ]; then just postgres::delete-db airflow fi @@ -450,6 +481,96 @@ delete-dags-storage: kubectl delete pvc airflow-dags-pvc -n ${AIRFLOW_NAMESPACE} --ignore-not-found echo "โœ… DAG storage deleted" +# View DAG import error logs +logs-dag-errors dag_file='': + #!/bin/bash + set -euo pipefail + DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}') + if [ -z "${DAG_PROCESSOR_POD}" ]; then + echo "โŒ DAG processor pod not found" + exit 1 + fi + + LOG_DATE=$(date +%Y-%m-%d) + LOG_DIR="/opt/airflow/logs/dag_processor/${LOG_DATE}/dags-folder" + + if [ -n "{{dag_file}}" ]; then + # Show specific DAG file errors + LOG_FILE="${LOG_DIR}/{{dag_file}}.log" + echo "๐Ÿ“‹ Showing errors for DAG file: {{dag_file}}" + echo "๐Ÿ“‚ Log file: ${LOG_FILE}" + echo "" + kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \ + cat "${LOG_FILE}" 2>/dev/null | jq -r 'select(.level == "error") | .timestamp + " " + .event + ": " + .error_detail[0].exc_value' || \ + echo "โŒ No error log found for {{dag_file}} or file doesn't exist" + else + # List all DAG files with errors + echo "๐Ÿ“‹ Available DAG error logs:" + echo "๐Ÿ“‚ Log directory: ${LOG_DIR}" + echo "" + kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \ + ls -la "${LOG_DIR}" 2>/dev/null || echo "โŒ No DAG logs found for today" + echo "" + echo "๐Ÿ’ก Usage: just airflow::logs-dag-errors " + echo " Example: just airflow::logs-dag-errors csv_to_postgres_dag.py" + fi + +# View DAG processor real-time logs +logs-dag-processor: + #!/bin/bash + set -euo pipefail + DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}') + if [ -z "${DAG_PROCESSOR_POD}" ]; then + echo "โŒ DAG processor pod not found" + exit 1 + fi + echo "๐Ÿ“‹ Following DAG processor logs..." + echo "๐Ÿ” Pod: ${DAG_PROCESSOR_POD}" + echo "" + kubectl logs -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -f + +# List all DAG import errors +logs-import-errors: + #!/bin/bash + set -euo pipefail + DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}') + if [ -z "${DAG_PROCESSOR_POD}" ]; then + echo "โŒ DAG processor pod not found" + exit 1 + fi + echo "๐Ÿ“‹ Checking DAG import errors..." + echo "" + kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \ + airflow dags list-import-errors || echo "โŒ Failed to list import errors" + +# View DAG files in directory +logs-dag-files: + #!/bin/bash + set -euo pipefail + DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}') + if [ -z "${DAG_PROCESSOR_POD}" ]; then + echo "โŒ DAG processor pod not found" + exit 1 + fi + echo "๐Ÿ“‹ DAG files in /opt/airflow/dags/:" + echo "" + kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \ + ls -la /opt/airflow/dags/ + +# Test DAG file import manually +logs-test-import dag_file: + #!/bin/bash + set -euo pipefail + DAG_PROCESSOR_POD=$(kubectl get pods -n ${AIRFLOW_NAMESPACE} -l component=dag-processor -o jsonpath='{.items[0].metadata.name}') + if [ -z "${DAG_PROCESSOR_POD}" ]; then + echo "โŒ DAG processor pod not found" + exit 1 + fi + echo "๐Ÿงช Testing import of DAG file: {{dag_file}}" + echo "" + kubectl exec -n ${AIRFLOW_NAMESPACE} ${DAG_PROCESSOR_POD} -c dag-processor -- \ + python /opt/airflow/dags/{{dag_file}} + # Clean up database and secrets cleanup: #!/bin/bash