Files
buun-stack/airflow/examples/csv_to_postgres_dag.py
2025-09-16 12:24:42 +09:00

337 lines
10 KiB
Python

import os
from datetime import datetime, timedelta
from typing import Any, Dict
import dlt
import duckdb
from dlt.common.schema.typing import TWriteDispositionConfig
from dlt.sources.filesystem import readers
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
class DltResource:
"""DLT resource for data pipeline operations in Airflow."""
def __init__(self):
self.pipeline_name = "minio_to_postgres"
self.destination = "postgres"
self.dataset_name = "movielens_af"
def setup_environment(self):
"""Setup environment variables for dlt."""
# PostgreSQL configuration
postgres_url = os.getenv("POSTGRES_URL", "")
os.environ["DESTINATION__POSTGRES__CREDENTIALS"] = (
f"{postgres_url}/{self.dataset_name}"
)
# Enable detailed logging for dlt
os.environ["DLT_LOG_LEVEL"] = "INFO"
def create_pipeline(self, table_name: str):
"""Create dlt pipeline with optional table-specific name."""
self.setup_environment()
if table_name:
pipeline_name = f"{self.pipeline_name}_{table_name}"
else:
pipeline_name = self.pipeline_name
return dlt.pipeline(
pipeline_name=pipeline_name,
destination=self.destination,
dataset_name=self.dataset_name,
)
def read_csv_from_s3(self, bucket: str, file_glob: str, chunk_size: int = 10000):
"""Read CSV file from S3/MinIO using dlt filesystem readers."""
self.setup_environment()
print(f"Reading CSV from s3://{bucket}/{file_glob}")
csv_reader = readers(
bucket_url=f"s3://{bucket}",
file_glob=file_glob,
).read_csv_duckdb(
chunk_size=chunk_size,
header=True,
)
return csv_reader
def table_exists_and_has_data(self, table_name: str) -> bool:
"""Check if table exists and has data using DuckDB PostgreSQL scanner."""
conn: duckdb.DuckDBPyConnection | None = None
try:
postgres_url = os.getenv("POSTGRES_URL", "")
url_parts = postgres_url.replace("postgresql://", "").split("/")
auth_host = url_parts[0]
if "@" in auth_host:
auth, host_port = auth_host.split("@")
if ":" in auth:
user, password = auth.split(":", 1)
else:
user, password = auth, ""
else:
host_port = auth_host
user, password = "", ""
if ":" in host_port:
host, port = host_port.rsplit(":", 1)
else:
host, port = host_port, "5432"
conn = duckdb.connect()
conn.execute("INSTALL postgres_scanner")
conn.execute("LOAD postgres_scanner")
attach_cmd = f"""
ATTACH 'host={host} port={port} dbname={self.dataset_name} user={user} password={password}' AS postgres_db (TYPE postgres)
"""
conn.execute(attach_cmd)
query = f"""
SELECT COUNT(*) as row_count
FROM postgres_db.{self.dataset_name}.{table_name}
LIMIT 1
"""
result = conn.execute(query).fetchone()
row_count = result[0] if result else 0
print(f"Table {table_name} has {row_count} rows")
return row_count > 0
except Exception as e:
print(f"Table {table_name} does not exist or is empty: {e}")
return False
finally:
try:
if conn:
conn.close()
except Exception:
pass
def run_pipeline(
self,
resource_data,
table_name: str,
write_disposition: TWriteDispositionConfig = "replace",
primary_key: str = "",
) -> Dict[str, Any]:
"""Run dlt pipeline with given resource data."""
pipeline = self.create_pipeline(table_name=table_name)
print(f"Running pipeline '{pipeline.pipeline_name}' for table {table_name}")
pipeline.config.progress = "log"
load_info = pipeline.run(
resource_data, table_name=table_name, write_disposition=write_disposition
)
print(f"Pipeline completed for {table_name}")
if load_info.load_packages:
package = load_info.load_packages[0]
completed_jobs = package.jobs.get("completed_jobs", [])
total_rows = sum(
getattr(job, "rows_count", 0)
for job in completed_jobs
if hasattr(job, "rows_count")
)
return {
"load_id": load_info.loads_ids[0] if load_info.loads_ids else None,
"table_name": table_name,
"completed_jobs": len(completed_jobs),
"pipeline_name": self.pipeline_name,
"destination": self.destination,
"dataset_name": self.dataset_name,
"write_disposition": write_disposition,
"total_rows": total_rows,
}
return {
"table_name": table_name,
"pipeline_name": self.pipeline_name,
"destination": self.destination,
"dataset_name": self.dataset_name,
}
# Task functions
def process_movies_table(**context):
"""Load movies CSV from MinIO to PostgreSQL using dlt."""
dlt_resource = DltResource()
print("Starting movies pipeline...")
table_exists = dlt_resource.table_exists_and_has_data("movies")
if table_exists:
print("Movies table already exists with data, skipping import")
return {"status": "skipped", "reason": "table already exists with data"}
print("Reading movies.csv from MinIO...")
movies_data = dlt_resource.read_csv_from_s3(
bucket="movie-lens", file_glob="movies.csv"
)
movies_data.apply_hints(primary_key="movieId")
result = dlt_resource.run_pipeline(
movies_data,
table_name="movies",
write_disposition="replace",
)
print(f"Movies pipeline completed: {result}")
return result
def process_ratings_table(**context):
"""Load ratings CSV from MinIO to PostgreSQL using dlt."""
dlt_resource = DltResource()
print("Starting ratings pipeline...")
table_exists = dlt_resource.table_exists_and_has_data("ratings")
if table_exists:
print("Ratings table already exists with data, skipping import")
return {"status": "skipped", "reason": "table already exists with data"}
print("Reading ratings.csv from MinIO...")
ratings_data = dlt_resource.read_csv_from_s3(
bucket="movie-lens", file_glob="ratings.csv"
)
ratings_data.apply_hints(primary_key=["userId", "movieId"])
result = dlt_resource.run_pipeline(
ratings_data, table_name="ratings", write_disposition="replace"
)
print(f"Ratings pipeline completed: {result}")
return result
def process_tags_table(**context):
"""Load tags CSV from MinIO to PostgreSQL using dlt."""
dlt_resource = DltResource()
print("Starting tags pipeline...")
table_exists = dlt_resource.table_exists_and_has_data("tags")
if table_exists:
print("Tags table already exists with data, skipping import")
return {"status": "skipped", "reason": "table already exists with data"}
print("Reading tags.csv from MinIO...")
tags_data = dlt_resource.read_csv_from_s3(bucket="movie-lens", file_glob="tags.csv")
tags_data.apply_hints(primary_key=["userId", "movieId", "timestamp"])
result = dlt_resource.run_pipeline(
tags_data, table_name="tags", write_disposition="replace"
)
print(f"Tags pipeline completed: {result}")
return result
def generate_summary(**context):
"""Generate summary of all loaded MovieLens data."""
dlt_resource = DltResource()
print("Generating summary of MovieLens dataset...")
pipeline_names = ["movies", "ratings", "tags"]
schema_info = {}
tables_found = []
for table_name in pipeline_names:
try:
pipeline = dlt_resource.create_pipeline(table_name=table_name)
if pipeline.default_schema_name in pipeline.schemas:
schema = pipeline.schemas[pipeline.default_schema_name]
print(f"Found schema for pipeline '{pipeline.pipeline_name}'")
schema_info[table_name] = {
"pipeline": pipeline.pipeline_name,
"schema_version": schema.version,
}
tables_found.extend(
[t for t in schema.tables.keys() if t == table_name]
)
except Exception as e:
print(f"Could not get schema for {table_name}: {e}")
print(
f"Summary: Found {len(tables_found)} tables from {len(schema_info)} pipelines"
)
return {
"base_pipeline_name": dlt_resource.pipeline_name,
"dataset_name": dlt_resource.dataset_name,
"destination": dlt_resource.destination,
"pipelines_checked": list(schema_info.keys()),
"tables_found": tables_found,
"movielens_tables_count": len(tables_found),
"schema_info": schema_info,
}
# Default arguments for the DAG
default_args = {
"owner": "data-team",
"depends_on_past": False,
"start_date": datetime(2024, 1, 1),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
# Create the DAG
dag = DAG(
"csv_to_postgres",
default_args=default_args,
description="Load MovieLens CSV data from MinIO to PostgreSQL using dlt",
schedule=None, # Manual trigger only
catchup=False,
tags=["etl", "movielens", "dlt"],
)
# Create tasks
movies_task = PythonOperator(
task_id="process_movies",
python_callable=process_movies_table,
dag=dag,
)
ratings_task = PythonOperator(
task_id="process_ratings",
python_callable=process_ratings_table,
dag=dag,
)
tags_task = PythonOperator(
task_id="process_tags",
python_callable=process_tags_table,
dag=dag,
)
summary_task = PythonOperator(
task_id="generate_summary",
python_callable=generate_summary,
dag=dag,
)
# Set task dependencies
[movies_task, ratings_task, tags_task] >> summary_task