Skip to content

Streaming API

The qd.stream() function provides real-time market data streaming with built-in resilience features.

Basic Usage

import qldata as qd
import time

def handle_data(df):
    """Process incoming tick data."""
    if not df.empty:
        latest = df.iloc[-1]
        print(f"[{latest['symbol']}] Price: {latest['price']}")

# Create and start the stream
stream = qd.stream(["BTCUSDT", "ETHUSDT"], source="binance", category="spot") \
    .resolution("tick") \
    .on_data(handle_data) \
    .get(start=True)

# Keep running (Ctrl+C to stop)
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    stream.stop()

Function Signature

stream module-attribute

stream = StreamingAPI()

Query Builder Methods

.resolution(timeframe)

Set the stream data type.

# Raw tick/trade data
.resolution("tick")

# Aggregated bars (where supported)
.resolution("1m")

Supported Resolutions:

Resolution Description Data Type
"tick" Individual trades Trade/tick data
"1m" 1-minute bars OHLCV bars

.on_data(callback)

Register a callback for incoming data.

def handle_data(df):
    """
    Args:
        df: pandas DataFrame with new data
    """
    print(f"Received {len(df)} rows")

.on_data(handle_data)

Callback Signature:

def callback(df: pandas.DataFrame) -> None:
    ...

The DataFrame contains:

Column Type Description
price float Trade price
quantity float Trade quantity
symbol str Trading pair
side str "buy" or "sell"
trade_id int Unique trade ID
Column Type Description
open float Opening price
high float Highest price
low float Lowest price
close float Closing price
volume float Trading volume
symbol str Trading pair

.on_error(callback)

Register a callback for errors.

def handle_error(error):
    """
    Args:
        error: Exception that occurred
    """
    print(f"Stream error: {error}")

.on_error(handle_error)

.on_close(callback)

Register a callback for stream close.

def handle_close():
    """Called when stream closes."""
    print("Stream closed")

.on_close(handle_close)

.on_reconnect(callback)

Register a callback for reconnection events.

def handle_reconnect(attempt):
    """
    Args:
        attempt: Reconnection attempt number
    """
    print(f"Reconnecting... (attempt {attempt})")

.on_reconnect(handle_reconnect)

.get(start=True)

Create the stream session.

# Auto-start the stream
stream = query.get(start=True)

# Create without starting (manual control)
stream = query.get(start=False)
stream.start()  # Start manually later

Parameters:

Parameter Type Default Description
start bool True Auto-start the stream

Returns: StreamSession object


StreamSession Class

StreamSession

StreamSession(
    adapter: Any,
    symbols: list[str],
    stream_type: str,
    interval: str | None,
    on_data: Callable[[DataFrame], None] | None = None,
    on_error: Callable[[Exception], None] | None = None,
    on_close: Callable[[], None] | None = None,
    *,
    window_seconds: float | None = None,
    max_rows: int | None = None,
    resample_to: str | None = None,
    clean_fn: (
        Callable[[DataFrame], DataFrame] | None
    ) = None,
    resilience: ResilienceConfig | bool = True,
    source: str = "binance",
    rate_limiter: RateLimitManager | None = None,
    sequence_tracker: (
        dict[str, SequenceTracker] | None
    ) = None,
    journal: Any | None = None
)

Bases: AbstractContextManager['StreamSession']

Manage a live streaming session using adapter WebSocket methods.

start

start() -> StreamSession

Start streaming.

stop

stop() -> None

Stop streaming.


Resilience Features

Auto-Reconnect

Streams automatically reconnect on connection loss:

stream = qd.stream(["BTCUSDT"], source="binance") \
    .resolution("tick") \
    .on_data(handler) \
    .on_reconnect(lambda n: print(f"Reconnect attempt {n}")) \
    .get(start=True)

# Stream will auto-reconnect with exponential backoff

Rate Limit Management

Built-in rate limiting prevents API throttling:

# Even with many symbols, rate limits are respected
stream = qd.stream(
    ["BTCUSDT", "ETHUSDT", "SOLUSDT", "BNBUSDT", "XRPUSDT"],
    source="binance"
) \
    .resolution("tick") \
    .on_data(handler) \
    .get(start=True)

Sequence Tracking

Detects and logs missed messages:

# Sequence gaps are automatically logged
# Enable verbose logging to see them:
import logging
logging.basicConfig(level=logging.DEBUG)

Heartbeat Monitoring

Connection health is monitored:

stream = qd.stream(["BTCUSDT"], source="binance") \
    .resolution("tick") \
    .on_data(handler) \
    .get(start=True)

# Check if stream is healthy
if stream.is_running():
    print("Stream is healthy")

Advanced Configuration

Resilience Options

from qldata.resilience import ResilienceConfig

config = ResilienceConfig(
    auto_reconnect=True,           # Enable auto-reconnect
    max_reconnect_attempts=10,     # Max attempts before giving up
    reconnect_delay=1.0,           # Initial delay (seconds)
    reconnect_delay_max=60.0,      # Max delay (seconds)
    heartbeat_interval=30.0,       # Heartbeat check interval
    rate_limit_enabled=True        # Enable rate limiting
)

