←back to Blog

How to Build an End-to-End Data Engineering and Machine Learning Pipeline with Apache Spark and PySpark

«`html

How to Build an End-to-End Data Engineering and Machine Learning Pipeline with Apache Spark and PySpark

This tutorial explores harnessing Apache Spark’s techniques using PySpark directly in Google Colab. We progress through the essential steps required to build a complete data engineering and machine learning pipeline, beginning with setting up a local Spark session and moving through transformations, SQL queries, joins, and window functions.

Target Audience Analysis

The target audience for this tutorial mainly consists of:

  • Data Engineers and Data Scientists: Individuals responsible for managing and analyzing large datasets.
  • Business Analysts: Professionals looking to leverage machine learning for predictive analytics.
  • Students and Learners: Individuals seeking to gain hands-on experience with Apache Spark and machine learning principles.

Pain Points:

  • Difficulty in integrating different stages of data processing and machine learning workflows.
  • Challenges in scaling solutions for large datasets effectively.
  • Need for clear and concise examples to facilitate learning software tools like PySpark.

Goals:

  • To build a functional and scalable data pipeline.
  • To understand the capabilities of Apache Spark for data transformations and machine learning.
  • To gain practical experience through hands-on examples.

Interests:

  • Latest advancements in data engineering and machine learning.
  • Effective use of cloud-based platforms like Google Colab for data analytics.
  • Methods for optimizing performance in data processing.

Communication Preferences:

  • Clear, technical language with a focus on practical applications and coding examples.
  • Visual aids such as flowcharts or diagrams to illustrate processes.
  • Code snippets that are easy to follow with step-by-step explanations.

Setting Up PySpark in Google Colab

We begin by setting up PySpark and initializing the Spark session:

!pip install -q pyspark==3.5.1
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, FloatType
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

spark = (SparkSession.builder.appName("ColabSparkAdvancedTutorial")
        .master("local[*]")
        .config("spark.sql.shuffle.partitions", "4")
        .getOrCreate())
print("Spark version:", spark.version)

Next, we create a structured DataFrame with user information:

data = [
   (1, "Alice", "IN", "2025-10-01", 56000.0, "premium"),
   (2, "Bob", "US", "2025-10-03", 43000.0, "standard"),
   (3, "Carlos", "IN", "2025-09-27", 72000.0, "premium"),
   (4, "Diana", "UK", "2025-09-30", 39000.0, "standard"),
   (5, "Esha", "IN", "2025-10-02", 85000.0, "premium"),
   (6, "Farid", "AE", "2025-10-02", 31000.0, "basic"),
   (7, "Gita", "IN", "2025-09-29", 46000.0, "standard"),
   (8, "Hassan", "PK", "2025-10-01", 52000.0, "premium"),
]
schema = StructType([
   StructField("id", IntegerType(), False),
   StructField("name", StringType(), True),
   StructField("country", StringType(), True),
   StructField("signup_date", StringType(), True),
   StructField("income", FloatType(), True),
   StructField("plan", StringType(), True),
])
df = spark.createDataFrame(data, schema)
df.show()

Transforming Data

We perform various transformations to enhance our DataFrame:

df2 = (df.withColumn("signup_ts", F.to_timestamp("signup_date"))
        .withColumn("year", F.year("signup_ts"))
        .withColumn("month", F.month("signup_ts"))
        .withColumn("is_india", (F.col("country") == "IN").cast("int")))
df2.show()

We then create a temporary SQL view and execute aggregation queries:

df2.createOrReplaceTempView("users")
spark.sql("""
SELECT country, COUNT(*) AS cnt, AVG(income) AS avg_income
FROM users
GROUP BY country
ORDER BY cnt DESC
""").show()

Leveraging Window Functions

Next, we apply window functions to rank users by income:

w = Window.partitionBy("country").orderBy(F.col("income").desc())
df_ranked = df2.withColumn("income_rank_in_country", F.rank().over(w))
df_ranked.show()

We also introduce a user-defined function (UDF) to assign subscription plan priorities:

def plan_priority(plan):
   if plan == "premium": return 3
   if plan == "standard": return 2
   if plan == "basic": return 1
   return 0
plan_priority_udf = F.udf(plan_priority, IntegerType())
df_udf = df_ranked.withColumn("plan_priority", plan_priority_udf(F.col("plan")))
df_udf.show()

Joining Datasets

We enrich our user dataset by joining it with country-level metadata:

country_data = [
   ("IN", "Asia", 1.42), ("US", "North America", 0.33),
   ("UK", "Europe", 0.07), ("AE", "Asia", 0.01), ("PK", "Asia", 0.24),
]
country_schema = StructType([
   StructField("country", StringType(), True),
   StructField("region", StringType(), True),
   StructField("population_bn", FloatType(), True),
])
country_df = spark.createDataFrame(country_data, country_schema)

joined = df_udf.alias("u").join(country_df.alias("c"), on="country", how="left")
joined.show()

We compute analytical summaries by region and subscription plan:

region_stats = (joined.groupBy("region", "plan")
               .agg(F.count("*").alias("users"),
                    F.round(F.avg("income"), 2).alias("avg_income"))
               .orderBy("region", "plan"))
region_stats.show()

Machine Learning with PySpark

We prepare our data for machine learning by indexing categorical columns and training a logistic regression model:

ml_df = joined.withColumn("label", (F.col("plan") == "premium").cast("int")).na.drop()
country_indexer = StringIndexer(inputCol="country", outputCol="country_idx", handleInvalid="keep")
country_fitted = country_indexer.fit(ml_df)
ml_df2 = country_fitted.transform(ml_df)

assembler = VectorAssembler(inputCols=["income", "country_idx", "plan_priority"], outputCol="features")
ml_final = assembler.transform(ml_df2)
train_df, test_df = ml_final.randomSplit([0.7, 0.3], seed=42)

lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=20)
lr_model = lr.fit(train_df)
preds = lr_model.transform(test_df)
preds.select("name", "country", "income", "plan", "label", "prediction", "probability").show(truncate=False)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
acc = evaluator.evaluate(preds)
print("Classification accuracy:", acc)

Saving and Reloading Data

We conclude by writing the processed data to Parquet format and reading it back into Spark for verification:

output_path = "/content/spark_users_parquet"
joined.write.mode("overwrite").parquet(output_path)
parquet_df = spark.read.parquet(output_path)
print("Parquet reloaded:")
parquet_df.show()

recent = spark.sql("""
SELECT name, country, income, signup_ts
FROM users
WHERE signup_ts >= '2025-10-01'
ORDER BY signup_ts DESC
""")
recent.show()

recent.explain()
spark.stop()

Conclusion

We gain a practical understanding of how PySpark unifies data engineering and machine learning tasks within a single scalable framework. By experimenting with these concepts, we enhance our ability to prototype and deploy Spark-based data solutions effectively.

Check out the FULL CODES here. Feel free to check out our GitHub Page for Tutorials, Codes, and Notebooks. You can also follow us on Twitter. Don’t forget to join our 100k+ ML SubReddit and subscribe to our Newsletter. Plus, you can now join us on Telegram as well.

«`