From 467eaf2abc01fbd670e5f1ebde370d419bcf7a23 Mon Sep 17 00:00:00 2001 From: Masaki Yatsu Date: Tue, 16 Sep 2025 12:24:42 +0900 Subject: [PATCH] examples(airflow): csv_to_postgres example --- airflow/examples/README.md | 190 ++++++++++++++ airflow/examples/csv_to_postgres_dag.py | 336 ++++++++++++++++++++++++ airflow/examples/test_dag.py | 32 +++ 3 files changed, 558 insertions(+) create mode 100644 airflow/examples/README.md create mode 100644 airflow/examples/csv_to_postgres_dag.py create mode 100644 airflow/examples/test_dag.py diff --git a/airflow/examples/README.md b/airflow/examples/README.md new file mode 100644 index 0000000..8f80a79 --- /dev/null +++ b/airflow/examples/README.md @@ -0,0 +1,190 @@ +# CSV to PostgreSQL Airflow DAG Deployment + +## Overview + +This document describes how to deploy the `csv_to_postgres_dag.py` to Airflow using JupyterHub interface. The DAG processes MovieLens dataset files stored in MinIO and loads them into PostgreSQL. + +## Dataset Information + +### MovieLens 20M Dataset + +This DAG processes the [MovieLens 20M dataset](https://grouplens.org/datasets/movielens/20m/) from GroupLens Research. The dataset contains: + +- **27,278 movies** with metadata +- **20 million ratings** from 138,493 users +- **465,564 tags** applied by users +- Additional genome data for content-based filtering + +### MinIO Storage Structure + +The dataset files are stored in MinIO under the `movie-lens` bucket: + +```bash +mc alias set buun https://minio.your-domain.com access-key secret-key +mc ls buun/movie-lens + +[2025-09-14 12:13:09 JST] 309MiB STANDARD genome-scores.csv +[2025-09-14 12:12:37 JST] 18KiB STANDARD genome-tags.csv +[2025-09-14 12:12:38 JST] 557KiB STANDARD links.csv +[2025-09-14 12:12:38 JST] 1.3MiB STANDARD movies.csv +[2025-09-14 12:13:15 JST] 509MiB STANDARD ratings.csv +[2025-09-14 12:12:42 JST] 16MiB STANDARD tags.csv +``` + +The DAG currently processes: + +- **movies.csv** (1.3MiB) - Movie metadata +- **tags.csv** (16MiB) - User-generated tags +- **ratings.csv** (509MiB) - User ratings (available but currently disabled in DAG) + +## Deployment Steps + +### 1. Access JupyterHub + +- Navigate to your JupyterHub instance (e.g., `https://jupyter.buun.dev`) +- Login with your credentials + +### 2. Navigate to Airflow DAGs Directory + +In JupyterHub, the Airflow DAGs directory is mounted at: + +``` +/home/jovyan/airflow-dags/ +``` + +### 3. Upload the DAG File + +1. Open JupyterHub file browser +2. Navigate to `/home/jovyan/airflow-dags/` +3. Upload or copy `csv_to_postgres_dag.py` to this directory + +### 4. Verify Deployment + +1. Access Airflow Web UI (e.g., `https://airflow.buun.dev`) +2. Check that the DAG `csv_to_postgres` appears in the DAGs list +3. If the DAG doesn't appear immediately, wait 1-2 minutes for Airflow to detect the new file + +## DAG Features + +### Tables Processed + +- **movies**: MovieLens movies data with primary key `movieId` +- **ratings**: User ratings with composite primary key `[userId, movieId]` +- **tags**: User tags with composite primary key `[userId, movieId, timestamp]` +- **summary**: Generates metadata summary of all processed tables + +### Smart Processing + +- **Table Existence Check**: Uses DuckDB PostgreSQL scanner to check if tables already exist +- **Skip Logic**: If a table already contains data, the task will skip processing to avoid reprocessing large files +- **Write Disposition**: Uses `replace` mode for initial loads + +### Environment Variables Required + +The DAG expects the following environment variables to be set: + +- `POSTGRES_URL`: PostgreSQL connection string (format: `postgresql://user:password@host:port/database`) +- `AWS_ACCESS_KEY_ID`: MinIO/S3 access key +- `AWS_SECRET_ACCESS_KEY`: MinIO/S3 secret key +- `AWS_ENDPOINT_URL`: MinIO endpoint URL +- Additional dlt-specific environment variables for advanced configuration + +### Environment Variables Setup + +Environment variables are provided to Airflow through Kubernetes Secrets. You have several options: + +#### Option 1: Customize the Example Template + +1. Create the example environment secrets template: + + ```bash + just airflow::create-env-secrets-example + ``` + +2. **Important**: This creates a template with sample values. You must customize it: + - If using **External Secrets**: Edit `airflow-env-external-secret.gomplate.yaml` to reference your actual Vault paths + - If using **Direct Secrets**: Update the created `airflow-env-secret` with your actual credentials + +#### Option 2: Create ExternalSecret Manually + +Create an ExternalSecret that references your Vault credentials: + +```yaml +apiVersion: external-secrets.io/v1 +kind: ExternalSecret +metadata: + name: airflow-env-external-secret + namespace: datastack +spec: + refreshInterval: 1h + secretStoreRef: + name: vault-secret-store + kind: ClusterSecretStore + target: + name: airflow-env-secret + data: + - secretKey: AWS_ACCESS_KEY_ID + remoteRef: + key: minio/credentials + property: access_key + - secretKey: AWS_SECRET_ACCESS_KEY + remoteRef: + key: minio/credentials + property: secret_key + # Add more variables as needed +``` + +#### Option 3: Create Kubernetes Secret Directly + +```bash +kubectl create secret generic airflow-env-secret -n datastack \ + --from-literal=AWS_ACCESS_KEY_ID="your-access-key" \ + --from-literal=AWS_SECRET_ACCESS_KEY="your-secret-key" \ + --from-literal=AWS_ENDPOINT_URL="http://minio.minio.svc.cluster.local:9000" \ + --from-literal=POSTGRES_URL="postgresql://user:pass@postgres-cluster-rw.postgres:5432" +``` + +After creating the environment secrets, redeploy Airflow to pick up the new configuration. + +### Manual Execution + +The DAG is configured for manual execution only (`schedule_interval=None`). To run: + +1. Go to Airflow Web UI +2. Find the `csv_to_postgres` DAG +3. Click "Trigger DAG" to start execution + +## Dependencies + +- dlt[duckdb,filesystem,postgres,s3]>=1.12.1 +- duckdb (for table existence checking) +- Standard Airflow libraries + +## Troubleshooting + +### DAG Not Appearing + +- Check file permissions in `/home/jovyan/airflow-dags/` +- Verify the Python syntax is correct +- Check Airflow logs for import errors + +### Environment Variables + +- Ensure the `airflow-env-secret` Kubernetes Secret exists in the datastack namespace +- Verify secret contains all required environment variables: + + ```bash + kubectl describe secret airflow-env-secret -n datastack + ``` + +- If using External Secrets, check that the ExternalSecret is syncing properly: + + ```bash + kubectl get externalsecret airflow-env-external-secret -n datastack + ``` + +### Connection Issues + +- Verify MinIO and PostgreSQL connectivity from Airflow workers +- Check that the `movielens_af` database exists in PostgreSQL +- Ensure MinIO bucket `movie-lens` is accessible with proper credentials diff --git a/airflow/examples/csv_to_postgres_dag.py b/airflow/examples/csv_to_postgres_dag.py new file mode 100644 index 0000000..e81d7e9 --- /dev/null +++ b/airflow/examples/csv_to_postgres_dag.py @@ -0,0 +1,336 @@ +import os +from datetime import datetime, timedelta +from typing import Any, Dict + +import dlt +import duckdb +from dlt.common.schema.typing import TWriteDispositionConfig +from dlt.sources.filesystem import readers + +from airflow import DAG +from airflow.providers.standard.operators.python import PythonOperator + + +class DltResource: + """DLT resource for data pipeline operations in Airflow.""" + + def __init__(self): + self.pipeline_name = "minio_to_postgres" + self.destination = "postgres" + self.dataset_name = "movielens_af" + + def setup_environment(self): + """Setup environment variables for dlt.""" + # PostgreSQL configuration + postgres_url = os.getenv("POSTGRES_URL", "") + os.environ["DESTINATION__POSTGRES__CREDENTIALS"] = ( + f"{postgres_url}/{self.dataset_name}" + ) + + # Enable detailed logging for dlt + os.environ["DLT_LOG_LEVEL"] = "INFO" + + def create_pipeline(self, table_name: str): + """Create dlt pipeline with optional table-specific name.""" + self.setup_environment() + + if table_name: + pipeline_name = f"{self.pipeline_name}_{table_name}" + else: + pipeline_name = self.pipeline_name + + return dlt.pipeline( + pipeline_name=pipeline_name, + destination=self.destination, + dataset_name=self.dataset_name, + ) + + def read_csv_from_s3(self, bucket: str, file_glob: str, chunk_size: int = 10000): + """Read CSV file from S3/MinIO using dlt filesystem readers.""" + self.setup_environment() + + print(f"Reading CSV from s3://{bucket}/{file_glob}") + + csv_reader = readers( + bucket_url=f"s3://{bucket}", + file_glob=file_glob, + ).read_csv_duckdb( + chunk_size=chunk_size, + header=True, + ) + + return csv_reader + + def table_exists_and_has_data(self, table_name: str) -> bool: + """Check if table exists and has data using DuckDB PostgreSQL scanner.""" + conn: duckdb.DuckDBPyConnection | None = None + + try: + postgres_url = os.getenv("POSTGRES_URL", "") + url_parts = postgres_url.replace("postgresql://", "").split("/") + auth_host = url_parts[0] + + if "@" in auth_host: + auth, host_port = auth_host.split("@") + if ":" in auth: + user, password = auth.split(":", 1) + else: + user, password = auth, "" + else: + host_port = auth_host + user, password = "", "" + + if ":" in host_port: + host, port = host_port.rsplit(":", 1) + else: + host, port = host_port, "5432" + + conn = duckdb.connect() + conn.execute("INSTALL postgres_scanner") + conn.execute("LOAD postgres_scanner") + + attach_cmd = f""" + ATTACH 'host={host} port={port} dbname={self.dataset_name} user={user} password={password}' AS postgres_db (TYPE postgres) + """ + conn.execute(attach_cmd) + + query = f""" + SELECT COUNT(*) as row_count + FROM postgres_db.{self.dataset_name}.{table_name} + LIMIT 1 + """ + + result = conn.execute(query).fetchone() + row_count = result[0] if result else 0 + + print(f"Table {table_name} has {row_count} rows") + return row_count > 0 + + except Exception as e: + print(f"Table {table_name} does not exist or is empty: {e}") + return False + finally: + try: + if conn: + conn.close() + except Exception: + pass + + def run_pipeline( + self, + resource_data, + table_name: str, + write_disposition: TWriteDispositionConfig = "replace", + primary_key: str = "", + ) -> Dict[str, Any]: + """Run dlt pipeline with given resource data.""" + pipeline = self.create_pipeline(table_name=table_name) + + print(f"Running pipeline '{pipeline.pipeline_name}' for table {table_name}") + + pipeline.config.progress = "log" + + load_info = pipeline.run( + resource_data, table_name=table_name, write_disposition=write_disposition + ) + + print(f"Pipeline completed for {table_name}") + + if load_info.load_packages: + package = load_info.load_packages[0] + completed_jobs = package.jobs.get("completed_jobs", []) + + total_rows = sum( + getattr(job, "rows_count", 0) + for job in completed_jobs + if hasattr(job, "rows_count") + ) + + return { + "load_id": load_info.loads_ids[0] if load_info.loads_ids else None, + "table_name": table_name, + "completed_jobs": len(completed_jobs), + "pipeline_name": self.pipeline_name, + "destination": self.destination, + "dataset_name": self.dataset_name, + "write_disposition": write_disposition, + "total_rows": total_rows, + } + + return { + "table_name": table_name, + "pipeline_name": self.pipeline_name, + "destination": self.destination, + "dataset_name": self.dataset_name, + } + + +# Task functions +def process_movies_table(**context): + """Load movies CSV from MinIO to PostgreSQL using dlt.""" + dlt_resource = DltResource() + + print("Starting movies pipeline...") + + table_exists = dlt_resource.table_exists_and_has_data("movies") + if table_exists: + print("Movies table already exists with data, skipping import") + return {"status": "skipped", "reason": "table already exists with data"} + + print("Reading movies.csv from MinIO...") + movies_data = dlt_resource.read_csv_from_s3( + bucket="movie-lens", file_glob="movies.csv" + ) + + movies_data.apply_hints(primary_key="movieId") + + result = dlt_resource.run_pipeline( + movies_data, + table_name="movies", + write_disposition="replace", + ) + + print(f"Movies pipeline completed: {result}") + return result + + +def process_ratings_table(**context): + """Load ratings CSV from MinIO to PostgreSQL using dlt.""" + dlt_resource = DltResource() + + print("Starting ratings pipeline...") + + table_exists = dlt_resource.table_exists_and_has_data("ratings") + if table_exists: + print("Ratings table already exists with data, skipping import") + return {"status": "skipped", "reason": "table already exists with data"} + + print("Reading ratings.csv from MinIO...") + ratings_data = dlt_resource.read_csv_from_s3( + bucket="movie-lens", file_glob="ratings.csv" + ) + + ratings_data.apply_hints(primary_key=["userId", "movieId"]) + + result = dlt_resource.run_pipeline( + ratings_data, table_name="ratings", write_disposition="replace" + ) + + print(f"Ratings pipeline completed: {result}") + return result + + +def process_tags_table(**context): + """Load tags CSV from MinIO to PostgreSQL using dlt.""" + dlt_resource = DltResource() + + print("Starting tags pipeline...") + + table_exists = dlt_resource.table_exists_and_has_data("tags") + if table_exists: + print("Tags table already exists with data, skipping import") + return {"status": "skipped", "reason": "table already exists with data"} + + print("Reading tags.csv from MinIO...") + tags_data = dlt_resource.read_csv_from_s3(bucket="movie-lens", file_glob="tags.csv") + + tags_data.apply_hints(primary_key=["userId", "movieId", "timestamp"]) + + result = dlt_resource.run_pipeline( + tags_data, table_name="tags", write_disposition="replace" + ) + + print(f"Tags pipeline completed: {result}") + return result + + +def generate_summary(**context): + """Generate summary of all loaded MovieLens data.""" + dlt_resource = DltResource() + + print("Generating summary of MovieLens dataset...") + + pipeline_names = ["movies", "ratings", "tags"] + schema_info = {} + tables_found = [] + + for table_name in pipeline_names: + try: + pipeline = dlt_resource.create_pipeline(table_name=table_name) + + if pipeline.default_schema_name in pipeline.schemas: + schema = pipeline.schemas[pipeline.default_schema_name] + print(f"Found schema for pipeline '{pipeline.pipeline_name}'") + schema_info[table_name] = { + "pipeline": pipeline.pipeline_name, + "schema_version": schema.version, + } + tables_found.extend( + [t for t in schema.tables.keys() if t == table_name] + ) + except Exception as e: + print(f"Could not get schema for {table_name}: {e}") + + print( + f"Summary: Found {len(tables_found)} tables from {len(schema_info)} pipelines" + ) + + return { + "base_pipeline_name": dlt_resource.pipeline_name, + "dataset_name": dlt_resource.dataset_name, + "destination": dlt_resource.destination, + "pipelines_checked": list(schema_info.keys()), + "tables_found": tables_found, + "movielens_tables_count": len(tables_found), + "schema_info": schema_info, + } + + +# Default arguments for the DAG +default_args = { + "owner": "data-team", + "depends_on_past": False, + "start_date": datetime(2024, 1, 1), + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=5), +} + +# Create the DAG +dag = DAG( + "csv_to_postgres", + default_args=default_args, + description="Load MovieLens CSV data from MinIO to PostgreSQL using dlt", + schedule=None, # Manual trigger only + catchup=False, + tags=["etl", "movielens", "dlt"], +) + +# Create tasks +movies_task = PythonOperator( + task_id="process_movies", + python_callable=process_movies_table, + dag=dag, +) + +ratings_task = PythonOperator( + task_id="process_ratings", + python_callable=process_ratings_table, + dag=dag, +) + +tags_task = PythonOperator( + task_id="process_tags", + python_callable=process_tags_table, + dag=dag, +) + +summary_task = PythonOperator( + task_id="generate_summary", + python_callable=generate_summary, + dag=dag, +) + +# Set task dependencies +[movies_task, ratings_task, tags_task] >> summary_task diff --git a/airflow/examples/test_dag.py b/airflow/examples/test_dag.py new file mode 100644 index 0000000..e851d10 --- /dev/null +++ b/airflow/examples/test_dag.py @@ -0,0 +1,32 @@ +from datetime import datetime, timedelta +from airflow import DAG +from airflow.providers.standard.operators.python import PythonOperator + +def test_task(**context): + print("Test task executed successfully!") + return "success" + +default_args = { + "owner": "test", + "depends_on_past": False, + "start_date": datetime(2024, 1, 1), + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=5), +} + +dag = DAG( + "test_dag", + default_args=default_args, + description="Simple test DAG", + schedule=None, + catchup=False, + tags=["test"], +) + +test_task = PythonOperator( + task_id="test_task", + python_callable=test_task, + dag=dag, +) \ No newline at end of file