Skip to content

Resilience API

Production-grade resilience features for WebSocket connections and data streaming.

Overview

qldata provides several resilience components:

  • ReconnectionManager - Auto-reconnect with exponential backoff
  • HeartbeatMonitor - Connection health monitoring
  • MessageDeduplicator - Duplicate message detection
  • RateLimitManager - API rate limit handling
  • SequenceTracker - Missing message detection
  • TimeSyncManager - Clock synchronization
from qldata.resilience import (
    ReconnectionManager,
    HeartbeatMonitor,
    MessageDeduplicator,
)

ReconnectionManager

Manages automatic reconnection with exponential backoff.

ReconnectionManager

ReconnectionManager(
    max_retries: int = 10,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    backoff_factor: float = 2.0,
)

Manages automatic reconnection with exponential backoff.

Handles reconnection attempts with configurable backoff and max retries.

Parameters:

Name Type Description Default
max_retries int

Maximum reconnection attempts (0 = infinite)

10
base_delay float

Initial delay in seconds

1.0
max_delay float

Maximum delay between retries

60.0
backoff_factor float

Exponential backoff multiplier

2.0

reset

reset() -> None

Reset attempt counter.

Configuration

Parameter Type Default Description
max_attempts int 10 Maximum reconnection attempts
initial_delay float 1.0 Initial delay in seconds
max_delay float 60.0 Maximum delay in seconds
backoff_factor float 2.0 Exponential backoff multiplier

Usage

from qldata.resilience import ReconnectionManager

manager = ReconnectionManager(
    max_attempts=10,
    initial_delay=1.0,
    max_delay=60.0,
    backoff_factor=2.0
)

while manager.should_reconnect():
    try:
        connect()
        manager.record_success()
        break
    except ConnectionError:
        manager.record_failure()
        delay = manager.get_delay()
        print(f"Reconnecting in {delay:.1f}s...")
        time.sleep(delay)

HeartbeatMonitor

Monitors connection health via periodic heartbeat checks.

HeartbeatMonitor

HeartbeatMonitor(
    timeout_seconds: float = 30.0,
    ping_interval: float = 10.0,
)

Monitors connection health via heartbeats.

Detects stale connections and triggers reconnection if needed.

Parameters:

Name Type Description Default
timeout_seconds float

Seconds without heartbeat before timeout

30.0
ping_interval float

Seconds between ping messages

10.0

record_heartbeat

record_heartbeat() -> None

Record a heartbeat (pong received).

is_alive

is_alive() -> bool

Check if connection is alive.

Returns:

Type Description
bool

True if heartbeat within timeout, False otherwise

Configuration

Parameter Type Default Description
interval float 30.0 Heartbeat check interval (seconds)
timeout float 10.0 Heartbeat timeout (seconds)

Usage

from qldata.resilience import HeartbeatMonitor

monitor = HeartbeatMonitor(interval=30.0, timeout=10.0)

def on_pong():
    monitor.record_heartbeat()

# Check health
if not monitor.is_alive():
    print(f"Connection dead. Last heartbeat: {monitor.get_last_heartbeat()}")
    reconnect()

MessageDeduplicator

Detects and filters duplicate messages.

MessageDeduplicator

MessageDeduplicator(buffer_size: int = 1000)

Deduplicates messages using sequence numbers.

Prevents processing duplicate messages during reconnections.

Parameters:

Name Type Description Default
buffer_size int

Number of recent sequence numbers to track

1000

is_duplicate

is_duplicate(sequence: int) -> bool

Check if sequence number has been seen.

Parameters:

Name Type Description Default
sequence int

Sequence number

required

Returns:

Type Description
bool

True if duplicate, False otherwise

record

record(sequence: int) -> bool

Record a sequence number.

Parameters:

Name Type Description Default
sequence int

Sequence number

required

Returns:

Type Description
bool

True if new (not duplicate), False if duplicate

Configuration

Parameter Type Default Description
window_size int 1000 Number of recent IDs to track
ttl_seconds float 60.0 Time-to-live for tracked IDs

Usage

