Monitoring & Alerts API¶
Production-grade monitoring for data quality, latency, and connection health.
Overview¶
qldata provides two main monitoring components:
- DataQualityMonitor - Track latency, throughput, and staleness
- AlertManager - Callback-based alerting system
from qldata.monitoring import DataQualityMonitor, AlertManager
# Setup monitoring
monitor = DataQualityMonitor(stale_threshold_seconds=10)
alerts = AlertManager()
# Configure alerts
alerts.on_stale_data(lambda: print("Data is stale!"))
alerts.on_high_latency(lambda ms: print(f"High latency: {ms}ms"))
DataQualityMonitor¶
Real-time monitoring of data quality metrics.
Basic Usage¶
from qldata.monitoring import DataQualityMonitor
from datetime import datetime, timezone
# Create monitor
monitor = DataQualityMonitor(stale_threshold_seconds=10)
# Record incoming messages
def on_data(df):
if not df.empty:
monitor.record_message(df.index[-1])
# Process data...
# Get current metrics
metrics = monitor.get_metrics()
print(f"Throughput: {metrics['throughput']:.1f}/s")
print(f"P95 Latency: {metrics['latency_p95']:.1f}ms")
print(f"Is Stale: {metrics['is_stale']}")
DataQualityMonitor ¶
Monitor streaming data quality in real-time.
Tracks latency, throughput, and staleness of streaming data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stale_threshold_seconds | int | Seconds without data before considered stale | 30 |
record_message ¶
Record receipt of a message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_timestamp | datetime | None | Timestamp from the message (for latency calc) | None |
get_metrics ¶
Get current quality metrics.
Returns:
| Type | Description |
|---|---|
dict | Dictionary with quality metrics |
is_stale ¶
Check if data is stale (no recent updates).
Returns:
| Type | Description |
|---|---|
bool | True if no messages received within threshold |
get_health_status ¶
Get overall health status.
Returns:
| Type | Description |
|---|---|
str | "healthy", "degraded", or "unhealthy" |
Metrics Reference¶
The get_metrics() method returns:
| Metric | Type | Description |
|---|---|---|
total_messages | int | Total messages recorded |
throughput | float | Messages per second |
latency_p50 | float | 50th percentile latency (ms) |
latency_p95 | float | 95th percentile latency (ms) |
latency_p99 | float | 99th percentile latency (ms) |
is_stale | bool | True if no recent messages |
seconds_since_last_message | float | Time since last message |
window_start | datetime | Start of measurement window |
Health Status¶
| Status | Condition |
|---|---|
"healthy" | Recent data, low latency |
"degraded" | High latency or low throughput |
"unhealthy" | Stale data or no connection |
AlertManager¶
Callback-based alert system for production monitoring.
Basic Usage¶
from qldata.monitoring import AlertManager
alerts = AlertManager()
# Register callbacks
alerts.on_high_latency(lambda ms: print(f"High latency: {ms}ms"))
alerts.on_stale_data(lambda: print("Data is stale!"))
alerts.on_low_throughput(lambda tps: print(f"Low throughput: {tps}/s"))
alerts.on_connection_lost(lambda: print("Connection lost"))
alerts.on_reconnected(lambda n: print(f"Reconnected after {n} attempts"))
AlertManager ¶
Manage alerts for data quality issues.
Provides callbacks for various alert conditions.
on_high_latency ¶
Register callback for high latency alerts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback | Callable[[float], None] | Function(latency_ms) to call when latency is high | required |
on_stale_data ¶
Register callback for stale data alerts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback | Callable[[], None] | Function to call when data becomes stale | required |
on_low_throughput ¶
Register callback for low throughput alerts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback | Callable[[float], None] | Function(throughput) to call when throughput is low | required |
on_connection_lost ¶
Register callback for connection loss.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback | Callable[[], None] | Function to call when connection is lost | required |
on_reconnected ¶
Register callback for successful reconnection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback | Callable[[int], None] | Function(attempts) to call when reconnected | required |
check_latency ¶
check_latency(
latency_ms: float,
warning_threshold: float = 500,
error_threshold: float = 1000,
) -> Alert | None
Check latency and trigger alerts if needed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
latency_ms | float | Current latency in milliseconds | required |
warning_threshold | float | Warning threshold in ms | 500 |
error_threshold | float | Error threshold in ms | 1000 |
Returns:
| Type | Description |
|---|---|
Alert | None | Alert if threshold exceeded, None otherwise |
check_stale_data ¶
Check if data is stale and trigger alerts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
is_stale | bool | Whether data is currently stale | required |
Returns:
| Type | Description |
|---|---|
Alert | None | Alert if stale, None otherwise |
notify_reconnected ¶
Notify successful reconnection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
attempts | int | Number of attempts it took to reconnect | required |
get_recent_alerts ¶
Get recent alerts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit | int | Maximum number of alerts to return | 10 |
Returns:
| Type | Description |
|---|---|
list[Alert] | List of recent alerts |
Alert Callbacks¶
| Method | Callback Signature | Description |
|---|---|---|
on_high_latency | (latency_ms: float) -> None | Latency exceeds threshold |
on_stale_data | () -> None | No data received recently |
on_low_throughput | (tps: float) -> None | Throughput below threshold |
on_connection_lost | () -> None | Connection dropped |
on_reconnected | (attempts: int) -> None | Successfully reconnected |
Checking Thresholds¶
from qldata.monitoring import AlertManager
alerts = AlertManager()
# Check latency with thresholds
alert = alerts.check_latency(
latency_ms=1200,
warning_threshold=500, # Warning above 500ms
error_threshold=1000 # Error above 1000ms
)
if alert:
print(f"Alert: {alert.severity} - {alert.message}")
Alert History¶
# Get recent alerts
recent = alerts.get_recent_alerts(limit=10)
for alert in recent:
print(f"[{alert.severity}] {alert.timestamp}: {alert.message}")
Integration Examples¶
Example 1: Monitor with Streaming¶
import qldata as qd
from qldata.monitoring import DataQualityMonitor, AlertManager
import time
# Setup
monitor = DataQualityMonitor(stale_threshold_seconds=5)
alerts = AlertManager()
def on_high_latency(ms):
print(f"⚠️ High latency: {ms:.1f}ms")
def on_stale():
print("🚨 Data is STALE!")
alerts.on_high_latency(on_high_latency)
alerts.on_stale_data(on_stale)
def handle_data(df):
if df.empty:
return
# Record message
monitor.record_message(df.index[-1])
# Check thresholds
metrics = monitor.get_metrics()
if metrics["latency_p95"]:
alerts.check_latency(metrics["latency_p95"], warning_threshold=200)
alerts.check_stale_data(monitor.is_stale())
# Start stream
stream = qd.stream(["BTCUSDT"], source="binance") \
.resolution("tick") \
.on_data(handle_data) \
.get(start=True)
# Monitor health
while True:
metrics = monitor.get_metrics()
print(f"Health: {monitor.get_health_status()} | TPS: {metrics['throughput']:.1f}")
time.sleep(10)
Example 2: PagerDuty Integration¶
from qldata.monitoring import AlertManager
import requests
PAGERDUTY_KEY = "your-integration-key"
def send_pagerduty(severity: str, message: str):
"""Send alert to PagerDuty."""
requests.post(
"https://events.pagerduty.com/v2/enqueue",
json={
"routing_key": PAGERDUTY_KEY,
"event_action": "trigger",
"payload": {
"summary": message,
"severity": severity,
"source": "qldata-monitor"
}
}
)
alerts = AlertManager()
alerts.on_stale_data(lambda: send_pagerduty("critical", "Market data is stale"))
alerts.on_high_latency(lambda ms: send_pagerduty("warning", f"Latency: {ms}ms"))
Example 3: Slack Integration¶
from qldata.monitoring import AlertManager
import requests
SLACK_WEBHOOK = "https://hooks.slack.com/services/..."
def send_slack(message: str, emoji: str = "⚠️"):
"""Send message to Slack."""
requests.post(
SLACK_WEBHOOK,
json={"text": f"{emoji} {message}"}
)
alerts = AlertManager()
alerts.on_stale_data(lambda: send_slack("Data feed is stale!", "🚨"))
alerts.on_connection_lost(lambda: send_slack("Connection lost", "❌"))
alerts.on_reconnected(lambda n: send_slack(f"Reconnected after {n} attempts", "✅"))
Example 4: Prometheus Metrics¶
from qldata.monitoring import DataQualityMonitor
from prometheus_client import Gauge, start_http_server
# Prometheus metrics
latency_p95 = Gauge("qldata_latency_p95_ms", "P95 latency in milliseconds")
throughput = Gauge("qldata_throughput_tps", "Throughput in messages/second")
is_stale = Gauge("qldata_is_stale", "1 if data is stale, 0 otherwise")
monitor = DataQualityMonitor()
def update_prometheus():
"""Update Prometheus metrics."""
metrics = monitor.get_metrics()
latency_p95.set(metrics["latency_p95"] or 0)
throughput.set(metrics["throughput"])
is_stale.set(1 if metrics["is_stale"] else 0)
# Start Prometheus server
start_http_server(8000)
# Update in background
import threading
import time
def metrics_loop():
while True:
update_prometheus()
time.sleep(5)
threading.Thread(target=metrics_loop, daemon=True).start()
Example 5: Logging Integration¶
import logging
from qldata.monitoring import DataQualityMonitor, AlertManager
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger("qldata.monitor")
alerts = AlertManager()
alerts.on_high_latency(lambda ms: logger.warning(f"High latency: {ms:.1f}ms"))
alerts.on_stale_data(lambda: logger.error("Data is stale!"))
alerts.on_connection_lost(lambda: logger.error("Connection lost"))
alerts.on_reconnected(lambda n: logger.info(f"Reconnected after {n} attempts"))
# Periodic health logging
monitor = DataQualityMonitor()
def log_health():
metrics = monitor.get_metrics()
logger.info(
f"Health: {monitor.get_health_status()} | "
f"TPS: {metrics['throughput']:.1f} | "
f"P95: {metrics['latency_p95']:.1f}ms"
)
Best Practices¶
1. Set Appropriate Thresholds¶
# Crypto markets: tighter thresholds
monitor = DataQualityMonitor(stale_threshold_seconds=5)
alerts.check_latency(ms, warning_threshold=100, error_threshold=500)
# Lower-frequency data: relaxed thresholds
monitor = DataQualityMonitor(stale_threshold_seconds=60)
alerts.check_latency(ms, warning_threshold=1000, error_threshold=5000)
2. Avoid Alert Fatigue¶
# Debounce alerts
from datetime import datetime, timedelta
last_alert = {}
def debounced_alert(alert_type: str, callback):
now = datetime.now()
if alert_type not in last_alert or now - last_alert[alert_type] > timedelta(minutes=5):
last_alert[alert_type] = now
callback()
3. Include Context in Alerts¶
def on_stale(symbol: str, last_time: datetime):
send_alert(f"Symbol {symbol} stale since {last_time}")
4. Monitor Multiple Data Feeds¶
monitors = {
"BTCUSDT": DataQualityMonitor(),
"ETHUSDT": DataQualityMonitor(),
}
def handle_data(df):
symbol = df["symbol"].iloc[0]
monitors[symbol].record_message(df.index[-1])
See Also¶
- Streaming API - Integrate with streams
- Resilience - Connection management
- Cookbook - Production examples