From 686cea20224c78a96aa290617f84a49d0b8377aa Mon Sep 17 00:00:00 2001 From: Masaki Yatsu Date: Tue, 16 Sep 2025 09:31:04 +0900 Subject: [PATCH] fix(dagster): fix example summary error --- .../csv_to_postgres/csv_to_postgres/assets.py | 45 ++++++++++++++----- .../csv_to_postgres/resources.py | 22 +++++---- 2 files changed, 46 insertions(+), 21 deletions(-) diff --git a/dagster/examples/csv_to_postgres/csv_to_postgres/assets.py b/dagster/examples/csv_to_postgres/csv_to_postgres/assets.py index 5f6983c..8580e78 100644 --- a/dagster/examples/csv_to_postgres/csv_to_postgres/assets.py +++ b/dagster/examples/csv_to_postgres/csv_to_postgres/assets.py @@ -104,25 +104,46 @@ def movielens_summary( ) -> MaterializeResult: """Generate summary of all loaded MovieLens data.""" - # Get pipeline to access dlt info - pipeline = dlt.create_pipeline() + context.log.info("Generating summary of MovieLens dataset...") - # Get schema info - schema = pipeline.default_schema - tables = list(schema.tables.keys()) + # Try to get schema from one of the existing pipelines + pipeline_names = ["movies", "ratings", "tags"] + schema_info = {} + tables_found = [] - context.log.info(f"MovieLens dataset loaded with tables: {tables}") + for table_name in pipeline_names: + try: + # Create pipeline with the same name used in previous assets + pipeline = dlt.create_pipeline(table_name=table_name) - # Calculate basic metrics - table_count = len([t for t in tables if t in ["movies", "ratings", "tags"]]) + # Try to get schema if it exists + if pipeline.default_schema_name in pipeline.schemas: + schema = pipeline.schemas[pipeline.default_schema_name] + context.log.info( + f"Found schema for pipeline '{pipeline.pipeline_name}'" + ) + schema_info[table_name] = { + "pipeline": pipeline.pipeline_name, + "schema_version": schema.version, + } + tables_found.extend( + [t for t in schema.tables.keys() if t == table_name] + ) + except Exception as e: + context.log.debug(f"Could not get schema for {table_name}: {e}") + + context.log.info( + f"Summary: Found {len(tables_found)} tables from {len(schema_info)} pipelines" + ) return MaterializeResult( metadata={ - "pipeline_name": MetadataValue.text(dlt.pipeline_name), + "base_pipeline_name": MetadataValue.text(dlt.pipeline_name), "dataset_name": MetadataValue.text(dlt.dataset_name), "destination": MetadataValue.text(dlt.destination), - "schema_version": MetadataValue.int(schema.version if schema else 0), - "tables": MetadataValue.json(tables), - "movielens_tables": MetadataValue.int(table_count), + "pipelines_checked": MetadataValue.json(list(schema_info.keys())), + "tables_found": MetadataValue.json(tables_found), + "movielens_tables_count": MetadataValue.int(len(tables_found)), + "schema_info": MetadataValue.json(schema_info), } ) diff --git a/dagster/examples/csv_to_postgres/csv_to_postgres/resources.py b/dagster/examples/csv_to_postgres/csv_to_postgres/resources.py index f4d1eae..9f73777 100644 --- a/dagster/examples/csv_to_postgres/csv_to_postgres/resources.py +++ b/dagster/examples/csv_to_postgres/csv_to_postgres/resources.py @@ -28,17 +28,18 @@ class DltResource(ConfigurableResource): # Enable detailed logging for dlt os.environ["DLT_LOG_LEVEL"] = "INFO" - def create_pipeline(self): - """Create dlt pipeline.""" - import uuid - + def create_pipeline(self, table_name: str): + """Create dlt pipeline with optional table-specific name.""" self.setup_environment() - # Use a unique pipeline name to avoid conflicts - unique_pipeline_name = f"{self.pipeline_name}_{uuid.uuid4().hex[:8]}" + # Use table-specific pipeline name if provided, otherwise use base name + if table_name: + pipeline_name = f"{self.pipeline_name}_{table_name}" + else: + pipeline_name = self.pipeline_name return dlt.pipeline( - pipeline_name=unique_pipeline_name, + pipeline_name=pipeline_name, destination=self.destination, dataset_name=self.dataset_name, ) @@ -70,9 +71,12 @@ class DltResource(ConfigurableResource): """Run dlt pipeline with given resource data.""" logger = get_dagster_logger() - pipeline = self.create_pipeline() + # Create pipeline with table-specific name + pipeline = self.create_pipeline(table_name=table_name) - logger.info(f"Running pipeline for table {table_name}") + logger.info( + f"Running pipeline '{pipeline.pipeline_name}' for table {table_name}" + ) # Configure pipeline for progress tracking pipeline.config.progress = "log" # Enables progress logging