

Photo by Author | Ideogram
The data is dirty. So when you are pulling information from APIS, real -world datases, and so on, you will inevitably go into duplicate, lost values ​​and wrong entries. Instead of repeatedly writing the same cleaning code, a well -designed pipeline saves time and ensures consistency in your data science projects.
In this article, we will re -use data cleaning and verification pipeline that handles ordinary data quality issues while providing detailed feedback. Finally, you will have a tool that can clear the datases and verify them against business rules in just a few lines of the code.
🔗 🔗 Link from the code on the Gut Hub
Why the data cleaning pipelines?
Think about data pipelines such as assembly lines in manufacturing. Each step performs a specific function, and one step output becomes input for the next. This approach makes your code more capable, checkable and reusable in various projects.


A simple data cleansing pipeline
Photo by Author | Diagrams. Net (Draw.u)
Our pipeline will handle three basic responsibilities:
- Cleaning: Remove copies and handle the lost values ​​(use it as a starting point. You can add maximum cleaning steps as needed.)
- Verification: Make sure the data meets business rules and obstacles
- Reporting: Track what changes made during processing
To compile a development environment
Please make sure you are using the recent version of the Uzar. If use locally, create a virtual environment and install the desired packages:
If you prefer you can also use Google Kolab or similar notebook environments.
Explanation of verification scheme
Before we can verify the data, we need to explain how “correct”. We will use a Padintic, an aggregate library that uses types of indicators to correct data types.
class DataValidator(BaseModel):
name: str
age: Optional(int) = None
email: Optional(str) = None
salary: Optional(float) = None
@field_validator('age')
@classmethod
def validate_age(cls, v):
if v is not None and (v < 0 or v > 100):
raise ValueError('Age must be between 0 and 100')
return v
@field_validator('email')
@classmethod
def validate_email(cls, v):
if v and '@' not in v:
raise ValueError('Invalid email format')
return v
This scheme creates a model of expected data using a pedantic syntax. To use @field_validator
Decorator, you will need @classmethod
The decoration is ensuring the logic of verification that the age comes within the limits and the emails contain ‘@’.
Construction of pipeline class
Our main pipeline class adds all cleaning and verification logic:
class DataPipeline:
def __init__(self):
self.cleaning_stats = {'duplicates_removed': 0, 'nulls_handled': 0, 'validation_errors': 0}
Constructors launch a data dictionary to track changes made during processing. This helps look closely at the quality of the data and also help keep an eye on the cleaning stages over time.
Write data cleaning logic
Let’s add a clean_data
How to handle normal data quality issues such as lost values ​​and duplicate records:
def clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
initial_rows = len(df)
# Remove duplicates
df = df.drop_duplicates()
self.cleaning_stats('duplicates_removed') = initial_rows - len(df)
# Handle missing values
numeric_columns = df.select_dtypes(include=(np.number)).columns
df(numeric_columns) = df(numeric_columns).fillna(df(numeric_columns).median())
string_columns = df.select_dtypes(include=('object')).columns
df(string_columns) = df(string_columns).fillna('Unknown')
This approach is smart about handling different data types. Numeric lost values ​​are filled with median (stronger against outsiders), while text columns get the price of a place holder. Before removing the duplicate, our middle calculation is to avoid.
Adding verification with error tracking
The verification phase adheres to each row individually, both accurate data and detailed information of the error.
def validate_data(self, df: pd.DataFrame) -> pd.DataFrame:
valid_rows = ()
errors = ()
for idx, row in df.iterrows():
try:
validated_row = DataValidator(**row.to_dict())
valid_rows.append(validated_row.model_dump())
except ValidationError as e:
errors.append({'row': idx, 'errors': str(e)})
self.cleaning_stats('validation_errors') = len(errors)
return pd.DataFrame(valid_rows), errors
This row -by -line approach ensures that a bad record does not crash the entire pipeline. The right rows continue through this process while errors are made to review. This is important in a productive environment where you need to flag while what you can do during the flag.
Architerating the pipeline
process
The method connects everything together:
def process(self, df: pd.DataFrame) -> Dict(str, Any):
cleaned_df = self.clean_data(df.copy())
validated_df, validation_errors = self.validate_data(cleaned_df)
return {
'cleaned_data': validated_df,
'validation_errors': validation_errors,
'stats': self.cleaning_stats
}
The return price is a comprehensive report that includes cleared data, any verification errors, and processing data.
To keep it all together
This is how you will use the pipeline practically:
# Create sample messy data
sample_data = pd.DataFrame({
'name': ('Tara Jamison', 'Jane Smith', 'Lucy Lee', None, 'Clara Clark','Jane Smith'),
'age': (25, -5, 25, 35, 150,-5),
'email': ('taraj@email.com', 'invalid-email', 'lucy@email.com', 'jane@email.com', 'clara@email.com','invalid-email'),
'salary': (50000, 60000, 50000, None, 75000,60000)
})
pipeline = DataPipeline()
result = pipeline.process(sample_data)
The pipeline automatically removes the duplicate record, filling the lost name with ‘unknown’, filling the missing salary with a medium price, and the flag verification errors for negative age and incorrect email.
🔗 🔗 You can find Full script on Gut Hub.
Growing the pipeline
It works as a pipeline foundation you can build. Consider these increases of your specific needs:
Customs cleaning rules: Add phone numbers or addresses such as domain cleaning methods.
Validized verification: Enable to set up pydantic schemes so that the same pipeline can handle different data types.
Deal with advanced error: Implement automatic correction for temporary errors or automatic correction for normal errors.
Performance correctionConsider the use of large datases, vecturized operations or parallel processing.
Wrap
Data pipelines are not just about cleaning individual datases. They are about the construction of a reliable, maintaining system.
This pipeline approach ensures consistency in your projects and makes it easy to adjust the business rules as well as change the requirements. Start with this basic pipeline, then customize it for your specific needs.
There is a reliable, re -usable system of the key that handles work around the world so that you can focus on removing insights from clean data. Happy Data Cleaning!
Pray Ca Is a developer and technical author from India. She likes to work at the intersection of mathematics, programming, data science, and content creation. The fields of interest and expertise include dupas, data science, and natural language processing. She enjoys reading, writing, coding and coffee! Currently, they are working with the developer community to learn and share their knowledge with the developer community by writing a lesson, how to guide, feed and more. The above resources review and coding also engages lessons.