seminars.fb

Automation → “Concurrency Approaches & threading, multiprocessing, and asyncio

Seminar (Wed, Nov 11, 2020; 9 AM PST)

Theme: Automation

Topic: Concurrency Approaches & threading, multiprocessing, and asyncio

Keywords: Python, automation, concurrency, parallelism, threading, multiprocessing, asyncio

Presenter James Powell james@dutc.io
Date Wednesday, November 11, 2020
Time 9:00 AM PST
print("Let's get started!")
print("Let's get started!")
print("Let's get started!")
print("Let's get started!")
from time import sleep
from os import getpid, getppid
from threading import get_ident

data = [1, 2, 3, 4, 5, 6, 7, 8, 9]
def target(idx, name):
    while True:
        print(f'#{idx} {name} @ {getppid()}{getpid()}/{get_ident()} {data.pop() if data else None}')
        sleep(.5)

target(0, 'aaa')
from time import sleep
from os import getpid, getppid
from threading import Thread, get_ident
from string import ascii_lowercase
from random import choice

randname = lambda: ''.join(choice(ascii_lowercase) * 2 for _ in range(2))

def target(idx, name):
    while True:
        print(f'#{idx} {name} @ {getppid()}{getpid()}/{get_ident()}')
        sleep(.75)

pool = [Thread(target=target, args=(idx, randname())) for idx in range(4)]
for x in pool:
    x.start()
for x in pool:
    x.join()
from time import sleep
from os import getpid, getppid
from threading import get_ident
from multiprocessing import Process
from string import ascii_lowercase
from random import choice

randname = lambda: ''.join(choice(ascii_lowercase) * 2 for _ in range(2))

data = [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
def target(idx, name):
    while True:
        print(f'#{idx} {name} @ {getppid()}{getpid()}/{get_ident()} {data.pop() if data else None}')
        sleep(.75)

pool = [Process(target=target, args=(idx, randname())) for idx in range(4)]
for x in pool:
    x.start()
for x in pool:
    x.join()
from time import sleep
from os import getpid, getppid
from threading import get_ident
from multiprocessing import Process, Queue
from string import ascii_lowercase
from random import choice, randrange

from collections import namedtuple
Success = namedtuple('Success', 'job result')

randname = lambda: ''.join(choice(ascii_lowercase) * 2 for _ in range(2))

def target(idx, name, jobs, results):
    while not jobs.empty():
        jb = jobs.get()
        rv = jb ** 2
        s = Success(jb, rv)
        results.put(s)
        print(f'#{idx} {name} @ {getppid()}{getpid()}/{get_ident()} ({s})')
        sleep(.75)

jobs, results = Queue(), Queue()
pool = [Process(target=target, args=(idx, randname(), jobs, results)) for idx in range(4)]

for _ in range(20):
    jobs.put(randrange(-100, 100))

for x in pool:
    x.start()
for x in pool:
    x.join()

all_results = []
while not results.empty():
    all_results.append(results.get())

print(f'{all_results = }')
def f():
    print(f'Step #1')
    x = 1
    print(f'Step #2')
    y = 20
    print(f'Step #3')
    z = 300
    return x, y, z

print('Before.'.center(20, '-'))
print(f'{f() = }')
print('After.'.center(20, '-'))
from itertools import count
def g():
    print(f'Step #1')
    yield 1
    print(f'Step #2')
    yield 20
    print(f'Step #3')
    yield 300

print('Before.'.center(20, '-'))
gi = g()
print(f'{next(gi) = }')
print('Interleave.'.center(20, '-'))
print(f'{next(gi) = }')
print('Interleave.'.center(20, '-'))
print(f'{next(gi) = }')
print('After.'.center(20, '-'))
from time import sleep
from random import choice
from string import ascii_lowercase

randname = lambda: ''.join(choice(ascii_lowercase) * 2 for _ in range(2))

def task(idx, name):
    while True:
        print(f'{idx} {name}')
        yield
        sleep(.75)

def scheduler(tasks):
    while True:
        for t in tasks:
            next(t)

tasks = [task(idx, randname()) for idx in range(4)]
scheduler(tasks)
from random import choice, random
from string import ascii_lowercase
from asyncio import sleep, run, gather

randname = lambda: ''.join(choice(ascii_lowercase) * 2 for _ in range(2))

data = [0, 1, 2, 3, 4, 5, 6, 7, 8]
async def task(idx, name):
    while True:
        print(f'{idx} {name}')
        await sleep(random())

async def main(tasks):
    await gather(*tasks)

tasks = [task(idx, randname()) for idx in range(4)]
run(main(tasks))
from os import getpid, getppid
from threading import get_ident, Thread
from multiprocessing import Process
from random import choice
from string import ascii_lowercase
from asyncio import sleep, run, gather

randname = lambda: ''.join(choice(ascii_lowercase) * 2 for _ in range(2))

async def task(idx, name):
    while True:
        print(f'#{idx} {name} @ {getppid()}{getpid()}/{get_ident()}')
        await sleep(.75)

async def main(tasks):
    await gather(*tasks)

def thread():
    tasks = [task(idx, randname()) for idx in range(4)]
    run(main(tasks))

def process():
    pool = [Thread(target=thread) for _ in range(4)]
    for x in pool:
        x.start()
    for x in pool:
        x.join()

pool = [Process(target=process) for _ in range(4)]
for x in pool:
    x.start()
for x in pool:
    x.join()