From 55e00c01a0dfbf85264d7ae6989c60b630b4b328 Mon Sep 17 00:00:00 2001 From: Masaki Yatsu Date: Tue, 16 Sep 2025 10:19:09 +0900 Subject: [PATCH] fix(dagster): csv_to_postgres table existence check --- .../csv_to_postgres/csv_to_postgres/assets.py | 48 ++++++++++++-- .../csv_to_postgres/resources.py | 65 +++++++++++++++++++ 2 files changed, 108 insertions(+), 5 deletions(-) diff --git a/dagster/examples/csv_to_postgres/csv_to_postgres/assets.py b/dagster/examples/csv_to_postgres/csv_to_postgres/assets.py index 8580e78..cef69f9 100644 --- a/dagster/examples/csv_to_postgres/csv_to_postgres/assets.py +++ b/dagster/examples/csv_to_postgres/csv_to_postgres/assets.py @@ -11,14 +11,28 @@ def movies_pipeline( context.log.info("Starting movies pipeline...") + # Check if table already exists and has data + table_exists = dlt.table_exists_and_has_data("movies") + if table_exists: + context.log.info("Movies table already exists with data, skipping import") + return MaterializeResult( + metadata={ + "status": MetadataValue.text("skipped"), + "reason": MetadataValue.text("table already exists with data"), + } + ) + # Read movies CSV using dlt filesystem readers context.log.info("Reading movies.csv from MinIO...") movies_data = dlt.read_csv_from_s3(bucket="movie-lens", file_glob="movies.csv") - # Run dlt pipeline - context.log.info("Loading data to PostgreSQL...") + # Set primary key for movies table + movies_data.apply_hints(primary_key="movieId") + result = dlt.run_pipeline( - movies_data, table_name="movies", write_disposition="replace" + movies_data, + table_name="movies", + write_disposition="replace", ) context.log.info(f"Movies pipeline completed: {result}") @@ -44,10 +58,22 @@ def ratings_pipeline( ) -> MaterializeResult: """Load ratings CSV from MinIO to PostgreSQL using dlt.""" + # Check if table already exists and has data + if dlt.table_exists_and_has_data("ratings"): + context.log.info("Ratings table already exists with data, skipping import") + return MaterializeResult( + metadata={ + "status": MetadataValue.text("skipped"), + "reason": MetadataValue.text("table already exists with data"), + } + ) + # Read ratings CSV using dlt filesystem readers ratings_data = dlt.read_csv_from_s3(bucket="movie-lens", file_glob="ratings.csv") - # Run dlt pipeline + # Set composite primary key for ratings table + ratings_data.apply_hints(primary_key=["userId", "movieId"]) + result = dlt.run_pipeline( ratings_data, table_name="ratings", write_disposition="replace" ) @@ -75,10 +101,22 @@ def tags_pipeline( ) -> MaterializeResult: """Load tags CSV from MinIO to PostgreSQL using dlt.""" + # Check if table already exists and has data + if dlt.table_exists_and_has_data("tags"): + context.log.info("Tags table already exists with data, skipping import") + return MaterializeResult( + metadata={ + "status": MetadataValue.text("skipped"), + "reason": MetadataValue.text("table already exists with data"), + } + ) + # Read tags CSV using dlt filesystem readers tags_data = dlt.read_csv_from_s3(bucket="movie-lens", file_glob="tags.csv") - # Run dlt pipeline + # Set composite primary key for tags table + tags_data.apply_hints(primary_key=["userId", "movieId", "timestamp"]) + result = dlt.run_pipeline(tags_data, table_name="tags", write_disposition="replace") context.log.info(f"Tags pipeline completed: {result}") diff --git a/dagster/examples/csv_to_postgres/csv_to_postgres/resources.py b/dagster/examples/csv_to_postgres/csv_to_postgres/resources.py index 9f73777..65cd381 100644 --- a/dagster/examples/csv_to_postgres/csv_to_postgres/resources.py +++ b/dagster/examples/csv_to_postgres/csv_to_postgres/resources.py @@ -2,6 +2,7 @@ import os from typing import Any, Dict import dlt +import duckdb from dagster import ConfigurableResource, get_dagster_logger from dlt.common.schema.typing import TWriteDispositionConfig from dlt.sources.filesystem import readers @@ -62,11 +63,75 @@ class DltResource(ConfigurableResource): return csv_reader + def table_exists_and_has_data(self, table_name: str) -> bool: + """Check if table exists and has data using DuckDB PostgreSQL scanner.""" + logger = get_dagster_logger() + conn: duckdb.DuckDBPyConnection | None = None + + try: + # Get PostgreSQL connection details + postgres_url = os.getenv("POSTGRES_URL", "") + + # Parse PostgreSQL URL to extract components + # Format: postgresql://user:password@host:port/database + url_parts = postgres_url.replace("postgresql://", "").split("/") + auth_host = url_parts[0] + + if "@" in auth_host: + auth, host_port = auth_host.split("@") + if ":" in auth: + user, password = auth.split(":", 1) + else: + user, password = auth, "" + else: + host_port = auth_host + user, password = "", "" + + if ":" in host_port: + host, port = host_port.rsplit(":", 1) + else: + host, port = host_port, "5432" + + # Create DuckDB connection and install/load postgres scanner + conn = duckdb.connect() + conn.execute("INSTALL postgres_scanner") + conn.execute("LOAD postgres_scanner") + + # Attach PostgreSQL database + attach_cmd = f""" + ATTACH 'host={host} port={port} dbname={self.dataset_name} user={user} password={password}' AS postgres_db (TYPE postgres) + """ + conn.execute(attach_cmd) + + # Check if table exists and has data + query = f""" + SELECT COUNT(*) as row_count + FROM postgres_db.{self.dataset_name}.{table_name} + LIMIT 1 + """ + + result = conn.execute(query).fetchone() + row_count = result[0] if result else 0 + + logger.info(f"Table {table_name} has {row_count} rows") + return row_count > 0 + + except Exception as e: + logger.info(f"Table {table_name} does not exist or is empty: {e}") + return False + finally: + try: + if conn: + conn.close() + except Exception: + pass + def run_pipeline( self, resource_data, table_name: str, write_disposition: TWriteDispositionConfig = "replace", + primary_key: str = "", ) -> Dict[str, Any]: """Run dlt pipeline with given resource data.""" logger = get_dagster_logger()