10 Useful Izgar One Liner for Data Engineering

by SkillAiNest

10 Useful Izgar One Liner for Data Engineering10 Useful Izgar One Liner for Data Engineering
Photo by Editor | Chat GPT

. Introduction

Data engineering includes action on major datases, building ETL pipelines, and maintaining data quality. Data engineers work with streaming data, monitor the performance of the system, handle schema changes, and ensure data consistency in the distributed systems.

The one -liner can help facilitate complex tasks by thicking them into single -read statements. This article focuses on practical one -liners that solve ordinary data engineering issues.

The one -liners presented here indicate real tasks such as action on event data with different structures, analysis of system logs for performance issues, handling API response with different schemes and implementing data quality examination. Let’s start.

🔗 🔗 Link from the code on the Gut Hub

. Sample data

Let’s spin some sample data to run your one -liners:

import pandas as pd
import numpy as np
import json
from datetime import datetime, timedelta

# Create streaming event data
np.random.seed(42)
events = ()
for i in range(1000):
    properties = {
        'device_type': np.random.choice(('mobile', 'desktop', 'tablet')),
        'page_path': np.random.choice(('/home', '/products', '/checkout')),
        'session_length': np.random.randint(60, 3600)
    }
    if np.random.random() > 0.7:
        properties('purchase_value') = round(np.random.uniform(20, 300), 2)

    event = {
        'event_id': f'evt_{i}',
        'timestamp': (datetime.now() - timedelta(hours=np.random.randint(0, 72))).isoformat(),
        'user_id': f'user_{np.random.randint(100, 999)}',
        'event_type': np.random.choice(('view', 'click', 'purchase')),
        'metadata': json.dumps(properties)
    }
    events.append(event)

# Create database performance logs
db_logs = pd.DataFrame({
    'timestamp': pd.date_range('2024-01-01', periods=5000, freq='1min'),
    'operation': np.random.choice(('SELECT', 'INSERT', 'UPDATE'), 5000, p=(0.7, 0.2, 0.1)),
    'duration_ms': np.random.lognormal(mean=4, sigma=1, size=5000),
    'table_name': np.random.choice(('users', 'orders', 'products'), 5000),
    'rows_processed': np.random.poisson(lam=25, size=5000),
    'connection_id': np.random.randint(1, 20, 5000)
})

# Create API log data
api_logs = ()
for i in range(800):
    log_entry = {
        'timestamp': datetime.now() - timedelta(minutes=np.random.randint(0, 1440)),
        'endpoint': np.random.choice(('/api/users', '/api/orders', '/api/metrics')),
        'status_code': np.random.choice((200, 400, 500), p=(0.8, 0.15, 0.05)),
        'response_time': np.random.exponential(150)
    }
    if log_entry('status_code') == 200:
        log_entry('payload_size') = np.random.randint(100, 5000)
    api_logs.append(log_entry)

. 1. JSON fields extract in data frame columns

Convert JSO JSON MATA Fields to Data Frame Columns from Event Log.

events_df = pd.DataFrame(({**event, **json.loads(event('metadata'))} for event in events)).drop('metadata', axis=1)

This One Liner uses the list with the opening of the dictionary to integrate the base fields of each program with its Parson JSON metadata. drop() Removes the original metadata The columns are now in separate columns.

Output:
Extract- JSON-2-ColsExtract- JSON-2-Cols

This produces data frame with 1000 rows and 8 columns, where JSON Fields likes device_type And purchase_value Be individual columns that can be directly questioned and collected.

. 2. To indicate out of performance through the type of operation

Find a database operations that take extraordinary long time in comparison to similar operations.

outliers = db_logs.groupby('operation').apply(lambda x: x(x('duration_ms') > x('duration_ms').quantile(0.95))).reset_index(drop=True)

This group logs through the database operation type, then filters each group for more records than the 95th perception period.

Small Output:

OutdoesOutdoes

It returns about 250 250 out -of -layer operations (5 % of a total of 5000), where each operation has slowly performed more than 95 so much of similar operations.

. 3. Accounting the average response times for API’s closing locations

Monitor performance trends over time for various API closing points using Sliding Windows.

api_response_trends = pd.DataFrame(api_logs).set_index('timestamp').sort_index().groupby('endpoint')('response_time').rolling('1H').mean().reset_index()

This API Log changes the data frame, set timestamp As an index for time -based operations, and historically set up to ensure a monopoly sequence. Then there are groups through endpoint And in response times the rolling applies the 1 -hour window.

Within each sliding window, mean() The function calculates the average response time. The rolling window moves over time, which provides the performance trend rather than isolated measurement.

Small Output:
Rolling-egRolling-eg

We find the reaction time trends, showing how each API end -point performance changes over time with values ​​in mills over time. High values ​​indicate slow performance.

. 4. Finding scheme changes in event figures

Identify that when new fields appear in event metal data that were not present in previous events.

schema_evolution = pd.DataFrame(({k: type(v).__name__ for k, v in json.loads(event('metadata')).items()} for event in events)).fillna('missing').nunique()

It analyzes JSON metadata from each program and names the name of a dictionary map -making field for their remedy names. type(v).__name__.

The resulting data frame is a line per event and is found in all events in a unique field per column. fillna('missing') Handle the events that do not have fields, and nunique() How many different values ​​(including missing) Appear in every column.

Output:

device_type       1
page_path         1
session_length    1
purchase_value    2
dtype: int64

. 5. Multi -level database connection to collect performance

Make summary data group through operation type and connection to monitor resources.

connection_perf = db_logs.groupby(('operation', 'connection_id')).agg({'duration_ms': ('mean', 'count'), 'rows_processed': ('sum', 'mean')}).round(2)