stream = qd.stream(["BTCUSDT"], source="binance") \
    .resolution("tick") \
    .on_data(handler) \
    .resilience(config) \
    .get(start=True)

Disable Resilience

For testing or specific use cases:

stream = qd.stream(["BTCUSDT"], source="binance") \
    .resolution("tick") \
    .on_data(handler) \
    .get(start=True, resilience=False)

Monitoring Integration

DataQualityMonitor

Monitor stream health and latency:

from qldata.monitoring import DataQualityMonitor

monitor = DataQualityMonitor(stale_threshold_seconds=10)

def handle_data(df):
    if not df.empty:
        # Record message for monitoring
        monitor.record_message(df.index[-1])

        # Process data
        process_tick(df)

stream = qd.stream(["BTCUSDT"], source="binance") \
    .resolution("tick") \
    .on_data(handle_data) \
    .get(start=True)

# Periodically check metrics
metrics = monitor.get_metrics()
print(f"Throughput: {metrics['throughput']:.1f}/s")
print(f"Latency P95: {metrics['latency_p95']:.1f}ms")
print(f"Is Stale: {metrics['is_stale']}")

AlertManager

Set up production alerts:

from qldata.monitoring import AlertManager

alerts = AlertManager()

# Configure callbacks
alerts.on_high_latency(lambda ms: send_pagerduty(f"High latency: {ms}ms"))
alerts.on_stale_data(lambda: send_slack("Data is stale!"))
alerts.on_connection_lost(lambda: send_alert("Connection lost"))
alerts.on_reconnected(lambda n: log_info(f"Reconnected after {n} attempts"))

Examples

Example 1: Simple Price Tracker

import qldata as qd
import time

def print_price(df):
    if not df.empty:
        for _, row in df.iterrows():
            print(f"{row['symbol']}: ${row['price']:,.2f}")

stream = qd.stream(["BTCUSDT", "ETHUSDT"], source="binance") \
    .resolution("tick") \
    .on_data(print_price) \
    .get(start=True)

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    stream.stop()

Example 2: Volume-Weighted Average Price (VWAP)

import qldata as qd
import time

# Track VWAP
vwap_data = {"total_value": 0, "total_volume": 0}

def update_vwap(df):
    if df.empty:
        return

    for _, row in df.iterrows():
        vwap_data["total_value"] += row["price"] * row["quantity"]
        vwap_data["total_volume"] += row["quantity"]

    if vwap_data["total_volume"] > 0:
        vwap = vwap_data["total_value"] / vwap_data["total_volume"]
        print(f"VWAP: ${vwap:,.2f}")

stream = qd.stream(["BTCUSDT"], source="binance") \
    .resolution("tick") \
    .on_data(update_vwap) \
    .get(start=True)

time.sleep(60)  # Run for 60 seconds
stream.stop()

Example 3: Multi-Symbol Price Comparison

import qldata as qd
import time
from collections import defaultdict

# Track latest prices
prices = defaultdict(float)

def update_prices(df):
    if df.empty:
        return

    for _, row in df.iterrows():
        prices[row["symbol"]] = row["price"]

    # Print comparison
    if "BTCUSDT" in prices and "ETHUSDT" in prices:
        ratio = prices["BTCUSDT"] / prices["ETHUSDT"]
        print(f"BTC/ETH ratio: {ratio:.2f}")

stream = qd.stream(["BTCUSDT", "ETHUSDT"], source="binance") \
    .resolution("tick") \
    .on_data(update_prices) \
    .get(start=True)

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    stream.stop()

Example 4: Production Stream with Monitoring

import qldata as qd
from qldata.monitoring import DataQualityMonitor, AlertManager
import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Setup monitoring
monitor = DataQualityMonitor(stale_threshold_seconds=5)
alerts = AlertManager()

alerts.on_stale_data(lambda: logger.warning("Data is stale!"))
alerts.on_high_latency(lambda ms: logger.warning(f"High latency: {ms}ms"))

def handle_data(df):
    if df.empty:
        return

    # Record for monitoring
    monitor.record_message(df.index[-1])

    # Check for issues
    metrics = monitor.get_metrics()
    if metrics["latency_p95"]:
        alerts.check_latency(metrics["latency_p95"])
    alerts.check_stale_data(monitor.is_stale())

    # Process data
    logger.info(f"Received {len(df)} ticks")

def handle_error(e):
    logger.error(f"Stream error: {e}")

def handle_reconnect(attempt):
    logger.info(f"Reconnecting (attempt {attempt})")

stream = qd.stream(["BTCUSDT"], source="binance") \
    .resolution("tick") \
    .on_data(handle_data) \
    .on_error(handle_error) \
    .on_reconnect(handle_reconnect) \
    .get(start=True)

try:
    while True:
        # Log metrics every 30 seconds
        time.sleep(30)
        metrics = monitor.get_metrics()
        logger.info(
            f"Health: {monitor.get_health_status()} | "
            f"Throughput: {metrics['throughput']:.1f}/s | "
            f"P95: {metrics['latency_p95']:.1f}ms"
        )
except KeyboardInterrupt:
    logger.info("Shutting down...")
    stream.stop()

See Also