Resilience API¶
Production-grade resilience features for WebSocket connections and data streaming.
Overview¶
qldata provides several resilience components:
- ReconnectionManager - Auto-reconnect with exponential backoff
- HeartbeatMonitor - Connection health monitoring
- MessageDeduplicator - Duplicate message detection
- RateLimitManager - API rate limit handling
- SequenceTracker - Missing message detection
- TimeSyncManager - Clock synchronization
ReconnectionManager¶
Manages automatic reconnection with exponential backoff.
ReconnectionManager ¶
ReconnectionManager(
max_retries: int = 10,
base_delay: float = 1.0,
max_delay: float = 60.0,
backoff_factor: float = 2.0,
)
Manages automatic reconnection with exponential backoff.
Handles reconnection attempts with configurable backoff and max retries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_retries | int | Maximum reconnection attempts (0 = infinite) | 10 |
base_delay | float | Initial delay in seconds | 1.0 |
max_delay | float | Maximum delay between retries | 60.0 |
backoff_factor | float | Exponential backoff multiplier | 2.0 |
Configuration¶
| Parameter | Type | Default | Description |
|---|---|---|---|
max_attempts | int | 10 | Maximum reconnection attempts |
initial_delay | float | 1.0 | Initial delay in seconds |
max_delay | float | 60.0 | Maximum delay in seconds |
backoff_factor | float | 2.0 | Exponential backoff multiplier |
Usage¶
from qldata.resilience import ReconnectionManager
manager = ReconnectionManager(
max_attempts=10,
initial_delay=1.0,
max_delay=60.0,
backoff_factor=2.0
)
while manager.should_reconnect():
try:
connect()
manager.record_success()
break
except ConnectionError:
manager.record_failure()
delay = manager.get_delay()
print(f"Reconnecting in {delay:.1f}s...")
time.sleep(delay)
HeartbeatMonitor¶
Monitors connection health via periodic heartbeat checks.
HeartbeatMonitor ¶
Monitors connection health via heartbeats.
Detects stale connections and triggers reconnection if needed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout_seconds | float | Seconds without heartbeat before timeout | 30.0 |
ping_interval | float | Seconds between ping messages | 10.0 |
Configuration¶
| Parameter | Type | Default | Description |
|---|---|---|---|
interval | float | 30.0 | Heartbeat check interval (seconds) |
timeout | float | 10.0 | Heartbeat timeout (seconds) |
Usage¶
from qldata.resilience import HeartbeatMonitor
monitor = HeartbeatMonitor(interval=30.0, timeout=10.0)
def on_pong():
monitor.record_heartbeat()
# Check health
if not monitor.is_alive():
print(f"Connection dead. Last heartbeat: {monitor.get_last_heartbeat()}")
reconnect()
MessageDeduplicator¶
Detects and filters duplicate messages.
MessageDeduplicator ¶
Deduplicates messages using sequence numbers.
Prevents processing duplicate messages during reconnections.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
buffer_size | int | Number of recent sequence numbers to track | 1000 |
is_duplicate ¶
Check if sequence number has been seen.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence | int | Sequence number | required |
Returns:
| Type | Description |
|---|---|
bool | True if duplicate, False otherwise |
record ¶
Record a sequence number.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence | int | Sequence number | required |
Returns:
| Type | Description |
|---|---|
bool | True if new (not duplicate), False if duplicate |
Configuration¶
| Parameter | Type | Default | Description |
|---|---|---|---|
window_size | int | 1000 | Number of recent IDs to track |
ttl_seconds | float | 60.0 | Time-to-live for tracked IDs |
Usage¶
from qldata.resilience import MessageDeduplicator
dedup = MessageDeduplicator(window_size=1000, ttl_seconds=60)
def handle_message(msg):
msg_id = msg["id"]
if dedup.is_duplicate(msg_id):
return # Skip duplicate
dedup.record(msg_id)
process_message(msg)
RateLimitManager¶
Manages API rate limits to prevent throttling.
Usage¶
from qldata.resilience import RateLimitManager
# Configure limits (e.g., Binance: 1200 requests/minute)
limiter = RateLimitManager(
requests_per_minute=1200,
burst_limit=100
)
async def make_request():
await limiter.acquire() # Wait if rate limited
response = await api.call()
return response
Exchange Defaults¶
| Exchange | Requests/Minute | WebSocket Connections |
|---|---|---|
| Binance | 1200 | 5 per IP |
| Bybit | 120 | 20 per IP |
SequenceTracker¶
Tracks message sequence numbers to detect gaps.
Usage¶
from qldata.resilience import SequenceTracker
tracker = SequenceTracker()
def handle_message(msg):
seq = msg["sequence"]
if tracker.check(seq):
process_message(msg)
else:
gaps = tracker.get_gaps()
logger.warning(f"Sequence gaps detected: {gaps}")
request_replay(gaps) # Request missing messages
TimeSyncManager¶
Synchronizes local clock with exchange server time.
Usage¶
from qldata.resilience import TimeSyncManager
sync = TimeSyncManager()
# Sync with exchange
await sync.sync_with_exchange("binance")
# Get offset
offset_ms = sync.get_offset()
print(f"Clock offset: {offset_ms}ms")
# Convert server time to local
local_time = sync.to_local_time(server_timestamp)
Integrated Resilience¶
Default Configuration¶
Streaming uses resilience by default:
import qldata as qd
# Resilience enabled by default
stream = qd.stream(["BTCUSDT"], source="binance") \
.resolution("tick") \
.on_data(handler) \
.get(start=True)
# Includes:
# - Auto-reconnect with exponential backoff
# - Rate limit management
# - Sequence tracking
# - Heartbeat monitoring
Custom Configuration¶
from qldata.resilience import ResilienceConfig
config = ResilienceConfig(
auto_reconnect=True,
max_reconnect_attempts=10,
reconnect_delay=1.0,
reconnect_delay_max=60.0,
heartbeat_interval=30.0,
rate_limit_enabled=True,
sequence_tracking=True,
time_sync_enabled=True
)
stream = qd.stream(["BTCUSDT"], source="binance") \
.resolution("tick") \
.on_data(handler) \
.resilience(config) \
.get(start=True)
Disable Resilience¶
For testing or specific use cases:
stream = qd.stream(["BTCUSDT"], source="binance") \
.resolution("tick") \
.on_data(handler) \
.get(start=True, resilience=False)
Connection State Management¶
Track connection state transitions:
from qldata.resilience import ConnectionState, ConnectionStateManager
state_mgr = ConnectionStateManager()
# Register callbacks
state_mgr.on_connected(lambda: print("Connected!"))
state_mgr.on_reconnecting(lambda: print("Reconnecting..."))
state_mgr.on_disconnected(lambda: print("Disconnected"))
state_mgr.on_failed(lambda: print("Connection failed"))
# Manual state transitions
state_mgr.transition(ConnectionState.CONNECTING)
state_mgr.transition(ConnectionState.CONNECTED)
# Check current state
print(f"State: {state_mgr.state.value}")
print(f"Time in state: {state_mgr.time_in_state:.1f}s")
Connection States¶
stateDiagram-v2
[*] --> DISCONNECTED
DISCONNECTED --> CONNECTING
CONNECTING --> CONNECTED
CONNECTING --> FAILED
CONNECTED --> RECONNECTING
RECONNECTING --> CONNECTED
RECONNECTING --> FAILED
FAILED --> CONNECTING
CONNECTED --> DISCONNECTED | State | Description |
|---|---|
DISCONNECTED | Not connected |
CONNECTING | Establishing connection |
CONNECTED | Successfully connected |
RECONNECTING | Attempting to reconnect |
FAILED | Connection permanently failed |
Examples¶
Example 1: Custom Reconnection Logic¶
from qldata.resilience import ReconnectionManager
import asyncio
async def connect_with_retry():
manager = ReconnectionManager(max_attempts=5)
while manager.should_reconnect():
try:
ws = await websockets.connect("wss://...")
manager.record_success()
return ws
except Exception as e:
manager.record_failure()
delay = manager.get_delay()
print(f"Connection failed: {e}. Retry in {delay:.1f}s")
await asyncio.sleep(delay)
raise ConnectionError("Max reconnection attempts exceeded")
Example 2: Duplicate Detection¶
from qldata.resilience import MessageDeduplicator
import qldata as qd
dedup = MessageDeduplicator(window_size=5000)
processed = 0
duplicates = 0
def handle_data(df):
global processed, duplicates
for _, row in df.iterrows():
trade_id = row["trade_id"]
if dedup.is_duplicate(trade_id):
duplicates += 1
continue
dedup.record(trade_id)
processed += 1
# Process unique trade...
stream = qd.stream(["BTCUSDT"], source="binance") \
.resolution("tick") \
.on_data(handle_data) \
.get(start=True)
# Later: check stats
print(f"Processed: {processed}, Duplicates: {duplicates}")
Example 3: Sequence Gap Detection¶
from qldata.resilience import SequenceTracker
import logging
logger = logging.getLogger(__name__)
tracker = SequenceTracker()
def handle_message(msg):
seq = msg.get("u") # Update ID
if not tracker.check(seq):
gaps = tracker.get_gaps()
logger.error(f"Sequence gap! Missing: {gaps}")
# Request snapshot to recover
request_order_book_snapshot()
tracker.reset()
return
process_update(msg)
Example 4: Production Resilience Setup¶
import qldata as qd
from qldata.resilience import ConnectionStateManager, ResilienceConfig
from qldata.monitoring import DataQualityMonitor, AlertManager
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# State management
state = ConnectionStateManager()
state.on_connected(lambda: logger.info("Connected"))
state.on_reconnecting(lambda: logger.warning("Reconnecting..."))
state.on_failed(lambda: logger.error("Connection failed!"))
# Monitoring
monitor = DataQualityMonitor(stale_threshold_seconds=10)
alerts = AlertManager()
alerts.on_stale_data(lambda: logger.error("Data stale!"))
# Resilience config
config = ResilienceConfig(
auto_reconnect=True,
max_reconnect_attempts=20,
reconnect_delay=1.0,
reconnect_delay_max=120.0,
)
def handle_data(df):
if df.empty:
return
monitor.record_message(df.index[-1])
alerts.check_stale_data(monitor.is_stale())
# Process data...
stream = qd.stream(["BTCUSDT"], source="binance") \
.resolution("tick") \
.on_data(handle_data) \
.resilience(config) \
.get(start=True)
Best Practices¶
1. Always Use Resilience in Production¶
# ✓ Production - resilience enabled (default)
stream = qd.stream(...).get(start=True)
# ✗ Testing only - resilience disabled
stream = qd.stream(...).get(start=True, resilience=False)
2. Monitor Connection Health¶
# Periodically log health
while True:
if state.state == ConnectionState.CONNECTED:
logger.info(f"Connected for {state.time_in_state:.0f}s")
time.sleep(60)
3. Handle Sequence Gaps¶
# Don't just log - take action
if not tracker.check(seq):
# Option 1: Request snapshot
request_snapshot()
# Option 2: Skip to latest
tracker.reset()
4. Configure Appropriate Timeouts¶
# High-frequency data
config = ResilienceConfig(
heartbeat_interval=10.0, # Check every 10s
reconnect_delay=0.5, # Fast reconnect
)
# Lower-frequency data
config = ResilienceConfig(
heartbeat_interval=60.0, # Check every 60s
reconnect_delay=5.0, # Slower reconnect
)
See Also¶
- Streaming API - Live data streaming
- Monitoring - Data quality monitoring
- Configuration - Global settings