Theme: Device Automation
Topic: Automation with asyncssh
and paramiko
Date: Friday, October 9, 2020
import asyncssh
import paramiko
FROM ubuntu:18.04
RUN apt update -y && apt install -y ssh
RUN systemctl enable ssh
COPY user.sh /tmp
RUN sh /tmp/user.sh
CMD service ssh start && sleep 3600
EXPOSE 22
#!/bin/bash
useradd -m -s /bin/bash user
echo user:password | chpasswd
echo 'PasswordAuthentication yes' >> /etc/ssh/sshd_config
$ docker build -t test .
$ docker run --rm -P -p 2222:22 --name test test
$ docker exec -it test /bin/bash
$ ssh -p 2222 -o StrictHostKeyChecking=no user@localhost hostname
from asyncio import run
from asyncssh import connect
async def main():
async with connect('localhost', port=2222) as conn:
pass
run(main())
from asyncio import run
from asyncssh import connect, SSHClientConnectionOptions
async def main():
for host in ...:
options = SSHClientConnectionOptions(username='user', password='password')
async with connect('localhost', port=2222, options=options) as conn:
pass
run(main())
from asyncio import run
from asyncssh import connect, SSHClientConnectionOptions
options = SSHClientConnectionOptions(username='user', password='password')
async def main():
async with connect('localhost', port=2222, options=options) as conn:
result = await conn.run('cat /etc/lsb-release')
print(f'{result.stdout = !r}')
run(main())
from asyncio import run
from asyncssh import connect, SSHClientConnectionOptions, SSHClientSession
class Session(SSHClientSession):
def data_received(self, data, datatype):
print(f'{data = }')
options = SSHClientConnectionOptions(username='user', password='password')
async def main():
async with connect('localhost', port=2222, options=options) as conn:
chan, session = await conn.create_session(Session, 'seq 1 100; seq 1 100)
await chan.wait_closed()
run(main())
from paramiko import SSHClient, AutoAddPolicy
with SSHClient() as client:
client.set_missing_host_key_policy(AutoAddPolicy())
client.connect('localhost', port=2222, username='user', password='password')
_, stdout, _ = client.exec_command('hostname')
print(f'{stdout = }')
print(f'{[*stdout] = }')