from qldata.resilience import MessageDeduplicator

dedup = MessageDeduplicator(window_size=1000, ttl_seconds=60)

def handle_message(msg):
    msg_id = msg["id"]

    if dedup.is_duplicate(msg_id):
        return  # Skip duplicate

    dedup.record(msg_id)
    process_message(msg)

RateLimitManager

Manages API rate limits to prevent throttling.

Usage

from qldata.resilience import RateLimitManager

# Configure limits (e.g., Binance: 1200 requests/minute)
limiter = RateLimitManager(
    requests_per_minute=1200,
    burst_limit=100
)

async def make_request():
    await limiter.acquire()  # Wait if rate limited
    response = await api.call()
    return response

Exchange Defaults

Exchange Requests/Minute WebSocket Connections
Binance 1200 5 per IP
Bybit 120 20 per IP

SequenceTracker

Tracks message sequence numbers to detect gaps.

Usage

from qldata.resilience import SequenceTracker

tracker = SequenceTracker()

def handle_message(msg):
    seq = msg["sequence"]

    if tracker.check(seq):
        process_message(msg)
    else:
        gaps = tracker.get_gaps()
        logger.warning(f"Sequence gaps detected: {gaps}")
        request_replay(gaps)  # Request missing messages

TimeSyncManager

Synchronizes local clock with exchange server time.

Usage

from qldata.resilience import TimeSyncManager

sync = TimeSyncManager()

# Sync with exchange
await sync.sync_with_exchange("binance")

# Get offset
offset_ms = sync.get_offset()
print(f"Clock offset: {offset_ms}ms")

# Convert server time to local
local_time = sync.to_local_time(server_timestamp)

Integrated Resilience

Default Configuration

Streaming uses resilience by default:

import qldata as qd

# Resilience enabled by default
stream = qd.stream(["BTCUSDT"], source="binance") \
    .resolution("tick") \
    .on_data(handler) \
    .get(start=True)

# Includes:
# - Auto-reconnect with exponential backoff
# - Rate limit management
# - Sequence tracking
# - Heartbeat monitoring

Custom Configuration

from qldata.resilience import ResilienceConfig

config = ResilienceConfig(
    auto_reconnect=True,
    max_reconnect_attempts=10,
    reconnect_delay=1.0,
    reconnect_delay_max=60.0,
    heartbeat_interval=30.0,
    rate_limit_enabled=True,
    sequence_tracking=True,
    time_sync_enabled=True
)

stream = qd.stream(["BTCUSDT"], source="binance") \
    .resolution("tick") \
    .on_data(handler) \
    .resilience(config) \
    .get(start=True)

Disable Resilience

For testing or specific use cases:

stream = qd.stream(["BTCUSDT"], source="binance") \
    .resolution("tick") \
    .on_data(handler) \
    .get(start=True, resilience=False)

Connection State Management

Track connection state transitions:

from qldata.resilience import ConnectionState, ConnectionStateManager

state_mgr = ConnectionStateManager()

# Register callbacks
state_mgr.on_connected(lambda: print("Connected!"))
state_mgr.on_reconnecting(lambda: print("Reconnecting..."))
state_mgr.on_disconnected(lambda: print("Disconnected"))
state_mgr.on_failed(lambda: print("Connection failed"))

# Manual state transitions
state_mgr.transition(ConnectionState.CONNECTING)
state_mgr.transition(ConnectionState.CONNECTED)

# Check current state
print(f"State: {state_mgr.state.value}")
print(f"Time in state: {state_mgr.time_in_state:.1f}s")

Connection States

stateDiagram-v2
    [*] --> DISCONNECTED
    DISCONNECTED --> CONNECTING
    CONNECTING --> CONNECTED
    CONNECTING --> FAILED
    CONNECTED --> RECONNECTING
    RECONNECTING --> CONNECTED
    RECONNECTING --> FAILED
    FAILED --> CONNECTING
    CONNECTED --> DISCONNECTED
State Description
DISCONNECTED Not connected
CONNECTING Establishing connection
CONNECTED Successfully connected
RECONNECTING Attempting to reconnect
FAILED Connection permanently failed

