Skip to content

Real-Time Streaming

The SDK provides two real-time WebSocket streaming protocols for different data types:

Protocol Data Transport
MDDS Market quotes, L2 depth, Time & Sales, 1-min bars Proprietary WebSocket (JSON with numeric field IDs)
BEPS Account events — order fills, cancels, position changes, price alerts AWS AppSync GraphQL WebSocket (graphql-ws)

The SDK offers two tiers of streaming API:

  • Protocol layer (MDDSClient, BEPSClient) — message building and parsing, no network dependencies, ideal for testing and custom integrations
  • Production layer (MDDSStream, BEPSStream) — full async WebSocket lifecycle with auto-reconnect, reference-counted subscriptions, and callback dispatch

MDDS — Market Data Streaming

MDDS (Market Data Distribution System) delivers real-time quotes, options with Greeks, L2 depth-of-book, and Time & Sales data.

Quick Start

import asyncio
import websockets
from fidelity_trader import FidelityClient
from fidelity_trader.streaming.mdds import MDDSClient, MDDS_URL

async def stream_quotes():
    with FidelityClient() as client:
        client.login(username, password)

        # Build cookie header from authenticated session
        cookie_str = "; ".join(f"{k}={v}" for k, v in client._http.cookies.items())

        mdds = MDDSClient()

        async with websockets.connect(
            MDDS_URL,
            additional_headers={"Cookie": cookie_str},
        ) as ws:
            # 1. Handle server session message (sent automatically on connect)
            session = mdds.handle_connect_message(await ws.recv())
            print(f"Connected: session={session.session_id}")

            # 2. Subscribe to symbols (chunked at 50 per message)
            for msg in mdds.build_chunked_subscribe(["AAPL", "TSLA", "SPY"]):
                await ws.send(msg)

            # 3. Process incoming quotes
            async for raw in ws:
                for quote in mdds.parse_message(raw):
                    print(f"{quote.symbol}: ${quote.last_price} "
                          f"bid={quote.bid} ask={quote.ask} vol={quote.volume}")

asyncio.run(stream_quotes())

Quote Fields

The MDDS field map contains 163 core fields, 10 Greek fields, and 200 L2 depth fields. Key fields on MDDSQuote:

Property Description Field ID
last_price Last trade price 29
bid Best bid price 20
ask Best ask price 18
open Opening price 31
previous_close Previous session close 32
close Close price 124
volume Cumulative volume 33
net_change Price change 12
net_change_pct Percent change 13
day_high Session high 26
day_low Session low 27
trade_size Last trade size (T&S) 28
trade_time Last trade timestamp 316
trade_exchange Exchange MIC code 882

Option Greeks (when IncludeGreeks=True):

Property Field ID
delta 187
gamma 188
vega 189
theta 190
iv_mid 196

Access the full parsed data dict via quote.data for all 163+ fields.

Symbol Chunking

Fidelity limits subscribe messages to 50 symbols per message. Use build_chunked_subscribe() to automatically split large lists:

# Subscribes 150 symbols across 3 messages
messages = mdds.build_chunked_subscribe(my_150_symbols, chunk_size=50)
for msg in messages:
    await ws.send(msg)

L2 Depth of Book (VirtualBook)

# Subscribe to L2 depth (single symbol, separate subscription)
await ws.send(mdds.build_virtualbook_subscribe("SPY"))

# Parse returns VirtualBook objects (not MDDSQuote)
for update in mdds.parse_message(raw):
    if isinstance(update, VirtualBook):
        print(f"Spread: {update.spread}")
        print(f"Best bid: {update.best_bid.price} x {update.best_bid.size}")
        print(f"Best ask: {update.best_ask.price} x {update.best_ask.size}")
        # 25 levels per side
        for i, level in enumerate(update.bids[:5]):
            print(f"  Bid {i}: {level.price} x {level.size} @ {level.exchange}")

1-Minute Bars

# Subscribe to 1-minute OHLCV bars
await ws.send(mdds.build_bar_subscribe(["AAPL", "TSLA"]))

Bar data uses dedicated field IDs (356=close, 358=high, 359=low, 360=open, 362=volume).

Unsubscribe

# Unsubscribe by request ID (from the original subscribe response)
await ws.send(mdds.build_unsubscribe_message(
    symbols=["AAPL"],
    request_id="0ba9bd91",  # from subscribe response
))

# Unsubscribe L2
await ws.send(mdds.build_virtualbook_unsubscribe("SPY"))

Time & Sales Detection

Time & Sales data arrives as regular delta updates (ResponseType "0") with trade fields present. Check quote.has_trade_data:

