Data Transforms API¶
Functions for cleaning, filling, and resampling market data.
Overview¶
qldata provides two ways to use transforms:
- Fluent API - Chain transforms in queries
- Standalone functions - Apply to any DataFrame
import qldata as qd
# Fluent API (recommended)
df = qd.data("BTCUSDT", source="binance") \
.last(30) \
.resolution("1h") \
.clean() \
.fill_forward() \
.resample("4h") \
.get()
# Standalone functions
from qldata import remove_duplicates, fill_forward, resample
df = remove_duplicates(raw_df)
df = fill_forward(df)
df = resample(df, "4h")
Cleaning Functions¶
clean()¶
Apply adaptive data cleaning based on data type.
# Basic cleaning
.clean()
# With options
.clean(
remove_invalid_prices=True,
validate_ohlc=True,
remove_outliers=True
)
Cleaning Steps:
- Sort by timestamp index
- Remove duplicate timestamps
- Drop rows with NaN in core columns (adaptive)
- Optional: Remove invalid prices
- Optional: Validate OHLC relationships
- Optional: Remove statistical outliers
remove_duplicates(df)¶
Remove rows with duplicate timestamps.
from qldata import remove_duplicates
# Keep first occurrence
df = remove_duplicates(raw_df)
# Keep last occurrence
df = remove_duplicates(raw_df, keep="last")
Remove duplicate timestamps.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data | DataFrame | DataFrame with timestamp index | required |
keep | str | Which duplicate to keep ('first', 'last', or 'mean') | 'last' |
Returns:
| Type | Description |
|---|---|
DataFrame | DataFrame with duplicates removed |
Example
data = pd.DataFrame({ ... 'timestamp': ['2024-01-01', '2024-01-01', '2024-01-02'], ... 'close': [100, 101, 102] ... }).set_index('timestamp') clean = remove_duplicates(data, keep='last') len(clean) 2
remove_invalid_prices(df)¶
Remove rows with zero or negative prices.
from qldata import remove_invalid_prices
df = remove_invalid_prices(raw_df)
# Specify price columns
df = remove_invalid_prices(raw_df, columns=["open", "high", "low", "close"])
Remove rows with zero or negative prices.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data | DataFrame | DataFrame with price data | required |
price_columns | list[str] | None | Columns to check (auto-detects OHLC if None) | None |
Returns:
| Type | Description |
|---|---|
DataFrame | DataFrame with invalid prices removed |
Example
df = pd.DataFrame({ ... 'open': [100, 0, -5, 102], ... 'high': [101, 101, 104, 103], ... 'close': [100.5, 100, 103, 102.5] ... }) clean = remove_invalid_prices(df) len(clean) 2
remove_outliers(df)¶
Remove statistical outliers using z-score.
from qldata import remove_outliers
# Default: Remove > 3 standard deviations
df = remove_outliers(raw_df)
# Custom threshold
df = remove_outliers(raw_df, sigma=2.5)
# Specific columns only
df = remove_outliers(raw_df, columns=["close", "volume"])
Remove outliers from data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data | DataFrame | DataFrame with data | required |
columns | list[str] | str | None | Columns to check (default: auto-detect all numeric columns) | None |
method | str | Detection method ('zscore' or 'iqr') | 'iqr' |
**kwargs | Any | Additional arguments for detection method | {} |
Returns:
| Type | Description |
|---|---|
DataFrame | DataFrame with outliers removed |
Example
data = pd.DataFrame({ ... 'close': [100, 101, 102, 200, 103, 104], ... 'volume': [1000, 1100, 1050, 1200, 1150, 1080] ... }) clean = remove_outliers(data, columns='close', method='iqr') len(clean) 5
Fill Functions¶
fill_forward(df)¶
Forward fill missing values (use last known value).
from qldata import fill_forward
df = fill_forward(raw_df)
# Or in fluent API
df = qd.data(...).clean().fill_forward().get()
Forward fill missing values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data | DataFrame | DataFrame with data | required |
columns | list[str] | None | Columns to fill (default: all columns) | None |
Returns:
| Type | Description |
|---|---|
DataFrame | DataFrame with gaps filled |
fill_backward(df)¶
Backward fill missing values (use next known value).
Backward fill missing values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data | DataFrame | DataFrame with data | required |
columns | list[str] | None | Columns to fill (default: all columns) | None |
Returns:
| Type | Description |
|---|---|
DataFrame | DataFrame with gaps filled |
fill_interpolate(df)¶
Fill missing values using interpolation.
from qldata import fill_interpolate
# Linear interpolation (default)
df = fill_interpolate(raw_df)
# Time-weighted interpolation
df = fill_interpolate(raw_df, method="time")
# Polynomial interpolation
df = fill_interpolate(raw_df, method="polynomial", order=2)
Interpolate missing values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data | DataFrame | DataFrame with data | required |
columns | list[str] | None | Columns to fill (default: numeric columns) | None |
method | str | Interpolation method (default: 'linear') | 'linear' |
Returns:
| Type | Description |
|---|---|
DataFrame | DataFrame with gaps filled |
Interpolation Methods:
| Method | Description |
|---|---|
"linear" | Linear interpolation between points |
"time" | Time-weighted linear interpolation |
"index" | Index-based interpolation |
"polynomial" | Polynomial interpolation (requires order) |
"spline" | Spline interpolation (requires order) |
Resample Functions¶
resample(df, timeframe)¶
Resample bars to a different timeframe.
from qldata import resample
# Resample 1m to 1h
hourly = resample(minute_df, "1h")
# Resample 1h to 4h
four_hour = resample(hourly_df, "4h")
Resampling transforms.
aggregate_bars ¶
Aggregate multiple bars into a single bar.
Used for resampling bars to higher timeframes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bars | DataFrame | DataFrame with OHLCV bar data | required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | Dictionary with aggregated OHLCV values |
aggregate_ohlcv ¶
Aggregate ticks into OHLCV values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ticks | DataFrame | DataFrame with tick data (must have 'price' and 'volume' columns) | required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | Dictionary with OHLCV values |
Example
ticks = pd.DataFrame({'price': [100, 101, 99, 102], 'volume': [1000, 2000, 1500, 1000]}) ohlcv = aggregate_ohlcv(ticks) ohlcv['open'], ohlcv['high'], ohlcv['low'], ohlcv['close'] (100, 102, 99, 102)
aggregate_vwap ¶
Calculate volume-weighted average price.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ticks | DataFrame | DataFrame with tick data (must have 'price' and 'volume' columns) | required |
Returns:
| Type | Description |
|---|---|
float | VWAP value |
Example
ticks = pd.DataFrame({'price': [100, 101, 99], 'volume': [1000, 2000, 1500]}) vwap = aggregate_vwap(ticks) round(vwap, 2) 100.11
resample_bars ¶
Resample bars to different timeframe.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bars | DataFrame | DataFrame with OHLCV bar data (indexed by timestamp) | required |
from_timeframe | Timeframe | Current timeframe | required |
to_timeframe | Timeframe | Target timeframe | required |
Returns:
| Type | Description |
|---|---|
DataFrame | DataFrame with resampled OHLCV data |
Raises:
| Type | Description |
|---|---|
ValueError | If trying to resample to lower timeframe |
Example
bars_1m = pd.DataFrame({ ... 'timestamp': pd.date_range('2024-01-01', periods=60, freq='1min'), ... 'open': [100 + i*0.1 for i in range(60)], ... 'high': [100 + i*0.1 + 0.5 for i in range(60)], ... 'low': [100 + i*0.1 - 0.3 for i in range(60)], ... 'close': [100 + i*0.1 + 0.2 for i in range(60)], ... 'volume': [1000] * 60 ... }).set_index('timestamp') bars_1h = resample_bars(bars_1m, Timeframe.MIN_1, Timeframe.HOUR_1) len(bars_1h) 1
ticks_to_bars ¶
ticks_to_bars(
ticks: DataFrame,
timeframe: Timeframe,
symbol: str | None = None,
) -> DataFrame
Convert tick data to bars.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ticks | DataFrame | DataFrame with tick data (indexed by timestamp, with 'price' and 'volume') | required |
timeframe | Timeframe | Target bar timeframe | required |
symbol | str | None | Optional symbol (if not in DataFrame) | None |
Returns:
| Type | Description |
|---|---|
DataFrame | DataFrame with OHLCV bar data |
Example
ticks = pd.DataFrame({ ... 'timestamp': pd.date_range('2024-01-01', periods=1000, freq='1s'), ... 'price': [100 + i*0.01 for i in range(1000)], ... 'volume': [1000] * 1000 ... }).set_index('timestamp') bars = ticks_to_bars(ticks, Timeframe.MIN_1) len(bars) 17
Upsampling Not Supported
You can only resample to larger timeframes (1m → 1h). Resampling to smaller timeframes is not supported.
resample_bars(df, timeframe)¶
OHLCV-aware resampling with proper aggregation.
from qldata import resample_bars
# Properly aggregates OHLCV:
# - open: first value
# - high: max value
# - low: min value
# - close: last value
# - volume: sum
hourly = resample_bars(minute_df, "1h")
Resample bars to different timeframe.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bars | DataFrame | DataFrame with OHLCV bar data (indexed by timestamp) | required |
from_timeframe | Timeframe | Current timeframe | required |
to_timeframe | Timeframe | Target timeframe | required |
Returns:
| Type | Description |
|---|---|
DataFrame | DataFrame with resampled OHLCV data |
Raises:
| Type | Description |
|---|---|
ValueError | If trying to resample to lower timeframe |
Example
bars_1m = pd.DataFrame({ ... 'timestamp': pd.date_range('2024-01-01', periods=60, freq='1min'), ... 'open': [100 + i*0.1 for i in range(60)], ... 'high': [100 + i*0.1 + 0.5 for i in range(60)], ... 'low': [100 + i*0.1 - 0.3 for i in range(60)], ... 'close': [100 + i*0.1 + 0.2 for i in range(60)], ... 'volume': [1000] * 60 ... }).set_index('timestamp') bars_1h = resample_bars(bars_1m, Timeframe.MIN_1, Timeframe.HOUR_1) len(bars_1h) 1
ticks_to_bars(df, timeframe)¶
Convert tick/trade data to OHLCV bars.
from qldata import ticks_to_bars
# Convert ticks to 1-minute bars
bars = ticks_to_bars(tick_df, "1m")
# Convert to hourly
bars = ticks_to_bars(tick_df, "1h")
Convert tick data to bars.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ticks | DataFrame | DataFrame with tick data (indexed by timestamp, with 'price' and 'volume') | required |
timeframe | Timeframe | Target bar timeframe | required |
symbol | str | None | Optional symbol (if not in DataFrame) | None |
Returns:
| Type | Description |
|---|---|
DataFrame | DataFrame with OHLCV bar data |
Example
ticks = pd.DataFrame({ ... 'timestamp': pd.date_range('2024-01-01', periods=1000, freq='1s'), ... 'price': [100 + i*0.01 for i in range(1000)], ... 'volume': [1000] * 1000 ... }).set_index('timestamp') bars = ticks_to_bars(ticks, Timeframe.MIN_1) len(bars) 17
Transform Pipeline¶
For complex transform workflows, use TransformPipeline:
from qldata import TransformPipeline, remove_duplicates, remove_outliers, fill_forward
# Create a reusable pipeline
pipeline = TransformPipeline() \
.add(remove_duplicates) \
.add(remove_outliers, sigma=3) \
.add(fill_forward)
# Apply to any DataFrame
clean_df = pipeline.apply(raw_df)
# Apply to multiple DataFrames
for symbol, df in data.items():
data[symbol] = pipeline.apply(df)
TransformPipeline ¶
Pipeline for composing multiple data transforms.
Allows chaining transforms and applying them in sequence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | Pipeline name for logging | 'pipeline' |
add ¶
add(
transform: Callable[[DataFrame], DataFrame],
name: str | None = None,
) -> TransformPipeline
Add a transform to the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
transform | Callable[[DataFrame], DataFrame] | Function that takes DataFrame and returns DataFrame | required |
name | str | None | Optional name for logging | None |
Returns:
| Type | Description |
|---|---|
TransformPipeline | Self for method chaining |
Examples¶
Example 1: Basic Cleaning Pipeline¶
import qldata as qd
# Fetch and clean data
df = qd.data("BTCUSDT", source="binance") \
.last(30) \
.resolution("1h") \
.clean() \
.get()
print(f"Cleaned data: {len(df)} bars")
print(f"Missing values: {df.isna().sum().sum()}")
Example 2: Aggressive Cleaning¶
import qldata as qd
# Production-grade cleaning
df = qd.data("BTCUSDT", source="binance") \
.last(30) \
.resolution("1h") \
.clean(
remove_invalid_prices=True, # Remove zero/negative
validate_ohlc=True, # Check high >= low, etc.
remove_outliers=True # Remove > 3σ outliers
) \
.fill_forward() \
.get()
# Verify data quality
assert (df[["open", "high", "low", "close"]] > 0).all().all()
assert (df["high"] >= df["low"]).all()
assert df.isna().sum().sum() == 0
Example 3: Minute to Hourly Resampling¶
import qldata as qd
# Fetch 1-minute data
minute_df = qd.data("BTCUSDT", source="binance") \
.last(1, "days") \
.resolution("1m") \
.clean() \
.get()
print(f"1-minute bars: {len(minute_df)}")
# Resample to hourly
hourly_df = qd.data("BTCUSDT", source="binance") \
.last(1, "days") \
.resolution("1m") \
.clean() \
.resample("1h") \
.get()
print(f"Hourly bars: {len(hourly_df)}")
Example 4: Custom Transform Pipeline¶
from qldata import TransformPipeline, remove_duplicates, fill_forward
import pandas as pd
def custom_smooth(df: pd.DataFrame, window: int = 5) -> pd.DataFrame:
"""Custom smoothing transform."""
df = df.copy()
df["close_smooth"] = df["close"].rolling(window).mean()
return df
# Build pipeline with custom transform
pipeline = TransformPipeline() \
.add(remove_duplicates) \
.add(fill_forward) \
.add(custom_smooth, window=10)
# Apply to data
import qldata as qd
raw_df = qd.data("BTCUSDT", source="binance").last(30).resolution("1h").get()
smooth_df = pipeline.apply(raw_df)
print(f"Added column: close_smooth")
print(smooth_df[["close", "close_smooth"]].head())
Example 5: Multi-Symbol Cleaning¶
import qldata as qd
from qldata import TransformPipeline, remove_duplicates, remove_outliers, fill_forward
# Create reusable pipeline
pipeline = TransformPipeline() \
.add(remove_duplicates) \
.add(remove_outliers, sigma=3) \
.add(fill_forward)
# Fetch multiple symbols
symbols = ["BTCUSDT", "ETHUSDT", "SOLUSDT"]
data = qd.data(symbols, source="binance") \
.last(30) \
.resolution("1h") \
.get(parallel=True)
# Clean each symbol
for symbol in data:
data[symbol] = pipeline.apply(data[symbol])
print(f"{symbol}: {len(data[symbol])} bars, no NaN: {data[symbol].isna().sum().sum() == 0}")
Best Practices¶
1. Order Matters¶
Apply transforms in the right order:
# ✓ Good: Clean → Fill → Resample
.clean().fill_forward().resample("1h")
# ✗ Bad: Resample before cleaning may spread bad data
.resample("1h").clean().fill_forward()
2. Always Clean Production Data¶
3. Validate After Transforms¶
# Sanity check after cleaning
assert df.isna().sum().sum() == 0, "Unexpected NaN values"
assert (df["close"] > 0).all(), "Invalid prices found"
4. Use Pipelines for Consistency¶
# Define once, use everywhere
PRODUCTION_PIPELINE = TransformPipeline() \
.add(remove_duplicates) \
.add(remove_outliers) \
.add(fill_forward)
# Consistent cleaning across all data
df1 = PRODUCTION_PIPELINE.apply(raw_df1)
df2 = PRODUCTION_PIPELINE.apply(raw_df2)
See Also¶
- Historical Data - Using transforms with queries
- Data Validation - Validation rules
- Cookbook - Real-world examples