Building Efficient Data Pipelines with Polars: A Step-by-Step Guide
Published:
Data pipelines are at the heart of modern data engineering. They allow you to automate data ingestion, cleaning, transformation, and feature generation—making your datasets ready for analysis or machine learning tasks.
In this post, we’ll explore how to build scalable, modular, and efficient data pipelines using Polars. Along the way, you’ll learn best practices like using lazy evaluation, chaining operations with .pipe(), and structuring your project for clarity and maintainability.
We’ll be working with a YouTube trending videos dataset (public domain, available on Kaggle) as our example.
Setup
Before we start, ensure you have Polars installed and up to date:. We will uv for python dependencies management. You can install it first using the official documentation
uv init
uv add polars
What is a Data Pipeline?
A data pipeline is a sequence of automated steps that:
- Pull data from one or more sources
- Transform and clean the data
- Save the processed dataset for downstream tasks
With Polars, building pipelines becomes elegant and efficient thanks to method chaining and lazy evaluation.
Step 0: Utility Function
def read_category_mappings(path: str) -> Dict[int, str]:
"""Load and parse YouTube category mappings from JSON."""
with open(path, "r") as f:
categories = json.load(f)
return {int(c["id"]): c["snippet"]["title"] for c in categories["items"]}
Explanation:
This function reads a JSON file containing YouTube category information.
The JSON structure typically looks like this:
{
"items": [
{"id": "1", "snippet": {"title": "Film & Animation"}},
{"id": "2", "snippet": {"title": "Autos & Vehicles"}}
]
}
It returns a dictionary mapping category IDs (as integers) to human-readable names:
{1: "Film & Animation", 2: "Autos & Vehicles"}
Step 1: Load Data
def load_data(videos_path: str, categories_path: str):
id_to_category = read_category_mappings(categories_path)
df = pl.read_csv(videos_path)
return df, id_to_category
Explanation:
Reads the CSV file with video metadata (GBvideos.csv) into a Polars DataFrame.
Calls read_category_mappings() to get the mapping dictionary.
Returns both the DataFrame and the category mapping.
Essentially, this is the entry point for raw data into the pipeline.
Step 2: Parse Dates
def parse_dates(df: pl.DataFrame) -> pl.DataFrame:
return df.with_columns([
pl.col("trending_date").str.to_datetime("%y.%d.%m").alias("trending_date_dt"),
pl.col("publish_time").str.to_datetime("%Y-%m-%dT%H:%M:%S%.fZ").alias("publish_time_dt"),
])
Explanation:
Converts string representations of dates into Polars datetime objects.
trending_date is in YY.DD.MM format (strange YouTube format).
publish_time is in ISO format YYYY-MM-DDTHH:MM:SSZ.
Why: This enables date arithmetic like calculating how many days it took for a video to trend.
Step 3: Add Category Names
def add_categories(df: pl.DataFrame, id_to_category: Dict[int, str]) -> pl.DataFrame:
return df.with_columns([
pl.col("category_id").replace_strict(id_to_category, default="Unknown").alias("category")
])
Explanation:
Replaces numeric category_id in the video DataFrame with human-readable category names using the dictionary from Step 1.
If a category ID is missing in the mapping, it defaults to “Unknown”.
This is helpful for readability and analysis (e.g., grouping by category).
Step 4: Compute Engagement Metrics
def compute_engagement_metrics(df: pl.DataFrame) -> pl.DataFrame:
return df.with_columns([
(pl.col("likes") / pl.col("dislikes").replace(0, None)).alias("likes_to_dislikes"),
(pl.col("likes") / pl.col("views")).alias("likes_to_views"),
(pl.col("comment_count") / pl.col("views")).alias("comments_to_views"),
])
Explanation:
Calculates ratios to measure engagement:
likes_to_dislikes: Positive reaction vs negative reaction.
likes_to_views: Likes normalized by views.
comments_to_views: Comments normalized by views.
Handles divide-by-zero safely by replacing zero dislikes with None.
Step 5: Compute Temporal Metrics
def compute_temporal_metrics(df: pl.DataFrame) -> pl.DataFrame:
return df.with_columns([
(pl.col("trending_date_dt") - pl.col("publish_time_dt"))
.dt.total_days()
.alias("days_to_trending"),
pl.col("trending_date_dt").dt.weekday().alias("trending_weekday")
])
Explanation:
Computes how long it took for a video to trend (days_to_trending).
Extracts the weekday of trending (trending_weekday) to see patterns like whether videos trend more on certain days.
Uses the datetime columns created in Step 2.
Step 6: Aggregate Per Video
def aggregate_videos(df: pl.DataFrame) -> pl.DataFrame:
return (
df.group_by("video_id")
.agg([
(pl.col("trending_date_dt").max() - pl.col("trending_date_dt").min())
.dt.total_days()
.alias("days_in_trending"),
pl.col("views").first().alias("initial_views"),
pl.col("likes").first().alias("initial_likes"),
pl.col("category").first().alias("category"),
pl.col("days_to_trending").min().alias("min_days_to_trend"),
pl.col("likes_to_dislikes").mean().alias("avg_likes_ratio"),
pl.len().alias("trending_appearances")
])
.sort("days_in_trending", descending=True)
)
Explanation:
Groups data by video_id because a video can appear on trending multiple times.
Aggregates key metrics per video:
days_in_trending: Total days a video stayed trending.
initial_views & initial_likes: Values when the video first appeared.
category: First category (same for all rows of a video).
min_days_to_trend: Minimum days from publish to trending.
avg_likes_ratio: Average likes/dislikes across appearances.
trending_appearances: How many times it appeared on trending.
Sorts by days_in_trending to see the most trending videos first.
Main Function
def main():
# Paths
videos_path = "GBvideos.csv"
categories_path = "GB_category_id.json"
# Run pipeline
df, id_to_category = load_data(videos_path, categories_path)
df = parse_dates(df)
df = add_categories(df, id_to_category)
df = compute_engagement_metrics(df)
df = compute_temporal_metrics(df)
target = aggregate_videos(df)
# Outputs
print(f"Processed {len(target)} unique trending videos")
print(f"Dataset shape: {target.shape}")
print("\nTop 5 videos by days in trending:")
print(target.head())
Explanation:
Orchestrates the entire pipeline:
Loads raw data.
Parses dates for calculations.
Maps category names.
Computes engagement metrics.
Computes temporal metrics.
Aggregates results per video.
Prints summary statistics and top results.
Using python if __name__ == "__main__": main() ensures the pipeline runs only when the script is executed, not when imported.
You can get the entire source code via github