for quote in mdds.parse_message(raw):
    if quote.has_trade_data:
        print(f"TRADE: {quote.symbol} {quote.trade_size} @ {quote.last_price} "
              f"on {quote.trade_exchange} at {quote.trade_time}")

BEPS — Account Event Streaming

BEPS (Business Events Processing Service) delivers real-time account events over an AWS AppSync GraphQL WebSocket subscription.

Event Types

BEPS streams 19 event types across 8 categories:

Category Events Description
Execution Order fill created Trade executed
Order Update Order status changed Open, partial fill, etc.
Cancellation Order canceled Cancel confirmed
Position Position updated Quantity/value changed
Balance Balance updated Cash/margin changed
Cost Basis Cost basis recalculated Tax basis updated
Transaction Transaction recorded Activity history entry
Price Alert 12 trigger types Price target, MA crossover, 52-week high/low

Quick Start

import asyncio
import websockets
from fidelity_trader.streaming.beps import BEPSClient

async def stream_events():
    client = BEPSClient()

    # 1. Build authenticated URL (token from auth endpoint)
    url = client.build_connect_url(
        token="your-jwt-token",
        host="appsync-host.fidelity.com",
        stream_platform_url="wss://appsync-host/graphql/realtime?header={0}&payload=e30=",
    )

    # 2. Connect with graphql-ws sub-protocol
    async with websockets.connect(url, subprotocols=["graphql-ws"]) as ws:
        # 3. Initialize connection
        await ws.send(client.build_init_message())
        client.handle_message(await ws.recv())  # connection_ack
        assert client.is_connected

        # 4. Subscribe to events
        await ws.send(client.build_subscribe_message(
            mid="123456",       # Member ID
            app_id="ATN",       # App identifier
            token="your-jwt",
            host="appsync-host.fidelity.com",
        ))
        client.handle_message(await ws.recv())  # start_ack
        assert client.is_subscribed

        # 5. Process events
        async for raw in ws:
            if client.is_keepalive(raw):
                continue  # Skip keep-alive pings

            for event in client.handle_message(raw):
                print(f"[{event.category.value}] {event.event_type}")

                if event.is_execution:
                    print(f"  FILLED: {event.symbol} {event.quantity} @ {event.price}")
                elif event.is_price_alert:
                    print(f"  ALERT: {event.message}")
                elif event.is_cancellation:
                    print(f"  CANCELED: order {event.order_number}")

asyncio.run(stream_events())

Event Properties

Every BEPSEvent has:

event.event_id          # Unique event ID
event.event_type        # Wire type (e.g., "accounts.orders.executions.created")
event.category          # BEPSEventCategory enum (EXECUTION, CANCELLATION, etc.)
event.internal_code     # Short code (MFCEX, CXL01, SPPOS, etc.)
event.event_created_time  # ISO timestamp
event.has_error         # Boolean
event.message           # Human-readable message
event.desc              # Full event payload as dict

Common desc field accessors:

event.account_number    # Account number
event.symbol            # Ticker symbol
event.quantity          # Trade quantity
event.price             # Trade price
event.action            # Buy/sell code
event.order_number      # Order confirmation number
event.trade_date        # Trade date

Category Checks

event.is_execution       # Order fill
event.is_cancellation    # Order cancel
event.is_price_alert     # Any of the 12 price alert types
event.is_position_update # Position change
event.is_balance_update  # Balance change

Historical Time & Sales (REST)

For historical trade data (not real-time), use the REST endpoint:

result = client.time_and_sales.get_history(
    symbol="AAPL",
    max_rows=500,
)
for trade in result.trades:
    print(f"{trade.time} {trade.price} x {trade.size} @ {trade.exchange}")

Production Streaming

For production use, MDDSStream and BEPSStream wrap the protocol layer with connection lifecycle management. These handle reconnection, subscription tracking, and callback dispatch automatically.

Requires websockets

Install with pip install websockets or pip install fidelity-trader-api[streaming].

MDDSStream — Production Quote Streaming

from fidelity_trader.streaming import MDDSStream

