A Code Implementation of a Real‑Time In‑Memory Sensor Alert Pipeline in Google Colab with FastStream, RabbitMQ, TestRabbitBroker, Pydantic
In this notebook, we demonstrate how to build a fully in-memory “sensor alert” pipeline in Google Colab using , a high-performance, Python-native stream processing framework, and its integration with RabbitMQ. By leveraging t’s RabbitBroker and TestRabbitBroker, we simulate a message broker without needing external infrastructure. We orchestrate four distinct stages: ingestion & validation, normalization, monitoring & alert generation, and archiving, each defined as Pydantic models (RawSensorData, NormalizedData, AlertData) to ensure data quality and type safety. Under the hood, Python’s asyncio powers asynchronous message flow, while nest_asyncio enables nested event loops in Colab. We also employ the standard logging module for traceable pipeline execution and pandas for final result inspection, making it easy to visualize archived alerts in a DataFrame.
Copy Code Copied Use a different Browser
!pip install -q faststream[rabbit] nest_asyncio
We install FastStream with its RabbitMQ integration, providing the core stream-processing framework and broker connectors, as well as the nest_asyncio package, which enables nested asyncio event loops in environments like Colab. All this is achieved while keeping the output minimal with the -q flag.
Copy Code Copied Use a different Browser
import nest_asyncio, asyncio, logging
nest_()
We import the nest_asyncio, asyncio, and logging modules, then apply nest_() to patch Python’s event loop so that you can run nested asynchronous tasks inside environments like Colab or Jupyter notebooks without errors. The logging import readies you to instrument your pipeline with detailed runtime logs.
Copy Code Copied Use a different Browser
Config(level=logging.INFO, format=»%(asctime)s %(levelname)s %(message)s»)
logger = Logger(«sensor_pipeline»)
We configure Python’s built‑in logging to emit INFO‑level (and above) messages prefixed with a timestamp and severity, then create a dedicated logger named “sensor_pipeline” for emitting structured logs within your streaming pipeline.
Copy Code Copied Use a different Browser
from faststream import FastStream
from t import RabbitBroker, TestRabbitBroker
from pydantic import BaseModel, Field, validator
import pandas as pd
from typing import List
We bring in FastStream’s core FastStream class alongside its RabbitMQ connectors (RabbitBroker for real brokers and TestRabbitBroker for in‑memory testing), Pydantic’s BaseModel, Field, and validator for declarative data validation, pandas for tabular result inspection, and Python’s List type for annotating our in‑memory archives.
Copy Code Copied Use a different Browser
broker = RabbitBroker(«amqp://guest:guest@localhost:5672/»)
app = FastStream(broker)
We instantiate a RabbitBroker pointed at a (local) RabbitMQ server using the AMQP URL, then create a FastStream application bound to that broker, setting up the messaging backbone for your pipeline stages.
Copy Code Copied Use a different Browser
class RawSensorData(BaseModel):
sensor_id: str = Field(…, examples=[«sensor_1»])
reading_celsius: float = Field(…, ge=-50, le=150, examples=[23.5])
@validator(«sensor_id»)
def must_start_with_sensor(cls, v):
if not swith(«sensor_»):
raise ValueError(«sensor_id must start with ‘sensor_’»)
return v
class NormalizedData(BaseModel):
sensor_id: str
reading_kelvin: float
class AlertData(BaseModel):
sensor_id: str
reading_kelvin: float
alert: bool
These Pydantic models define the schema for each stage: RawSensorData enforces input validity (e.g., reading range and a sensor_ prefix), NormalizedData converts Celsius to Kelvin, and AlertData encapsulates the final alert payload (including a boolean flag), ensuring a type-safe data flow throughout the pipeline.
Copy Code Copied Use a different Browser
archive: List[AlertData] = []
@riber(«sensor_input»)
@sher(«normalized_input»)
async def ingest_and_validate(raw: RawSensorData) -> dict:
(f»Ingested raw data: ()»)
return ()
@riber(«normalized_input»)
@sher(«sensor_alert»)
async def normalize(data: dict) -> dict:
norm = NormalizedData(
sensor_id=data[«sensor_id»],
reading_kelvin=data[«reading_celsius»] + 273.15
)
(f»Normalized to Kelvin: ()»)
return ()
ALERT_THRESHOLD_K = 323.15
@riber(«sensor_alert»)
@sher(«archive_topic»)
async def monitor(data: dict) -> dict:
alert_flag = data[«reading_kelvin»] > ALERT_THRESHOLD_K
alert = AlertData(
sensor_id=data[«sensor_id»],
reading_kelvin=data[«reading_kelvin»],
alert=alert_flag
)
(f»Monitor result: ()»)
return ()
@riber(«archive_topic»)
async def archive_data(payload: dict):
rec = AlertData(**payload)
d(rec)
(f»Archived: ()»)
An in-memory archive list collects all finalized alerts, while four asynchronous functions, wired via @riber/@sher, form the pipeline stages. These functions ingest and validate raw sensor inputs, convert Celsius to Kelvin, check against an alert threshold, and finally archive each AlertData record, emitting logs at every step for full traceability.
Copy Code Copied Use a different Browser
async def main():
readings = [
«sensor_id»: «sensor_1», «reading_celsius»: 45.2,
«sensor_id»: «sensor_2», «reading_celsius»: 75.1,
«sensor_id»: «sensor_3», «reading_celsius»: 50.0,
]
async with TestRabbitBroker(broker) as tb:
for r in readings:
await sh(r, «sensor_input»)
await (0.1)
df = pd.DataFrame([() for a in archive])
print(«\nFinal Archived Alerts:»)
display(df)
(main())
Finally, the main coroutine publishes a set of sample sensor readings into the in-memory TestRabbitBroker, pauses briefly to allow each pipeline stage to run, and then collates the resulting AlertData records from the archive into a pandas DataFrame for easy display and verification of the end-to-end alert flow. At the end, (main()) kicks off the entire async demo in Colab.
In conclusion, this tutorial demonstrates how FastStream, combined with RabbitMQ abstractions and in-memory testing via TestRabbitBroker, can accelerate the development of real-time data pipelines without the overhead of deploying external brokers. With Pydantic handling schema validation, asyncio managing concurrency, and pandas enabling quick data analysis, this pattern provides a robust foundation for sensor monitoring, ETL tasks, or event‑driven workflows. You can seamlessly transition from this in‑memory demo to production by swapping in a live broker URL (RabbitMQ, Kafka, NATS, or Redis) and running faststream run under uvicorn or your preferred ASGI server, unlocking scalable, maintainable stream processing in any Python environment.
Here is the . Also, don’t forget to follow us on and join our and . Don’t Forget to join our .
[
The post appeared first on .