seminars.fb

Embedded Learning Series (for Python !)

Theme: Device Automation

Topic: Automation with asyncssh and paramiko

Date: Friday, October 9, 2020

import asyncssh
import paramiko
FROM ubuntu:18.04

RUN apt update -y && apt install -y ssh
RUN systemctl enable ssh

COPY user.sh /tmp
RUN sh /tmp/user.sh

CMD service ssh start && sleep 3600

EXPOSE 22
#!/bin/bash

useradd -m -s /bin/bash user
echo user:password | chpasswd
echo 'PasswordAuthentication yes' >> /etc/ssh/sshd_config
$ docker build -t test .
$ docker run --rm -P -p 2222:22 --name test test
$ docker exec -it test /bin/bash
$ ssh -p 2222 -o StrictHostKeyChecking=no user@localhost hostname
from asyncio import run
from asyncssh import connect

async def main():
    async with connect('localhost', port=2222) as conn:
        pass

run(main())
from asyncio import run
from asyncssh import connect, SSHClientConnectionOptions


async def main():
    for host in ...:
        options = SSHClientConnectionOptions(username='user', password='password')
        async with connect('localhost', port=2222, options=options) as conn:
            pass

run(main())
from asyncio import run
from asyncssh import connect, SSHClientConnectionOptions

options = SSHClientConnectionOptions(username='user', password='password')

async def main():
    async with connect('localhost', port=2222, options=options) as conn:
        result = await conn.run('cat /etc/lsb-release')
        print(f'{result.stdout = !r}')

run(main())
from asyncio import run
from asyncssh import connect, SSHClientConnectionOptions, SSHClientSession

class Session(SSHClientSession):
    def data_received(self, data, datatype):
        print(f'{data = }')

options = SSHClientConnectionOptions(username='user', password='password')
async def main():
    async with connect('localhost', port=2222, options=options) as conn:
        chan, session = await conn.create_session(Session, 'seq 1 100; seq 1 100)
        await chan.wait_closed()

run(main())
from paramiko import SSHClient, AutoAddPolicy

with SSHClient() as client:
    client.set_missing_host_key_policy(AutoAddPolicy())
    client.connect('localhost', port=2222, username='user', password='password')
    _, stdout, _ = client.exec_command('hostname')
    print(f'{stdout = }')
    print(f'{[*stdout] = }')