Cookbook¶
Real-world examples and recipes for common use cases.
Quick Recipes¶
Fetch and Save Historical Data¶
import qldata as qd
# Fetch last 30 days of hourly data
df = qd.data("BTCUSDT", source="binance", category="spot") \
.last(30) \
.resolution("1h") \
.clean() \
.get()
# Save to CSV
df.to_csv("btc_hourly_30d.csv")
# Save to Parquet (smaller, faster)
df.to_parquet("btc_hourly_30d.parquet")
Multi-Symbol Download¶
import qldata as qd
from pathlib import Path
symbols = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "BNBUSDT", "XRPUSDT"]
# Parallel download
data = qd.data(symbols, source="binance") \
.last(90) \
.resolution("1h") \
.clean() \
.get(parallel=True, workers=4)
# Save each symbol
output_dir = Path("./data")
output_dir.mkdir(exist_ok=True)
for symbol, df in data.items():
df.to_parquet(output_dir / f"{symbol}_1h.parquet")
print(f"Saved {symbol}: {len(df)} bars")
Real-Time Price Tracker¶
import qldata as qd
import time
from collections import defaultdict
prices = defaultdict(float)
def update_prices(df):
for _, row in df.iterrows():
symbol = row["symbol"]
prices[symbol] = row["price"]
print(f"{symbol}: ${prices[symbol]:,.2f}")
stream = qd.stream(["BTCUSDT", "ETHUSDT"], source="binance") \
.resolution("tick") \
.on_data(update_prices) \
.get(start=True)
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stream.stop()
Prepare Backtesting Data¶
import qldata as qd
import pandas as pd
def prepare_backtest_data(
symbol: str,
days: int = 365,
resolution: str = "1h"
) -> pd.DataFrame:
"""Prepare clean data for backtesting."""
# Fetch and clean
df = qd.data(symbol, source="binance") \
.last(days) \
.resolution(resolution) \
.clean(
remove_invalid_prices=True,
remove_outliers=True
) \
.fill_forward() \
.get()
# Add common indicators
df["returns"] = df["close"].pct_change()
df["log_returns"] = np.log(df["close"] / df["close"].shift(1))
df["volatility"] = df["returns"].rolling(24).std()
# Drop NaN from indicator calculation
df = df.dropna()
return df
# Prepare data
import numpy as np
btc = prepare_backtest_data("BTCUSDT", days=365, resolution="1h")
print(f"Ready for backtesting: {len(btc)} bars")
Funding Rate Monitor¶
import qldata as qd
import time
symbols = ["BTCUSDT", "ETHUSDT", "SOLUSDT"]
def check_funding():
print("\n" + "="*50)
print("Funding Rate Check")
print("="*50)
for symbol in symbols:
try:
rate = qd.current_funding_rate(
symbol,
source="binance",
category="usdm"
)
annual = rate * 3 * 365 * 100 # 8h funding
if abs(rate) > 0.001: # High funding
print(f"⚠️ {symbol}: {rate*100:.4f}% ({annual:.1f}% APR)")
else:
print(f" {symbol}: {rate*100:.4f}% ({annual:.1f}% APR)")
except Exception as e:
print(f" {symbol}: Error - {e}")
# Check every hour
while True:
check_funding()
time.sleep(3600)
Symbol Screener¶
import qldata as qd
# Get all USDT pairs
all_symbols = qd.list_symbols(
source="binance",
category="spot",
quote_asset="USDT",
active_only=True
)
print(f"Screening {len(all_symbols)} symbols...")
# Screen for top movers (simplified)
results = []
for symbol in all_symbols[:50]: # Limit for demo
try:
df = qd.data(symbol, source="binance") \
.last(7) \
.resolution("1d") \
.clean() \
.get()
if len(df) >= 2:
change = (df["close"].iloc[-1] / df["close"].iloc[0] - 1) * 100
volume = df["volume"].mean()
results.append({
"symbol": symbol,
"change_7d": change,
"avg_volume": volume
})
except:
pass
# Sort by change
results.sort(key=lambda x: x["change_7d"], reverse=True)
print("\nTop 10 Gainers (7d):")
for r in results[:10]:
print(f" {r['symbol']}: +{r['change_7d']:.2f}%")
print("\nTop 10 Losers (7d):")
for r in results[-10:]:
print(f" {r['symbol']}: {r['change_7d']:.2f}%")
Production Data Pipeline¶
import qldata as qd
from qldata.monitoring import DataQualityMonitor, AlertManager
import logging
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pipeline")
# Setup
monitor = DataQualityMonitor(stale_threshold_seconds=10)
alerts = AlertManager()
def send_alert(msg):
logger.warning(f"ALERT: {msg}")
# Add PagerDuty/Slack integration here
alerts.on_stale_data(lambda: send_alert("Data is stale!"))
alerts.on_high_latency(lambda ms: send_alert(f"Latency: {ms}ms"))
def process_trade(df):
"""Process incoming trades."""
if df.empty:
return
# Record for monitoring
monitor.record_message(df.index[-1])
# Check health
metrics = monitor.get_metrics()
if metrics["latency_p95"]:
alerts.check_latency(metrics["latency_p95"])
alerts.check_stale_data(monitor.is_stale())
# Process data (your logic here)
for _, row in df.iterrows():
logger.debug(f"Trade: {row['symbol']} @ ${row['price']}")
# Start stream
stream = qd.stream(["BTCUSDT", "ETHUSDT"], source="binance") \
.resolution("tick") \
.on_data(process_trade) \
.on_error(lambda e: logger.error(f"Error: {e}")) \
.on_reconnect(lambda n: logger.info(f"Reconnect #{n}")) \
.get(start=True)
# Health loop
try:
while True:
metrics = monitor.get_metrics()
logger.info(
f"Health: {monitor.get_health_status()} | "
f"TPS: {metrics['throughput']:.1f} | "
f"P95: {metrics['latency_p95']:.1f}ms"
)
time.sleep(30)
except KeyboardInterrupt:
logger.info("Shutting down...")
stream.stop()
Common Patterns¶
Error Handling¶
from qldata.errors import QldataError, ConnectionError, RateLimitError
try:
df = qd.data("BTCUSDT", source="binance").last(30).get()
except RateLimitError:
print("Rate limited - waiting...")
time.sleep(60)
df = qd.data("BTCUSDT", source="binance").last(30).get()
except ConnectionError as e:
print(f"Network error: {e}")
df = load_cached_data()
except QldataError as e:
print(f"qldata error: {e}")
Caching Layer¶
import qldata as qd
from pathlib import Path
import pandas as pd
CACHE_DIR = Path("./cache")
CACHE_DIR.mkdir(exist_ok=True)
def get_data_cached(symbol, days, resolution):
"""Fetch data with local cache."""
cache_file = CACHE_DIR / f"{symbol}_{resolution}_{days}d.parquet"
# Check cache age
if cache_file.exists():
mtime = cache_file.stat().st_mtime
age_hours = (time.time() - mtime) / 3600
if age_hours < 1: # Cache valid for 1 hour
return pd.read_parquet(cache_file)
# Fetch fresh data
df = qd.data(symbol, source="binance") \
.last(days) \
.resolution(resolution) \
.clean() \
.get()
# Save to cache
df.to_parquet(cache_file)
return df
Retry Logic¶
from tenacity import retry, stop_after_attempt, wait_exponential
import qldata as qd
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
def fetch_with_retry(symbol, days, resolution):
return qd.data(symbol, source="binance") \
.last(days) \
.resolution(resolution) \
.get()
# Will retry up to 3 times with exponential backoff
df = fetch_with_retry("BTCUSDT", 30, "1h")
Next Steps¶
- Binance Examples - Binance-specific patterns
- Bybit Examples - Bybit-specific patterns
- API Reference - Full API documentation