Theme: Programming Fundamentals
Topic: Generators
Keywords: generators, laziness, coroutines, comprehension syntax
Presenter | James Powell james@dutc.io |
Date | Friday, December 18, 2020 |
Time | 9:00 AM PST |
print('Good morning!')
print('Good morning!')
print('Good morning!')
print('Good morning!')
xs = [*'abcd']
ys = [*'wxyz']
for x, y in zip(xs, ys):
print(f'{x, y = }')
print(f'{zip(xs, ys) = }')
data = range(10)
print(f'{data = }')
# for x in data:
# print(f'{x = }')
data = range(10000000000)
for idx, x in enumerate(data):
print x
if idx > 10:
break
print data
data = xrange(10)
print data
xs = [-2, -1, 0, 1, 2, 3]
ys = []
for x in xs:
ys.append(x**2)
print(f'{ys = }')
ys = [x**2 for x in xs]
print(f'{ys = }')
data = range(10)
lc = [x**2 for x in data]
sc = {x**2 for x in data}
dc = {x: x**2 for x in data}
ge = (x**2 for x in data)
tc = *(x**2 for x in data),
print(f'{type(lc) = !r:<15} {lc = }')
print(f'{type(sc) = !r:<15} {sc = }')
print(f'{type(dc) = !r:<15} {dc = }')
print(f'{type(ge) = !r:<15} {ge = }')
xs = [1, 2, 3]
print(f'{next(xs) = }')
xs = [1, 2, 3]
xi = iter(xs)
print(f'{next(xi) = }')
# iterable
xs = [1, 2, 3]
for x in xs:
print(f'{x = }')
f = open('/etc/group')
for line in f:
print(f'{line = }')
xs = [1, 2, 3]
print(f'{next(xs) = }') # 1
print(f'{next(xs) = }') # 2
print(f'{next(xs) = }') # 3
for x in xs:
print(f'{x = }')
# iterator
# - just the iterable
# - some additional storage/state/“where I left off”
xs = [1, 2, 3, 4]
xi1 = iter(xs)
xi2 = iter(xs)
print(f'{next(xi1) = }')
print(f'{next(xi2) = }')
print(f'{next(xi1) = }')
print(f'{next(xi1) = }')
print(f'{next(xi2) = }')
s = 'a,b,c' # i. str
xs = ['a', 'b', 'c'] # ii. list
# structure data
# - in-band: add structuring elements/notation to the data
# - quoting
# - escaping
# - using unique, unambiguous tokens
# - out-of-band: we use some mechanism separate from the data
def f(w):
x = w * 2
y = w + 1
z = w ** 3
return x, y, z
print(f'{f(10) = }')
from dis import dis
dis(f)
def f(w):
return w * 2
from dis import dis
dis(f)
def g(w):
x = w * 2
yield
y = w + 1
yield
z = w ** 3
yield
return x, y, z
print(f'{g(10) = }')
def g(w):
x = w * 2
yield
y = w + 1
yield
z = w ** 3
yield
return x, y, z
gi = g(10)
print(f'{next(gi) = }')
print(f'{next(gi) = }')
print(f'{next(gi) = }')
def g(w):
x = w * 2
yield x
y = w + 1
yield y
z = w ** 3
yield z
gi = g(10)
print(f'{next(gi) = }')
print(f'{next(gi) = }')
xs = [*'abc']
ys = [*'xyz']
print(f'{xs = }')
print(f'{ys = }')
print(f'{zip(xs, ys) = }')
xs = list('abc')
ys = list('xyz')
print xs
print ys
print zip(xs, ys)
for a, b in zip(range(10_000_000_000), range(10_000_000, 20_000_000_000)):
print(f'{a, b = }')
data = range(10)
xs = (x**2 for x in data)
def process(data):
for x in data:
yield x**2
xs = process(data)
print(f'{xs = }')
print(f'{next(xs) = }')
print(f'{next(xs) = }')
print(f'{next(xs) = }')
def f(w):
x = w * 2
y = w + 1
z = w ** 3
return x, y, z
for x in f.__code__.co_code:
print(f'{x = }')
from dis import dis
dis(f)
def g(w):
x = w * 2
yield x
y = w + 1
yield y
z = w ** 3
yield z
from dis import dis
dis(g)
from inspect import currentframe, getouterframes
def f(x):
y = x * 2
return g(y)
def g(y):
z = y ** 2
return h(z)
def h(z):
w = z + 2
return getouterframes(currentframe())
for f in f(10)[:-1]:
print(f'{f.frame.f_locals = }')
def f():
return 1, 2, 3
print(f'{f() = }')
def g():
yield 1
yield 2
yield 3
gi = g()
print(f'{next(gi) = }')
print(f'{gi.gi_frame.f_lasti = }')
print(f'{next(gi) = }')
print(f'{gi.gi_frame.f_lasti = }')
print(f'{next(gi) = }')
print(f'{gi.gi_frame.f_lasti = }')
from dis import dis
dis(g)
# iterable
s = '1,2,3,4'
xs = [1, 2, 3, 4] # out-of-band
# iterable
def f():
x = w * 2
y = w + 2
z = y ** 3
return x, y, z
def g(w=1):
x = w * 2
yield x
y = w + 2
yield y
z = y ** 3
yield z
for x in xs:
print(f'{x = }')
for x in g():
print(f'{x = }')
xs = [1, 2, 3, 4]
xi = iter(xs)
def g():
yield 1
yield 2
yield 3
gi = g()
data = range(10)
process = lambda: (x**2 for x in data)
xs = process(data)
def process(data):
for x in data:
yield x**2
xs = process(data)
def db():
print('connect()')
print('config()')
print('query()')
print('disconnect()')
db()
class db:
def connect(self):
print('connect()')
def config(self):
print('config()')
def query(self):
print('query()')
def disconnect(self):
print('disconnect()')
x = db()
x.connect()
...
x.config()
...
x.disconnect()
...
x.query()
def db():
print('connect()')
yield
print('config()')
yield
print('query()')
yield
print('disconnect()')
x = db()
next(x)
next(x)
next(x)
next(x, None)
from time import sleep
from random import random
from time import perf_counter
start1 = perf_counter()
sleep(random())
start2 = perf_counter()
sleep(random() * 2)
stop2 = perf_counter()
print(f'Elapsed \N{greek capital letter delta}t: {stop2 - start2:.2f}s')
sleep(random() / 10)
stop1 = perf_counter()
print(f'Elapsed \N{greek capital letter delta}t: {stop1 - start1:.2f}s')
from fake_xdb_api import connect
with connect() as db:
with db.transaction():
...
with db.transaction():
...
with opem('/etc/group') as f:
...
from time import sleep
from random import random
from time import perf_counter
from contextlib import contextmanager
class timed:
def __init__(self, msg):
self.msg = msg
def __enter__(self):
self.start = perf_counter()
def __exit__(self, *_):
self.stop = perf_counter()
print(f'Elapsed [{self.msg:^15}] \N{greek capital letter delta}t: {self.stop - self.start:.2f}s')
@contextmanager
def timed(msg):
start = perf_counter()
try:
yield
finally:
stop = perf_counter()
print(f'Elapsed [{msg:^15}] \N{greek capital letter delta}t: {stop - start:.2f}s')
with timed('overall'):
sleep(random())
with timed('section'):
sleep(random() * 2)
sleep(random() / 10)
def db():
print('connect')
config = yield
print(f'config: {config}')
query = yield
while query:
print(f'query: {query}')
query = yield
print('disconnect')
x = db()
next(x)
x.send('some configuration')
x.send('some query')
x.send('some other query')
x.send('some yet another other query')
next(x, None)
def fib(n):
rv = [1, 1]
while True:
if len(rv) == n:
break
rv.append(rv[-1] + rv[-2])
return rv
print(f'{fib(15) = }')
def fib(m):
rv = [1, 1]
while True:
if rv[-1] + rv[-2] >= m:
break
rv.append(rv[-1] + rv[-2])
return rv
print(f'{fib(90) = }')
from time import perf_counter
def fib(t):
rv = [1, 1]
start = perf_counter()
while True:
if perf_counter() - start >= t:
break
rv.append(rv[-1] + rv[-2])
return rv
print(f'{fib(1e-5) = }')
from math import isclose, sqrt
from time import perf_counter
phi = 1/2 + sqrt(5)/2
def fib(r):
rv = [1, 1]
while True:
if isclose(rv[-1] / rv[-2], phi, abs_tol=r):
break
rv.append(rv[-1] + rv[-2])
return rv
print(f'{phi = }')
print(f'{fib(.00001) = }')
def fib(a=1, b=1):
while True:
yield a
a, b = b, a + b
# --------- #
for idx, x in enumerate(fib()):
if idx > 10:
break
print(f'{x = }')
for x in fib():
if x > 90:
break
print(f'{x = }')
from time import perf_counter
start = perf_counter()
for x in fib():
if perf_counter() - start > 1e-4:
break
print(f'{x = }')
def fib(a=1, b=1):
while True:
yield a
a, b = b, a + b
from itertools import islice, takewhile
print(f'{[*islice(fib(), 10)] = }')
print(f'{[*takewhile(lambda x: x < 90, fib())] = }')
from time import perf_counter
def timed(g, t):
start = perf_counter()
for x in g:
if perf_counter() - start >= t:
break
yield x
print(f'{[*timed(fib(), 1e-5)] = }')
from itertools import islice, tee
def nwise(g, n=2):
return zip(*(islice(g, idx, None) for idx, g in enumerate(tee(g, 2))))
from math import isclose, sqrt
phi = 1/2 + sqrt(5)/2
print(f'{[*takewhile(lambda a_b: not isclose(a_b[-1] / a_b[0], phi, abs_tol=1e-5), nwise(fib()))] = }')