Skip to content

Cookbook

Real-world examples and recipes for common use cases.

Quick Recipes

Fetch and Save Historical Data

import qldata as qd

# Fetch last 30 days of hourly data
df = qd.data("BTCUSDT", source="binance", category="spot") \
    .last(30) \
    .resolution("1h") \
    .clean() \
    .get()

# Save to CSV
df.to_csv("btc_hourly_30d.csv")

# Save to Parquet (smaller, faster)
df.to_parquet("btc_hourly_30d.parquet")

Multi-Symbol Download

import qldata as qd
from pathlib import Path

symbols = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "BNBUSDT", "XRPUSDT"]

# Parallel download
data = qd.data(symbols, source="binance") \
    .last(90) \
    .resolution("1h") \
    .clean() \
    .get(parallel=True, workers=4)

# Save each symbol
output_dir = Path("./data")
output_dir.mkdir(exist_ok=True)

for symbol, df in data.items():
    df.to_parquet(output_dir / f"{symbol}_1h.parquet")
    print(f"Saved {symbol}: {len(df)} bars")

Real-Time Price Tracker

import qldata as qd
import time
from collections import defaultdict

prices = defaultdict(float)

def update_prices(df):
    for _, row in df.iterrows():
        symbol = row["symbol"]
        prices[symbol] = row["price"]
        print(f"{symbol}: ${prices[symbol]:,.2f}")

stream = qd.stream(["BTCUSDT", "ETHUSDT"], source="binance") \
    .resolution("tick") \
    .on_data(update_prices) \
    .get(start=True)

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    stream.stop()

Prepare Backtesting Data

import qldata as qd
import pandas as pd

def prepare_backtest_data(
    symbol: str,
    days: int = 365,
    resolution: str = "1h"
) -> pd.DataFrame:
    """Prepare clean data for backtesting."""

    # Fetch and clean
    df = qd.data(symbol, source="binance") \
        .last(days) \
        .resolution(resolution) \
        .clean(
            remove_invalid_prices=True,
            remove_outliers=True
        ) \
        .fill_forward() \
        .get()

    # Add common indicators
    df["returns"] = df["close"].pct_change()
    df["log_returns"] = np.log(df["close"] / df["close"].shift(1))
    df["volatility"] = df["returns"].rolling(24).std()

    # Drop NaN from indicator calculation
    df = df.dropna()

    return df

# Prepare data
import numpy as np
btc = prepare_backtest_data("BTCUSDT", days=365, resolution="1h")
print(f"Ready for backtesting: {len(btc)} bars")

Funding Rate Monitor

import qldata as qd
import time

symbols = ["BTCUSDT", "ETHUSDT", "SOLUSDT"]

def check_funding():
    print("\n" + "="*50)
    print("Funding Rate Check")
    print("="*50)

    for symbol in symbols:
        try:
            rate = qd.current_funding_rate(
                symbol, 
                source="binance", 
                category="usdm"
            )
            annual = rate * 3 * 365 * 100  # 8h funding

            if abs(rate) > 0.001:  # High funding
                print(f"⚠️ {symbol}: {rate*100:.4f}% ({annual:.1f}% APR)")
            else:
                print(f"   {symbol}: {rate*100:.4f}% ({annual:.1f}% APR)")

        except Exception as e:
            print(f"   {symbol}: Error - {e}")

# Check every hour
while True:
    check_funding()
    time.sleep(3600)

Symbol Screener

import qldata as qd

# Get all USDT pairs
all_symbols = qd.list_symbols(
    source="binance",
    category="spot",
    quote_asset="USDT",
    active_only=True
)

print(f"Screening {len(all_symbols)} symbols...")

# Screen for top movers (simplified)
results = []

for symbol in all_symbols[:50]:  # Limit for demo
    try:
        df = qd.data(symbol, source="binance") \
            .last(7) \
            .resolution("1d") \
            .clean() \
            .get()

        if len(df) >= 2:
            change = (df["close"].iloc[-1] / df["close"].iloc[0] - 1) * 100
            volume = df["volume"].mean()
            results.append({
                "symbol": symbol,
                "change_7d": change,
                "avg_volume": volume
            })
    except:
        pass

