Title | Doing More with Asynchronous Python |
Topic | Asynchronous Python with async /await |
Date | Fri Dec 3 |
Keywords | generators, coroutines, await , async def , async for , async with , PEP-492 |
These sessions are designed for a broad audience of non-software engineers and software programmers of all backgrounds and skill-levels.
Our expected audience should comprise attendees with a…
During this session, we will endeavour to guide our audience to developing…
asyncio
fits into the choices for concurrency mechanisms, including the advantages and limitations of each approach.asyncio
library and new syntax added to Python 3.6.In a previous seminar, we gave a broad overview of concurrency approaches in
Python, comparing them and contrasting them, touching briefly upon asyncio
.
In this seminar, we will take a closer look at how asyncio
approaches are
structured. We’ll review the basics of generators and coroutines, build from
there to asynchronous coroutines and event loops, discuss new syntax added to
support asynchronous programming (e.g., async def
/await
.) We’ll also take a
look at how this syntax fits into mechanisms added to the Python object model,
and wrap up our discussion with a neat demo of a simple asynchronous system.
Agenda:
Did you enjoy this seminar? Did you learn something new that will help you as you write more complex Python systems that require concurrent approaches?
In a future seminar, we may dive deeper into asynchronous and concurrent design.
We can:
asyncio
combine with traditional concurrent approaches like threading
and multiprocessing
.multiprocessing
, including the use of multiprocessing.shared_memory
, limitations of multiprocessing
, and sophisticated use of entities such as subprocess.Popen
.print("Let's get started!")
print("Let's get started!")
from asyncssh import connect
from asyncio import run
async def main():
async with connect(host='host0',username='dutc', password='password', port=2222, known_hosts=None) as c:
print((await c.run('date')).stdout)
run(main())
Limitations:
threading
: preëmptivemultiprocessing
: separated runtimes# `async def` → `def` ✓
# `async def` → `async def` ✓
# `def` → `def` ✓
# `def` → `async def` ❌
from asyncio import run, gather
from asyncssh import connect
from itertools import product
from shlex import quote
params = {
'port': 2222,
'username': 'dutc',
'password': 'password',
'known_hosts': None,
}
commands = [
('uptime', []),
('free', []),
('cat', ['/proc/cpuinfo']),
]
hosts = [
'host0', 'host1', 'host2', 'host3',
'host4', 'host5', 'host6', 'host7',
]
async def task(host, commands):
async with connect(**params, host=host) as c:
commands = {
(exec_, tuple(args)): f'{exec_} {" ".join(quote(x) for x in args)}'
for exec_, args in commands
}
tasks = [c.run(cmd) for cmd in commands.values()]
results = dict(zip(commands, await gather(*tasks)))
# print(f'{({k: v.command for k, v in results.items()}) = }')
print(
results['free', ()].stdout
)
async def main():
tasks = [task(h, commands) for h in hosts]
await gather(*tasks)
run(main())
from asyncio import run, gather, sleep as aio_sleep
from random import random, randint
from time import sleep as tm_sleep
async def task(name):
print(f'before {name}')
async for x in g(randint(0, 5)):
print(f'{x = } {name = }')
await aio_sleep(1)
print(f'after {name}')
async def g(xs):
for x in range(xs):
await aio_sleep(1)
yield x
async def main():
tasks = [
task('task1'),
task('task2'),
task('task3'),
]
await gather(*tasks)
run(main())
from asyncssh import connect
from asyncio import run, gather, sleep
from random import choice
from string import ascii_lowercase
from shlex import quote
commands = [
('uptime', []),
('free', []),
('cat', ['/proc/cpuinfo']),
]
hosts = [
'host0', 'host1', 'host2', 'host3',
'host4', 'host5', 'host6', 'host7',
]
params = {
'port': 2222,
'username': 'dutc',
'password': 'password',
'known_hosts': None,
}
tasks = set()
async def producer():
async with connect(host='host0', **params) as c:
while True:
res = await c.run('seq 1 7 | shuf | head -n1')
host = f'host{int(res.stdout)}'
tasks.add(host)
await sleep(0)
async def consumer():
while True:
if tasks:
t = tasks.pop()
async with connect(host=t, **params) as c:
command_text = {
(exec_, tuple(args)): f'{exec_} {" ".join(quote(x) for x in args)}'
for exec_, args in commands
}
subtasks = [c.run(cmd) for cmd in command_text.values()]
results = dict(zip(command_text, await gather(*subtasks)))
print(
results['free', ()].stdout
)
await sleep(0)
async def main():
pool = [consumer() for _ in range(3)]
await gather(producer(), *pool)
run(main())