examples(airflow): csv_to_postgres example

This commit is contained in:
Masaki Yatsu
2025-09-16 12:24:42 +09:00
parent cbe8c15e22
commit 467eaf2abc
3 changed files with 558 additions and 0 deletions

190
airflow/examples/README.md Normal file
View File

@@ -0,0 +1,190 @@
# CSV to PostgreSQL Airflow DAG Deployment
## Overview
This document describes how to deploy the `csv_to_postgres_dag.py` to Airflow using JupyterHub interface. The DAG processes MovieLens dataset files stored in MinIO and loads them into PostgreSQL.
## Dataset Information
### MovieLens 20M Dataset
This DAG processes the [MovieLens 20M dataset](https://grouplens.org/datasets/movielens/20m/) from GroupLens Research. The dataset contains:
- **27,278 movies** with metadata
- **20 million ratings** from 138,493 users
- **465,564 tags** applied by users
- Additional genome data for content-based filtering
### MinIO Storage Structure
The dataset files are stored in MinIO under the `movie-lens` bucket:
```bash
mc alias set buun https://minio.your-domain.com access-key secret-key
mc ls buun/movie-lens
[2025-09-14 12:13:09 JST] 309MiB STANDARD genome-scores.csv
[2025-09-14 12:12:37 JST] 18KiB STANDARD genome-tags.csv
[2025-09-14 12:12:38 JST] 557KiB STANDARD links.csv
[2025-09-14 12:12:38 JST] 1.3MiB STANDARD movies.csv
[2025-09-14 12:13:15 JST] 509MiB STANDARD ratings.csv
[2025-09-14 12:12:42 JST] 16MiB STANDARD tags.csv
```
The DAG currently processes:
- **movies.csv** (1.3MiB) - Movie metadata
- **tags.csv** (16MiB) - User-generated tags
- **ratings.csv** (509MiB) - User ratings (available but currently disabled in DAG)
## Deployment Steps
### 1. Access JupyterHub
- Navigate to your JupyterHub instance (e.g., `https://jupyter.buun.dev`)
- Login with your credentials
### 2. Navigate to Airflow DAGs Directory
In JupyterHub, the Airflow DAGs directory is mounted at:
```
/home/jovyan/airflow-dags/
```
### 3. Upload the DAG File
1. Open JupyterHub file browser
2. Navigate to `/home/jovyan/airflow-dags/`
3. Upload or copy `csv_to_postgres_dag.py` to this directory
### 4. Verify Deployment
1. Access Airflow Web UI (e.g., `https://airflow.buun.dev`)
2. Check that the DAG `csv_to_postgres` appears in the DAGs list
3. If the DAG doesn't appear immediately, wait 1-2 minutes for Airflow to detect the new file
## DAG Features
### Tables Processed
- **movies**: MovieLens movies data with primary key `movieId`
- **ratings**: User ratings with composite primary key `[userId, movieId]`
- **tags**: User tags with composite primary key `[userId, movieId, timestamp]`
- **summary**: Generates metadata summary of all processed tables
### Smart Processing
- **Table Existence Check**: Uses DuckDB PostgreSQL scanner to check if tables already exist
- **Skip Logic**: If a table already contains data, the task will skip processing to avoid reprocessing large files
- **Write Disposition**: Uses `replace` mode for initial loads
### Environment Variables Required
The DAG expects the following environment variables to be set:
- `POSTGRES_URL`: PostgreSQL connection string (format: `postgresql://user:password@host:port/database`)
- `AWS_ACCESS_KEY_ID`: MinIO/S3 access key
- `AWS_SECRET_ACCESS_KEY`: MinIO/S3 secret key
- `AWS_ENDPOINT_URL`: MinIO endpoint URL
- Additional dlt-specific environment variables for advanced configuration
### Environment Variables Setup
Environment variables are provided to Airflow through Kubernetes Secrets. You have several options:
#### Option 1: Customize the Example Template
1. Create the example environment secrets template:
```bash
just airflow::create-env-secrets-example
```
2. **Important**: This creates a template with sample values. You must customize it:
- If using **External Secrets**: Edit `airflow-env-external-secret.gomplate.yaml` to reference your actual Vault paths
- If using **Direct Secrets**: Update the created `airflow-env-secret` with your actual credentials
#### Option 2: Create ExternalSecret Manually
Create an ExternalSecret that references your Vault credentials:
```yaml
apiVersion: external-secrets.io/v1
kind: ExternalSecret
metadata:
name: airflow-env-external-secret
namespace: datastack
spec:
refreshInterval: 1h
secretStoreRef:
name: vault-secret-store
kind: ClusterSecretStore
target:
name: airflow-env-secret
data:
- secretKey: AWS_ACCESS_KEY_ID
remoteRef:
key: minio/credentials
property: access_key
- secretKey: AWS_SECRET_ACCESS_KEY
remoteRef:
key: minio/credentials
property: secret_key
# Add more variables as needed
```
#### Option 3: Create Kubernetes Secret Directly
```bash
kubectl create secret generic airflow-env-secret -n datastack \
--from-literal=AWS_ACCESS_KEY_ID="your-access-key" \
--from-literal=AWS_SECRET_ACCESS_KEY="your-secret-key" \
--from-literal=AWS_ENDPOINT_URL="http://minio.minio.svc.cluster.local:9000" \
--from-literal=POSTGRES_URL="postgresql://user:pass@postgres-cluster-rw.postgres:5432"
```
After creating the environment secrets, redeploy Airflow to pick up the new configuration.
### Manual Execution
The DAG is configured for manual execution only (`schedule_interval=None`). To run:
1. Go to Airflow Web UI
2. Find the `csv_to_postgres` DAG
3. Click "Trigger DAG" to start execution
## Dependencies
- dlt[duckdb,filesystem,postgres,s3]>=1.12.1
- duckdb (for table existence checking)
- Standard Airflow libraries
## Troubleshooting
### DAG Not Appearing
- Check file permissions in `/home/jovyan/airflow-dags/`
- Verify the Python syntax is correct
- Check Airflow logs for import errors
### Environment Variables
- Ensure the `airflow-env-secret` Kubernetes Secret exists in the datastack namespace
- Verify secret contains all required environment variables:
```bash
kubectl describe secret airflow-env-secret -n datastack
```
- If using External Secrets, check that the ExternalSecret is syncing properly:
```bash
kubectl get externalsecret airflow-env-external-secret -n datastack
```
### Connection Issues
- Verify MinIO and PostgreSQL connectivity from Airflow workers
- Check that the `movielens_af` database exists in PostgreSQL
- Ensure MinIO bucket `movie-lens` is accessible with proper credentials

View File

@@ -0,0 +1,336 @@
import os
from datetime import datetime, timedelta
from typing import Any, Dict
import dlt
import duckdb
from dlt.common.schema.typing import TWriteDispositionConfig
from dlt.sources.filesystem import readers
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
class DltResource:
"""DLT resource for data pipeline operations in Airflow."""
def __init__(self):
self.pipeline_name = "minio_to_postgres"
self.destination = "postgres"
self.dataset_name = "movielens_af"
def setup_environment(self):
"""Setup environment variables for dlt."""
# PostgreSQL configuration
postgres_url = os.getenv("POSTGRES_URL", "")
os.environ["DESTINATION__POSTGRES__CREDENTIALS"] = (
f"{postgres_url}/{self.dataset_name}"
)
# Enable detailed logging for dlt
os.environ["DLT_LOG_LEVEL"] = "INFO"
def create_pipeline(self, table_name: str):
"""Create dlt pipeline with optional table-specific name."""
self.setup_environment()
if table_name:
pipeline_name = f"{self.pipeline_name}_{table_name}"
else:
pipeline_name = self.pipeline_name
return dlt.pipeline(
pipeline_name=pipeline_name,
destination=self.destination,
dataset_name=self.dataset_name,
)
def read_csv_from_s3(self, bucket: str, file_glob: str, chunk_size: int = 10000):
"""Read CSV file from S3/MinIO using dlt filesystem readers."""
self.setup_environment()
print(f"Reading CSV from s3://{bucket}/{file_glob}")
csv_reader = readers(
bucket_url=f"s3://{bucket}",
file_glob=file_glob,
).read_csv_duckdb(
chunk_size=chunk_size,
header=True,
)
return csv_reader
def table_exists_and_has_data(self, table_name: str) -> bool:
"""Check if table exists and has data using DuckDB PostgreSQL scanner."""
conn: duckdb.DuckDBPyConnection | None = None
try:
postgres_url = os.getenv("POSTGRES_URL", "")
url_parts = postgres_url.replace("postgresql://", "").split("/")
auth_host = url_parts[0]
if "@" in auth_host:
auth, host_port = auth_host.split("@")
if ":" in auth:
user, password = auth.split(":", 1)
else:
user, password = auth, ""
else:
host_port = auth_host
user, password = "", ""
if ":" in host_port:
host, port = host_port.rsplit(":", 1)
else:
host, port = host_port, "5432"
conn = duckdb.connect()
conn.execute("INSTALL postgres_scanner")
conn.execute("LOAD postgres_scanner")
attach_cmd = f"""
ATTACH 'host={host} port={port} dbname={self.dataset_name} user={user} password={password}' AS postgres_db (TYPE postgres)
"""
conn.execute(attach_cmd)
query = f"""
SELECT COUNT(*) as row_count
FROM postgres_db.{self.dataset_name}.{table_name}
LIMIT 1
"""
result = conn.execute(query).fetchone()
row_count = result[0] if result else 0
print(f"Table {table_name} has {row_count} rows")
return row_count > 0
except Exception as e:
print(f"Table {table_name} does not exist or is empty: {e}")
return False
finally:
try:
if conn:
conn.close()
except Exception:
pass
def run_pipeline(
self,
resource_data,
table_name: str,
write_disposition: TWriteDispositionConfig = "replace",
primary_key: str = "",
) -> Dict[str, Any]:
"""Run dlt pipeline with given resource data."""
pipeline = self.create_pipeline(table_name=table_name)
print(f"Running pipeline '{pipeline.pipeline_name}' for table {table_name}")
pipeline.config.progress = "log"
load_info = pipeline.run(
resource_data, table_name=table_name, write_disposition=write_disposition
)
print(f"Pipeline completed for {table_name}")
if load_info.load_packages:
package = load_info.load_packages[0]
completed_jobs = package.jobs.get("completed_jobs", [])
total_rows = sum(
getattr(job, "rows_count", 0)
for job in completed_jobs
if hasattr(job, "rows_count")
)
return {
"load_id": load_info.loads_ids[0] if load_info.loads_ids else None,
"table_name": table_name,
"completed_jobs": len(completed_jobs),
"pipeline_name": self.pipeline_name,
"destination": self.destination,
"dataset_name": self.dataset_name,
"write_disposition": write_disposition,
"total_rows": total_rows,
}
return {
"table_name": table_name,
"pipeline_name": self.pipeline_name,
"destination": self.destination,
"dataset_name": self.dataset_name,
}
# Task functions
def process_movies_table(**context):
"""Load movies CSV from MinIO to PostgreSQL using dlt."""
dlt_resource = DltResource()
print("Starting movies pipeline...")
table_exists = dlt_resource.table_exists_and_has_data("movies")
if table_exists:
print("Movies table already exists with data, skipping import")
return {"status": "skipped", "reason": "table already exists with data"}
print("Reading movies.csv from MinIO...")
movies_data = dlt_resource.read_csv_from_s3(
bucket="movie-lens", file_glob="movies.csv"
)
movies_data.apply_hints(primary_key="movieId")
result = dlt_resource.run_pipeline(
movies_data,
table_name="movies",
write_disposition="replace",
)
print(f"Movies pipeline completed: {result}")
return result
def process_ratings_table(**context):
"""Load ratings CSV from MinIO to PostgreSQL using dlt."""
dlt_resource = DltResource()
print("Starting ratings pipeline...")
table_exists = dlt_resource.table_exists_and_has_data("ratings")
if table_exists:
print("Ratings table already exists with data, skipping import")
return {"status": "skipped", "reason": "table already exists with data"}
print("Reading ratings.csv from MinIO...")
ratings_data = dlt_resource.read_csv_from_s3(
bucket="movie-lens", file_glob="ratings.csv"
)
ratings_data.apply_hints(primary_key=["userId", "movieId"])
result = dlt_resource.run_pipeline(
ratings_data, table_name="ratings", write_disposition="replace"
)
print(f"Ratings pipeline completed: {result}")
return result
def process_tags_table(**context):
"""Load tags CSV from MinIO to PostgreSQL using dlt."""
dlt_resource = DltResource()
print("Starting tags pipeline...")
table_exists = dlt_resource.table_exists_and_has_data("tags")
if table_exists:
print("Tags table already exists with data, skipping import")
return {"status": "skipped", "reason": "table already exists with data"}
print("Reading tags.csv from MinIO...")
tags_data = dlt_resource.read_csv_from_s3(bucket="movie-lens", file_glob="tags.csv")
tags_data.apply_hints(primary_key=["userId", "movieId", "timestamp"])
result = dlt_resource.run_pipeline(
tags_data, table_name="tags", write_disposition="replace"
)
print(f"Tags pipeline completed: {result}")
return result
def generate_summary(**context):
"""Generate summary of all loaded MovieLens data."""
dlt_resource = DltResource()
print("Generating summary of MovieLens dataset...")
pipeline_names = ["movies", "ratings", "tags"]
schema_info = {}
tables_found = []
for table_name in pipeline_names:
try:
pipeline = dlt_resource.create_pipeline(table_name=table_name)
if pipeline.default_schema_name in pipeline.schemas:
schema = pipeline.schemas[pipeline.default_schema_name]
print(f"Found schema for pipeline '{pipeline.pipeline_name}'")
schema_info[table_name] = {
"pipeline": pipeline.pipeline_name,
"schema_version": schema.version,
}
tables_found.extend(
[t for t in schema.tables.keys() if t == table_name]
)
except Exception as e:
print(f"Could not get schema for {table_name}: {e}")
print(
f"Summary: Found {len(tables_found)} tables from {len(schema_info)} pipelines"
)
return {
"base_pipeline_name": dlt_resource.pipeline_name,
"dataset_name": dlt_resource.dataset_name,
"destination": dlt_resource.destination,
"pipelines_checked": list(schema_info.keys()),
"tables_found": tables_found,
"movielens_tables_count": len(tables_found),
"schema_info": schema_info,
}
# Default arguments for the DAG
default_args = {
"owner": "data-team",
"depends_on_past": False,
"start_date": datetime(2024, 1, 1),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
# Create the DAG
dag = DAG(
"csv_to_postgres",
default_args=default_args,
description="Load MovieLens CSV data from MinIO to PostgreSQL using dlt",
schedule=None, # Manual trigger only
catchup=False,
tags=["etl", "movielens", "dlt"],
)
# Create tasks
movies_task = PythonOperator(
task_id="process_movies",
python_callable=process_movies_table,
dag=dag,
)
ratings_task = PythonOperator(
task_id="process_ratings",
python_callable=process_ratings_table,
dag=dag,
)
tags_task = PythonOperator(
task_id="process_tags",
python_callable=process_tags_table,
dag=dag,
)
summary_task = PythonOperator(
task_id="generate_summary",
python_callable=generate_summary,
dag=dag,
)
# Set task dependencies
[movies_task, ratings_task, tags_task] >> summary_task

View File

@@ -0,0 +1,32 @@
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
def test_task(**context):
print("Test task executed successfully!")
return "success"
default_args = {
"owner": "test",
"depends_on_past": False,
"start_date": datetime(2024, 1, 1),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"test_dag",
default_args=default_args,
description="Simple test DAG",
schedule=None,
catchup=False,
tags=["test"],
)
test_task = PythonOperator(
task_id="test_task",
python_callable=test_task,
dag=dag,
)