# Sort by change
results.sort(key=lambda x: x["change_7d"], reverse=True)

print("\nTop 10 Gainers (7d):")
for r in results[:10]:
    print(f"  {r['symbol']}: +{r['change_7d']:.2f}%")

print("\nTop 10 Losers (7d):")
for r in results[-10:]:
    print(f"  {r['symbol']}: {r['change_7d']:.2f}%")

Production Data Pipeline

import qldata as qd
from qldata.monitoring import DataQualityMonitor, AlertManager
import logging
import time

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pipeline")

# Setup
monitor = DataQualityMonitor(stale_threshold_seconds=10)
alerts = AlertManager()

def send_alert(msg):
    logger.warning(f"ALERT: {msg}")
    # Add PagerDuty/Slack integration here

alerts.on_stale_data(lambda: send_alert("Data is stale!"))
alerts.on_high_latency(lambda ms: send_alert(f"Latency: {ms}ms"))

def process_trade(df):
    """Process incoming trades."""
    if df.empty:
        return

    # Record for monitoring
    monitor.record_message(df.index[-1])

    # Check health
    metrics = monitor.get_metrics()
    if metrics["latency_p95"]:
        alerts.check_latency(metrics["latency_p95"])
    alerts.check_stale_data(monitor.is_stale())

    # Process data (your logic here)
    for _, row in df.iterrows():
        logger.debug(f"Trade: {row['symbol']} @ ${row['price']}")

# Start stream
stream = qd.stream(["BTCUSDT", "ETHUSDT"], source="binance") \
    .resolution("tick") \
    .on_data(process_trade) \
    .on_error(lambda e: logger.error(f"Error: {e}")) \
    .on_reconnect(lambda n: logger.info(f"Reconnect #{n}")) \
    .get(start=True)

# Health loop
try:
    while True:
        metrics = monitor.get_metrics()
        logger.info(
            f"Health: {monitor.get_health_status()} | "
            f"TPS: {metrics['throughput']:.1f} | "
            f"P95: {metrics['latency_p95']:.1f}ms"
        )
        time.sleep(30)
except KeyboardInterrupt:
    logger.info("Shutting down...")
    stream.stop()

Common Patterns

Error Handling

from qldata.errors import QldataError, ConnectionError, RateLimitError

try:
    df = qd.data("BTCUSDT", source="binance").last(30).get()
except RateLimitError:
    print("Rate limited - waiting...")
    time.sleep(60)
    df = qd.data("BTCUSDT", source="binance").last(30).get()
except ConnectionError as e:
    print(f"Network error: {e}")
    df = load_cached_data()
except QldataError as e:
    print(f"qldata error: {e}")

Caching Layer

import qldata as qd
from pathlib import Path
import pandas as pd

CACHE_DIR = Path("./cache")
CACHE_DIR.mkdir(exist_ok=True)

def get_data_cached(symbol, days, resolution):
    """Fetch data with local cache."""

    cache_file = CACHE_DIR / f"{symbol}_{resolution}_{days}d.parquet"

    # Check cache age
    if cache_file.exists():
        mtime = cache_file.stat().st_mtime
        age_hours = (time.time() - mtime) / 3600

        if age_hours < 1:  # Cache valid for 1 hour
            return pd.read_parquet(cache_file)

    # Fetch fresh data
    df = qd.data(symbol, source="binance") \
        .last(days) \
        .resolution(resolution) \
        .clean() \
        .get()

    # Save to cache
    df.to_parquet(cache_file)

    return df

Retry Logic

from tenacity import retry, stop_after_attempt, wait_exponential
import qldata as qd

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10)
)
def fetch_with_retry(symbol, days, resolution):
    return qd.data(symbol, source="binance") \
        .last(days) \
        .resolution(resolution) \
        .get()

# Will retry up to 3 times with exponential backoff
df = fetch_with_retry("BTCUSDT", 30, "1h")

Next Steps