examples(dagster): add csv_to_postgres project

This commit is contained in:
Masaki Yatsu
2025-09-16 00:36:56 +09:00
parent 6da1fac457
commit 42d74ff961
12 changed files with 3767 additions and 1 deletions

View File

@@ -0,0 +1,19 @@
from dagster import Definitions
from .assets import movies_pipeline, ratings_pipeline, tags_pipeline, movielens_summary
from .resources import DltResource
defs = Definitions(
assets=[
movies_pipeline,
ratings_pipeline,
tags_pipeline,
movielens_summary,
],
resources={
"dlt": DltResource(
minio_access_key="minio",
minio_secret_key="minio123",
),
},
)

View File

@@ -0,0 +1,128 @@
from dagster import AssetExecutionContext, MaterializeResult, MetadataValue, asset
from .resources import DltResource
@asset(group_name="movies")
def movies_pipeline(
context: AssetExecutionContext, dlt: DltResource
) -> MaterializeResult:
"""Load movies CSV from MinIO to PostgreSQL using dlt."""
context.log.info("Starting movies pipeline...")
# Read movies CSV using dlt filesystem readers
context.log.info("Reading movies.csv from MinIO...")
movies_data = dlt.read_csv_from_s3(bucket="movie-lens", file_glob="movies.csv")
# Run dlt pipeline
context.log.info("Loading data to PostgreSQL...")
result = dlt.run_pipeline(
movies_data, table_name="movies", write_disposition="replace"
)
context.log.info(f"Movies pipeline completed: {result}")
return MaterializeResult(
metadata={
"load_id": MetadataValue.text(str(result.get("load_id", ""))),
"table_name": MetadataValue.text(result["table_name"]),
"pipeline_name": MetadataValue.text(result["pipeline_name"]),
"destination": MetadataValue.text(result["destination"]),
"dataset_name": MetadataValue.text(result["dataset_name"]),
"write_disposition": MetadataValue.text(
result.get("write_disposition", "")
),
"completed_jobs": MetadataValue.int(result.get("completed_jobs", 0)),
}
)
@asset(group_name="ratings")
def ratings_pipeline(
context: AssetExecutionContext, dlt: DltResource
) -> MaterializeResult:
"""Load ratings CSV from MinIO to PostgreSQL using dlt."""
# Read ratings CSV using dlt filesystem readers
ratings_data = dlt.read_csv_from_s3(bucket="movie-lens", file_glob="ratings.csv")
# Run dlt pipeline
result = dlt.run_pipeline(
ratings_data, table_name="ratings", write_disposition="replace"
)
context.log.info(f"Ratings pipeline completed: {result}")
return MaterializeResult(
metadata={
"load_id": MetadataValue.text(str(result.get("load_id", ""))),
"table_name": MetadataValue.text(result["table_name"]),
"pipeline_name": MetadataValue.text(result["pipeline_name"]),
"destination": MetadataValue.text(result["destination"]),
"dataset_name": MetadataValue.text(result["dataset_name"]),
"write_disposition": MetadataValue.text(
result.get("write_disposition", "")
),
"completed_jobs": MetadataValue.int(result.get("completed_jobs", 0)),
}
)
@asset(group_name="tags")
def tags_pipeline(
context: AssetExecutionContext, dlt: DltResource
) -> MaterializeResult:
"""Load tags CSV from MinIO to PostgreSQL using dlt."""
# Read tags CSV using dlt filesystem readers
tags_data = dlt.read_csv_from_s3(bucket="movie-lens", file_glob="tags.csv")
# Run dlt pipeline
result = dlt.run_pipeline(tags_data, table_name="tags", write_disposition="replace")
context.log.info(f"Tags pipeline completed: {result}")
return MaterializeResult(
metadata={
"load_id": MetadataValue.text(str(result.get("load_id", ""))),
"table_name": MetadataValue.text(result["table_name"]),
"pipeline_name": MetadataValue.text(result["pipeline_name"]),
"destination": MetadataValue.text(result["destination"]),
"dataset_name": MetadataValue.text(result["dataset_name"]),
"write_disposition": MetadataValue.text(
result.get("write_disposition", "")
),
"completed_jobs": MetadataValue.int(result.get("completed_jobs", 0)),
}
)
@asset(group_name="summary", deps=[movies_pipeline, ratings_pipeline, tags_pipeline])
def movielens_summary(
context: AssetExecutionContext, dlt: DltResource
) -> MaterializeResult:
"""Generate summary of all loaded MovieLens data."""
# Get pipeline to access dlt info
pipeline = dlt.create_pipeline()
# Get schema info
schema = pipeline.default_schema
tables = list(schema.tables.keys())
context.log.info(f"MovieLens dataset loaded with tables: {tables}")
# Calculate basic metrics
table_count = len([t for t in tables if t in ["movies", "ratings", "tags"]])
return MaterializeResult(
metadata={
"pipeline_name": MetadataValue.text(dlt.pipeline_name),
"dataset_name": MetadataValue.text(dlt.dataset_name),
"destination": MetadataValue.text(dlt.destination),
"schema_version": MetadataValue.int(schema.version if schema else 0),
"tables": MetadataValue.json(tables),
"movielens_tables": MetadataValue.int(table_count),
}
)

