Skip to content

Monitoring & Alerts API

Production-grade monitoring for data quality, latency, and connection health.

Overview

qldata provides two main monitoring components:

  • DataQualityMonitor - Track latency, throughput, and staleness
  • AlertManager - Callback-based alerting system
from qldata.monitoring import DataQualityMonitor, AlertManager

# Setup monitoring
monitor = DataQualityMonitor(stale_threshold_seconds=10)
alerts = AlertManager()

# Configure alerts
alerts.on_stale_data(lambda: print("Data is stale!"))
alerts.on_high_latency(lambda ms: print(f"High latency: {ms}ms"))

DataQualityMonitor

Real-time monitoring of data quality metrics.

Basic Usage

from qldata.monitoring import DataQualityMonitor
from datetime import datetime, timezone

# Create monitor
monitor = DataQualityMonitor(stale_threshold_seconds=10)

# Record incoming messages
def on_data(df):
    if not df.empty:
        monitor.record_message(df.index[-1])
        # Process data...

# Get current metrics
metrics = monitor.get_metrics()
print(f"Throughput: {metrics['throughput']:.1f}/s")
print(f"P95 Latency: {metrics['latency_p95']:.1f}ms")
print(f"Is Stale: {metrics['is_stale']}")

DataQualityMonitor

DataQualityMonitor(stale_threshold_seconds: int = 30)

Monitor streaming data quality in real-time.

Tracks latency, throughput, and staleness of streaming data.

Parameters:

Name Type Description Default
stale_threshold_seconds int

Seconds without data before considered stale

30

record_message

record_message(
    message_timestamp: datetime | None = None,
) -> None

Record receipt of a message.

Parameters:

Name Type Description Default
message_timestamp datetime | None

Timestamp from the message (for latency calc)

None

get_metrics

get_metrics() -> dict

Get current quality metrics.

Returns:

Type Description
dict

Dictionary with quality metrics

is_stale

is_stale() -> bool

Check if data is stale (no recent updates).

Returns:

Type Description
bool

True if no messages received within threshold

get_health_status

get_health_status() -> str

Get overall health status.

Returns:

Type Description
str

"healthy", "degraded", or "unhealthy"

reset

reset() -> None

Reset all metrics.


Metrics Reference

The get_metrics() method returns:

Metric Type Description
total_messages int Total messages recorded
throughput float Messages per second
latency_p50 float 50th percentile latency (ms)
latency_p95 float 95th percentile latency (ms)
latency_p99 float 99th percentile latency (ms)
is_stale bool True if no recent messages
seconds_since_last_message float Time since last message
window_start datetime Start of measurement window

Health Status

status = monitor.get_health_status()
# Returns: "healthy", "degraded", or "unhealthy"
Status Condition
"healthy" Recent data, low latency
"degraded" High latency or low throughput
"unhealthy" Stale data or no connection

AlertManager

Callback-based alert system for production monitoring.

Basic Usage

from qldata.monitoring import AlertManager

alerts = AlertManager()

# Register callbacks
alerts.on_high_latency(lambda ms: print(f"High latency: {ms}ms"))
alerts.on_stale_data(lambda: print("Data is stale!"))
alerts.on_low_throughput(lambda tps: print(f"Low throughput: {tps}/s"))
alerts.on_connection_lost(lambda: print("Connection lost"))
alerts.on_reconnected(lambda n: print(f"Reconnected after {n} attempts"))

AlertManager

AlertManager()

Manage alerts for data quality issues.

Provides callbacks for various alert conditions.

on_high_latency

on_high_latency(callback: Callable[[float], None]) -> None

Register callback for high latency alerts.

Parameters:

Name Type Description Default
callback Callable[[float], None]

Function(latency_ms) to call when latency is high

required

on_stale_data

on_stale_data(callback: Callable[[], None]) -> None

Register callback for stale data alerts.

Parameters:

Name Type Description Default
callback Callable[[], None]

Function to call when data becomes stale

required

on_low_throughput

on_low_throughput(
    callback: Callable[[float], None]
) -> None

Register callback for low throughput alerts.

Parameters:

Name Type Description Default
callback Callable[[float], None]