Examples

Example 1: Custom Reconnection Logic

from qldata.resilience import ReconnectionManager
import asyncio

async def connect_with_retry():
    manager = ReconnectionManager(max_attempts=5)

    while manager.should_reconnect():
        try:
            ws = await websockets.connect("wss://...")
            manager.record_success()
            return ws
        except Exception as e:
            manager.record_failure()
            delay = manager.get_delay()
            print(f"Connection failed: {e}. Retry in {delay:.1f}s")
            await asyncio.sleep(delay)

    raise ConnectionError("Max reconnection attempts exceeded")

Example 2: Duplicate Detection

from qldata.resilience import MessageDeduplicator
import qldata as qd

dedup = MessageDeduplicator(window_size=5000)
processed = 0
duplicates = 0

def handle_data(df):
    global processed, duplicates

    for _, row in df.iterrows():
        trade_id = row["trade_id"]

        if dedup.is_duplicate(trade_id):
            duplicates += 1
            continue

        dedup.record(trade_id)
        processed += 1
        # Process unique trade...

stream = qd.stream(["BTCUSDT"], source="binance") \
    .resolution("tick") \
    .on_data(handle_data) \
    .get(start=True)

# Later: check stats
print(f"Processed: {processed}, Duplicates: {duplicates}")

Example 3: Sequence Gap Detection

from qldata.resilience import SequenceTracker
import logging

logger = logging.getLogger(__name__)
tracker = SequenceTracker()

def handle_message(msg):
    seq = msg.get("u")  # Update ID

    if not tracker.check(seq):
        gaps = tracker.get_gaps()
        logger.error(f"Sequence gap! Missing: {gaps}")

        # Request snapshot to recover
        request_order_book_snapshot()
        tracker.reset()
        return

    process_update(msg)

Example 4: Production Resilience Setup

import qldata as qd
from qldata.resilience import ConnectionStateManager, ResilienceConfig
from qldata.monitoring import DataQualityMonitor, AlertManager
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# State management
state = ConnectionStateManager()
state.on_connected(lambda: logger.info("Connected"))
state.on_reconnecting(lambda: logger.warning("Reconnecting..."))
state.on_failed(lambda: logger.error("Connection failed!"))

# Monitoring
monitor = DataQualityMonitor(stale_threshold_seconds=10)
alerts = AlertManager()
alerts.on_stale_data(lambda: logger.error("Data stale!"))

# Resilience config
config = ResilienceConfig(
    auto_reconnect=True,
    max_reconnect_attempts=20,
    reconnect_delay=1.0,
    reconnect_delay_max=120.0,
)

def handle_data(df):
    if df.empty:
        return
    monitor.record_message(df.index[-1])
    alerts.check_stale_data(monitor.is_stale())
    # Process data...

stream = qd.stream(["BTCUSDT"], source="binance") \
    .resolution("tick") \
    .on_data(handle_data) \
    .resilience(config) \
    .get(start=True)

Best Practices

1. Always Use Resilience in Production

# ✓ Production - resilience enabled (default)
stream = qd.stream(...).get(start=True)

# ✗ Testing only - resilience disabled
stream = qd.stream(...).get(start=True, resilience=False)

2. Monitor Connection Health

# Periodically log health
while True:
    if state.state == ConnectionState.CONNECTED:
        logger.info(f"Connected for {state.time_in_state:.0f}s")
    time.sleep(60)

3. Handle Sequence Gaps

# Don't just log - take action
if not tracker.check(seq):
    # Option 1: Request snapshot
    request_snapshot()

    # Option 2: Skip to latest
    tracker.reset()

4. Configure Appropriate Timeouts

# High-frequency data
config = ResilienceConfig(
    heartbeat_interval=10.0,  # Check every 10s
    reconnect_delay=0.5,      # Fast reconnect
)

# Lower-frequency data
config = ResilienceConfig(
    heartbeat_interval=60.0,  # Check every 60s
    reconnect_delay=5.0,      # Slower reconnect
)

See Also