Skip to content

Data Transforms API

Functions for cleaning, filling, and resampling market data.

Overview

qldata provides two ways to use transforms:

  1. Fluent API - Chain transforms in queries
  2. Standalone functions - Apply to any DataFrame
import qldata as qd

# Fluent API (recommended)
df = qd.data("BTCUSDT", source="binance") \
    .last(30) \
    .resolution("1h") \
    .clean() \
    .fill_forward() \
    .resample("4h") \
    .get()

# Standalone functions
from qldata import remove_duplicates, fill_forward, resample
df = remove_duplicates(raw_df)
df = fill_forward(df)
df = resample(df, "4h")

Cleaning Functions

clean()

Apply adaptive data cleaning based on data type.

# Basic cleaning
.clean()

# With options
.clean(
    remove_invalid_prices=True,
    validate_ohlc=True,
    remove_outliers=True
)

Cleaning Steps:

  1. Sort by timestamp index
  2. Remove duplicate timestamps
  3. Drop rows with NaN in core columns (adaptive)
  4. Optional: Remove invalid prices
  5. Optional: Validate OHLC relationships
  6. Optional: Remove statistical outliers

remove_duplicates(df)

Remove rows with duplicate timestamps.

from qldata import remove_duplicates

# Keep first occurrence
df = remove_duplicates(raw_df)

# Keep last occurrence
df = remove_duplicates(raw_df, keep="last")

Remove duplicate timestamps.

Parameters:

Name Type Description Default
data DataFrame

DataFrame with timestamp index

required
keep str

Which duplicate to keep ('first', 'last', or 'mean')

'last'

Returns:

Type Description
DataFrame

DataFrame with duplicates removed

Example

data = pd.DataFrame({ ... 'timestamp': ['2024-01-01', '2024-01-01', '2024-01-02'], ... 'close': [100, 101, 102] ... }).set_index('timestamp') clean = remove_duplicates(data, keep='last') len(clean) 2


remove_invalid_prices(df)

Remove rows with zero or negative prices.

from qldata import remove_invalid_prices

df = remove_invalid_prices(raw_df)

# Specify price columns
df = remove_invalid_prices(raw_df, columns=["open", "high", "low", "close"])

Remove rows with zero or negative prices.

Parameters:

Name Type Description Default
data DataFrame

DataFrame with price data

required
price_columns list[str] | None

Columns to check (auto-detects OHLC if None)

None

Returns:

Type Description
DataFrame

DataFrame with invalid prices removed

Example

df = pd.DataFrame({ ... 'open': [100, 0, -5, 102], ... 'high': [101, 101, 104, 103], ... 'close': [100.5, 100, 103, 102.5] ... }) clean = remove_invalid_prices(df) len(clean) 2


remove_outliers(df)

Remove statistical outliers using z-score.

from qldata import remove_outliers

# Default: Remove > 3 standard deviations
df = remove_outliers(raw_df)

# Custom threshold
df = remove_outliers(raw_df, sigma=2.5)

# Specific columns only
df = remove_outliers(raw_df, columns=["close", "volume"])

Remove outliers from data.

Parameters:

Name Type Description Default
data DataFrame

DataFrame with data

required
columns list[str] | str | None

Columns to check (default: auto-detect all numeric columns)

None
method str

Detection method ('zscore' or 'iqr')

'iqr'
**kwargs Any

Additional arguments for detection method

{}

Returns:

Type Description
DataFrame

DataFrame with outliers removed

Example

data = pd.DataFrame({ ... 'close': [100, 101, 102, 200, 103, 104], ... 'volume': [1000, 1100, 1050, 1200, 1150, 1080] ... }) clean = remove_outliers(data, columns='close', method='iqr') len(clean) 5


Fill Functions

fill_forward(df)

Forward fill missing values (use last known value).

from qldata import fill_forward

df = fill_forward(raw_df)

# Or in fluent API
df = qd.data(...).clean().fill_forward().get()

Forward fill missing values.

Parameters:

Name Type Description Default
data DataFrame

DataFrame with data

required
columns list[str] | None

Columns to fill (default: all columns)

None

Returns:

Type Description
DataFrame

DataFrame with gaps filled


fill_backward(df)

Backward fill missing values (use next known value).

from qldata import fill_backward

df = fill_backward(raw_df)