Function(throughput) to call when throughput is low

required

on_connection_lost

on_connection_lost(callback: Callable[[], None]) -> None

Register callback for connection loss.

Parameters:

Name Type Description Default
callback Callable[[], None]

Function to call when connection is lost

required

on_reconnected

on_reconnected(callback: Callable[[int], None]) -> None

Register callback for successful reconnection.

Parameters:

Name Type Description Default
callback Callable[[int], None]

Function(attempts) to call when reconnected

required

check_latency

check_latency(
    latency_ms: float,
    warning_threshold: float = 500,
    error_threshold: float = 1000,
) -> Alert | None

Check latency and trigger alerts if needed.

Parameters:

Name Type Description Default
latency_ms float

Current latency in milliseconds

required
warning_threshold float

Warning threshold in ms

500
error_threshold float

Error threshold in ms

1000

Returns:

Type Description
Alert | None

Alert if threshold exceeded, None otherwise

check_stale_data

check_stale_data(is_stale: bool) -> Alert | None

Check if data is stale and trigger alerts.

Parameters:

Name Type Description Default
is_stale bool

Whether data is currently stale

required

Returns:

Type Description
Alert | None

Alert if stale, None otherwise

notify_connection_lost

notify_connection_lost() -> None

Notify that connection was lost.

notify_reconnected

notify_reconnected(attempts: int) -> None

Notify successful reconnection.

Parameters:

Name Type Description Default
attempts int

Number of attempts it took to reconnect

required

get_recent_alerts

get_recent_alerts(limit: int = 10) -> list[Alert]

Get recent alerts.

Parameters:

Name Type Description Default
limit int

Maximum number of alerts to return

10

Returns:

Type Description
list[Alert]

List of recent alerts


Alert Callbacks

Method Callback Signature Description
on_high_latency (latency_ms: float) -> None Latency exceeds threshold
on_stale_data () -> None No data received recently
on_low_throughput (tps: float) -> None Throughput below threshold
on_connection_lost () -> None Connection dropped
on_reconnected (attempts: int) -> None Successfully reconnected

Checking Thresholds

from qldata.monitoring import AlertManager

alerts = AlertManager()

# Check latency with thresholds
alert = alerts.check_latency(
    latency_ms=1200,
    warning_threshold=500,   # Warning above 500ms
    error_threshold=1000     # Error above 1000ms
)

if alert:
    print(f"Alert: {alert.severity} - {alert.message}")

Alert History

# Get recent alerts
recent = alerts.get_recent_alerts(limit=10)

for alert in recent:
    print(f"[{alert.severity}] {alert.timestamp}: {alert.message}")

Integration Examples

Example 1: Monitor with Streaming

import qldata as qd
from qldata.monitoring import DataQualityMonitor, AlertManager
import time

# Setup
monitor = DataQualityMonitor(stale_threshold_seconds=5)
alerts = AlertManager()

def on_high_latency(ms):
    print(f"⚠️ High latency: {ms:.1f}ms")

def on_stale():
    print("🚨 Data is STALE!")

alerts.on_high_latency(on_high_latency)
alerts.on_stale_data(on_stale)

def handle_data(df):
    if df.empty:
        return

    # Record message
    monitor.record_message(df.index[-1])

    # Check thresholds
    metrics = monitor.get_metrics()
    if metrics["latency_p95"]:
        alerts.check_latency(metrics["latency_p95"], warning_threshold=200)
    alerts.check_stale_data(monitor.is_stale())

# Start stream
stream = qd.stream(["BTCUSDT"], source="binance") \
    .resolution("tick") \
    .on_data(handle_data) \
    .get(start=True)

# Monitor health
while True:
    metrics = monitor.get_metrics()
    print(f"Health: {monitor.get_health_status()} | TPS: {metrics['throughput']:.1f}")
    time.sleep(10)

Example 2: PagerDuty Integration

from qldata.monitoring import AlertManager
import requests

PAGERDUTY_KEY = "your-integration-key"

