Real-Time Streaming¶
The SDK provides two real-time WebSocket streaming protocols for different data types:
| Protocol | Data | Transport |
|---|---|---|
| MDDS | Market quotes, L2 depth, Time & Sales, 1-min bars | Proprietary WebSocket (JSON with numeric field IDs) |
| BEPS | Account events — order fills, cancels, position changes, price alerts | AWS AppSync GraphQL WebSocket (graphql-ws) |
The SDK offers two tiers of streaming API:
- Protocol layer (
MDDSClient,BEPSClient) — message building and parsing, no network dependencies, ideal for testing and custom integrations - Production layer (
MDDSStream,BEPSStream) — full async WebSocket lifecycle with auto-reconnect, reference-counted subscriptions, and callback dispatch
MDDS — Market Data Streaming¶
MDDS (Market Data Distribution System) delivers real-time quotes, options with Greeks, L2 depth-of-book, and Time & Sales data.
Quick Start¶
import asyncio
import websockets
from fidelity_trader import FidelityClient
from fidelity_trader.streaming.mdds import MDDSClient, MDDS_URL
async def stream_quotes():
with FidelityClient() as client:
client.login(username, password)
# Build cookie header from authenticated session
cookie_str = "; ".join(f"{k}={v}" for k, v in client._http.cookies.items())
mdds = MDDSClient()
async with websockets.connect(
MDDS_URL,
additional_headers={"Cookie": cookie_str},
) as ws:
# 1. Handle server session message (sent automatically on connect)
session = mdds.handle_connect_message(await ws.recv())
print(f"Connected: session={session.session_id}")
# 2. Subscribe to symbols (chunked at 50 per message)
for msg in mdds.build_chunked_subscribe(["AAPL", "TSLA", "SPY"]):
await ws.send(msg)
# 3. Process incoming quotes
async for raw in ws:
for quote in mdds.parse_message(raw):
print(f"{quote.symbol}: ${quote.last_price} "
f"bid={quote.bid} ask={quote.ask} vol={quote.volume}")
asyncio.run(stream_quotes())
Quote Fields¶
The MDDS field map contains 163 core fields, 10 Greek fields, and 200 L2 depth fields. Key fields on MDDSQuote:
| Property | Description | Field ID |
|---|---|---|
last_price |
Last trade price | 29 |
bid |
Best bid price | 20 |
ask |
Best ask price | 18 |
open |
Opening price | 31 |
previous_close |
Previous session close | 32 |
close |
Close price | 124 |
volume |
Cumulative volume | 33 |
net_change |
Price change | 12 |
net_change_pct |
Percent change | 13 |
day_high |
Session high | 26 |
day_low |
Session low | 27 |
trade_size |
Last trade size (T&S) | 28 |
trade_time |
Last trade timestamp | 316 |
trade_exchange |
Exchange MIC code | 882 |
Option Greeks (when IncludeGreeks=True):
| Property | Field ID |
|---|---|
delta |
187 |
gamma |
188 |
vega |
189 |
theta |
190 |
iv_mid |
196 |
Access the full parsed data dict via quote.data for all 163+ fields.
Symbol Chunking¶
Fidelity limits subscribe messages to 50 symbols per message. Use build_chunked_subscribe() to automatically split large lists:
# Subscribes 150 symbols across 3 messages
messages = mdds.build_chunked_subscribe(my_150_symbols, chunk_size=50)
for msg in messages:
await ws.send(msg)
L2 Depth of Book (VirtualBook)¶
# Subscribe to L2 depth (single symbol, separate subscription)
await ws.send(mdds.build_virtualbook_subscribe("SPY"))
# Parse returns VirtualBook objects (not MDDSQuote)
for update in mdds.parse_message(raw):
if isinstance(update, VirtualBook):
print(f"Spread: {update.spread}")
print(f"Best bid: {update.best_bid.price} x {update.best_bid.size}")
print(f"Best ask: {update.best_ask.price} x {update.best_ask.size}")
# 25 levels per side
for i, level in enumerate(update.bids[:5]):
print(f" Bid {i}: {level.price} x {level.size} @ {level.exchange}")
1-Minute Bars¶
Bar data uses dedicated field IDs (356=close, 358=high, 359=low, 360=open, 362=volume).
Unsubscribe¶
# Unsubscribe by request ID (from the original subscribe response)
await ws.send(mdds.build_unsubscribe_message(
symbols=["AAPL"],
request_id="0ba9bd91", # from subscribe response
))
# Unsubscribe L2
await ws.send(mdds.build_virtualbook_unsubscribe("SPY"))
Time & Sales Detection¶
Time & Sales data arrives as regular delta updates (ResponseType "0") with trade fields present. Check quote.has_trade_data:
for quote in mdds.parse_message(raw):
if quote.has_trade_data:
print(f"TRADE: {quote.symbol} {quote.trade_size} @ {quote.last_price} "
f"on {quote.trade_exchange} at {quote.trade_time}")
BEPS — Account Event Streaming¶
BEPS (Business Events Processing Service) delivers real-time account events over an AWS AppSync GraphQL WebSocket subscription.
Event Types¶
BEPS streams 19 event types across 8 categories:
| Category | Events | Description |
|---|---|---|
| Execution | Order fill created | Trade executed |
| Order Update | Order status changed | Open, partial fill, etc. |
| Cancellation | Order canceled | Cancel confirmed |
| Position | Position updated | Quantity/value changed |
| Balance | Balance updated | Cash/margin changed |
| Cost Basis | Cost basis recalculated | Tax basis updated |
| Transaction | Transaction recorded | Activity history entry |
| Price Alert | 12 trigger types | Price target, MA crossover, 52-week high/low |
Quick Start¶
import asyncio
import websockets
from fidelity_trader.streaming.beps import BEPSClient
async def stream_events():
client = BEPSClient()
# 1. Build authenticated URL (token from auth endpoint)
url = client.build_connect_url(
token="your-jwt-token",
host="appsync-host.fidelity.com",
stream_platform_url="wss://appsync-host/graphql/realtime?header={0}&payload=e30=",
)
# 2. Connect with graphql-ws sub-protocol
async with websockets.connect(url, subprotocols=["graphql-ws"]) as ws:
# 3. Initialize connection
await ws.send(client.build_init_message())
client.handle_message(await ws.recv()) # connection_ack
assert client.is_connected
# 4. Subscribe to events
await ws.send(client.build_subscribe_message(
mid="123456", # Member ID
app_id="ATN", # App identifier
token="your-jwt",
host="appsync-host.fidelity.com",
))
client.handle_message(await ws.recv()) # start_ack
assert client.is_subscribed
# 5. Process events
async for raw in ws:
if client.is_keepalive(raw):
continue # Skip keep-alive pings
for event in client.handle_message(raw):
print(f"[{event.category.value}] {event.event_type}")
if event.is_execution:
print(f" FILLED: {event.symbol} {event.quantity} @ {event.price}")
elif event.is_price_alert:
print(f" ALERT: {event.message}")
elif event.is_cancellation:
print(f" CANCELED: order {event.order_number}")
asyncio.run(stream_events())
Event Properties¶
Every BEPSEvent has:
event.event_id # Unique event ID
event.event_type # Wire type (e.g., "accounts.orders.executions.created")
event.category # BEPSEventCategory enum (EXECUTION, CANCELLATION, etc.)
event.internal_code # Short code (MFCEX, CXL01, SPPOS, etc.)
event.event_created_time # ISO timestamp
event.has_error # Boolean
event.message # Human-readable message
event.desc # Full event payload as dict
Common desc field accessors:
event.account_number # Account number
event.symbol # Ticker symbol
event.quantity # Trade quantity
event.price # Trade price
event.action # Buy/sell code
event.order_number # Order confirmation number
event.trade_date # Trade date
Category Checks¶
event.is_execution # Order fill
event.is_cancellation # Order cancel
event.is_price_alert # Any of the 12 price alert types
event.is_position_update # Position change
event.is_balance_update # Balance change
Historical Time & Sales (REST)¶
For historical trade data (not real-time), use the REST endpoint:
result = client.time_and_sales.get_history(
symbol="AAPL",
max_rows=500,
)
for trade in result.trades:
print(f"{trade.time} {trade.price} x {trade.size} @ {trade.exchange}")
Production Streaming¶
For production use, MDDSStream and BEPSStream wrap the protocol layer with connection lifecycle management. These handle reconnection, subscription tracking, and callback dispatch automatically.
Requires websockets
Install with pip install websockets or pip install fidelity-trader-api[streaming].
MDDSStream — Production Quote Streaming¶
from fidelity_trader.streaming import MDDSStream
async def main():
stream = MDDSStream(
cookies=session_cookies, # dict from authenticated session
reconnect_intervals=[1, 2, 5, 10], # backoff in seconds (clamped to last)
unsubscribe_delay=10.0, # seconds before batch unsubscribe
chunk_size=50, # symbols per subscribe message
conflation_rate=1000, # server-side throttle (ms)
)
# Set callbacks
stream.on_quote = lambda q: print(f"{q.symbol}: ${q.last_price}")
stream.on_virtualbook = lambda vb: print(f"{vb.symbol}: spread={vb.spread}")
stream.on_state_change = lambda s: print(f"State: {s.value}")
stream.on_error = lambda e: print(f"Error: {e}")
await stream.start()
# Subscribe (ref-counted — only sends to server on first consumer)
await stream.subscribe(["AAPL", "TSLA", "SPY"], consumer_id="view1")
await stream.subscribe(["AAPL"], consumer_id="view2") # no extra network call
# Unsubscribe (delayed — waits 10s before actually unsubscribing)
await stream.unsubscribe(["TSLA"], consumer_id="view1")
# If someone re-subscribes TSLA within 10s, the unsub is canceled
# ... quotes flow via on_quote callback ...
await stream.stop()
Features:
| Feature | Description |
|---|---|
| Auto-reconnect | Configurable backoff array (e.g., [1s, 2s, 5s, 10s]), clamps to last value, resets on success |
| Ref-counted subscriptions | Multiple consumers can subscribe to the same symbol; only the first triggers a server subscribe |
| Delayed unsubscribe | Symbols at refcount 0 wait 10 seconds before unsubscribing (prevents churn from UI navigation) |
| Re-subscribe on reconnect | Active symbols are automatically re-subscribed after a reconnection |
| State monitoring | on_state_change callback fires on: CONNECTING, CONNECTED, RECONNECTING, DISCONNECTED, FAILED |
| Symbol chunking | Large symbol lists are split into 50-per-message chunks automatically |
BEPSStream — Production Event Streaming¶
from fidelity_trader.streaming import BEPSStream
async def main():
stream = BEPSStream(
token="your-jwt-token",
host="appsync-host.fidelity.com",
stream_url="wss://appsync-host/graphql/realtime?header={0}&payload=e30=",
mid="123456", # Member ID
app_id="ATN",
reconnect_intervals=[1, 2, 5, 10],
)
stream.on_event = lambda e: handle_event(e)
stream.on_state_change = lambda s: print(f"BEPS: {s.value}")
await stream.start()
# ... events flow via on_event callback ...
await stream.stop()
def handle_event(event):
if event.is_execution:
print(f"ORDER FILLED: {event.symbol} {event.quantity} @ {event.price}")
elif event.is_cancellation:
print(f"ORDER CANCELED: {event.order_number}")
elif event.is_price_alert:
print(f"PRICE ALERT: {event.message}")
Features:
| Feature | Description |
|---|---|
| Full graphql-ws lifecycle | Handles connection_init, connection_ack, start, start_ack, keepalive automatically |
| Auto-reconnect | Same configurable backoff as MDDSStream |
| Keepalive handling | Silently absorbs ka (keep-alive) pings |
| Clean shutdown | Sends stop message before disconnecting |
ReconnectPolicy¶
Both stream managers use ReconnectPolicy internally. You can also use it standalone:
from fidelity_trader.streaming import ReconnectPolicy, ConnectionState
policy = ReconnectPolicy(
retry_intervals=[1.0, 2.0, 5.0, 10.0, 30.0], # seconds
max_attempts=0, # 0 = unlimited
)
# On connection failure:
delay = policy.next_delay() # 1.0, then 2.0, then 5.0, ...
# Clamps to last value (30.0) after exhausting array
# On successful reconnect:
policy.on_success() # Resets counter to 0
# State tracking:
policy.state # ConnectionState.CONNECTED, .RECONNECTING, .FAILED, etc.
SubscriptionManager¶
Standalone ref-counted subscription tracking:
from fidelity_trader.streaming import SubscriptionManager
mgr = SubscriptionManager(
on_subscribe=lambda syms: ws.send(build_sub(syms)),
on_unsubscribe=lambda syms: ws.send(build_unsub(syms)),
unsubscribe_delay=10.0,
)
await mgr.subscribe(["AAPL", "TSLA"], consumer_id="view1")
await mgr.subscribe(["AAPL"], consumer_id="view2") # no network call (refcount 2)
await mgr.unsubscribe(["AAPL"], consumer_id="view1") # refcount 1, no action
await mgr.unsubscribe(["AAPL"], consumer_id="view2") # refcount 0, starts 10s timer
# After 10s: on_unsubscribe(["AAPL"]) fires
mgr.stats # {"active": 1, "pending_unsub": 1, "consumers": 2, "total_refs": 1}
Service Layer
Production streaming (MDDSStream, BEPSStream) is not yet integrated into the REST service layer. The service currently uses the protocol-layer MDDSClient via MDDSManager for SSE/WebSocket fan-out. Future work will upgrade the service to use the production streaming classes.