Backward fill missing values.

Parameters:

Name Type Description Default
data DataFrame

DataFrame with data

required
columns list[str] | None

Columns to fill (default: all columns)

None

Returns:

Type Description
DataFrame

DataFrame with gaps filled


fill_interpolate(df)

Fill missing values using interpolation.

from qldata import fill_interpolate

# Linear interpolation (default)
df = fill_interpolate(raw_df)

# Time-weighted interpolation
df = fill_interpolate(raw_df, method="time")

# Polynomial interpolation
df = fill_interpolate(raw_df, method="polynomial", order=2)

Interpolate missing values.

Parameters:

Name Type Description Default
data DataFrame

DataFrame with data

required
columns list[str] | None

Columns to fill (default: numeric columns)

None
method str

Interpolation method (default: 'linear')

'linear'

Returns:

Type Description
DataFrame

DataFrame with gaps filled

Interpolation Methods:

Method Description
"linear" Linear interpolation between points
"time" Time-weighted linear interpolation
"index" Index-based interpolation
"polynomial" Polynomial interpolation (requires order)
"spline" Spline interpolation (requires order)

Resample Functions

resample(df, timeframe)

Resample bars to a different timeframe.

from qldata import resample

# Resample 1m to 1h
hourly = resample(minute_df, "1h")

# Resample 1h to 4h
four_hour = resample(hourly_df, "4h")

Resampling transforms.

aggregate_bars

aggregate_bars(bars: DataFrame) -> dict[str, Any]

Aggregate multiple bars into a single bar.

Used for resampling bars to higher timeframes.

Parameters:

Name Type Description Default
bars DataFrame

DataFrame with OHLCV bar data

required

Returns:

Type Description
dict[str, Any]

Dictionary with aggregated OHLCV values

aggregate_ohlcv

aggregate_ohlcv(ticks: DataFrame) -> dict[str, Any]

Aggregate ticks into OHLCV values.

Parameters:

Name Type Description Default
ticks DataFrame

DataFrame with tick data (must have 'price' and 'volume' columns)

required

Returns:

Type Description
dict[str, Any]

Dictionary with OHLCV values

Example

ticks = pd.DataFrame({'price': [100, 101, 99, 102], 'volume': [1000, 2000, 1500, 1000]}) ohlcv = aggregate_ohlcv(ticks) ohlcv['open'], ohlcv['high'], ohlcv['low'], ohlcv['close'] (100, 102, 99, 102)

aggregate_vwap

aggregate_vwap(ticks: DataFrame) -> float

Calculate volume-weighted average price.

Parameters:

Name Type Description Default
ticks DataFrame

DataFrame with tick data (must have 'price' and 'volume' columns)

required

Returns:

Type Description
float

VWAP value

Example

ticks = pd.DataFrame({'price': [100, 101, 99], 'volume': [1000, 2000, 1500]}) vwap = aggregate_vwap(ticks) round(vwap, 2) 100.11

resample_bars

resample_bars(
    bars: DataFrame,
    from_timeframe: Timeframe,
    to_timeframe: Timeframe,
) -> DataFrame

Resample bars to different timeframe.

Parameters:

Name Type Description Default
bars DataFrame

DataFrame with OHLCV bar data (indexed by timestamp)

required
from_timeframe Timeframe

Current timeframe

required
to_timeframe Timeframe

Target timeframe

required

Returns:

Type Description
DataFrame

DataFrame with resampled OHLCV data

Raises:

Type Description
ValueError

If trying to resample to lower timeframe

Example

bars_1m = pd.DataFrame({ ... 'timestamp': pd.date_range('2024-01-01', periods=60, freq='1min'), ... 'open': [100 + i*0.1 for i in range(60)], ... 'high': [100 + i*0.1 + 0.5 for i in range(60)], ... 'low': [100 + i*0.1 - 0.3 for i in range(60)], ... 'close': [100 + i*0.1 + 0.2 for i in range(60)], ... 'volume': [1000] * 60 ... }).set_index('timestamp') bars_1h = resample_bars(bars_1m, Timeframe.MIN_1, Timeframe.HOUR_1) len(bars_1h) 1

ticks_to_bars

ticks_to_bars(
    ticks: DataFrame,
    timeframe: Timeframe,
    symbol: str | None = None,
) -> DataFrame

