Home / Intelligence Log / Data Engineering Data Engineering

Building High-Frequency Algorithmic Trading Data Pipelines in Python

Four-Layer Pipeline Architecture

A production trading data pipeline is not a single process — it is a layered system where each layer has a defined responsibility and a clean handoff to the next. Collapsing these layers into a monolith makes the system brittle: a normalization bug corrupts the signal processor, a persistence failure blocks ingestion, and debugging requires understanding the entire system simultaneously.

The four-layer model separates concerns cleanly: ingestion handles WebSocket connection management and raw tick buffering; normalization cleans and standardizes the raw data; signal processing applies the trading logic; persistence writes the output. Each layer can be tested, monitored, and replaced independently.

Layer Responsibility Key Design Concern
Ingestion WebSocket connection, raw tick buffering Reconnection resilience, backpressure management
Normalization Schema validation, unit conversion, deduplication Malformed tick handling without pipeline stall
Signal Processing Anomaly detection, feature computation, alerts Rolling state, low-latency execution
Persistence Write to time-series DB, async batching Write durability, batch sizing, backpressure

WebSocket Ingestion with asyncio

The ingestion layer maintains a persistent WebSocket connection to the data feed, buffers incoming ticks in a fixed-size ring buffer, and reconnects automatically on connection failure using exponential backoff. The ring buffer — a collections.deque with maxlen=10_000 — acts as a backpressure mechanism: when the downstream processing layers fall behind, the oldest ticks are silently dropped rather than allowing the buffer to grow unboundedly and exhaust memory.

Python ingestion.py
import asyncio, json, websockets
from collections import deque
from typing import Dict

class TradingDataIngestion:
    def __init__(self, feed_url: str, symbols: list):
        self.feed_url   = feed_url
        self.symbols    = symbols
        self.tick_buffer: Dict[str, deque] = {
            sym: deque(maxlen=10_000) for sym in symbols
        }
        self._backoff   = 1
        self._max_backoff = 60

    async def connect(self):
        while True:
            try:
                async with websockets.connect(self.feed_url) as ws:
                    await ws.send(json.dumps({
                        'action': 'subscribe', 'symbols': self.symbols
                    }))
                    self._backoff = 1  # reset on successful connection
                    async for message in ws:
                        tick = json.loads(message)
                        sym  = tick.get('symbol')
                        if sym and sym in self.tick_buffer:
                            self.tick_buffer[sym].append(tick)
            except (websockets.ConnectionClosed, OSError) as e:
                await asyncio.sleep(self._backoff)
                self._backoff = min(self._backoff * 2, self._max_backoff)

    def get_recent_ticks(self, symbol: str, n: int = 100) -> list:
        buf = self.tick_buffer.get(symbol, deque())
        return list(buf)[-n:]

Anomaly Detection and Tick Validation

Raw market data contains bad ticks: erroneous prints from exchange matching engine glitches, crossed markets where the bid exceeds the ask, and price spikes that are three standard deviations outside the recent distribution. Passing these ticks downstream corrupts signal calculations and can trigger false trading signals. The anomaly detector catches them before they reach the signal processor.

Rolling Z-score anomaly detection compares each incoming price to the mean and standard deviation of the recent window. A Z-score above WARN_THRESHOLD (2.5) flags the tick for logging; above CRITICAL_THRESHOLD (4.0) the tick is rejected entirely. Separately, validate_tick() catches crossed markets — bid greater than or equal to ask — which are always erroneous regardless of price level.

Python anomaly_detector.py
import numpy as np
from collections import deque
from typing import Dict, Optional

class AnomalyDetector:
    WARN_THRESHOLD     = 2.5
    CRITICAL_THRESHOLD = 4.0

    def __init__(self, window: int = 200):
        self.window  = window
        self.history: Dict[str, deque] = {}

    def validate_tick(self, tick: Dict) -> Optional[str]:
        # Crossed market check
        bid, ask = tick.get('bid'), tick.get('ask')
        if bid is not None and ask is not None and bid >= ask:
            return f"CROSSED_MARKET: bid={bid} >= ask={ask}"
        return None

    def check_price(self, symbol: str, price: float) -> str:
        if symbol not in self.history:
            self.history[symbol] = deque(maxlen=self.window)

        hist = self.history[symbol]
        hist.append(price)

        if len(hist) < 30:
            return "OK"  # insufficient history for Z-score

        arr     = np.array(hist)
        z_score = abs((price - arr.mean()) / (arr.std() + 1e-8))

        if z_score >= self.CRITICAL_THRESHOLD:
            return f"CRITICAL: z={z_score:.2f} — tick rejected"
        if z_score >= self.WARN_THRESHOLD:
            return f"WARN: z={z_score:.2f} — review recommended"
        return "OK"

The Circuit Breaker Pattern

A circuit breaker prevents a failing downstream system from dragging down the entire pipeline. When the persistence layer starts returning errors — a database running out of connections, a network partition, a write timeout — the circuit breaker trips to the Open state and short-circuits all downstream calls for a cooldown period. After the cooldown, it enters Half-Open, allowing one test call through. A successful test call resets to Closed; another failure resets the cooldown.

Python circuit_breaker.py
import time
from enum import Enum

class State(Enum):
    CLOSED    = "closed"     # normal operation
    OPEN      = "open"       # failing; short-circuit all calls
    HALF_OPEN = "half_open"  # testing recovery

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, cooldown: int = 30):
        self.failure_threshold = failure_threshold
        self.cooldown          = cooldown
        self.state             = State.CLOSED
        self.failure_count     = 0
        self.opened_at         = None

    def call(self, fn, *args, **kwargs):
        if self.state == State.OPEN:
            if time.time() - self.opened_at >= self.cooldown:
                self.state = State.HALF_OPEN
            else:
                raise RuntimeError("Circuit open — downstream unavailable")

        try:
            result = fn(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        self.failure_count = 0
        self.state         = State.CLOSED

    def _on_failure(self):
        self.failure_count += 1
        if self.failure_count >= self.failure_threshold:
            self.state     = State.OPEN
            self.opened_at = time.time()

Signal Processing and Persistence

The signal processing layer consumes validated, normalized ticks from the ingestion buffer, applies feature computation — rolling VWAP, bid-ask spread tracking, volume-weighted momentum — and emits signals to the persistence layer. The circuit breaker wraps every persistence call so that a write failure does not stall the signal processor or back up the ingestion buffer.

For persistence, a time-series database (TimescaleDB on PostgreSQL, or InfluxDB for simpler deployments) handles the write volume and enables efficient range queries for backtesting. Writes should be batched — assembling 50–200 rows before committing — to reduce transaction overhead without introducing meaningful latency. At typical intraday tick volumes, a 100-row batch commits every 200–500 milliseconds, which is more than adequate for any non-HFT trading strategy.

Need a Production-Grade Trading Data Pipeline?

We build resilient, low-latency data pipelines for quantitative trading teams — from WebSocket ingestion to time-series persistence and real-time anomaly detection.

Talk to Our Team