pyguides

Building a Data Cleaning Pipeline

Most data cleaning code looks like a wall of function calls: load data, fix a column, drop rows, convert types, fix another column. It works, but it’s hard to read, harder to test, and impossible to reuse when the upstream data changes. A pipeline fixes all three.

A pipeline chains operations into readable, testable, reusable steps. Pandas method chaining makes this natural, and pipe() handles anything that chaining can’t.

Why a Pipeline

The old way — a script full of intermediate variables:

df = pd.read_csv("sales.csv")
df["date"] = pd.to_datetime(df["date"])
df = df.dropna(subset=["customer_id"])
df["revenue"] = df["revenue"].str.replace("$", "").astype(float)
df = df[df["revenue"] > 0]

The pipeline way:

df = (
    pd.read_csv("sales.csv")
    .pipe(parse_dates, "date")
    .pipe(dropna_on, "customer_id")
    .pipe(clean_currency, "revenue")
    .pipe(filter_positive, "revenue")
)

Same result. Better readability. And you can test each step independently.

Method Chaining

The simplest pipeline uses method chaining — calling one method after another on the same object:

df = (
    pd.read_csv("sales.csv")
    .assign(
        date=lambda d: pd.to_datetime(d["date"]),
        revenue=lambda d: pd.to_numeric(d["revenue"].str.replace("$", ""), errors="coerce")
    )
    .rename(columns={"cust_id": "customer_id"})
    .sort_values("date")
    .reset_index(drop=True)
)

assign() adds or transforms columns without breaking the chain. Each lambda d: ... receives the DataFrame and returns a modified copy. The whole expression reads top-to-bottom in the order operations happen.

The pipe() Method

pipe() lets you call any function as part of a chain, even when the function takes arguments that don’t fit neatly into method syntax:

def parse_dates(df, col):
    df[col] = pd.to_datetime(df[col], errors="coerce")
    return df

def drop_empty_strings(df, col):
    df[col] = df[col].replace(r"^\s*$", pd.NA, regex=True)
    return df

df = (
    pd.read_csv("sales.csv")
    .pipe(parse_dates, "date")
    .pipe(drop_empty_strings, "notes")
    .pipe(filter_outliers, "revenue", z_threshold=3)
)

pipe(fn, arg1, arg2) calls fn(df, arg1, arg2). It forwards the DataFrame as the first argument, so any function can slot into the chain.

Building Reusable Transformers

Wrap transformations in functions that return a function — a factory pattern that gives you reusable, configurable pipeline steps:

def clean_currency(col):
    """Return a transformer that converts a currency column to float."""
    def transform(df):
        df[col] = (
            df[col]
            .str.replace("$", "", regex=False)
            .str.replace(",", "", regex=False)
            .str.strip()
            .pipe(pd.to_numeric, errors="coerce")
        )
        return df
    return transform

def rename_columns(mapping):
    """Return a transformer that renames columns using a dict."""
    def transform(df):
        return df.rename(columns=mapping)
    return transform

Each factory returns a transformer you pass directly to pipe().

A Complete Pipeline Example

Here’s a real pipeline that loads, cleans, and validates a sales dataset:

import pandas as pd
import numpy as np

class ColumnTransformer:
    def __init__(self, col, fn):
        self.col = col
        self.fn = fn

    def __call__(self, df):
        df[self.col] = self.fn(df[self.col])
        return df

def clean_currency(col):
    def fn(series):
        return (
            series
            .astype(str)
            .str.replace(r"[\$,]", "", regex=True)
            .str.strip()
            .pipe(pd.to_numeric, errors="coerce")
        )
    return ColumnTransformer(col, fn)

def to_datetime(col, format="%Y-%m-%d"):
    def fn(series):
        return pd.to_datetime(series, format=format, errors="coerce")
    return ColumnTransformer(col, fn)

def filter_negative(df, col):
    return df[df[col] > 0].copy()

def ensure_types(df, schema):
    for col, dtype in schema.items():
        if col in df.columns:
            try:
                df[col] = df[col].astype(dtype)
            except (ValueError, TypeError):
                pass
    return df

schema = {"customer_id": "int64", "quantity": "int64", "revenue": "float64"}

df = (
    pd.read_csv("sales.csv")
    .pipe(to_datetime("date"))
    .pipe(clean_currency("revenue"))
    .pipe(clean_currency("quantity"))
    .pipe(lambda d: filter_negative(d, "revenue"))
    .pipe(lambda d: ensure_types(d, schema))
    .reset_index(drop=True)
)

The pipeline is self-documenting: you can read the chain top-to-bottom and see exactly what transforms are applied, in what order. Adding a new step means appending one .pipe() call.

Testing Individual Steps

The big win with pipelines is testing. Each step is a function that takes a DataFrame and returns a DataFrame:

import pytest

def test_clean_currency():
    df = pd.DataFrame({"price": ["$1.23", "$4.56", "invalid"]})
    result = df.pipe(clean_currency("price"))
    assert result["price"].iloc[0] == 1.23
    assert result["price"].iloc[1] == 4.56
    assert pd.isna(result["price"].iloc[2])

def test_filter_negative():
    df = pd.DataFrame({"revenue": [10, -5, 20]})
    result = df.pipe(lambda d: filter_negative(d, "revenue"))
    assert len(result) == 2
    assert all(result["revenue"] > 0)

Test each transformer in isolation. When a pipeline breaks, run the failing test against the step that failed and fix it directly.

Handling Missing Data in Pipelines

Missing values are common. Handle them in the pipeline with explicit steps:

def fill_missing(df, col, fill_value):
    df[col] = df[col].fillna(fill_value)
    return df

def interpolate_missing(df, col, method="linear"):
    df[col] = df[col].interpolate(method=method)
    return df

df = (
    pd.read_csv("sales.csv")
    .pipe(clean_currency("revenue"))
    .pipe(lambda d: d.dropna(subset=["customer_id"]))  # drop rows missing key fields
    .pipe(fill_missing, "discount", 0)                  # fill numeric with 0
    .pipe(interpolate_missing, "quantity")               # fill with linear interpolation
)

Drop rows that are missing critical identifiers. Fill numeric columns with defaults. Interpolate time-series columns to maintain continuity.

Logging and Debugging

When a pipeline fails, you need to know where. Add logging between steps:

import logging

def log_step(name):
    def fn(df):
        print(f"[{name}] rows={len(df)}, cols={list(df.columns)}")
        return df
    return fn

df = (
    pd.read_csv("sales.csv")
    .pipe(log_step("load"))
    .pipe(to_date("date"))
    .pipe(log_step("dates parsed"))
    .pipe(clean_currency("revenue"))
    .pipe(log_step("currency cleaned"))
    .pipe(lambda d: filter_negative(d, "revenue"))
    .pipe(log_step("filtered"))
)

log_step() prints the step name, row count, and column names so you can pinpoint where the data changes unexpectedly.

Persisting the Output

At the end of the pipeline, write to the format your downstream system expects:

# CSV for tabular workflows
df.to_csv("sales_clean.csv", index=False)

# Parquet for analytics (fast reads, schema preserved)
df.to_parquet("sales_clean.parquet")

# JSON for APIs
df.to_json("sales_clean.json", orient="records", lines=True)

Parquet is usually the right choice for analytics pipelines — it’s compressed, supports schema evolution, and pandas reads it much faster than CSV for large files.

See Also