Ingesting streaming data with asyncio
Vita Smid | Pražské PyVo #68
November 16, 2016

- Small focused team
- We develop a trading platform and strategies
- All back-end code is Python 3.5
The problem
- A trading platform ingests real-time data from several financial markets at once.
- Various transports and application protocols are used: from binary over TCP to Redis pubsub to XML over long-polled HTTP.
- The platform runs independent strategies (agents) that want to see the data in a homogeneous format and react to it.
Starting a data source
class DataSource:
def start(self) -> None:
self._is_running.set() # asyncio.Event
self._run_task = asyncio.ensure_future(self._run())
Main data source loop
class DataSource:
async def _run(self) -> None:
async for _ in DelayedLoop(exponential_generator(...)):
try:
await self._protocol.connect()
except ingress.protocols.base.ProtocolConnectionError:
self._logger.exception('Connection error')
else:
# Restore pre-existing subscriptions
await self._recover_subscriptions()
# (receiving loop comes in here...)
await self._protocol.disconnect()
if not self._is_running.is_set():
break
Main data source loop:
receiving data
# Receiving loop
while True:
try:
self._receive_task = asyncio.ensure_future(
self._protocol.receive())
raw_data = await self._receive_task
except ingress.protocols.base.ProtocolConnectionError:
break # Connection failed
except asyncio.CancelledError:
break # Receive loop terminated
else:
await self._process_raw_message(raw_data)
Stopping a data source
class DataSource:
async def stop(self) -> None:
self._is_running.clear()
self._receive_task.cancel()
await self._run_task
Subscriptions
class DataSource:
async def subscribe(self, key: str) -> None:
async with self._subscribe_locks[key]:
# Omitted: subscription state tracking
delays = toolz.take(10, exponential_generator(...))
async for attempt in DelayedLoop(delays):
try:
await self._protocol.subscribe(key)
return
except ingress.protocols.base.TransientSubscriptionError:
self._logger.error('Transient subscription error for key = {}.', key)
else:
# Subscription failed completely...
Receiving data in protocol
class XYZProtocol:
async def receive(self) -> bytes:
self._read_task = asyncio.ensure_future(asyncio.wait_for(
self._read(), timeout = self._settings['heartbeat_period'])))
try:
msg = await self._read_task
except asyncio.CancelledError as e:
raise ProtocolConnectionError() from e
except asyncio.TimeoutError as e:
raise KeepAliveError() from e # No heartbeat came
else:
self._validate_sequence_id(msg)
return msg
Questions?