Streaming API¶
The qd.stream() function provides real-time market data streaming with built-in resilience features.
Basic Usage¶
import qldata as qd
import time
def handle_data(df):
"""Process incoming tick data."""
if not df.empty:
latest = df.iloc[-1]
print(f"[{latest['symbol']}] Price: {latest['price']}")
# Create and start the stream
stream = qd.stream(["BTCUSDT", "ETHUSDT"], source="binance", category="spot") \
.resolution("tick") \
.on_data(handle_data) \
.get(start=True)
# Keep running (Ctrl+C to stop)
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stream.stop()
Function Signature¶
Query Builder Methods¶
.resolution(timeframe)¶
Set the stream data type.
Supported Resolutions:
| Resolution | Description | Data Type |
|---|---|---|
"tick" | Individual trades | Trade/tick data |
"1m" | 1-minute bars | OHLCV bars |
.on_data(callback)¶
Register a callback for incoming data.
def handle_data(df):
"""
Args:
df: pandas DataFrame with new data
"""
print(f"Received {len(df)} rows")
.on_data(handle_data)
Callback Signature:
The DataFrame contains:
| Column | Type | Description |
|---|---|---|
price | float | Trade price |
quantity | float | Trade quantity |
symbol | str | Trading pair |
side | str | "buy" or "sell" |
trade_id | int | Unique trade ID |
| Column | Type | Description |
|---|---|---|
open | float | Opening price |
high | float | Highest price |
low | float | Lowest price |
close | float | Closing price |
volume | float | Trading volume |
symbol | str | Trading pair |
.on_error(callback)¶
Register a callback for errors.
def handle_error(error):
"""
Args:
error: Exception that occurred
"""
print(f"Stream error: {error}")
.on_error(handle_error)
.on_close(callback)¶
Register a callback for stream close.
.on_reconnect(callback)¶
Register a callback for reconnection events.
def handle_reconnect(attempt):
"""
Args:
attempt: Reconnection attempt number
"""
print(f"Reconnecting... (attempt {attempt})")
.on_reconnect(handle_reconnect)
.get(start=True)¶
Create the stream session.
# Auto-start the stream
stream = query.get(start=True)
# Create without starting (manual control)
stream = query.get(start=False)
stream.start() # Start manually later
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
start | bool | True | Auto-start the stream |
Returns: StreamSession object
StreamSession Class¶
StreamSession ¶
StreamSession(
adapter: Any,
symbols: list[str],
stream_type: str,
interval: str | None,
on_data: Callable[[DataFrame], None] | None = None,
on_error: Callable[[Exception], None] | None = None,
on_close: Callable[[], None] | None = None,
*,
window_seconds: float | None = None,
max_rows: int | None = None,
resample_to: str | None = None,
clean_fn: (
Callable[[DataFrame], DataFrame] | None
) = None,
resilience: ResilienceConfig | bool = True,
source: str = "binance",
rate_limiter: RateLimitManager | None = None,
sequence_tracker: (
dict[str, SequenceTracker] | None
) = None,
journal: Any | None = None
)
Bases: AbstractContextManager['StreamSession']
Manage a live streaming session using adapter WebSocket methods.
Resilience Features¶
Auto-Reconnect¶
Streams automatically reconnect on connection loss:
stream = qd.stream(["BTCUSDT"], source="binance") \
.resolution("tick") \
.on_data(handler) \
.on_reconnect(lambda n: print(f"Reconnect attempt {n}")) \
.get(start=True)
# Stream will auto-reconnect with exponential backoff
Rate Limit Management¶
Built-in rate limiting prevents API throttling:
# Even with many symbols, rate limits are respected
stream = qd.stream(
["BTCUSDT", "ETHUSDT", "SOLUSDT", "BNBUSDT", "XRPUSDT"],
source="binance"
) \
.resolution("tick") \
.on_data(handler) \
.get(start=True)
Sequence Tracking¶
Detects and logs missed messages:
# Sequence gaps are automatically logged
# Enable verbose logging to see them:
import logging
logging.basicConfig(level=logging.DEBUG)
Heartbeat Monitoring¶
Connection health is monitored:
stream = qd.stream(["BTCUSDT"], source="binance") \
.resolution("tick") \
.on_data(handler) \
.get(start=True)
# Check if stream is healthy
if stream.is_running():
print("Stream is healthy")
Advanced Configuration¶
Resilience Options¶
from qldata.resilience import ResilienceConfig
config = ResilienceConfig(
auto_reconnect=True, # Enable auto-reconnect
max_reconnect_attempts=10, # Max attempts before giving up
reconnect_delay=1.0, # Initial delay (seconds)
reconnect_delay_max=60.0, # Max delay (seconds)
heartbeat_interval=30.0, # Heartbeat check interval
rate_limit_enabled=True # Enable rate limiting
)
stream = qd.stream(["BTCUSDT"], source="binance") \
.resolution("tick") \
.on_data(handler) \
.resilience(config) \
.get(start=True)
Disable Resilience¶
For testing or specific use cases:
stream = qd.stream(["BTCUSDT"], source="binance") \
.resolution("tick") \
.on_data(handler) \
.get(start=True, resilience=False)
Monitoring Integration¶
DataQualityMonitor¶
Monitor stream health and latency:
from qldata.monitoring import DataQualityMonitor
monitor = DataQualityMonitor(stale_threshold_seconds=10)
def handle_data(df):
if not df.empty:
# Record message for monitoring
monitor.record_message(df.index[-1])
# Process data
process_tick(df)
stream = qd.stream(["BTCUSDT"], source="binance") \
.resolution("tick") \
.on_data(handle_data) \
.get(start=True)
# Periodically check metrics
metrics = monitor.get_metrics()
print(f"Throughput: {metrics['throughput']:.1f}/s")
print(f"Latency P95: {metrics['latency_p95']:.1f}ms")
print(f"Is Stale: {metrics['is_stale']}")
AlertManager¶
Set up production alerts:
from qldata.monitoring import AlertManager
alerts = AlertManager()
# Configure callbacks
alerts.on_high_latency(lambda ms: send_pagerduty(f"High latency: {ms}ms"))
alerts.on_stale_data(lambda: send_slack("Data is stale!"))
alerts.on_connection_lost(lambda: send_alert("Connection lost"))
alerts.on_reconnected(lambda n: log_info(f"Reconnected after {n} attempts"))
Examples¶
Example 1: Simple Price Tracker¶
import qldata as qd
import time
def print_price(df):
if not df.empty:
for _, row in df.iterrows():
print(f"{row['symbol']}: ${row['price']:,.2f}")
stream = qd.stream(["BTCUSDT", "ETHUSDT"], source="binance") \
.resolution("tick") \
.on_data(print_price) \
.get(start=True)
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stream.stop()
Example 2: Volume-Weighted Average Price (VWAP)¶
import qldata as qd
import time
# Track VWAP
vwap_data = {"total_value": 0, "total_volume": 0}
def update_vwap(df):
if df.empty:
return
for _, row in df.iterrows():
vwap_data["total_value"] += row["price"] * row["quantity"]
vwap_data["total_volume"] += row["quantity"]
if vwap_data["total_volume"] > 0:
vwap = vwap_data["total_value"] / vwap_data["total_volume"]
print(f"VWAP: ${vwap:,.2f}")
stream = qd.stream(["BTCUSDT"], source="binance") \
.resolution("tick") \
.on_data(update_vwap) \
.get(start=True)
time.sleep(60) # Run for 60 seconds
stream.stop()
Example 3: Multi-Symbol Price Comparison¶
import qldata as qd
import time
from collections import defaultdict
# Track latest prices
prices = defaultdict(float)
def update_prices(df):
if df.empty:
return
for _, row in df.iterrows():
prices[row["symbol"]] = row["price"]
# Print comparison
if "BTCUSDT" in prices and "ETHUSDT" in prices:
ratio = prices["BTCUSDT"] / prices["ETHUSDT"]
print(f"BTC/ETH ratio: {ratio:.2f}")
stream = qd.stream(["BTCUSDT", "ETHUSDT"], source="binance") \
.resolution("tick") \
.on_data(update_prices) \
.get(start=True)
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stream.stop()
Example 4: Production Stream with Monitoring¶
import qldata as qd
from qldata.monitoring import DataQualityMonitor, AlertManager
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Setup monitoring
monitor = DataQualityMonitor(stale_threshold_seconds=5)
alerts = AlertManager()
alerts.on_stale_data(lambda: logger.warning("Data is stale!"))
alerts.on_high_latency(lambda ms: logger.warning(f"High latency: {ms}ms"))
def handle_data(df):
if df.empty:
return
# Record for monitoring
monitor.record_message(df.index[-1])
# Check for issues
metrics = monitor.get_metrics()
if metrics["latency_p95"]:
alerts.check_latency(metrics["latency_p95"])
alerts.check_stale_data(monitor.is_stale())
# Process data
logger.info(f"Received {len(df)} ticks")
def handle_error(e):
logger.error(f"Stream error: {e}")
def handle_reconnect(attempt):
logger.info(f"Reconnecting (attempt {attempt})")
stream = qd.stream(["BTCUSDT"], source="binance") \
.resolution("tick") \
.on_data(handle_data) \
.on_error(handle_error) \
.on_reconnect(handle_reconnect) \
.get(start=True)
try:
while True:
# Log metrics every 30 seconds
time.sleep(30)
metrics = monitor.get_metrics()
logger.info(
f"Health: {monitor.get_health_status()} | "
f"Throughput: {metrics['throughput']:.1f}/s | "
f"P95: {metrics['latency_p95']:.1f}ms"
)
except KeyboardInterrupt:
logger.info("Shutting down...")
stream.stop()
See Also¶
- Resilience Features - Detailed resilience options
- Monitoring & Alerts - Production monitoring
- Historical Data - Batch data fetching