def send_pagerduty(severity: str, message: str):
    """Send alert to PagerDuty."""
    requests.post(
        "https://events.pagerduty.com/v2/enqueue",
        json={
            "routing_key": PAGERDUTY_KEY,
            "event_action": "trigger",
            "payload": {
                "summary": message,
                "severity": severity,
                "source": "qldata-monitor"
            }
        }
    )

alerts = AlertManager()
alerts.on_stale_data(lambda: send_pagerduty("critical", "Market data is stale"))
alerts.on_high_latency(lambda ms: send_pagerduty("warning", f"Latency: {ms}ms"))

Example 3: Slack Integration

from qldata.monitoring import AlertManager
import requests

SLACK_WEBHOOK = "https://hooks.slack.com/services/..."

def send_slack(message: str, emoji: str = "⚠️"):
    """Send message to Slack."""
    requests.post(
        SLACK_WEBHOOK,
        json={"text": f"{emoji} {message}"}
    )

alerts = AlertManager()
alerts.on_stale_data(lambda: send_slack("Data feed is stale!", "🚨"))
alerts.on_connection_lost(lambda: send_slack("Connection lost", "❌"))
alerts.on_reconnected(lambda n: send_slack(f"Reconnected after {n} attempts", "✅"))

Example 4: Prometheus Metrics

from qldata.monitoring import DataQualityMonitor
from prometheus_client import Gauge, start_http_server

# Prometheus metrics
latency_p95 = Gauge("qldata_latency_p95_ms", "P95 latency in milliseconds")
throughput = Gauge("qldata_throughput_tps", "Throughput in messages/second")
is_stale = Gauge("qldata_is_stale", "1 if data is stale, 0 otherwise")

monitor = DataQualityMonitor()

def update_prometheus():
    """Update Prometheus metrics."""
    metrics = monitor.get_metrics()
    latency_p95.set(metrics["latency_p95"] or 0)
    throughput.set(metrics["throughput"])
    is_stale.set(1 if metrics["is_stale"] else 0)

# Start Prometheus server
start_http_server(8000)

# Update in background
import threading
import time

def metrics_loop():
    while True:
        update_prometheus()
        time.sleep(5)

threading.Thread(target=metrics_loop, daemon=True).start()

Example 5: Logging Integration

import logging
from qldata.monitoring import DataQualityMonitor, AlertManager

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger("qldata.monitor")

alerts = AlertManager()

alerts.on_high_latency(lambda ms: logger.warning(f"High latency: {ms:.1f}ms"))
alerts.on_stale_data(lambda: logger.error("Data is stale!"))
alerts.on_connection_lost(lambda: logger.error("Connection lost"))
alerts.on_reconnected(lambda n: logger.info(f"Reconnected after {n} attempts"))

# Periodic health logging
monitor = DataQualityMonitor()

def log_health():
    metrics = monitor.get_metrics()
    logger.info(
        f"Health: {monitor.get_health_status()} | "
        f"TPS: {metrics['throughput']:.1f} | "
        f"P95: {metrics['latency_p95']:.1f}ms"
    )

Best Practices

1. Set Appropriate Thresholds

# Crypto markets: tighter thresholds
monitor = DataQualityMonitor(stale_threshold_seconds=5)
alerts.check_latency(ms, warning_threshold=100, error_threshold=500)

# Lower-frequency data: relaxed thresholds
monitor = DataQualityMonitor(stale_threshold_seconds=60)
alerts.check_latency(ms, warning_threshold=1000, error_threshold=5000)

2. Avoid Alert Fatigue

# Debounce alerts
from datetime import datetime, timedelta

last_alert = {}

def debounced_alert(alert_type: str, callback):
    now = datetime.now()
    if alert_type not in last_alert or now - last_alert[alert_type] > timedelta(minutes=5):
        last_alert[alert_type] = now
        callback()

3. Include Context in Alerts

def on_stale(symbol: str, last_time: datetime):
    send_alert(f"Symbol {symbol} stale since {last_time}")

4. Monitor Multiple Data Feeds

monitors = {
    "BTCUSDT": DataQualityMonitor(),
    "ETHUSDT": DataQualityMonitor(),
}

def handle_data(df):
    symbol = df["symbol"].iloc[0]
    monitors[symbol].record_message(df.index[-1])

See Also