fix(dagster): fix example summary error
This commit is contained in:
@@ -104,25 +104,46 @@ def movielens_summary(
|
|||||||
) -> MaterializeResult:
|
) -> MaterializeResult:
|
||||||
"""Generate summary of all loaded MovieLens data."""
|
"""Generate summary of all loaded MovieLens data."""
|
||||||
|
|
||||||
# Get pipeline to access dlt info
|
context.log.info("Generating summary of MovieLens dataset...")
|
||||||
pipeline = dlt.create_pipeline()
|
|
||||||
|
|
||||||
# Get schema info
|
# Try to get schema from one of the existing pipelines
|
||||||
schema = pipeline.default_schema
|
pipeline_names = ["movies", "ratings", "tags"]
|
||||||
tables = list(schema.tables.keys())
|
schema_info = {}
|
||||||
|
tables_found = []
|
||||||
|
|
||||||
context.log.info(f"MovieLens dataset loaded with tables: {tables}")
|
for table_name in pipeline_names:
|
||||||
|
try:
|
||||||
|
# Create pipeline with the same name used in previous assets
|
||||||
|
pipeline = dlt.create_pipeline(table_name=table_name)
|
||||||
|
|
||||||
# Calculate basic metrics
|
# Try to get schema if it exists
|
||||||
table_count = len([t for t in tables if t in ["movies", "ratings", "tags"]])
|
if pipeline.default_schema_name in pipeline.schemas:
|
||||||
|
schema = pipeline.schemas[pipeline.default_schema_name]
|
||||||
|
context.log.info(
|
||||||
|
f"Found schema for pipeline '{pipeline.pipeline_name}'"
|
||||||
|
)
|
||||||
|
schema_info[table_name] = {
|
||||||
|
"pipeline": pipeline.pipeline_name,
|
||||||
|
"schema_version": schema.version,
|
||||||
|
}
|
||||||
|
tables_found.extend(
|
||||||
|
[t for t in schema.tables.keys() if t == table_name]
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
context.log.debug(f"Could not get schema for {table_name}: {e}")
|
||||||
|
|
||||||
|
context.log.info(
|
||||||
|
f"Summary: Found {len(tables_found)} tables from {len(schema_info)} pipelines"
|
||||||
|
)
|
||||||
|
|
||||||
return MaterializeResult(
|
return MaterializeResult(
|
||||||
metadata={
|
metadata={
|
||||||
"pipeline_name": MetadataValue.text(dlt.pipeline_name),
|
"base_pipeline_name": MetadataValue.text(dlt.pipeline_name),
|
||||||
"dataset_name": MetadataValue.text(dlt.dataset_name),
|
"dataset_name": MetadataValue.text(dlt.dataset_name),
|
||||||
"destination": MetadataValue.text(dlt.destination),
|
"destination": MetadataValue.text(dlt.destination),
|
||||||
"schema_version": MetadataValue.int(schema.version if schema else 0),
|
"pipelines_checked": MetadataValue.json(list(schema_info.keys())),
|
||||||
"tables": MetadataValue.json(tables),
|
"tables_found": MetadataValue.json(tables_found),
|
||||||
"movielens_tables": MetadataValue.int(table_count),
|
"movielens_tables_count": MetadataValue.int(len(tables_found)),
|
||||||
|
"schema_info": MetadataValue.json(schema_info),
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -28,17 +28,18 @@ class DltResource(ConfigurableResource):
|
|||||||
# Enable detailed logging for dlt
|
# Enable detailed logging for dlt
|
||||||
os.environ["DLT_LOG_LEVEL"] = "INFO"
|
os.environ["DLT_LOG_LEVEL"] = "INFO"
|
||||||
|
|
||||||
def create_pipeline(self):
|
def create_pipeline(self, table_name: str):
|
||||||
"""Create dlt pipeline."""
|
"""Create dlt pipeline with optional table-specific name."""
|
||||||
import uuid
|
|
||||||
|
|
||||||
self.setup_environment()
|
self.setup_environment()
|
||||||
|
|
||||||
# Use a unique pipeline name to avoid conflicts
|
# Use table-specific pipeline name if provided, otherwise use base name
|
||||||
unique_pipeline_name = f"{self.pipeline_name}_{uuid.uuid4().hex[:8]}"
|
if table_name:
|
||||||
|
pipeline_name = f"{self.pipeline_name}_{table_name}"
|
||||||
|
else:
|
||||||
|
pipeline_name = self.pipeline_name
|
||||||
|
|
||||||
return dlt.pipeline(
|
return dlt.pipeline(
|
||||||
pipeline_name=unique_pipeline_name,
|
pipeline_name=pipeline_name,
|
||||||
destination=self.destination,
|
destination=self.destination,
|
||||||
dataset_name=self.dataset_name,
|
dataset_name=self.dataset_name,
|
||||||
)
|
)
|
||||||
@@ -70,9 +71,12 @@ class DltResource(ConfigurableResource):
|
|||||||
"""Run dlt pipeline with given resource data."""
|
"""Run dlt pipeline with given resource data."""
|
||||||
logger = get_dagster_logger()
|
logger = get_dagster_logger()
|
||||||
|
|
||||||
pipeline = self.create_pipeline()
|
# Create pipeline with table-specific name
|
||||||
|
pipeline = self.create_pipeline(table_name=table_name)
|
||||||
|
|
||||||
logger.info(f"Running pipeline for table {table_name}")
|
logger.info(
|
||||||
|
f"Running pipeline '{pipeline.pipeline_name}' for table {table_name}"
|
||||||
|
)
|
||||||
|
|
||||||
# Configure pipeline for progress tracking
|
# Configure pipeline for progress tracking
|
||||||
pipeline.config.progress = "log" # Enables progress logging
|
pipeline.config.progress = "log" # Enables progress logging
|
||||||
|
|||||||
Reference in New Issue
Block a user