Most operational dashboards in middle-market companies are not real-time. They are scheduled exports — nightly SQL queries, morning email reports, or weekly spreadsheet refreshes — dressed up with a modern UI. The data on screen is hours or days old before anyone reads it. For KPIs that drive same-day operational decisions, that lag is consequential.
The standard solution — Kafka plus Spark Streaming plus a time-series database — is powerful but carries significant operational overhead. For companies that do not need sub-second latency or multi-terabyte event volumes, there is a simpler path: watermark-based incremental queries against an existing transactional database, paired with a stateful in-process compute layer that maintains running KPI values between polling cycles.
| Approach | Latency | Infrastructure | Best For |
|---|---|---|---|
| Scheduled export | Hours–days | Cron + SQL | Weekly reporting, board summaries |
| Watermark polling | 30 sec – 5 min | Existing DB + Python | Operational dashboards, same-day alerts |
| Streaming (Kafka/Spark) | Milliseconds | Kafka + Spark + TSDB | Financial trading, fraud detection, IoT |
A watermark is a timestamp that marks the last successfully processed record. On each polling cycle, the data layer queries only records created after the watermark, processes them, and advances the watermark to the end of the batch. This pattern is incremental, idempotent-friendly, and imposes minimal load on the source database — a full table scan runs once, then every subsequent query touches only new data.
import psycopg2
from datetime import datetime
from typing import List, Dict
class WatermarkDataLayer:
def __init__(self, conn_string: str, batch_limit: int = 500):
self.db = psycopg2.connect(conn_string)
self.watermark = datetime(2000, 1, 1) # initial watermark
self.batch_limit = batch_limit
def fetch_batch(self) -> List[Dict]:
with self.db.cursor() as cur:
cur.execute(
"""SELECT transaction_id, created_at, amount,
transaction_type, user_id
FROM transactions
WHERE created_at > %(watermark)s
ORDER BY created_at
LIMIT %(batch_limit)s""",
{'watermark': self.watermark, 'batch_limit': self.batch_limit}
)
rows = cur.fetchall()
if rows:
# Advance watermark to the latest record in this batch
self.watermark = rows[-1][1]
return [
{'transaction_id': r[0], 'created_at': r[1],
'amount': r[2], 'type': r[3], 'user_id': r[4]}
for r in rows
]
The compute layer maintains running KPI values in memory across polling cycles. Rather than recalculating metrics from scratch on every batch, it applies each new batch as a delta to the existing state. This makes the pattern highly efficient: a business processing 10,000 transactions per day only needs to compute a small fraction of that volume on any given poll cycle.
from collections import defaultdict
from typing import Optional
class KPIComputeLayer:
def __init__(self):
self.state = {
'total_revenue': 0.0,
'transaction_count': 0,
'unique_users': set(),
'revenue_by_type': defaultdict(float),
}
def apply_batch(self, records: List[Dict]):
for rec in records:
amount = float(rec.get('amount', 0))
self.state['total_revenue'] += amount
self.state['transaction_count'] += 1
self.state['unique_users'].add(rec['user_id'])
self.state['revenue_by_type'][rec['type']] += amount
def snapshot(self) -> Dict:
s = self.state
return {
'total_revenue': round(s['total_revenue'], 2),
'transaction_count': s['transaction_count'],
'unique_users': len(s['unique_users']),
'revenue_by_type': dict(s['revenue_by_type']),
'avg_order_value': round(
s['total_revenue'] / s['transaction_count'], 2
) if s['transaction_count'] > 0 else 0.0,
}
def _check_thresholds(self, snapshot: Dict, thresholds: Dict) -> List[str]:
alerts = []
if snapshot['avg_order_value'] < thresholds.get('min_aov', 0):
alerts.append(f"AOV below threshold: {snapshot['avg_order_value']}")
if snapshot['total_revenue'] > thresholds.get('revenue_alert', float('inf')):
alerts.append(f"Revenue milestone reached: {snapshot['total_revenue']}")
return alerts
A KPI dashboard that requires a human to notice a problem has failed at its primary job. Threshold monitoring closes that loop: after each batch, the compute layer compares the current snapshot against defined thresholds and emits alerts when a KPI crosses a boundary. This can drive Slack notifications, PagerDuty pages, or email alerts to an operations manager without any additional infrastructure.
The alert logic belongs in the compute layer, not in the dashboard front end. A dashboard can be closed. A compute layer runs continuously and fires alerts regardless of who is watching the screen.
The polling loop ties the two layers together. Every 60 seconds (or whatever interval the use case demands), it fetches a new batch from the data layer, applies it to the compute layer, and publishes the snapshot to whatever surface the dashboard reads from — a Redis key, a WebSocket endpoint, or a simple REST API serving the last computed state.
The key design principle is separation of concerns. The data layer handles only extraction and watermark management. The compute layer handles only KPI math and alerting. The dashboard layer handles only rendering. This separation makes each component testable in isolation and replaceable without touching the others — which matters when the underlying database schema changes or the dashboard framework is swapped out.
We build real-time operational data layers for middle-market companies — watermark pipelines, stateful KPI compute, and live dashboards that your team will actually use.
Talk to Our Team