It logs into groups through the type of operation and connection ID, which creates a rating analysis of how different contacts handle different tasks.

agg() Function applies multiple collecting functions: mean And count For the period to reveal both the average performance and the frequency of inquiry, while sum And mean For rows_processed Show the thropped pattern. round(2) Reading ensures healthy health.

Output:
OverallOverall

This produces a multi -indexed data frame, showing how different operations each connection conducts.

. 6. Creating samples of distribution of event type per hour

Calculate the distribution samples of the event in different hours to understand the customer behavior cycle.

hourly_patterns = pd.DataFrame(events).assign(hour=lambda x: pd.to_datetime(x('timestamp')).dt.hour).groupby(('hour', 'event_type')).size().unstack(fill_value=0).div(pd.DataFrame(events).assign(hour=lambda x: pd.to_datetime(x('timestamp')).dt.hour).groupby('hour').size(), axis=0).round(3)

It pulls the hour from the timstamps using assign() And Lambda, then creates a cross tablet of hours than the types of event groupby And unstack.

div() The operation makes the usual routine through total events per hour to display proportional distribution rather than raw counting.

Small Output:
HourHour

Loots the matrix that shows the type of proportion of each event (viewFor, for, for,. clickFor, for, for,. purchase) For every hour of the day, by displaying different actions of the USer user behavior and peak activity.

. 7. Calculate API error rate summarizing by the status code

In all closing points, analyze the error distribution samples and monitor API health.

error_breakdown = pd.DataFrame(api_logs).groupby(('endpoint', 'status_code')).size().unstack(fill_value=0).div(pd.DataFrame(api_logs).groupby('endpoint').size(), axis=0).round(3)

These groups are API Login by Both endpoint And status_codeThen use size() To count the events and unstack() Code of axis in columns. div() The operation makes the usual routine according to the total requests to show the proportion rather than the raw count, which reveals which of the most fault rates in the closing places and what kind of errors they make.

Output:

status_code     200    400    500
endpoint                         
/api/metrics  0.789  0.151  0.060
/api/orders   0.827  0.140  0.033
/api/users    0.772  0.167  0.061

For each point, each status code (200, 400, 500) shows a matrix, which makes it easier to find the end locations of the hassle and whether they are failing with client errors (4xx) or server errors (5xx).

. 8. Sliding window implemented the detection of irregularities

Explore extraordinary patterns by comparing the current performance with recent historical performance.

anomaly_flags = db_logs.sort_values('timestamp').assign(rolling_mean=lambda x: x('duration_ms').rolling(window=100, min_periods=10).mean()).assign(is_anomaly=lambda x: x('duration_ms') > 2 * x('rolling_mean'))

According to the date of calculation, the last 100 calculates a rolling meaning of the use of operations rolling()Then in the flag operations where the current duration is twice as much of the rolling average. min_periods=10 The calculation begins only after enough data is available.

Small Output:
Sliding winsSliding wins

Each database adds flags of irregularities to operation, indicating operations that are extraordinarily slower than recent performance rather than use of stable thresholds.

. 9. Improved data types from memory

Improve the use of data frame memory by reducing the numeric types into small potential representations.

optimized_df = db_logs.assign(**{c: (pd.to_numeric(db_logs(c), downcast="integer") if pd.api.types.is_integer_dtype(db_logs(c)) else pd.to_numeric(db_logs(c), downcast="float")) for c in db_logs.select_dtypes(include=('int', 'float')).columns})

It only chooses numeric columns and changes them actually db_logs With the used version pd.to_numeric(). For the Integer Columns, it tries int8For, for, for,. int16And int32 Before staying int64. For float columns, it tries float32 Ago float64.

Doing so reduces the use of memory for large datases.

. 10. Accounting the event processing matrix per hour

Monitor streaming pipeline health by tracking the event volume and user engagement samples.

pipeline_metrics = pd.DataFrame(events).assign(hour=lambda x: pd.to_datetime(x('timestamp')).dt.hour).groupby('hour').agg({'event_id': 'count', 'user_id': 'nunique', 'event_type': lambda x: (x == 'purchase').mean()}).rename(columns={'event_id': 'total_events', 'user_id': 'unique_users', 'event_type': 'purchase_rate'}).round(3)

It removes the events of the time stamps and groups for an hour an hour, then calculates the three main matrix: using the total event counting count()Unique users use nunique()And buy conversion rates using Lambida, which calculates the proportion of purchase events. rename() The method provides the name of the descriptive column for the final output.

Output:
Event Prok outputEvent Prok output

This shows the volume of the event, the level of the user’s engagement, and the measuring hour to identify the conversion rate throughout the day.

. Wrap

These one liner are useful for data engineering tasks. They combine pandas operations, statistical analysis, and data change techniques to effectively dealt with real -world scenarios.

While maintaining the basic logic, each sample can be adapted and expanded on the basis of specific requirements that make them the effective efficiency of the use of production.

Happy coding!

Pray Ca Is a developer and technical author from India. She likes to work at the intersection of mathematics, programming, data science, and content creation. The fields of interest and expertise include dupas, data science, and natural language processing. She enjoys reading, writing, coding and coffee! Currently, they are working with the developer community to learn and share their knowledge with the developer community by writing a lesson, how to guide, feed and more. The above resources review and coding also engages lessons.

You may also like

Leave a Comment

At Skillainest, we believe the future belongs to those who embrace AI, upgrade their skills, and stay ahead of the curve.

Get latest news

Subscribe my Newsletter for new blog posts, tips & new photos. Let's stay updated!

@2025 Skillainest.Designed and Developed by Pro