feat(dagster): add Dagster

This commit is contained in:
Masaki Yatsu
2025-09-15 19:25:31 +09:00
parent c725124a7a
commit dbcbaedf6f
21 changed files with 4018 additions and 0 deletions

View File

@@ -0,0 +1,8 @@
from pathlib import Path
from dagster import definitions, load_from_defs_folder
@definitions
def defs():
return load_from_defs_folder(path_within_project=Path(__file__).parent)

View File

@@ -0,0 +1,96 @@
import dagster as dg
from dagster_duckdb import DuckDBResource
@dg.asset
def customers(duckdb: DuckDBResource):
url = "https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_customers.csv"
table_name = "customers"
with duckdb.get_connection() as conn:
conn.execute(
f"""
create or replace table {table_name} as (
select * from read_csv_auto('{url}')
)
"""
)
@dg.asset
def orders(duckdb: DuckDBResource):
url = "https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_orders.csv"
table_name = "orders"
with duckdb.get_connection() as conn:
conn.execute(
f"""
create or replace table {table_name} as (
select * from read_csv_auto('{url}')
)
"""
)
@dg.asset
def payments(duckdb: DuckDBResource):
url = "https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_payments.csv"
table_name = "payments"
with duckdb.get_connection() as conn:
conn.execute(
f"""
create or replace table {table_name} as (
select * from read_csv_auto('{url}')
)
"""
)
@dg.asset(
deps=["customers", "orders", "payments"],
)
def orders_aggregation(duckdb: DuckDBResource):
table_name = "orders_aggregation"
with duckdb.get_connection() as conn:
conn.execute(
f"""
create or replace table {table_name} as (
select
c.id as customer_id,
c.first_name,
c.last_name,
count(distinct o.id) as total_orders,
count(distinct p.id) as total_payments,
coalesce(sum(p.amount), 0) as total_amount_spent
from customers c
left join orders o
on c.id = o.user_id
left join payments p
on o.id = p.order_id
group by 1, 2, 3
);
"""
)
@dg.asset_check(asset="orders_aggregation")
def orders_aggregation_check(duckdb: DuckDBResource) -> dg.AssetCheckResult:
table_name = "orders_aggregation"
with duckdb.get_connection() as conn:
res = conn.execute(f"select count(*) from {table_name}").fetchone()
if res is None:
return dg.AssetCheckResult(
passed=False, metadata={"message": "Order aggregation check failed"}
)
row_count = res[0]
if row_count == 0:
return dg.AssetCheckResult(
passed=False, metadata={"message": "Order aggregation check failed"}
)
return dg.AssetCheckResult(
passed=True, metadata={"message": "Order aggregation check passed"}
)

View File

@@ -0,0 +1,13 @@
import dagster as dg
from dagster_duckdb import DuckDBResource
database_resource = DuckDBResource(database="/tmp/jaffle_platform.duckdb")
@dg.definitions
def resources():
return dg.Definitions(
resources={
"duckdb": database_resource,
}
)

View File

@@ -0,0 +1,17 @@
from typing import Union
import dagster as dg
# @dg.schedule(cron_schedule="@daily", target="*")
# def schedules(context: dg.ScheduleEvaluationContext) -> Union[dg.RunRequest, dg.SkipReason]:
# return dg.SkipReason("Skipping. Change this to return a RunRequest to launch a run.")
@dg.schedule(cron_schedule="* * * * *", target="*")
def tutorial_schedule(
context: dg.ScheduleEvaluationContext,
) -> Union[dg.RunRequest, dg.SkipReason]:
return dg.SkipReason(
"Skipping. Change this to return a RunRequest to launch a run."
)