Convert tick data to bars.

Parameters:

Name Type Description Default
ticks DataFrame

DataFrame with tick data (indexed by timestamp, with 'price' and 'volume')

required
timeframe Timeframe

Target bar timeframe

required
symbol str | None

Optional symbol (if not in DataFrame)

None

Returns:

Type Description
DataFrame

DataFrame with OHLCV bar data

Example

ticks = pd.DataFrame({ ... 'timestamp': pd.date_range('2024-01-01', periods=1000, freq='1s'), ... 'price': [100 + i*0.01 for i in range(1000)], ... 'volume': [1000] * 1000 ... }).set_index('timestamp') bars = ticks_to_bars(ticks, Timeframe.MIN_1) len(bars) 17

Upsampling Not Supported

You can only resample to larger timeframes (1m → 1h). Resampling to smaller timeframes is not supported.


resample_bars(df, timeframe)

OHLCV-aware resampling with proper aggregation.

from qldata import resample_bars

# Properly aggregates OHLCV:
# - open: first value
# - high: max value
# - low: min value
# - close: last value
# - volume: sum
hourly = resample_bars(minute_df, "1h")

Resample bars to different timeframe.

Parameters:

Name Type Description Default
bars DataFrame

DataFrame with OHLCV bar data (indexed by timestamp)

required
from_timeframe Timeframe

Current timeframe

required
to_timeframe Timeframe

Target timeframe

required

Returns:

Type Description
DataFrame

DataFrame with resampled OHLCV data

Raises:

Type Description
ValueError

If trying to resample to lower timeframe

Example

bars_1m = pd.DataFrame({ ... 'timestamp': pd.date_range('2024-01-01', periods=60, freq='1min'), ... 'open': [100 + i*0.1 for i in range(60)], ... 'high': [100 + i*0.1 + 0.5 for i in range(60)], ... 'low': [100 + i*0.1 - 0.3 for i in range(60)], ... 'close': [100 + i*0.1 + 0.2 for i in range(60)], ... 'volume': [1000] * 60 ... }).set_index('timestamp') bars_1h = resample_bars(bars_1m, Timeframe.MIN_1, Timeframe.HOUR_1) len(bars_1h) 1


ticks_to_bars(df, timeframe)

Convert tick/trade data to OHLCV bars.

from qldata import ticks_to_bars

# Convert ticks to 1-minute bars
bars = ticks_to_bars(tick_df, "1m")

# Convert to hourly
bars = ticks_to_bars(tick_df, "1h")

Convert tick data to bars.

Parameters:

Name Type Description Default
ticks DataFrame

DataFrame with tick data (indexed by timestamp, with 'price' and 'volume')

required
timeframe Timeframe

Target bar timeframe

required
symbol str | None

Optional symbol (if not in DataFrame)

None

Returns:

Type Description
DataFrame

DataFrame with OHLCV bar data

Example

ticks = pd.DataFrame({ ... 'timestamp': pd.date_range('2024-01-01', periods=1000, freq='1s'), ... 'price': [100 + i*0.01 for i in range(1000)], ... 'volume': [1000] * 1000 ... }).set_index('timestamp') bars = ticks_to_bars(ticks, Timeframe.MIN_1) len(bars) 17


Transform Pipeline

For complex transform workflows, use TransformPipeline:

from qldata import TransformPipeline, remove_duplicates, remove_outliers, fill_forward

# Create a reusable pipeline
pipeline = TransformPipeline() \
    .add(remove_duplicates) \
    .add(remove_outliers, sigma=3) \
    .add(fill_forward)

# Apply to any DataFrame
clean_df = pipeline.apply(raw_df)

# Apply to multiple DataFrames
for symbol, df in data.items():
    data[symbol] = pipeline.apply(df)

TransformPipeline

TransformPipeline(name: str = 'pipeline')

Pipeline for composing multiple data transforms.

Allows chaining transforms and applying them in sequence.

Parameters:

Name Type Description Default
name str

Pipeline name for logging

'pipeline'

add

add(
    transform: Callable[[DataFrame], DataFrame],
    name: str | None = None,
) -> TransformPipeline

Add a transform to the pipeline.

Parameters:

Name Type Description Default
transform Callable[[DataFrame], DataFrame]

