seminars.fb

Embedded Learning Series (for Python !)

Theme: Device Automation

Topic: Using asyncssh and paramiko

Date: Monday, October 12, 2020

from asyncio import run
from asyncssh import connect, SSHClientConnectionOptions
from collections import namedtuple

options = SSHClientConnectionOptions(username='user', password='password')

# ubuntu
#  hostname
#  lsb-release
#  os-release
# debian
#  hostname
#  os-release

async def main(tests):
    results = []
    for t in tests:
        try:
            async with connect(t.machine.host, port=t.machine.port, options=options) as conn: 
                for c in t.commands:
                    try:
                        payload = await conn.run(c)
                        r = Result(t, c, Success(payload))
                        results.append(r)
                    except Exception as e:
                        r = Result(t, c, Failure(e))
                        results.append(r)
        except OSError as e:
            for c in t.commands:
                r = Result(t, c, Failure(e))
                results.append(r)
    return results

Test    = namedtuple('Test', 'name machine commands')
Machine = namedtuple('Machine', 'host port')

Result  = namedtuple('Result', 'test command result')

Success = namedtuple('Success', 'payload')

Failure = namedtuple('Failure', 'payload')


#         Test ... Test ... Test ... Test ....
# Ubuntu   .         .        .        .
# Debian   .         .        .        . 

# Tests
#  o → o → o

tests = {
    (Test, Test): ...
    (Test, Test): ...
}

Test = namedtuple('Test', 'steps')
Test('get hostname')

class Machine:
    { 'get_hostname' : ... }


class Test1:
    def run_test(self):
        self.get_os_info()

class Test2:
    def run_other_test(self):
        ...
        self.get_os_info()
        ...

class Ubuntu(Platform):
    def get_os_info(self):
        ...
        ...
        ...

class UbuntuTest1(Test1, Ubuntu):
        ...

class UbuntuTest2(Test2, Ubuntu): ...
        ...

TestInstance(Test, Machine) # → Attempt

tests = [
    Test('OS Information',
         Machine('localhost', 2222),
         ['hostname', 'cat /etc/lsb-release', 'cat /etc/os-release']),
    Test('OS Information',
         Machine('localasdfasdfhost', 2220),
         ['hostname', 'cat /etc/os-release']),
]

results =run(main(tests))

from socket import gaierror

for r in results:
    if isinstance(r.result, Success):
        print(f'{r.test.machine = } {r.command = }')

class UbuntuCommands:
    get_os_information = Command('cat /etc/os-release', 'cat /etc/lsb-release', parse)

class DebianCommands:
    get_os_information = Command('cat /etc/os-release', parse)

class OsInformationTest:
    ...

class UbuntuOsInformationTest:
    def __init__(self, test, commands):
        ...

    def __call__(self):


tests = {
    (OsInformationTest, UbuntuCommands): UbuntuOsInformationInfo,
}
from networkx import DiGraph
from collections import namedtuple

Step = namedtuple('Step', 'platform name')

class OsInformationTest:
    steps = [
        Step(None, 'init'),
        Step(None, 'LSB info'),
        Step(None, 'OS info'),
    ]
    steps = {s.name: s for s in steps}
    edges = (
        (steps['init'] ,    steps['LSB info']),
        (steps['LSB info'] , steps['OS info'])
    )
    graph = DiGraph()
    for u, v in edges:
        graph.add_edge(u, v)

    platform_steps = [
        Step('ubuntu', '...'),
        Step('ubuntu', '...'),
        Step('ubuntu', '...'),
    ]

for u, v in OsInformationTest.edges:
    print(u, v)

class Test:
    


class OsInfoTest:
    def run(self):
        ...

async def get_free_mem(ssh_context):
    with ssh_context as ssh:
        result = await ssh.run('free -h')
        return Result(result)

class Minipack:
    measures = get_free_mem,

devices = {
    'abc.fbnet',
    'def.fbnet',
    'xyz.fbnet',
}

devices = [
    ('abc.fbnet', 'Minipack',),
    ('def.fbnet', ...),
    ('xyz.fbnet', ...),
]


# Attempts
# --------
# hostname time success payload raw_text
# abc.fbnet ...                 'asdf'
# def.fbnet ...                 'asdf'
# xyz.fbnet ...                 'asdf'
# abc.fbnet ...                 'asdf'
# def.fbnet ...                 'xxxx'
# xyz.fbnet ...                 'asdf'
# abc.fbnet ...                 'asdf'
# def.fbnet ...                 'asdf'
# xyz.fbnet ...                 'asdf'
from networkx import DiGraph

StressTest = DiGraph()
StressTest.add_edge(SetupAction('flush caches'), SetupAction('lots of disk reads'))
StressTest.add_edge(SetupAction('lots of disk reads'), TestAction('check in cache'))

#
#          FLUSH CACHES - ...
#                |
#          WARM UP WITH DISK READS - ...
#                |
#          CHECK IN CACHE - ...
#             /     \ 
#            ✓       × (remediation)
#            - ...
import asyncio
# Machine Machine Machine
#    |       |      |
# 
from multiprocessing import Process
from multiprocessing import SharedMemory
def f():
    d = {}
    d[k] = v

