# SECURE: SSH agent forwarding and certificate-based authentication setup
# ssh_ca_manager.py - SSH Certificate Authority management
import os
import subprocess
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
class SSHCertificateManager:
def __init__(self, ca_key_path, ca_pub_key_path):
self.ca_key_path = Path(ca_key_path)
self.ca_pub_key_path = Path(ca_pub_key_path)
self.validate_ca_keys()
def validate_ca_keys(self):
"""Validate CA key pair exists and has proper permissions"""
if not self.ca_key_path.exists():
raise FileNotFoundError(f"CA private key not found: {self.ca_key_path}")
if not self.ca_pub_key_path.exists():
raise FileNotFoundError(f"CA public key not found: {self.ca_pub_key_path}")
# Check permissions on CA private key
ca_key_stat = os.stat(self.ca_key_path)
if ca_key_stat.st_mode & 0o077: # Should be 600 or 400
raise PermissionError(f"CA private key has insecure permissions: {oct(ca_key_stat.st_mode)}")
def create_user_certificate(self, user_key_path, username, valid_hours=24, principals=None):
"""Create SSH user certificate"""
principals = principals or [username]
# Generate certificate
cert_path = f"{user_key_path}-cert.pub"
valid_from = datetime.now().strftime('%Y%m%d%H%M%S')
valid_to = (datetime.now() + timedelta(hours=valid_hours)).strftime('%Y%m%d%H%M%S')
cmd = [
'ssh-keygen',
'-s', str(self.ca_key_path),
'-I', f"{username}-{valid_from}",
'-n', ','.join(principals),
'-V', f"{valid_from}:{valid_to}",
'-z', str(int(datetime.now().timestamp())),
user_key_path
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return cert_path
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Certificate creation failed: {e.stderr}")
def create_host_certificate(self, host_key_path, hostname, valid_days=365):
"""Create SSH host certificate"""
cert_path = f"{host_key_path}-cert.pub"
valid_from = datetime.now().strftime('%Y%m%d%H%M%S')
valid_to = (datetime.now() + timedelta(days=valid_days)).strftime('%Y%m%d%H%M%S')
cmd = [
'ssh-keygen',
'-s', str(self.ca_key_path),
'-I', f"host-{hostname}-{valid_from}",
'-h', # Host certificate
'-n', hostname,
'-V', f"{valid_from}:{valid_to}",
host_key_path
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return cert_path
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Host certificate creation failed: {e.stderr}")
# secure_deployment.py - Deployment using SSH agent forwarding
import os
import subprocess
import logging
from contextlib import contextmanager
logger = logging.getLogger(__name__)
class SecureDeployment:
def __init__(self):
self.validate_ssh_agent()
def validate_ssh_agent(self):
"""Validate SSH agent is running and has keys loaded"""
if not os.getenv('SSH_AUTH_SOCK'):
raise EnvironmentError("SSH agent not running. Start with: ssh-agent bash")
# Check if agent has keys
try:
result = subprocess.run(['ssh-add', '-l'], capture_output=True, text=True)
if result.returncode != 0 or 'no identities' in result.stdout:
raise EnvironmentError("No SSH keys loaded in agent. Load keys with: ssh-add")
except FileNotFoundError:
raise EnvironmentError("ssh-add command not found")
@contextmanager
def ssh_connection(self, host, user='deploy'):
"""Create SSH connection using agent forwarding"""
ssh_cmd_base = [
'ssh',
'-A', # Enable agent forwarding
'-o', 'StrictHostKeyChecking=yes',
'-o', 'ForwardAgent=yes',
'-o', 'ConnectTimeout=10',
f'{user}@{host}'
]
yield ssh_cmd_base
def execute_deployment_command(self, host, command, user='deploy'):
"""Execute deployment command using SSH agent forwarding"""
with self.ssh_connection(host, user) as ssh_base:
ssh_cmd = ssh_base + [command]
try:
result = subprocess.run(
ssh_cmd,
capture_output=True,
text=True,
timeout=300,
check=True
)
logger.info(f"Command executed successfully on {host}: {command}")
return result.stdout
except subprocess.TimeoutExpired:
raise TimeoutError(f"Command timed out on {host}: {command}")
except subprocess.CalledProcessError as e:
logger.error(f"Command failed on {host}: {e.stderr}")
raise
def deploy_with_git_pull(self, hosts, repository_path='/opt/app'):
"""Deploy by pulling latest code using agent forwarding"""
deployment_commands = [
f'cd {repository_path}',
'git fetch origin',
'git reset --hard origin/main',
'sudo systemctl restart application'
]
combined_command = ' && '.join(deployment_commands)
for host in hosts:
logger.info(f"Deploying to {host}")
try:
self.execute_deployment_command(host, combined_command)
logger.info(f"Deployment successful on {host}")
except Exception as e:
logger.error(f"Deployment failed on {host}: {e}")
raise
# SSH configuration for certificate-based auth
# ~/.ssh/config - Client configuration
Host *.company.com
# Use certificate authentication
CertificateFile ~/.ssh/id_ed25519-cert.pub
IdentityFile ~/.ssh/id_ed25519
# Enable agent forwarding for deployment
ForwardAgent yes
# Security settings
StrictHostKeyChecking yes
UserKnownHostsFile ~/.ssh/known_hosts
# Certificate authority public key for host verification
TrustedUserCAKeys ~/.ssh/ca-key.pub
# /etc/ssh/sshd_config - Server configuration for certificate auth
# Enable certificate authentication
TrustedUserCAKeys /etc/ssh/ca-key.pub
AuthorizedPrincipalsFile /etc/ssh/auth_principals/%u
# Host certificate for server identity
HostCertificate /etc/ssh/ssh_host_ed25519_key-cert.pub
HostKey /etc/ssh/ssh_host_ed25519_key
# Security settings
PasswordAuthentication no
PermitRootLogin no
X11Forwarding no
AllowTcpForwarding no
PermitTunnel no
# Usage example script
#!/bin/bash
# deploy.sh - Secure deployment using certificates and agent forwarding
# Ensure SSH agent is running
if [ -z "$SSH_AUTH_SOCK" ]; then
echo "Starting SSH agent..."
eval $(ssh-agent)
fi
# Load deployment key into agent
ssh-add ~/.ssh/id_ed25519
# Create short-lived certificate (4 hours)
python3 create_deployment_cert.py --user deploy --hours 4
# Execute deployment
python3 secure_deployment.py --environment production --version v1.2.3
# Clean up certificate
rm ~/.ssh/id_ed25519-cert.pub