Manually configuring network devices doesn’t scale. Whether you’re managing 5 servers or 500 switches, repetitive tasks — updating firewall rules, deploying configurations, checking interface status — are error-prone and time-consuming when done by hand.
Network automation uses scripts and tools to configure, monitor, and manage network devices programmatically. Python is the dominant language in this space, with mature libraries for SSH, SNMP, REST APIs, and configuration management.
Paramiko is a Python implementation of the SSHv2 protocol. It lets you connect to remote hosts, execute commands, and transfer files — all from Python code.
import paramiko
def ssh_command(host: str, username: str, password: str, command: str) -> str:
"""Execute a command on a remote host via SSH."""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(host, username=username, password=password, timeout=10)
stdin, stdout, stderr = client.exec_command(command)
output = stdout.read().decode()
errors = stderr.read().decode()
if errors:
print(f"STDERR: {errors}")
return output
finally:
client.close()
# Usage
result = ssh_command("192.168.1.10", "admin", "secret", "ip addr show")
print(result)
import paramiko
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Using a private key file
private_key = paramiko.RSAKey.from_private_key_file("/home/user/.ssh/id_rsa")
client.connect("192.168.1.10", username="admin", pkey=private_key)
stdin, stdout, stderr = client.exec_command("uname -a")
print(stdout.read().decode())
client.close()
import paramiko
def upload_file(host: str, username: str, key_path: str,
local_path: str, remote_path: str) -> None:
"""Upload a file to a remote host via SFTP."""
key = paramiko.RSAKey.from_private_key_file(key_path)
transport = paramiko.Transport((host, 22))
transport.connect(username=username, pkey=key)
sftp = paramiko.SFTPClient.from_transport(transport)
sftp.put(local_path, remote_path)
print(f"Uploaded {local_path} → {remote_path}")
sftp.close()
transport.close()
Full example: code/ssh_automation.py
Netmiko builds on Paramiko and adds support for the quirks of network devices (Cisco, Juniper, Arista, etc.) — handling prompts, enable mode, configuration mode, and output pagination.
from netmiko import ConnectHandler
device = {
"device_type": "cisco_ios",
"host": "192.168.1.1",
"username": "admin",
"password": "secret",
"secret": "enable_secret", # Enable mode password
}
with ConnectHandler(**device) as conn:
conn.enable() # Enter privileged EXEC mode
# Show commands
output = conn.send_command("show ip interface brief")
print(output)
# Configuration commands
config_commands = [
"interface GigabitEthernet0/1",
"description Uplink to Core",
"ip address 10.0.0.1 255.255.255.252",
"no shutdown",
]
conn.send_config_set(config_commands)
conn.save_config()
Netmiko supports 50+ device types:
| Device Type | Vendor |
|---|---|
cisco_ios |
Cisco IOS |
cisco_nxos |
Cisco Nexus |
arista_eos |
Arista EOS |
juniper_junos |
Juniper JunOS |
hp_procurve |
HPE ProCurve |
linux |
Linux servers |
mikrotik_routeros |
MikroTik |
paloalto_panos |
Palo Alto |
from netmiko import ConnectHandler
from concurrent.futures import ThreadPoolExecutor
devices = [
{"device_type": "cisco_ios", "host": "192.168.1.1", "username": "admin", "password": "secret"},
{"device_type": "cisco_ios", "host": "192.168.1.2", "username": "admin", "password": "secret"},
{"device_type": "cisco_ios", "host": "192.168.1.3", "username": "admin", "password": "secret"},
]
def get_version(device: dict) -> str:
with ConnectHandler(**device) as conn:
return f"{device['host']}: {conn.send_command('show version | include Version')}"
with ThreadPoolExecutor(max_workers=10) as pool:
results = pool.map(get_version, devices)
for result in results:
print(result)
Simple Network Management Protocol (SNMP) is the standard for monitoring network devices. Devices expose a tree of OIDs (Object Identifiers) that contain status information — interface counters, CPU usage, temperature, etc.
from pysnmp.hlapi import (
getCmd, SnmpEngine, CommunityData, UdpTransportTarget,
ContextData, ObjectType, ObjectIdentity
)
def snmp_get(host: str, community: str, oid: str) -> str:
"""Perform an SNMP GET request."""
iterator = getCmd(
SnmpEngine(),
CommunityData(community),
UdpTransportTarget((host, 161), timeout=5),
ContextData(),
ObjectType(ObjectIdentity(oid))
)
error_indication, error_status, error_index, var_binds = next(iterator)
if error_indication:
return f"Error: {error_indication}"
if error_status:
return f"Error: {error_status.prettyPrint()}"
for name, val in var_binds:
return f"{name.prettyPrint()} = {val.prettyPrint()}"
# Common OIDs
print(snmp_get("192.168.1.1", "public", "1.3.6.1.2.1.1.1.0")) # sysDescr
print(snmp_get("192.168.1.1", "public", "1.3.6.1.2.1.1.3.0")) # sysUpTime
print(snmp_get("192.168.1.1", "public", "1.3.6.1.2.1.1.5.0")) # sysName
| OID | Name | Description |
|---|---|---|
.1.3.6.1.2.1.1.1.0 |
sysDescr | System description |
.1.3.6.1.2.1.1.3.0 |
sysUpTime | Uptime in timeticks |
.1.3.6.1.2.1.1.5.0 |
sysName | Hostname |
.1.3.6.1.2.1.2.2.1.10.X |
ifInOctets | Bytes received on interface X |
.1.3.6.1.2.1.2.2.1.16.X |
ifOutOctets | Bytes sent on interface X |
.1.3.6.1.2.1.2.2.1.8.X |
ifOperStatus | Interface operational status |
from pysnmp.hlapi import (
nextCmd, SnmpEngine, CommunityData, UdpTransportTarget,
ContextData, ObjectType, ObjectIdentity
)
def snmp_walk(host: str, community: str, oid: str):
"""Walk an SNMP OID subtree."""
for error_indication, error_status, error_index, var_binds in nextCmd(
SnmpEngine(),
CommunityData(community),
UdpTransportTarget((host, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid)),
lexicographicMode=False
):
if error_indication or error_status:
break
for name, val in var_binds:
print(f"{name.prettyPrint()} = {val.prettyPrint()}")
# Walk all interfaces
snmp_walk("192.168.1.1", "public", "1.3.6.1.2.1.2.2.1")
Modern network devices increasingly expose REST APIs alongside or instead of CLI/SNMP:
import requests
class NetworkDeviceAPI:
def __init__(self, host: str, token: str):
self.base_url = f"https://{host}/api/v1"
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
})
self.session.verify = False # Self-signed certs in lab
def get_interfaces(self) -> list:
r = self.session.get(f"{self.base_url}/interfaces")
r.raise_for_status()
return r.json()["interfaces"]
def configure_interface(self, name: str, config: dict) -> dict:
r = self.session.put(f"{self.base_url}/interfaces/{name}", json=config)
r.raise_for_status()
return r.json()
# Usage
api = NetworkDeviceAPI("switch01.lab.local", "my-api-token")
interfaces = api.get_interfaces()
for iface in interfaces:
print(f"{iface['name']}: {iface['status']}")
import os
from datetime import datetime
from netmiko import ConnectHandler
def backup_config(device: dict, backup_dir: str) -> str:
"""Backup a device's running configuration."""
with ConnectHandler(**device) as conn:
config = conn.send_command("show running-config")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{device['host']}_{timestamp}.cfg"
filepath = os.path.join(backup_dir, filename)
os.makedirs(backup_dir, exist_ok=True)
with open(filepath, "w") as f:
f.write(config)
print(f"Backed up {device['host']} → {filepath}")
return filepath
import difflib
def diff_configs(file1: str, file2: str) -> str:
"""Compare two configuration files."""
with open(file1) as f1, open(file2) as f2:
lines1 = f1.readlines()
lines2 = f2.readlines()
diff = difflib.unified_diff(lines1, lines2,
fromfile=file1, tofile=file2)
return "".join(diff)
For larger environments, structure your automation with an inventory and task runner:
import json
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
@dataclass
class Device:
hostname: str
host: str
device_type: str
username: str
password: str
groups: list[str]
def load_inventory(path: str) -> list[Device]:
with open(path) as f:
data = json.load(f)
return [Device(**d) for d in data["devices"]]
def run_task(devices: list[Device], task_fn, max_workers: int = 10):
"""Run a task function across multiple devices in parallel."""
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(task_fn, d): d for d in devices}
for future in futures:
device = futures[future]
try:
result = future.result()
print(f"✓ {device.hostname}: {result}")
except Exception as e:
print(f"✗ {device.hostname}: {e}")
Full example: code/network_automation_framework.py
| ← Previous: Network Monitoring and Packet Analysis | Table of Contents | Next: Troubleshooting and Best Practices → |