Ingesting streaming data with asyncio

Vita Smid | Pražské PyVo #68

November 16, 2016

Quantlane

  • Small focused team
  • We develop a trading platform and strategies
  • All back-end code is Python 3.5

The problem

  • A trading platform ingests real-time data from several financial markets at once.
  • Various transports and application protocols are used: from binary over TCP to Redis pubsub to XML over long-polled HTTP.
  • The platform runs independent strategies (agents) that want to see the data in a homogeneous format and react to it.

Starting a data source


class DataSource:

  def start(self) -> None:
    self._is_running.set() # asyncio.Event
    self._run_task = asyncio.ensure_future(self._run())
					

Main data source loop


class DataSource:

  async def _run(self) -> None:
    async for _ in DelayedLoop(exponential_generator(...)):
      try:
        await self._protocol.connect()
      except ingress.protocols.base.ProtocolConnectionError:
        self._logger.exception('Connection error')
      else:
        # Restore pre-existing subscriptions
        await self._recover_subscriptions()
        # (receiving loop comes in here...)
        await self._protocol.disconnect()
        if not self._is_running.is_set():
          break
					

Main data source loop:
receiving data

  # Receiving loop
  while True:
    try:
      self._receive_task = asyncio.ensure_future(
          self._protocol.receive())
      raw_data = await self._receive_task
    except ingress.protocols.base.ProtocolConnectionError:
      break # Connection failed
    except asyncio.CancelledError:
      break # Receive loop terminated
    else:
      await self._process_raw_message(raw_data)

Stopping a data source


class DataSource:

  async def stop(self) -> None:
    self._is_running.clear()
    self._receive_task.cancel()
    await self._run_task
					

Subscriptions


class DataSource:

  async def subscribe(self, key: str) -> None:
    async with self._subscribe_locks[key]:
      # Omitted: subscription state tracking
      delays = toolz.take(10, exponential_generator(...))
      async for attempt in DelayedLoop(delays):
        try:
          await self._protocol.subscribe(key)
          return
        except ingress.protocols.base.TransientSubscriptionError:
          self._logger.error('Transient subscription error for key = {}.', key)
      else:
        # Subscription failed completely...
					

Receiving data in protocol


class XYZProtocol:

  async def receive(self) -> bytes:
    self._read_task = asyncio.ensure_future(asyncio.wait_for(
      self._read(), timeout = self._settings['heartbeat_period'])))
    try:
      msg = await self._read_task
    except asyncio.CancelledError as e:
      raise ProtocolConnectionError() from e
    except asyncio.TimeoutError as e:
      raise KeepAliveError() from e # No heartbeat came
    else:
      self._validate_sequence_id(msg)
      return msg
					

Wrapping it up

  • Synchronization of connections, subscriptions, and retries “just works” with asyncio.
  • How do we break up CPU-bound code, like parsers? asyncio.sleep?
    await self._process_raw_message(raw_data)
  • It is handy to keep task references for later cancelations.

Questions?

quantlane.com