from dis import dis
dis(f)

# GIL

from time import sleep

def task(name):
    while True:
        print(f'{name = }')
        print(f'{name = }')
        print(f'{name = }')
        yield

def scheduler(*tasks):
    while True:
        for t in tasks:
            next(t)
            sleep(.2)

scheduler(task('task 123'), task('task 456'))
from threading import Thread
from time import sleep

def target(name):
    while True:
        print(f'{name = }')
        sleep(.2)

pool = [Thread(target=target, args=(f'thread-{x}',)) for x in range(2)]
for t in pool:
    t.start()
for t in pool:
    t.join()
from asyncio import run, gather
from asyncssh import connect, SSHClientConnectionOptions
from collections import namedtuple

options = SSHClientConnectionOptions(username='user', password='password')

async def main():
    commands = ['sleep 1 && date', 'sleep 2 && date', 'sleep 3 && date']
    async with connect('localhost', port=2222, options=options) as conn: 
        tasks = [conn.run(c) for c in commands]
        results = await gather(*tasks)
        print(f'{results = }')

from time import perf_counter

before = perf_counter()
run(main())
after = perf_counter()
print(f'{after - before = }')
#             A      - 1s
#           / | \
#          B  C  D   - 1s
#          \  |  /
#             E      - 1s

# topological sorting
# a
if ...:
    ... # b
else:
    ... # c

graph.add_edge('a', 'b')
graph.add_edge('a', 'c')

def process():
    while True:
        # a
        x = yield
        if ...:
            ... # b
            x = yield
        else:
            ... # c
            x = yield
from pandas import Series
from asyncio import run
from asyncssh import connect, SSHClientConnectionOptions
from asyncssh.misc import ChannelOpenError
from time import sleep

options = SSHClientConnectionOptions(username='user', password='password')

async def main():
    results = []
    async with connect('localhost', port=2222, options=options) as conn: 
        for _ in range(100):
            try:
                result = await conn.run('free')
            except ChannelOpenError:
                break
            results.append(result)
            sleep(.1)
    return results

results = run(main())
results = Series([r.stdout for r in results])
results = results.str.split('\n').str[1].str.split().str[2]
print(f'{results = }')
from collections import namedtuple
class Handle(namedtuple('HandleBase', 'data')):
    TYPES = {}

    @classmethod
    def from_text(cls, data):
        header = data[1]
        subcls = cls.TYPES.get(header, None)
        if subcls is None:
            return cls(data)
        return subcls.from_text(data)

    def __init_subclass__(cls, header):
        cls.TYPES[header] = cls

MemoryAddress = namedtuple('MemoryArray', 'starting ending')
class MemoryArrayMappedHandle(Handle, header='Memory Array Mapped Address'):
    @classmethod
    def from_text(cls, data):
        starting, ending = data[2], data[3]
        starting = int(starting.split()[-1], 16)
        ending   = int(ending.split()[-1], 16)
        return cls(MemoryAddress(starting, ending))

# use the below to eliminate some "manual loop management" in `parse`
from itertools import islice, tee
nwise = lambda g,n=2: zip(*(islice(g, i, None) for i, g in enumerate(tee(g, n))))

def parse(lines):
    handle = None
    for line in (line.rstrip() for line in f if line.strip()):
        if line.startswith('Handle'):
            if handle is not None:
                yield handle
            handle = []
        if handle is not None:
            handle.append(line)

with open('dmidecode.out') as f:
    handles = [Handle.from_text(x) for x in parse(f)]

for h in handles:
    if isinstance(h, MemoryArrayMappedHandle):
        print(h)

from collections import namedtuple
from json import dumps

class Point(namedtuple('PointBase', 'y x z')):
    def __new__(cls, x, y, z=0):
        return super().__new__(cls, x, y, z)
    @property
    def w(self):
        return self.x - self.y
    def __call__(self, a, b, c):
        return sum([a, b, c, *self])

p = Point(x=10, y=-10)
print(f'{p.w = }')
print(f'{p.x = }')
print(f'{p.y = }')
print(f'{p.z = }')
print(f'{p(10, 20, 30) = }')
print(f'{p._replace(z=10000) = }')

from dataclasses import dataclass
@dataclass
class Point:
    x : int
    y : int
    z : int = 100

    @property
    def w(self):
        return self.x - self.y

    def __call__(self, a, b, c):
        return sum([a, b, c, self.x, self.y, self.z])

p = Point(x=10, y=-10.0, z='asdfasdfasdf')
print(f'{p = }')
#  p.x = 1000000
#  print(f'{p.w = }')
#  print(f'{p.x = }')
#  print(f'{p.y = }')
#  print(f'{p.z = }')
#  print(f'{p(10, 20, 30) = }')

# a
# | \
# b   c

from enum import Enum              # metaclass
from collections import namedtuple # code generation via `exec`
from dataclasses import dataclass  # class decorator
                                   # __init_subclass_