class SomeComponent:
def start(self):
self._is_running.set() # asyncio.Event
self._run_task = asyncio.ensure_future(self._run())
async def _run(self):
while self._is_running.is_set():
message = await some_queue.get() # asyncio.Queue
self._process(message)
async def stop(self):
self._is_running.clear()
self._run_task.cancel()
with contextlib.suppress(asyncio.TimeoutError):
await self._run_task
hub = aiopubsub.Hub()
subscriber = aiopubsub.Subscriber(hub, 'subscriber_id')
subscriber.subscribe(('some', 'namespace'))
publisher = aiopubsub.Publisher(hub, prefix = ('some',))
publisher.publish(('namespace',), 'Hello subscriber')
key, message = await subscriber.consume()
assert key == ('some', 'namespace')
assert message == 'Hello subscriber'
lst = RedisBackedReadWritableList('redis-key', ...)
lst.append('hi') # writes 'hi' to Redis
lst[0] # reads 'hi' from memory, not from Redis
del lst[0] # deletes 'hi' from Redis
trades[3] = {'price': '15.10', 'quantity': 500}
trades[4] = {'price': '15.11', 'quantity': 150}
schema_id = 123
payload = {'start': 3, 'stop': 5,
'items': [{'price': '15.10', 'quantity': 500},
{'price': '15.11', 'quantity': 150}]}
007b0014060a040a31352e3130e8070a31352e3131ac0200