Algorithmic trading with asyncio

Vita Smid | PyDays Vienna

May 5, 2017

Hi, I am Vita.

I am a freelancer specializing in difficult, mathy problems.

Quantlane

  • We develop a stock trading platform and strategies.
  • Small team, lean principles.
  • All back-end code is Python 3.5.
  • We also like Docker, ReactJS, Redis, Kafka, PostgreSQL, TensorFlow, …

This talk

  • What is a trading platform?
  • Useful tricks & tools for async Python and performance.
A trading platform is a

low-latency event-driven
multi-agent system

Low-latency

  • Newly arrived data should be processed in less than 10 milliseconds
  • Decisions should be made in less than 100 milliseconds

Event-driven

  • A lot of code is just waiting for things to happen:
    • Data from stock exchanges
    • Our computations
    • User input

Multi-agent

  • Many decoupled components.
  • Functionality is spread across different modules …
  • … or even processes

asyncio 101

  • Parallel programs are easy to reason about.
  • Coroutines use await to suspend themselves until they receive input.
  • Everything runs in one thread by default.

class InputDataProcessor:
    async def run(self):
        while True:
            data = await self._reader.read()  # Read from socket
            publish(parse(data))  # Send to internal pubsub
					

class Model:
    async def run(self):
        while True:
            incremental_update = await consume()  # Read from pubsub
            state_snapshot = self._process(incremental_update)
            publish(state_snapshot)  # Send state to internal pubsub
					

class UserInterfaceServer:
   async def run(self):
       await asyncio.wait([self._handle_input(), self._render()])

   async def _handle_input(self):
       while True:
           data = await self._websocket.recv() # Read from WebSocket
           publish(parse(data))  # Send input to internal pubsub

   async def _render(self):
       while True:
           state = get_model_state()
           await self._websocket.send(encode(state))
           await asyncio.sleep(0.25)
					

asyncio publish-subscribe

  • Many components produce data.
  • Many components consume data.
  • All messages are routed through a central hub.

Hub


hub = pubsub.Hub()
					

Publisher


publisher1 = pubsub.Publisher(hub, ('some', 'namespace'))
publisher1.publish({'data': 42})
					

Subscriber


subscriber1 = pubsub.Subscriber(hub, 'subscriber1')
subscriber1.subscribe(('some', '*'))

while True:
    key, payload = await subscriber1.consume()
    print(key)      # ('some', 'namespace')
    print(payload)  # {'data': 42}
					

github.com/qntln/aiopubsub

Tracking metrics in StatsD

  • Application code sends timings and counters via UDP.
  • StatsD aggregates the raw data and stores it in a backend.
  • Very simple to track dozens of metrics.

Example from statsd.readthedocs.io:


import statsd
c = statsd.StatsClient('localhost', 8125)

c.incr('foo')  # Increment the 'foo' counter.
c.timing('stats.timed', 320)  # Record a 320ms 'stats.timed'.
					

github.com/qntln/fastatsd

aiodebug

  • Small library for monitoring and testing asyncio programs.
  • Always on in production.

Log warnings when callbacks block the event loop for longer than 50ms


aiodebug.log_slow_callbacks.enable(0.05)
					
Executing <Task pending coro=<foo() running at /home/.../foo.py:37> wait_for=<Future pending cb=[Task._wakeup()]>> took 0.069 seconds

Speed up or slow down time in the event loop


loop = aiodebug.testing.time_dilated_loop.TimeDilatedLoop()
asyncio.set_event_loop(loop)

loop.time_dilation = 3
await asyncio.sleep(1) # Takes 0.333s of real time

loop.time_dilation = 0.1
await asyncio.sleep(1) # Takes 10s of real time
					

Track event loop lags in StatsD


aiodebug.monitor_loop_lag.enable(statsd_client)
					

github.com/qntln/aiodebug

Making logging faster

  • The standard logging module is feature-rich but slow.
  • We replaced it with a bare-bones logging library with almost no features.


import logwood
from logwood.handlers.stderr import ColoredStderrHandler

logwood.basic_config(
    level = logwood.INFO,
    handlers = [ColoredStderrHandler()]
)

logger = logwood.get_logger('LoggerName')
logger.info('Just FYI')
logger.warning('Six times seven is {:d}.', 42)
					
[1477469095.001102][hostname][script.py][LoggerName][INFO] Just FYI
[1477469095.001147][hostname][script.py][LoggerName][WARNING] Six times seven is 42.

github.com/qntln/logwood

Profiling in production

  • Profiling is essential for optimization.
  • The standard Python profiler cProfile is great but slows down the profiled program.
  • In complex systems, production load is difficult to simulate.

Statistical profiling

  • statprof is a statistical profiler: instead of tracing every function call, it samples the stack N times per second
  • Less accurate, but lightweight.


import statprof
statprof.start()

json.dump(some_data, file) # Your program

statprof.stop()
statprof.display()
					

 %   cumulative      self
time    seconds   seconds  name
65.31      0.06      0.06  .../json/__init__.py:179:dump
 8.16      0.03      0.01  .../json/encoder.py:324:_iterencode_list
 8.16      0.01      0.01  .../json/encoder.py:392:_iterencode_dict
...
					
Tip: use statprof-smarkets rather than the unmaintained statprof.

Continuous profiling

  • Running statprof ad-hoc is nice, but it's a lot of manual work.
  • Can we automate the collection of profiles?
  • Can we automate the analysis as well?

Takeaways – asyncio

  1. Event-driven architecture promotes decoupling.
  2. asyncio makes it easy to run many separate components in one process …
  3. … especially when you add a simple publish-subscribe system.

Takeaways – performance

  1. Tracking application metrics helps with monitoring and performance troubleshooting.
  2. Sometimes it is right to reinvent the wheel (such as logging).
  3. Profiling live systems is crucial, but difficult … for now.

Thank you

quantlane.com