A Coding Guide to Build and Validate End-to-End Partitioned Data Pipelines in Dagster with Machine Learning Integration
This tutorial focuses on implementing an advanced data pipeline using Dagster. We will set up a custom CSV-based IOManager to persist assets, define partitioned daily data generation, and process synthetic sales data through cleaning, feature engineering, and model training. Throughout the process, we will add a data-quality asset check to validate nulls, ranges, and categorical values, ensuring that metadata and outputs are stored in a structured way. The emphasis is on hands-on implementation, demonstrating how to integrate raw data ingestion, transformations, quality checks, and machine learning into a single reproducible workflow.
Target Audience Analysis
The target audience for this guide primarily includes data engineers, data scientists, and machine learning practitioners who are interested in building robust data pipelines. They are likely to work in sectors such as finance, e-commerce, and technology, where data-driven decision-making is crucial.
- Pain Points: Difficulty in managing complex data workflows, ensuring data quality, and integrating machine learning models into existing pipelines.
- Goals: To create efficient, maintainable, and scalable data pipelines that can handle large volumes of data while ensuring data integrity.
- Interests: Latest tools and frameworks for data engineering, best practices for machine learning integration, and case studies showcasing successful implementations.
- Communication Preferences: Technical documentation, hands-on tutorials, and community forums for sharing experiences and solutions.
Setting Up the Environment
We begin by installing the required libraries, Dagster, Pandas, and scikit-learn, to ensure we have the full toolset available. The following code snippet installs the necessary packages:
import sys, subprocess, json, os
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "dagster", "pandas", "scikit-learn"])
Next, we import essential modules, set up NumPy and Pandas for data handling, and define a base directory along with a start date to organize our pipeline outputs.
Creating a Custom IOManager
We define a custom CSVIOManager to save asset outputs as CSV or JSON files and reload them when needed. This allows our pipeline to process data for each date independently:
class CSVIOManager(IOManager):
def __init__(self, base: Path): self.base = base
def _path(self, key, ext): return self.base / f"{'_'.join(key.path)}.{ext}"
def handle_output(self, context, obj):
if isinstance(obj, pd.DataFrame):
p = self._path(context.asset_key, "csv"); obj.to_csv(p, index=False)
context.log.info(f"Saved {context.asset_key} -> {p}")
else:
p = self._path(context.asset_key, "json"); p.write_text(json.dumps(obj, indent=2))
context.log.info(f"Saved {context.asset_key} -> {p}")
def load_input(self, context):
k = context.upstream_output.asset_key; p = self._path(k, "csv")
df = pd.read_csv(p); context.log.info(f"Loaded {k} <- {p} ({len(df)} rows)"); return df
Defining Daily Partitions
We register the CSVIOManager with Dagster and set up a daily partitioning scheme:
@io_manager
def csv_io_manager(_): return CSVIOManager(BASE)
daily = DailyPartitionsDefinition(start_date=START)
Creating Core Assets
We create three core assets for the pipeline:
@asset(partitions_def=daily, description="Synthetic raw sales with noise & occasional nulls.")
def raw_sales(context) -> Output[pd.DataFrame]:
rng = np.random.default_rng(42)
n = 200; day = context.partition_key
x = rng.normal(100, 20, n); promo = rng.integers(0, 2, n); noise = rng.normal(0, 10, n)
sales = 2.5 * x + 30 * promo + noise + 50
x[rng.choice(n, size=max(1, n // 50), replace=False)] = np.nan
df = pd.DataFrame({"date": day, "units": x, "promo": promo, "sales": sales})
meta = {"rows": n, "null_units": int(df["units"].isna().sum()), "head": df.head().to_markdown()}
return Output(df, metadata=meta)
The raw_sales
asset generates synthetic daily sales data with noise and occasional missing values. The clean_sales
asset removes nulls and clips outliers, while the features
asset performs feature engineering by adding interaction and standardized variables.
Implementing Data Quality Checks
We enforce data integrity by verifying that there are no nulls and that the cleaned units remain within valid bounds:
@asset_check(asset=clean_sales, description="No nulls; promo in {0,1}; units within clipped bounds.")
def clean_sales_quality(clean_sales: pd.DataFrame) -> AssetCheckResult:
nulls = int(clean_sales.isna().sum().sum())
promo_ok = bool(set(clean_sales["promo"].unique()).issubset({0, 1}))
units_ok = bool(clean_sales["units"].between(clean_sales["units"].min(), clean_sales["units"].max()).all())
passed = bool((nulls == 0) and promo_ok and units_ok)
return AssetCheckResult(
passed=passed,
metadata={"nulls": nulls, "promo_ok": promo_ok, "units_ok": units_ok},
)
Training a Linear Regression Model
Finally, we train a simple linear regression model on the engineered features and output key metrics:
@asset(description="Train a tiny linear regressor; emit R^2 and coefficients.")
def tiny_model_metrics(context, features: pd.DataFrame) -> dict:
X = features[["z_units", "z_units_sq", "z_units_promo", "promo"]].values
y = features["sales"].values
model = LinearRegression().fit(X, y)
return {"r2_train": float(model.score(X, y)),
**{n: float(c) for n, c in zip(["z_units","z_units_sq","z_units_promo","promo"], model.coef_)}}
Materializing the Pipeline
We register our assets and the IO manager in Definitions, then materialize the entire DAG for a selected partition key:
defs = Definitions(
assets=[raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],
resources={"io_manager": csv_io_manager}
)
if __name__ == "__main__":
run_day = os.environ.get("RUN_DATE") or START
print("Materializing everything for:", run_day)
result = materialize(
[raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],
partition_key=run_day,
resources={"io_manager": csv_io_manager},
)
print("Run success:", result.success)
In conclusion, we materialize all assets and checks in a single Dagster run, confirm data quality, and train a regression model whose metrics are stored for inspection. This tutorial demonstrates how we can combine partitioning, asset definitions, and checks to build a technically robust and reproducible workflow.
Check out the FULL CODES here. Feel free to check out our GitHub Page for tutorials, codes, and notebooks. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and subscribe to our newsletter.
Partner with us for promotion.