fix(dagster): csv_to_postgres table existence check

This commit is contained in:
Masaki Yatsu
2025-09-16 10:19:09 +09:00
parent 9d0f81ab6e
commit 55e00c01a0
2 changed files with 108 additions and 5 deletions

View File

@@ -11,14 +11,28 @@ def movies_pipeline(
context.log.info("Starting movies pipeline...")
# Check if table already exists and has data
table_exists = dlt.table_exists_and_has_data("movies")
if table_exists:
context.log.info("Movies table already exists with data, skipping import")
return MaterializeResult(
metadata={
"status": MetadataValue.text("skipped"),
"reason": MetadataValue.text("table already exists with data"),
}
)
# Read movies CSV using dlt filesystem readers
context.log.info("Reading movies.csv from MinIO...")
movies_data = dlt.read_csv_from_s3(bucket="movie-lens", file_glob="movies.csv")
# Run dlt pipeline
context.log.info("Loading data to PostgreSQL...")
# Set primary key for movies table
movies_data.apply_hints(primary_key="movieId")
result = dlt.run_pipeline(
movies_data, table_name="movies", write_disposition="replace"
movies_data,
table_name="movies",
write_disposition="replace",
)
context.log.info(f"Movies pipeline completed: {result}")
@@ -44,10 +58,22 @@ def ratings_pipeline(
) -> MaterializeResult:
"""Load ratings CSV from MinIO to PostgreSQL using dlt."""
# Check if table already exists and has data
if dlt.table_exists_and_has_data("ratings"):
context.log.info("Ratings table already exists with data, skipping import")
return MaterializeResult(
metadata={
"status": MetadataValue.text("skipped"),
"reason": MetadataValue.text("table already exists with data"),
}
)
# Read ratings CSV using dlt filesystem readers
ratings_data = dlt.read_csv_from_s3(bucket="movie-lens", file_glob="ratings.csv")
# Run dlt pipeline
# Set composite primary key for ratings table
ratings_data.apply_hints(primary_key=["userId", "movieId"])
result = dlt.run_pipeline(
ratings_data, table_name="ratings", write_disposition="replace"
)
@@ -75,10 +101,22 @@ def tags_pipeline(
) -> MaterializeResult:
"""Load tags CSV from MinIO to PostgreSQL using dlt."""
# Check if table already exists and has data
if dlt.table_exists_and_has_data("tags"):
context.log.info("Tags table already exists with data, skipping import")
return MaterializeResult(
metadata={
"status": MetadataValue.text("skipped"),
"reason": MetadataValue.text("table already exists with data"),
}
)
# Read tags CSV using dlt filesystem readers
tags_data = dlt.read_csv_from_s3(bucket="movie-lens", file_glob="tags.csv")
# Run dlt pipeline
# Set composite primary key for tags table
tags_data.apply_hints(primary_key=["userId", "movieId", "timestamp"])
result = dlt.run_pipeline(tags_data, table_name="tags", write_disposition="replace")
context.log.info(f"Tags pipeline completed: {result}")

View File

@@ -2,6 +2,7 @@ import os
from typing import Any, Dict
import dlt
import duckdb
from dagster import ConfigurableResource, get_dagster_logger
from dlt.common.schema.typing import TWriteDispositionConfig
from dlt.sources.filesystem import readers
@@ -62,11 +63,75 @@ class DltResource(ConfigurableResource):
return csv_reader
def table_exists_and_has_data(self, table_name: str) -> bool:
"""Check if table exists and has data using DuckDB PostgreSQL scanner."""
logger = get_dagster_logger()
conn: duckdb.DuckDBPyConnection | None = None
try:
# Get PostgreSQL connection details
postgres_url = os.getenv("POSTGRES_URL", "")
# Parse PostgreSQL URL to extract components
# Format: postgresql://user:password@host:port/database
url_parts = postgres_url.replace("postgresql://", "").split("/")
auth_host = url_parts[0]
if "@" in auth_host:
auth, host_port = auth_host.split("@")
if ":" in auth:
user, password = auth.split(":", 1)
else:
user, password = auth, ""
else:
host_port = auth_host
user, password = "", ""
if ":" in host_port:
host, port = host_port.rsplit(":", 1)
else:
host, port = host_port, "5432"
# Create DuckDB connection and install/load postgres scanner
conn = duckdb.connect()
conn.execute("INSTALL postgres_scanner")
conn.execute("LOAD postgres_scanner")
# Attach PostgreSQL database
attach_cmd = f"""
ATTACH 'host={host} port={port} dbname={self.dataset_name} user={user} password={password}' AS postgres_db (TYPE postgres)
"""
conn.execute(attach_cmd)
# Check if table exists and has data
query = f"""
SELECT COUNT(*) as row_count
FROM postgres_db.{self.dataset_name}.{table_name}
LIMIT 1
"""
result = conn.execute(query).fetchone()
row_count = result[0] if result else 0
logger.info(f"Table {table_name} has {row_count} rows")
return row_count > 0
except Exception as e:
logger.info(f"Table {table_name} does not exist or is empty: {e}")
return False
finally:
try:
if conn:
conn.close()
except Exception:
pass
def run_pipeline(
self,
resource_data,
table_name: str,
write_disposition: TWriteDispositionConfig = "replace",
primary_key: str = "",
) -> Dict[str, Any]:
"""Run dlt pipeline with given resource data."""
logger = get_dagster_logger()