←back to Blog

A Coding Guide to Build and Validate End-to-End Partitioned Data Pipelines in Dagster with Machine Learning Integration

A Coding Guide to Build and Validate End-to-End Partitioned Data Pipelines in Dagster with Machine Learning Integration

This tutorial focuses on implementing an advanced data pipeline using Dagster. We will set up a custom CSV-based IOManager to persist assets, define partitioned daily data generation, and process synthetic sales data through cleaning, feature engineering, and model training. Throughout the process, we will add a data-quality asset check to validate nulls, ranges, and categorical values, ensuring that metadata and outputs are stored in a structured way. The emphasis is on hands-on implementation, demonstrating how to integrate raw data ingestion, transformations, quality checks, and machine learning into a single reproducible workflow.

Target Audience Analysis

The target audience for this guide primarily includes data engineers, data scientists, and machine learning practitioners who are interested in building robust data pipelines. They are likely to work in sectors such as finance, e-commerce, and technology, where data-driven decision-making is crucial.

  • Pain Points: Difficulty in managing complex data workflows, ensuring data quality, and integrating machine learning models into existing pipelines.
  • Goals: To create efficient, maintainable, and scalable data pipelines that can handle large volumes of data while ensuring data integrity.
  • Interests: Latest tools and frameworks for data engineering, best practices for machine learning integration, and case studies showcasing successful implementations.
  • Communication Preferences: Technical documentation, hands-on tutorials, and community forums for sharing experiences and solutions.

Setting Up the Environment

We begin by installing the required libraries, Dagster, Pandas, and scikit-learn, to ensure we have the full toolset available. The following code snippet installs the necessary packages:

import sys, subprocess, json, os
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "dagster", "pandas", "scikit-learn"])

Next, we import essential modules, set up NumPy and Pandas for data handling, and define a base directory along with a start date to organize our pipeline outputs.

Creating a Custom IOManager

We define a custom CSVIOManager to save asset outputs as CSV or JSON files and reload them when needed. This allows our pipeline to process data for each date independently:

class CSVIOManager(IOManager):
   def __init__(self, base: Path): self.base = base
   def _path(self, key, ext): return self.base / f"{'_'.join(key.path)}.{ext}"
   def handle_output(self, context, obj):
       if isinstance(obj, pd.DataFrame):
           p = self._path(context.asset_key, "csv"); obj.to_csv(p, index=False)
           context.log.info(f"Saved {context.asset_key} -> {p}")
       else:
           p = self._path(context.asset_key, "json"); p.write_text(json.dumps(obj, indent=2))
           context.log.info(f"Saved {context.asset_key} -> {p}")
   def load_input(self, context):
       k = context.upstream_output.asset_key; p = self._path(k, "csv")
       df = pd.read_csv(p); context.log.info(f"Loaded {k} <- {p} ({len(df)} rows)"); return df

Defining Daily Partitions

We register the CSVIOManager with Dagster and set up a daily partitioning scheme:

@io_manager
def csv_io_manager(_): return CSVIOManager(BASE)

daily = DailyPartitionsDefinition(start_date=START)

Creating Core Assets

We create three core assets for the pipeline:

@asset(partitions_def=daily, description="Synthetic raw sales with noise & occasional nulls.")
def raw_sales(context) -> Output[pd.DataFrame]:
   rng = np.random.default_rng(42)
   n = 200; day = context.partition_key
   x = rng.normal(100, 20, n); promo = rng.integers(0, 2, n); noise = rng.normal(0, 10, n)
   sales = 2.5 * x + 30 * promo + noise + 50
   x[rng.choice(n, size=max(1, n // 50), replace=False)] = np.nan
   df = pd.DataFrame({"date": day, "units": x, "promo": promo, "sales": sales})
   meta = {"rows": n, "null_units": int(df["units"].isna().sum()), "head": df.head().to_markdown()}
   return Output(df, metadata=meta)

The raw_sales asset generates synthetic daily sales data with noise and occasional missing values. The clean_sales asset removes nulls and clips outliers, while the features asset performs feature engineering by adding interaction and standardized variables.

Implementing Data Quality Checks

We enforce data integrity by verifying that there are no nulls and that the cleaned units remain within valid bounds:

@asset_check(asset=clean_sales, description="No nulls; promo in {0,1}; units within clipped bounds.")
def clean_sales_quality(clean_sales: pd.DataFrame) -> AssetCheckResult:
   nulls = int(clean_sales.isna().sum().sum())
   promo_ok = bool(set(clean_sales["promo"].unique()).issubset({0, 1}))
   units_ok = bool(clean_sales["units"].between(clean_sales["units"].min(), clean_sales["units"].max()).all())
   passed = bool((nulls == 0) and promo_ok and units_ok)
   return AssetCheckResult(
       passed=passed,
       metadata={"nulls": nulls, "promo_ok": promo_ok, "units_ok": units_ok},
   )

Training a Linear Regression Model

Finally, we train a simple linear regression model on the engineered features and output key metrics:

@asset(description="Train a tiny linear regressor; emit R^2 and coefficients.")
def tiny_model_metrics(context, features: pd.DataFrame) -> dict:
   X = features[["z_units", "z_units_sq", "z_units_promo", "promo"]].values
   y = features["sales"].values
   model = LinearRegression().fit(X, y)
   return {"r2_train": float(model.score(X, y)),
           **{n: float(c) for n, c in zip(["z_units","z_units_sq","z_units_promo","promo"], model.coef_)}} 

Materializing the Pipeline

We register our assets and the IO manager in Definitions, then materialize the entire DAG for a selected partition key:

defs = Definitions(
   assets=[raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],
   resources={"io_manager": csv_io_manager}
)

if __name__ == "__main__":
   run_day = os.environ.get("RUN_DATE") or START
   print("Materializing everything for:", run_day)
   result = materialize(
       [raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],
       partition_key=run_day,
       resources={"io_manager": csv_io_manager},
   )
   print("Run success:", result.success)

In conclusion, we materialize all assets and checks in a single Dagster run, confirm data quality, and train a regression model whose metrics are stored for inspection. This tutorial demonstrates how we can combine partitioning, asset definitions, and checks to build a technically robust and reproducible workflow.

Check out the FULL CODES here. Feel free to check out our GitHub Page for tutorials, codes, and notebooks. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and subscribe to our newsletter.

Partner with us for promotion.