Building a Data Cleaning Pipeline
Most data cleaning code looks like a wall of function calls: load data, fix a column, drop rows, convert types, fix another column. It works, but it’s hard to read, harder to test, and impossible to reuse when the upstream data changes. A pipeline fixes all three.
A pipeline chains operations into readable, testable, reusable steps. Pandas method chaining makes this natural, and pipe() handles anything that chaining can’t.
Why a Pipeline
The old way — a script full of intermediate variables:
df = pd.read_csv("sales.csv")
df["date"] = pd.to_datetime(df["date"])
df = df.dropna(subset=["customer_id"])
df["revenue"] = df["revenue"].str.replace("$", "").astype(float)
df = df[df["revenue"] > 0]
The pipeline way:
df = (
pd.read_csv("sales.csv")
.pipe(parse_dates, "date")
.pipe(dropna_on, "customer_id")
.pipe(clean_currency, "revenue")
.pipe(filter_positive, "revenue")
)
Same result. Better readability. And you can test each step independently.
Method Chaining
The simplest pipeline uses method chaining — calling one method after another on the same object:
df = (
pd.read_csv("sales.csv")
.assign(
date=lambda d: pd.to_datetime(d["date"]),
revenue=lambda d: pd.to_numeric(d["revenue"].str.replace("$", ""), errors="coerce")
)
.rename(columns={"cust_id": "customer_id"})
.sort_values("date")
.reset_index(drop=True)
)
assign() adds or transforms columns without breaking the chain. Each lambda d: ... receives the DataFrame and returns a modified copy. The whole expression reads top-to-bottom in the order operations happen.
The pipe() Method
pipe() lets you call any function as part of a chain, even when the function takes arguments that don’t fit neatly into method syntax:
def parse_dates(df, col):
df[col] = pd.to_datetime(df[col], errors="coerce")
return df
def drop_empty_strings(df, col):
df[col] = df[col].replace(r"^\s*$", pd.NA, regex=True)
return df
df = (
pd.read_csv("sales.csv")
.pipe(parse_dates, "date")
.pipe(drop_empty_strings, "notes")
.pipe(filter_outliers, "revenue", z_threshold=3)
)
pipe(fn, arg1, arg2) calls fn(df, arg1, arg2). It forwards the DataFrame as the first argument, so any function can slot into the chain.
Building Reusable Transformers
Wrap transformations in functions that return a function — a factory pattern that gives you reusable, configurable pipeline steps:
def clean_currency(col):
"""Return a transformer that converts a currency column to float."""
def transform(df):
df[col] = (
df[col]
.str.replace("$", "", regex=False)
.str.replace(",", "", regex=False)
.str.strip()
.pipe(pd.to_numeric, errors="coerce")
)
return df
return transform
def rename_columns(mapping):
"""Return a transformer that renames columns using a dict."""
def transform(df):
return df.rename(columns=mapping)
return transform
Each factory returns a transformer you pass directly to pipe().
A Complete Pipeline Example
Here’s a real pipeline that loads, cleans, and validates a sales dataset:
import pandas as pd
import numpy as np
class ColumnTransformer:
def __init__(self, col, fn):
self.col = col
self.fn = fn
def __call__(self, df):
df[self.col] = self.fn(df[self.col])
return df
def clean_currency(col):
def fn(series):
return (
series
.astype(str)
.str.replace(r"[\$,]", "", regex=True)
.str.strip()
.pipe(pd.to_numeric, errors="coerce")
)
return ColumnTransformer(col, fn)
def to_datetime(col, format="%Y-%m-%d"):
def fn(series):
return pd.to_datetime(series, format=format, errors="coerce")
return ColumnTransformer(col, fn)
def filter_negative(df, col):
return df[df[col] > 0].copy()
def ensure_types(df, schema):
for col, dtype in schema.items():
if col in df.columns:
try:
df[col] = df[col].astype(dtype)
except (ValueError, TypeError):
pass
return df
schema = {"customer_id": "int64", "quantity": "int64", "revenue": "float64"}
df = (
pd.read_csv("sales.csv")
.pipe(to_datetime("date"))
.pipe(clean_currency("revenue"))
.pipe(clean_currency("quantity"))
.pipe(lambda d: filter_negative(d, "revenue"))
.pipe(lambda d: ensure_types(d, schema))
.reset_index(drop=True)
)
The pipeline is self-documenting: you can read the chain top-to-bottom and see exactly what transforms are applied, in what order. Adding a new step means appending one .pipe() call.
Testing Individual Steps
The big win with pipelines is testing. Each step is a function that takes a DataFrame and returns a DataFrame:
import pytest
def test_clean_currency():
df = pd.DataFrame({"price": ["$1.23", "$4.56", "invalid"]})
result = df.pipe(clean_currency("price"))
assert result["price"].iloc[0] == 1.23
assert result["price"].iloc[1] == 4.56
assert pd.isna(result["price"].iloc[2])
def test_filter_negative():
df = pd.DataFrame({"revenue": [10, -5, 20]})
result = df.pipe(lambda d: filter_negative(d, "revenue"))
assert len(result) == 2
assert all(result["revenue"] > 0)
Test each transformer in isolation. When a pipeline breaks, run the failing test against the step that failed and fix it directly.
Handling Missing Data in Pipelines
Missing values are common. Handle them in the pipeline with explicit steps:
def fill_missing(df, col, fill_value):
df[col] = df[col].fillna(fill_value)
return df
def interpolate_missing(df, col, method="linear"):
df[col] = df[col].interpolate(method=method)
return df
df = (
pd.read_csv("sales.csv")
.pipe(clean_currency("revenue"))
.pipe(lambda d: d.dropna(subset=["customer_id"])) # drop rows missing key fields
.pipe(fill_missing, "discount", 0) # fill numeric with 0
.pipe(interpolate_missing, "quantity") # fill with linear interpolation
)
Drop rows that are missing critical identifiers. Fill numeric columns with defaults. Interpolate time-series columns to maintain continuity.
Logging and Debugging
When a pipeline fails, you need to know where. Add logging between steps:
import logging
def log_step(name):
def fn(df):
print(f"[{name}] rows={len(df)}, cols={list(df.columns)}")
return df
return fn
df = (
pd.read_csv("sales.csv")
.pipe(log_step("load"))
.pipe(to_date("date"))
.pipe(log_step("dates parsed"))
.pipe(clean_currency("revenue"))
.pipe(log_step("currency cleaned"))
.pipe(lambda d: filter_negative(d, "revenue"))
.pipe(log_step("filtered"))
)
log_step() prints the step name, row count, and column names so you can pinpoint where the data changes unexpectedly.
Persisting the Output
At the end of the pipeline, write to the format your downstream system expects:
# CSV for tabular workflows
df.to_csv("sales_clean.csv", index=False)
# Parquet for analytics (fast reads, schema preserved)
df.to_parquet("sales_clean.parquet")
# JSON for APIs
df.to_json("sales_clean.json", orient="records", lines=True)
Parquet is usually the right choice for analytics pipelines — it’s compressed, supports schema evolution, and pandas reads it much faster than CSV for large files.
See Also
- /tutorials/scientific-python/pandas-data-cleaning/ — the foundational techniques this pipeline builds on
- /tutorials/scientific-python/ds-pandas-intro/ — DataFrame operations the pipeline uses
- /tutorials/scientific-python/pandas-getting-started/ — getting started with pandas for data work