337 lines
10 KiB
Python
337 lines
10 KiB
Python
import os
|
|
from datetime import datetime, timedelta
|
|
from typing import Any, Dict
|
|
|
|
import dlt
|
|
import duckdb
|
|
from dlt.common.schema.typing import TWriteDispositionConfig
|
|
from dlt.sources.filesystem import readers
|
|
|
|
from airflow import DAG
|
|
from airflow.providers.standard.operators.python import PythonOperator
|
|
|
|
|
|
class DltResource:
|
|
"""DLT resource for data pipeline operations in Airflow."""
|
|
|
|
def __init__(self):
|
|
self.pipeline_name = "minio_to_postgres"
|
|
self.destination = "postgres"
|
|
self.dataset_name = "movielens_af"
|
|
|
|
def setup_environment(self):
|
|
"""Setup environment variables for dlt."""
|
|
# PostgreSQL configuration
|
|
postgres_url = os.getenv("POSTGRES_URL", "")
|
|
os.environ["DESTINATION__POSTGRES__CREDENTIALS"] = (
|
|
f"{postgres_url}/{self.dataset_name}"
|
|
)
|
|
|
|
# Enable detailed logging for dlt
|
|
os.environ["DLT_LOG_LEVEL"] = "INFO"
|
|
|
|
def create_pipeline(self, table_name: str):
|
|
"""Create dlt pipeline with optional table-specific name."""
|
|
self.setup_environment()
|
|
|
|
if table_name:
|
|
pipeline_name = f"{self.pipeline_name}_{table_name}"
|
|
else:
|
|
pipeline_name = self.pipeline_name
|
|
|
|
return dlt.pipeline(
|
|
pipeline_name=pipeline_name,
|
|
destination=self.destination,
|
|
dataset_name=self.dataset_name,
|
|
)
|
|
|
|
def read_csv_from_s3(self, bucket: str, file_glob: str, chunk_size: int = 10000):
|
|
"""Read CSV file from S3/MinIO using dlt filesystem readers."""
|
|
self.setup_environment()
|
|
|
|
print(f"Reading CSV from s3://{bucket}/{file_glob}")
|
|
|
|
csv_reader = readers(
|
|
bucket_url=f"s3://{bucket}",
|
|
file_glob=file_glob,
|
|
).read_csv_duckdb(
|
|
chunk_size=chunk_size,
|
|
header=True,
|
|
)
|
|
|
|
return csv_reader
|
|
|
|
def table_exists_and_has_data(self, table_name: str) -> bool:
|
|
"""Check if table exists and has data using DuckDB PostgreSQL scanner."""
|
|
conn: duckdb.DuckDBPyConnection | None = None
|
|
|
|
try:
|
|
postgres_url = os.getenv("POSTGRES_URL", "")
|
|
url_parts = postgres_url.replace("postgresql://", "").split("/")
|
|
auth_host = url_parts[0]
|
|
|
|
if "@" in auth_host:
|
|
auth, host_port = auth_host.split("@")
|
|
if ":" in auth:
|
|
user, password = auth.split(":", 1)
|
|
else:
|
|
user, password = auth, ""
|
|
else:
|
|
host_port = auth_host
|
|
user, password = "", ""
|
|
|
|
if ":" in host_port:
|
|
host, port = host_port.rsplit(":", 1)
|
|
else:
|
|
host, port = host_port, "5432"
|
|
|
|
conn = duckdb.connect()
|
|
conn.execute("INSTALL postgres_scanner")
|
|
conn.execute("LOAD postgres_scanner")
|
|
|
|
attach_cmd = f"""
|
|
ATTACH 'host={host} port={port} dbname={self.dataset_name} user={user} password={password}' AS postgres_db (TYPE postgres)
|
|
"""
|
|
conn.execute(attach_cmd)
|
|
|
|
query = f"""
|
|
SELECT COUNT(*) as row_count
|
|
FROM postgres_db.{self.dataset_name}.{table_name}
|
|
LIMIT 1
|
|
"""
|
|
|
|
result = conn.execute(query).fetchone()
|
|
row_count = result[0] if result else 0
|
|
|
|
print(f"Table {table_name} has {row_count} rows")
|
|
return row_count > 0
|
|
|
|
except Exception as e:
|
|
print(f"Table {table_name} does not exist or is empty: {e}")
|
|
return False
|
|
finally:
|
|
try:
|
|
if conn:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
|
|
def run_pipeline(
|
|
self,
|
|
resource_data,
|
|
table_name: str,
|
|
write_disposition: TWriteDispositionConfig = "replace",
|
|
primary_key: str = "",
|
|
) -> Dict[str, Any]:
|
|
"""Run dlt pipeline with given resource data."""
|
|
pipeline = self.create_pipeline(table_name=table_name)
|
|
|
|
print(f"Running pipeline '{pipeline.pipeline_name}' for table {table_name}")
|
|
|
|
pipeline.config.progress = "log"
|
|
|
|
load_info = pipeline.run(
|
|
resource_data, table_name=table_name, write_disposition=write_disposition
|
|
)
|
|
|
|
print(f"Pipeline completed for {table_name}")
|
|
|
|
if load_info.load_packages:
|
|
package = load_info.load_packages[0]
|
|
completed_jobs = package.jobs.get("completed_jobs", [])
|
|
|
|
total_rows = sum(
|
|
getattr(job, "rows_count", 0)
|
|
for job in completed_jobs
|
|
if hasattr(job, "rows_count")
|
|
)
|
|
|
|
return {
|
|
"load_id": load_info.loads_ids[0] if load_info.loads_ids else None,
|
|
"table_name": table_name,
|
|
"completed_jobs": len(completed_jobs),
|
|
"pipeline_name": self.pipeline_name,
|
|
"destination": self.destination,
|
|
"dataset_name": self.dataset_name,
|
|
"write_disposition": write_disposition,
|
|
"total_rows": total_rows,
|
|
}
|
|
|
|
return {
|
|
"table_name": table_name,
|
|
"pipeline_name": self.pipeline_name,
|
|
"destination": self.destination,
|
|
"dataset_name": self.dataset_name,
|
|
}
|
|
|
|
|
|
# Task functions
|
|
def process_movies_table(**context):
|
|
"""Load movies CSV from MinIO to PostgreSQL using dlt."""
|
|
dlt_resource = DltResource()
|
|
|
|
print("Starting movies pipeline...")
|
|
|
|
table_exists = dlt_resource.table_exists_and_has_data("movies")
|
|
if table_exists:
|
|
print("Movies table already exists with data, skipping import")
|
|
return {"status": "skipped", "reason": "table already exists with data"}
|
|
|
|
print("Reading movies.csv from MinIO...")
|
|
movies_data = dlt_resource.read_csv_from_s3(
|
|
bucket="movie-lens", file_glob="movies.csv"
|
|
)
|
|
|
|
movies_data.apply_hints(primary_key="movieId")
|
|
|
|
result = dlt_resource.run_pipeline(
|
|
movies_data,
|
|
table_name="movies",
|
|
write_disposition="replace",
|
|
)
|
|
|
|
print(f"Movies pipeline completed: {result}")
|
|
return result
|
|
|
|
|
|
def process_ratings_table(**context):
|
|
"""Load ratings CSV from MinIO to PostgreSQL using dlt."""
|
|
dlt_resource = DltResource()
|
|
|
|
print("Starting ratings pipeline...")
|
|
|
|
table_exists = dlt_resource.table_exists_and_has_data("ratings")
|
|
if table_exists:
|
|
print("Ratings table already exists with data, skipping import")
|
|
return {"status": "skipped", "reason": "table already exists with data"}
|
|
|
|
print("Reading ratings.csv from MinIO...")
|
|
ratings_data = dlt_resource.read_csv_from_s3(
|
|
bucket="movie-lens", file_glob="ratings.csv"
|
|
)
|
|
|
|
ratings_data.apply_hints(primary_key=["userId", "movieId"])
|
|
|
|
result = dlt_resource.run_pipeline(
|
|
ratings_data, table_name="ratings", write_disposition="replace"
|
|
)
|
|
|
|
print(f"Ratings pipeline completed: {result}")
|
|
return result
|
|
|
|
|
|
def process_tags_table(**context):
|
|
"""Load tags CSV from MinIO to PostgreSQL using dlt."""
|
|
dlt_resource = DltResource()
|
|
|
|
print("Starting tags pipeline...")
|
|
|
|
table_exists = dlt_resource.table_exists_and_has_data("tags")
|
|
if table_exists:
|
|
print("Tags table already exists with data, skipping import")
|
|
return {"status": "skipped", "reason": "table already exists with data"}
|
|
|
|
print("Reading tags.csv from MinIO...")
|
|
tags_data = dlt_resource.read_csv_from_s3(bucket="movie-lens", file_glob="tags.csv")
|
|
|
|
tags_data.apply_hints(primary_key=["userId", "movieId", "timestamp"])
|
|
|
|
result = dlt_resource.run_pipeline(
|
|
tags_data, table_name="tags", write_disposition="replace"
|
|
)
|
|
|
|
print(f"Tags pipeline completed: {result}")
|
|
return result
|
|
|
|
|
|
def generate_summary(**context):
|
|
"""Generate summary of all loaded MovieLens data."""
|
|
dlt_resource = DltResource()
|
|
|
|
print("Generating summary of MovieLens dataset...")
|
|
|
|
pipeline_names = ["movies", "ratings", "tags"]
|
|
schema_info = {}
|
|
tables_found = []
|
|
|
|
for table_name in pipeline_names:
|
|
try:
|
|
pipeline = dlt_resource.create_pipeline(table_name=table_name)
|
|
|
|
if pipeline.default_schema_name in pipeline.schemas:
|
|
schema = pipeline.schemas[pipeline.default_schema_name]
|
|
print(f"Found schema for pipeline '{pipeline.pipeline_name}'")
|
|
schema_info[table_name] = {
|
|
"pipeline": pipeline.pipeline_name,
|
|
"schema_version": schema.version,
|
|
}
|
|
tables_found.extend(
|
|
[t for t in schema.tables.keys() if t == table_name]
|
|
)
|
|
except Exception as e:
|
|
print(f"Could not get schema for {table_name}: {e}")
|
|
|
|
print(
|
|
f"Summary: Found {len(tables_found)} tables from {len(schema_info)} pipelines"
|
|
)
|
|
|
|
return {
|
|
"base_pipeline_name": dlt_resource.pipeline_name,
|
|
"dataset_name": dlt_resource.dataset_name,
|
|
"destination": dlt_resource.destination,
|
|
"pipelines_checked": list(schema_info.keys()),
|
|
"tables_found": tables_found,
|
|
"movielens_tables_count": len(tables_found),
|
|
"schema_info": schema_info,
|
|
}
|
|
|
|
|
|
# Default arguments for the DAG
|
|
default_args = {
|
|
"owner": "data-team",
|
|
"depends_on_past": False,
|
|
"start_date": datetime(2024, 1, 1),
|
|
"email_on_failure": False,
|
|
"email_on_retry": False,
|
|
"retries": 1,
|
|
"retry_delay": timedelta(minutes=5),
|
|
}
|
|
|
|
# Create the DAG
|
|
dag = DAG(
|
|
"csv_to_postgres",
|
|
default_args=default_args,
|
|
description="Load MovieLens CSV data from MinIO to PostgreSQL using dlt",
|
|
schedule=None, # Manual trigger only
|
|
catchup=False,
|
|
tags=["etl", "movielens", "dlt"],
|
|
)
|
|
|
|
# Create tasks
|
|
movies_task = PythonOperator(
|
|
task_id="process_movies",
|
|
python_callable=process_movies_table,
|
|
dag=dag,
|
|
)
|
|
|
|
ratings_task = PythonOperator(
|
|
task_id="process_ratings",
|
|
python_callable=process_ratings_table,
|
|
dag=dag,
|
|
)
|
|
|
|
tags_task = PythonOperator(
|
|
task_id="process_tags",
|
|
python_callable=process_tags_table,
|
|
dag=dag,
|
|
)
|
|
|
|
summary_task = PythonOperator(
|
|
task_id="generate_summary",
|
|
python_callable=generate_summary,
|
|
dag=dag,
|
|
)
|
|
|
|
# Set task dependencies
|
|
[movies_task, ratings_task, tags_task] >> summary_task
|