# SECURE - Using SSH agent and key management
import paramiko
import os
from pathlib import Path
class SecureSSHConnection:
def __init__(self):
self.client = paramiko.SSHClient()
self.client.load_system_host_keys()
# RejectPolicy is safer than AutoAddPolicy in production
self.client.set_missing_host_key_policy(paramiko.RejectPolicy())
def connect_with_agent(self, hostname, username):
"""Use SSH agent for authentication"""
# Ensure agent is running and has keys loaded
self.client.connect(
hostname,
username=username,
allow_agent=True,
look_for_keys=False # Important: don't look elsewhere if agent fails
)
def connect_with_key_file(self, hostname, username):
"""Use protected key file from secure location"""
key_path = Path.home() / '.ssh' / 'id_rsa' # Or specify path
if not key_path.exists():
raise FileNotFoundError(f"SSH key not found at {key_path}")
# Check key file permissions (VERY important)
if oct(key_path.stat().st_mode)[-3:] != '600':
raise PermissionError(f"SSH key file {key_path} has incorrect permissions (should be 600)")
# Load passphrase from environment or secure source
passphrase = os.environ.get('SSH_KEY_PASSPHRASE')
try:
key = paramiko.RSAKey.from_private_key_file(
str(key_path),
password=passphrase
)
self.client.connect(
hostname,
username=username,
pkey=key,
look_for_keys=False,
allow_agent=False
)
except paramiko.PasswordRequiredException:
# Handle case where key needs passphrase but none was provided
raise ValueError(f"SSH key {key_path} requires a passphrase (set SSH_KEY_PASSPHRASE)")
except paramiko.SSHException as e:
# Handle other SSH errors (e.g., key format incorrect)
raise ConnectionError(f"Failed to connect using key {key_path}: {e}")
def connect_with_bastion(self, target_host, bastion_host, bastion_user='bastion-user', target_user='app-user'):
"""Use bastion/jump host for secure access"""
# First, connect to bastion (using agent or key file)
bastion = paramiko.SSHClient()
bastion.load_system_host_keys()
bastion.set_missing_host_key_policy(paramiko.RejectPolicy())
# Example assumes bastion uses agent auth; adjust as needed
bastion.connect(bastion_host, username=bastion_user, allow_agent=True, look_for_keys=False)
# Create tunnel through bastion
bastion_transport = bastion.get_transport()
dest_addr = (target_host, 22)
local_addr = ('127.0.0.1', 0) # Use ephemeral local port
bastion_channel = bastion_transport.open_channel(
"direct-tcpip", dest_addr, local_addr
)
# Connect to target through tunnel (again, assumes agent auth for target)
self.client.connect(
target_host,
username=target_user,
sock=bastion_channel,
allow_agent=True, # Use agent key for target host
look_for_keys=False
)