seminars.fb

Seminar 6 (Fri Dec 3): “Doing More with Asynchronous Python”

   
Title Doing More with Asynchronous Python
Topic Asynchronous Python with async/await
Date Fri Dec 3
Keywords generators, coroutines, await, async def, async for, async with, PEP-492

Audience

These sessions are designed for a broad audience of non-software engineers and software programmers of all backgrounds and skill-levels.

Our expected audience should comprise attendees with a…

During this session, we will endeavour to guide our audience to developing…

Abstract

In a previous seminar, we gave a broad overview of concurrency approaches in Python, comparing them and contrasting them, touching briefly upon asyncio.

In this seminar, we will take a closer look at how asyncio approaches are structured. We’ll review the basics of generators and coroutines, build from there to asynchronous coroutines and event loops, discuss new syntax added to support asynchronous programming (e.g., async def/await.) We’ll also take a look at how this syntax fits into mechanisms added to the Python object model, and wrap up our discussion with a neat demo of a simple asynchronous system.

Agenda:

What’s Next?

Did you enjoy this seminar? Did you learn something new that will help you as you write more complex Python systems that require concurrent approaches?

In a future seminar, we may dive deeper into asynchronous and concurrent design.

We can:

Notes

print("Let's get started!")
print("Let's get started!")
from asyncssh import connect
from asyncio import run
async def main():
    async with connect(host='host0',username='dutc', password='password', port=2222, known_hosts=None) as c:
        print((await c.run('date')).stdout)

run(main())

Motivation

Limitations:

Pitfalls

# `async def` → `def`       ✓
# `async def` → `async def` ✓
# `def`       → `def`       ✓
# `def`       → `async def` ❌

Example: Worker Pool

from asyncio import run, gather
from asyncssh import connect
from itertools import product
from shlex import quote

params = {
    'port':       2222,
    'username':   'dutc',
    'password':   'password',
    'known_hosts': None,
}
commands = [
    ('uptime', []),
    ('free', []),
    ('cat', ['/proc/cpuinfo']),
]
hosts = [
    'host0', 'host1', 'host2', 'host3',
    'host4', 'host5', 'host6', 'host7',
]

async def task(host, commands):
    async with connect(**params, host=host) as c:
        commands = {
            (exec_, tuple(args)): f'{exec_} {" ".join(quote(x) for x in args)}'
            for exec_, args in commands
        }
        tasks = [c.run(cmd) for cmd in commands.values()]
        results = dict(zip(commands, await gather(*tasks)))
        #  print(f'{({k: v.command for k, v in results.items()}) = }')
        print(
            results['free', ()].stdout
        )

async def main():
    tasks = [task(h, commands) for h in hosts]
    await gather(*tasks)

run(main())
from asyncio import run, gather, sleep as aio_sleep
from random import random, randint
from time import sleep as tm_sleep

async def task(name):
    print(f'before {name}')
    async for x in g(randint(0, 5)):
        print(f'{x = } {name = }')
    await aio_sleep(1)
    print(f'after  {name}')

async def g(xs):
    for x in range(xs):
        await aio_sleep(1)
        yield x

async def main():
    tasks = [
        task('task1'),
        task('task2'),
        task('task3'),
    ]
    await gather(*tasks)

run(main())

Example: Producer/Consumer

from asyncssh import connect
from asyncio import run, gather, sleep
from random import choice
from string import ascii_lowercase
from shlex import quote

commands = [
    ('uptime', []),
    ('free',   []),
    ('cat',    ['/proc/cpuinfo']),
]
hosts = [
    'host0', 'host1', 'host2', 'host3',
    'host4', 'host5', 'host6', 'host7',
]
params = {
    'port':        2222,
    'username':   'dutc',
    'password':   'password',
    'known_hosts': None,
}

tasks = set()
async def producer():
    async with connect(host='host0', **params) as c:
        while True:
            res = await c.run('seq 1 7 | shuf | head -n1')
            host = f'host{int(res.stdout)}'
            tasks.add(host)
            await sleep(0)

async def consumer():
    while True:
        if tasks:
            t = tasks.pop()
            async with connect(host=t, **params) as c:
                command_text = {
                    (exec_, tuple(args)): f'{exec_} {" ".join(quote(x) for x in args)}'
                    for exec_, args in commands
                }
                subtasks = [c.run(cmd) for cmd in command_text.values()]
                results = dict(zip(command_text, await gather(*subtasks)))
                print(
                    results['free', ()].stdout
                )
        await sleep(0)

async def main():
    pool = [consumer() for _ in range(3)]
    await gather(producer(), *pool)

run(main())