View File

@@ -0,0 +1,13 @@
from dagster import Definitions, load_assets_from_modules
from csv_to_postgres import assets # noqa: TID252
from csv_to_postgres.resources import DltResource
all_assets = load_assets_from_modules([assets])
defs = Definitions(
assets=all_assets,
resources={
"dlt": DltResource(),
},
)

View File

@@ -0,0 +1,114 @@
import os
from typing import Any, Dict
import dlt
from dagster import ConfigurableResource, get_dagster_logger
from dlt.common.schema.typing import TWriteDispositionConfig
from dlt.sources.filesystem import readers
class DltResource(ConfigurableResource):
"""DLT resource for data pipeline operations."""
pipeline_name: str = "minio_to_postgres"
destination: str = "postgres"
dataset_name: str = "movielens"
def setup_environment(self):
"""Setup environment variables for dlt."""
# MinIO/S3 configuration
# os.environ["AWS_ACCESS_KEY_ID"]
# os.environ["AWS_SECRET_ACCESS_KEY"]
# os.environ["AWS_ENDPOINT_URL"]
# PostgreSQL configuration
postgres_url = os.getenv("POSTGRES_URL", "")
os.environ["DESTINATION__POSTGRES__CREDENTIALS"] = f"{postgres_url}/movielens"
# Enable detailed logging for dlt
os.environ["DLT_LOG_LEVEL"] = "INFO"
def create_pipeline(self):
"""Create dlt pipeline."""
import uuid
self.setup_environment()
# Use a unique pipeline name to avoid conflicts
unique_pipeline_name = f"{self.pipeline_name}_{uuid.uuid4().hex[:8]}"
return dlt.pipeline(
pipeline_name=unique_pipeline_name,
destination=self.destination,
dataset_name=self.dataset_name,
)
def read_csv_from_s3(self, bucket: str, file_glob: str, chunk_size: int = 10000):
"""Read CSV file from S3/MinIO using dlt filesystem readers."""
self.setup_environment()
logger = get_dagster_logger()
logger.info(f"Reading CSV from s3://{bucket}/{file_glob}")
# Use dlt filesystem readers
csv_reader = readers(
bucket_url=f"s3://{bucket}",
file_glob=file_glob,
).read_csv_duckdb(
chunk_size=chunk_size,
header=True,
)
return csv_reader
def run_pipeline(
self,
resource_data,
table_name: str,
write_disposition: TWriteDispositionConfig = "replace",
) -> Dict[str, Any]:
"""Run dlt pipeline with given resource data."""
logger = get_dagster_logger()
pipeline = self.create_pipeline()
logger.info(f"Running pipeline for table {table_name}")
# Configure pipeline for progress tracking
pipeline.config.progress = "log" # Enables progress logging
# Run the pipeline
load_info = pipeline.run(
resource_data, table_name=table_name, write_disposition=write_disposition
)
logger.info(f"Pipeline completed for {table_name}")
# Extract metadata from load_info
if load_info.load_packages:
package = load_info.load_packages[0]
completed_jobs = package.jobs.get("completed_jobs", [])
total_rows = sum(
getattr(job, "rows_count", 0)
for job in completed_jobs
if hasattr(job, "rows_count")
)
return {
"load_id": load_info.loads_ids[0] if load_info.loads_ids else None,
"table_name": table_name,
"completed_jobs": len(completed_jobs),
"pipeline_name": self.pipeline_name,
"destination": self.destination,
"dataset_name": self.dataset_name,
"write_disposition": write_disposition,
"total_rows": total_rows,
}
return {
"table_name": table_name,
"pipeline_name": self.pipeline_name,
"destination": self.destination,
"dataset_name": self.dataset_name,
}