async def main():
    stream = MDDSStream(
        cookies=session_cookies,             # dict from authenticated session
        reconnect_intervals=[1, 2, 5, 10],   # backoff in seconds (clamped to last)
        unsubscribe_delay=10.0,              # seconds before batch unsubscribe
        chunk_size=50,                       # symbols per subscribe message
        conflation_rate=1000,                # server-side throttle (ms)
    )

    # Set callbacks
    stream.on_quote = lambda q: print(f"{q.symbol}: ${q.last_price}")
    stream.on_virtualbook = lambda vb: print(f"{vb.symbol}: spread={vb.spread}")
    stream.on_state_change = lambda s: print(f"State: {s.value}")
    stream.on_error = lambda e: print(f"Error: {e}")

    await stream.start()

    # Subscribe (ref-counted — only sends to server on first consumer)
    await stream.subscribe(["AAPL", "TSLA", "SPY"], consumer_id="view1")
    await stream.subscribe(["AAPL"], consumer_id="view2")  # no extra network call

    # Unsubscribe (delayed — waits 10s before actually unsubscribing)
    await stream.unsubscribe(["TSLA"], consumer_id="view1")
    # If someone re-subscribes TSLA within 10s, the unsub is canceled

    # ... quotes flow via on_quote callback ...

    await stream.stop()

Features:

Feature Description
Auto-reconnect Configurable backoff array (e.g., [1s, 2s, 5s, 10s]), clamps to last value, resets on success
Ref-counted subscriptions Multiple consumers can subscribe to the same symbol; only the first triggers a server subscribe
Delayed unsubscribe Symbols at refcount 0 wait 10 seconds before unsubscribing (prevents churn from UI navigation)
Re-subscribe on reconnect Active symbols are automatically re-subscribed after a reconnection
State monitoring on_state_change callback fires on: CONNECTING, CONNECTED, RECONNECTING, DISCONNECTED, FAILED
Symbol chunking Large symbol lists are split into 50-per-message chunks automatically

BEPSStream — Production Event Streaming

from fidelity_trader.streaming import BEPSStream

async def main():
    stream = BEPSStream(
        token="your-jwt-token",
        host="appsync-host.fidelity.com",
        stream_url="wss://appsync-host/graphql/realtime?header={0}&payload=e30=",
        mid="123456",                        # Member ID
        app_id="ATN",
        reconnect_intervals=[1, 2, 5, 10],
    )

    stream.on_event = lambda e: handle_event(e)
    stream.on_state_change = lambda s: print(f"BEPS: {s.value}")

    await stream.start()
    # ... events flow via on_event callback ...
    await stream.stop()

def handle_event(event):
    if event.is_execution:
        print(f"ORDER FILLED: {event.symbol} {event.quantity} @ {event.price}")
    elif event.is_cancellation:
        print(f"ORDER CANCELED: {event.order_number}")
    elif event.is_price_alert:
        print(f"PRICE ALERT: {event.message}")

Features:

Feature Description
Full graphql-ws lifecycle Handles connection_init, connection_ack, start, start_ack, keepalive automatically
Auto-reconnect Same configurable backoff as MDDSStream
Keepalive handling Silently absorbs ka (keep-alive) pings
Clean shutdown Sends stop message before disconnecting

ReconnectPolicy

Both stream managers use ReconnectPolicy internally. You can also use it standalone:

from fidelity_trader.streaming import ReconnectPolicy, ConnectionState

policy = ReconnectPolicy(
    retry_intervals=[1.0, 2.0, 5.0, 10.0, 30.0],  # seconds
    max_attempts=0,                                   # 0 = unlimited
)

# On connection failure:
delay = policy.next_delay()    # 1.0, then 2.0, then 5.0, ...
                               # Clamps to last value (30.0) after exhausting array

# On successful reconnect:
policy.on_success()            # Resets counter to 0

# State tracking:
policy.state                   # ConnectionState.CONNECTED, .RECONNECTING, .FAILED, etc.

SubscriptionManager

Standalone ref-counted subscription tracking:

from fidelity_trader.streaming import SubscriptionManager

mgr = SubscriptionManager(
    on_subscribe=lambda syms: ws.send(build_sub(syms)),
    on_unsubscribe=lambda syms: ws.send(build_unsub(syms)),
    unsubscribe_delay=10.0,
)

await mgr.subscribe(["AAPL", "TSLA"], consumer_id="view1")
await mgr.subscribe(["AAPL"], consumer_id="view2")   # no network call (refcount 2)
await mgr.unsubscribe(["AAPL"], consumer_id="view1")  # refcount 1, no action
await mgr.unsubscribe(["AAPL"], consumer_id="view2")  # refcount 0, starts 10s timer
# After 10s: on_unsubscribe(["AAPL"]) fires

mgr.stats  # {"active": 1, "pending_unsub": 1, "consumers": 2, "total_refs": 1}

Service Layer

Production streaming (MDDSStream, BEPSStream) is not yet integrated into the REST service layer. The service currently uses the protocol-layer MDDSClient via MDDSManager for SSE/WebSocket fan-out. Future work will upgrade the service to use the production streaming classes.