Function that takes DataFrame and returns DataFrame

required
name str | None

Optional name for logging

None

Returns:

Type Description
TransformPipeline

Self for method chaining


Examples

Example 1: Basic Cleaning Pipeline

import qldata as qd

# Fetch and clean data
df = qd.data("BTCUSDT", source="binance") \
    .last(30) \
    .resolution("1h") \
    .clean() \
    .get()

print(f"Cleaned data: {len(df)} bars")
print(f"Missing values: {df.isna().sum().sum()}")

Example 2: Aggressive Cleaning

import qldata as qd

# Production-grade cleaning
df = qd.data("BTCUSDT", source="binance") \
    .last(30) \
    .resolution("1h") \
    .clean(
        remove_invalid_prices=True,  # Remove zero/negative
        validate_ohlc=True,          # Check high >= low, etc.
        remove_outliers=True         # Remove > 3σ outliers
    ) \
    .fill_forward() \
    .get()

# Verify data quality
assert (df[["open", "high", "low", "close"]] > 0).all().all()
assert (df["high"] >= df["low"]).all()
assert df.isna().sum().sum() == 0

Example 3: Minute to Hourly Resampling

import qldata as qd

# Fetch 1-minute data
minute_df = qd.data("BTCUSDT", source="binance") \
    .last(1, "days") \
    .resolution("1m") \
    .clean() \
    .get()

print(f"1-minute bars: {len(minute_df)}")

# Resample to hourly
hourly_df = qd.data("BTCUSDT", source="binance") \
    .last(1, "days") \
    .resolution("1m") \
    .clean() \
    .resample("1h") \
    .get()

print(f"Hourly bars: {len(hourly_df)}")

Example 4: Custom Transform Pipeline

from qldata import TransformPipeline, remove_duplicates, fill_forward
import pandas as pd

def custom_smooth(df: pd.DataFrame, window: int = 5) -> pd.DataFrame:
    """Custom smoothing transform."""
    df = df.copy()
    df["close_smooth"] = df["close"].rolling(window).mean()
    return df

# Build pipeline with custom transform
pipeline = TransformPipeline() \
    .add(remove_duplicates) \
    .add(fill_forward) \
    .add(custom_smooth, window=10)

# Apply to data
import qldata as qd
raw_df = qd.data("BTCUSDT", source="binance").last(30).resolution("1h").get()
smooth_df = pipeline.apply(raw_df)

print(f"Added column: close_smooth")
print(smooth_df[["close", "close_smooth"]].head())

Example 5: Multi-Symbol Cleaning

import qldata as qd
from qldata import TransformPipeline, remove_duplicates, remove_outliers, fill_forward

# Create reusable pipeline
pipeline = TransformPipeline() \
    .add(remove_duplicates) \
    .add(remove_outliers, sigma=3) \
    .add(fill_forward)

# Fetch multiple symbols
symbols = ["BTCUSDT", "ETHUSDT", "SOLUSDT"]
data = qd.data(symbols, source="binance") \
    .last(30) \
    .resolution("1h") \
    .get(parallel=True)

# Clean each symbol
for symbol in data:
    data[symbol] = pipeline.apply(data[symbol])
    print(f"{symbol}: {len(data[symbol])} bars, no NaN: {data[symbol].isna().sum().sum() == 0}")

Best Practices

1. Order Matters

Apply transforms in the right order:

# ✓ Good: Clean → Fill → Resample
.clean().fill_forward().resample("1h")

# ✗ Bad: Resample before cleaning may spread bad data
.resample("1h").clean().fill_forward()

2. Always Clean Production Data

# ✓ Recommended
df = qd.data(...).clean().get()

# ✗ Risky in production
df = qd.data(...).get()

3. Validate After Transforms

# Sanity check after cleaning
assert df.isna().sum().sum() == 0, "Unexpected NaN values"
assert (df["close"] > 0).all(), "Invalid prices found"

4. Use Pipelines for Consistency

# Define once, use everywhere
PRODUCTION_PIPELINE = TransformPipeline() \
    .add(remove_duplicates) \
    .add(remove_outliers) \
    .add(fill_forward)

# Consistent cleaning across all data
df1 = PRODUCTION_PIPELINE.apply(raw_df1)
df2 = PRODUCTION_PIPELINE.apply(raw_df2)

See Also