Theme: Device Automation
Topic: Using asyncssh
and paramiko
Date: Monday, October 12, 2020
from asyncio import run
from asyncssh import connect, SSHClientConnectionOptions
from collections import namedtuple
options = SSHClientConnectionOptions(username='user', password='password')
# ubuntu
# hostname
# lsb-release
# os-release
# debian
# hostname
# os-release
async def main(tests):
results = []
for t in tests:
try:
async with connect(t.machine.host, port=t.machine.port, options=options) as conn:
for c in t.commands:
try:
payload = await conn.run(c)
r = Result(t, c, Success(payload))
results.append(r)
except Exception as e:
r = Result(t, c, Failure(e))
results.append(r)
except OSError as e:
for c in t.commands:
r = Result(t, c, Failure(e))
results.append(r)
return results
Test = namedtuple('Test', 'name machine commands')
Machine = namedtuple('Machine', 'host port')
Result = namedtuple('Result', 'test command result')
Success = namedtuple('Success', 'payload')
Failure = namedtuple('Failure', 'payload')
# Test ... Test ... Test ... Test ....
# Ubuntu . . . .
# Debian . . . .
# Tests
# o → o → o
tests = {
(Test, Test): ...
(Test, Test): ...
}
Test = namedtuple('Test', 'steps')
Test('get hostname')
class Machine:
{ 'get_hostname' : ... }
class Test1:
def run_test(self):
self.get_os_info()
class Test2:
def run_other_test(self):
...
self.get_os_info()
...
class Ubuntu(Platform):
def get_os_info(self):
...
...
...
class UbuntuTest1(Test1, Ubuntu):
...
class UbuntuTest2(Test2, Ubuntu): ...
...
TestInstance(Test, Machine) # → Attempt
tests = [
Test('OS Information',
Machine('localhost', 2222),
['hostname', 'cat /etc/lsb-release', 'cat /etc/os-release']),
Test('OS Information',
Machine('localasdfasdfhost', 2220),
['hostname', 'cat /etc/os-release']),
]
results =run(main(tests))
from socket import gaierror
for r in results:
if isinstance(r.result, Success):
print(f'{r.test.machine = } {r.command = }')
class UbuntuCommands:
get_os_information = Command('cat /etc/os-release', 'cat /etc/lsb-release', parse)
class DebianCommands:
get_os_information = Command('cat /etc/os-release', parse)
class OsInformationTest:
...
class UbuntuOsInformationTest:
def __init__(self, test, commands):
...
def __call__(self):
tests = {
(OsInformationTest, UbuntuCommands): UbuntuOsInformationInfo,
}
from networkx import DiGraph
from collections import namedtuple
Step = namedtuple('Step', 'platform name')
class OsInformationTest:
steps = [
Step(None, 'init'),
Step(None, 'LSB info'),
Step(None, 'OS info'),
]
steps = {s.name: s for s in steps}
edges = (
(steps['init'] , steps['LSB info']),
(steps['LSB info'] , steps['OS info'])
)
graph = DiGraph()
for u, v in edges:
graph.add_edge(u, v)
platform_steps = [
Step('ubuntu', '...'),
Step('ubuntu', '...'),
Step('ubuntu', '...'),
]
for u, v in OsInformationTest.edges:
print(u, v)
class Test:
class OsInfoTest:
def run(self):
...
async def get_free_mem(ssh_context):
with ssh_context as ssh:
result = await ssh.run('free -h')
return Result(result)
class Minipack:
measures = get_free_mem,
devices = {
'abc.fbnet',
'def.fbnet',
'xyz.fbnet',
}
devices = [
('abc.fbnet', 'Minipack',),
('def.fbnet', ...),
('xyz.fbnet', ...),
]
# Attempts
# --------
# hostname time success payload raw_text
# abc.fbnet ... 'asdf'
# def.fbnet ... 'asdf'
# xyz.fbnet ... 'asdf'
# abc.fbnet ... 'asdf'
# def.fbnet ... 'xxxx'
# xyz.fbnet ... 'asdf'
# abc.fbnet ... 'asdf'
# def.fbnet ... 'asdf'
# xyz.fbnet ... 'asdf'
from networkx import DiGraph
StressTest = DiGraph()
StressTest.add_edge(SetupAction('flush caches'), SetupAction('lots of disk reads'))
StressTest.add_edge(SetupAction('lots of disk reads'), TestAction('check in cache'))
#
# FLUSH CACHES - ...
# |
# WARM UP WITH DISK READS - ...
# |
# CHECK IN CACHE - ...
# / \
# ✓ × (remediation)
# - ...
import asyncio
# Machine Machine Machine
# | | |
#
from multiprocessing import Process
from multiprocessing import SharedMemory
def f():
d = {}
d[k] = v
from dis import dis
dis(f)
# GIL
from time import sleep
def task(name):
while True:
print(f'{name = }')
print(f'{name = }')
print(f'{name = }')
yield
def scheduler(*tasks):
while True:
for t in tasks:
next(t)
sleep(.2)
scheduler(task('task 123'), task('task 456'))
from threading import Thread
from time import sleep
def target(name):
while True:
print(f'{name = }')
sleep(.2)
pool = [Thread(target=target, args=(f'thread-{x}',)) for x in range(2)]
for t in pool:
t.start()
for t in pool:
t.join()
from asyncio import run, gather
from asyncssh import connect, SSHClientConnectionOptions
from collections import namedtuple
options = SSHClientConnectionOptions(username='user', password='password')
async def main():
commands = ['sleep 1 && date', 'sleep 2 && date', 'sleep 3 && date']
async with connect('localhost', port=2222, options=options) as conn:
tasks = [conn.run(c) for c in commands]
results = await gather(*tasks)
print(f'{results = }')
from time import perf_counter
before = perf_counter()
run(main())
after = perf_counter()
print(f'{after - before = }')
# A - 1s
# / | \
# B C D - 1s
# \ | /
# E - 1s
# topological sorting
# a
if ...:
... # b
else:
... # c
graph.add_edge('a', 'b')
graph.add_edge('a', 'c')
def process():
while True:
# a
x = yield
if ...:
... # b
x = yield
else:
... # c
x = yield
from pandas import Series
from asyncio import run
from asyncssh import connect, SSHClientConnectionOptions
from asyncssh.misc import ChannelOpenError
from time import sleep
options = SSHClientConnectionOptions(username='user', password='password')
async def main():
results = []
async with connect('localhost', port=2222, options=options) as conn:
for _ in range(100):
try:
result = await conn.run('free')
except ChannelOpenError:
break
results.append(result)
sleep(.1)
return results
results = run(main())
results = Series([r.stdout for r in results])
results = results.str.split('\n').str[1].str.split().str[2]
print(f'{results = }')
from collections import namedtuple
class Handle(namedtuple('HandleBase', 'data')):
TYPES = {}
@classmethod
def from_text(cls, data):
header = data[1]
subcls = cls.TYPES.get(header, None)
if subcls is None:
return cls(data)
return subcls.from_text(data)
def __init_subclass__(cls, header):
cls.TYPES[header] = cls
MemoryAddress = namedtuple('MemoryArray', 'starting ending')
class MemoryArrayMappedHandle(Handle, header='Memory Array Mapped Address'):
@classmethod
def from_text(cls, data):
starting, ending = data[2], data[3]
starting = int(starting.split()[-1], 16)
ending = int(ending.split()[-1], 16)
return cls(MemoryAddress(starting, ending))
# use the below to eliminate some "manual loop management" in `parse`
from itertools import islice, tee
nwise = lambda g,n=2: zip(*(islice(g, i, None) for i, g in enumerate(tee(g, n))))
def parse(lines):
handle = None
for line in (line.rstrip() for line in f if line.strip()):
if line.startswith('Handle'):
if handle is not None:
yield handle
handle = []
if handle is not None:
handle.append(line)
with open('dmidecode.out') as f:
handles = [Handle.from_text(x) for x in parse(f)]
for h in handles:
if isinstance(h, MemoryArrayMappedHandle):
print(h)
from collections import namedtuple
from json import dumps
class Point(namedtuple('PointBase', 'y x z')):
def __new__(cls, x, y, z=0):
return super().__new__(cls, x, y, z)
@property
def w(self):
return self.x - self.y
def __call__(self, a, b, c):
return sum([a, b, c, *self])
p = Point(x=10, y=-10)
print(f'{p.w = }')
print(f'{p.x = }')
print(f'{p.y = }')
print(f'{p.z = }')
print(f'{p(10, 20, 30) = }')
print(f'{p._replace(z=10000) = }')
from dataclasses import dataclass
@dataclass
class Point:
x : int
y : int
z : int = 100
@property
def w(self):
return self.x - self.y
def __call__(self, a, b, c):
return sum([a, b, c, self.x, self.y, self.z])
p = Point(x=10, y=-10.0, z='asdfasdfasdf')
print(f'{p = }')
# p.x = 1000000
# print(f'{p.w = }')
# print(f'{p.x = }')
# print(f'{p.y = }')
# print(f'{p.z = }')
# print(f'{p(10, 20, 30) = }')
# a
# | \
# b c
from enum import Enum # metaclass
from collections import namedtuple # code generation via `exec`
from dataclasses import dataclass # class decorator
# __init_subclass_