Insecure Cloud Instance Metadata Service (IMDS) Access Leading to Credential Theft

Critical Risk Infrastructure Security
aws-imdsgcp-metadataazure-imdsssrfcredential-theftinstance-metadataiam-rolesprivilege-escalationcloud-security

What it is

A critical security vulnerability where cloud instance metadata services (AWS EC2 IMDS, GCP metadata server, Azure IMDS) are accessible without proper restrictions, allowing attackers to steal temporary credentials, IAM roles, and sensitive instance information. This commonly occurs through Server-Side Request Forgery (SSRF) attacks, remote code execution, or when applications make unvalidated HTTP requests that can be redirected to metadata endpoints.

#!/usr/bin/env python3
# VULNERABLE: Application with SSRF vulnerability allowing IMDS access

import requests
from flask import Flask, request, jsonify
import json
import os
from urllib.parse import urlparse

app = Flask(__name__)

# VULNERABLE: No URL validation or filtering
@app.route('/fetch-url', methods=['GET'])
def fetch_url():
    """VULNERABLE: Fetch content from any URL without validation"""
    target_url = request.args.get('url')
    
    if not target_url:
        return jsonify({'error': 'URL parameter required'}), 400
    
    try:
        # VULNERABLE: No restrictions on target URL
        # Attacker can use: http://169.254.169.254/latest/meta-data/iam/security-credentials/
        response = requests.get(target_url, timeout=10)
        
        return jsonify({
            'status': 'success',
            'status_code': response.status_code,
            'headers': dict(response.headers),
            'content': response.text  # VULNERABLE: Returns full content
        })
    
    except requests.RequestException as e:
        return jsonify({'error': str(e)}), 500

# VULNERABLE: Direct metadata service access
@app.route('/get-instance-info', methods=['GET'])
def get_instance_info():
    """VULNERABLE: Directly accessing instance metadata"""
    
    try:
        # VULNERABLE: Direct IMDS access without validation
        metadata_endpoints = [
            'http://169.254.169.254/latest/meta-data/instance-id',
            'http://169.254.169.254/latest/meta-data/local-ipv4',
            'http://169.254.169.254/latest/meta-data/iam/security-credentials/',
        ]
        
        instance_info = {}
        
        for endpoint in metadata_endpoints:
            try:
                response = requests.get(endpoint, timeout=2)
                key = endpoint.split('/')[-1] or endpoint.split('/')[-2]
                instance_info[key] = response.text
                
                # VULNERABLE: If IAM role found, get credentials
                if 'security-credentials' in endpoint and response.text:
                    role_name = response.text.strip().split('\n')[0]
                    creds_url = f"{endpoint}{role_name}"
                    creds_response = requests.get(creds_url, timeout=2)
                    instance_info['credentials'] = json.loads(creds_response.text)
                    
            except Exception:
                pass
        
        # VULNERABLE: Exposing sensitive metadata
        return jsonify({
            'instance_info': instance_info,
            'environment': dict(os.environ),  # VULNERABLE: Exposing env vars
        })
    
    except Exception as e:
        return jsonify({'error': str(e)}), 500

# VULNERABLE: Multi-cloud metadata access
@app.route('/get-cloud-metadata', methods=['GET'])
def get_cloud_metadata():
    """VULNERABLE: Accessing metadata from multiple cloud providers"""
    
    metadata = {}
    
    # VULNERABLE: AWS IMDS access
    try:
        aws_endpoints = [
            'http://169.254.169.254/latest/meta-data/instance-id',
            'http://169.254.169.254/latest/meta-data/iam/security-credentials/',
        ]
        
        for endpoint in aws_endpoints:
            response = requests.get(endpoint, timeout=1)
            if response.status_code == 200:
                metadata['aws'] = metadata.get('aws', {})
                key = endpoint.split('/')[-1] or 'root'
                metadata['aws'][key] = response.text
    except:
        pass
    
    # VULNERABLE: GCP metadata access
    try:
        gcp_headers = {'Metadata-Flavor': 'Google'}
        gcp_endpoints = [
            'http://169.254.169.254/computeMetadata/v1/project/project-id',
            'http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token',
        ]
        
        for endpoint in gcp_endpoints:
            response = requests.get(endpoint, headers=gcp_headers, timeout=1)
            if response.status_code == 200:
                metadata['gcp'] = metadata.get('gcp', {})
                key = endpoint.split('/')[-1]
                if key == 'token':
                    metadata['gcp']['service_account_token'] = json.loads(response.text)
                else:
                    metadata['gcp'][key] = response.text
    except:
        pass
    
    # VULNERABLE: Azure IMDS access
    try:
        azure_headers = {'Metadata': 'true'}
        azure_endpoints = [
            'http://169.254.169.254/metadata/instance?api-version=2021-02-01',
            'http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https://management.azure.com/',
        ]
        
        for endpoint in azure_endpoints:
            response = requests.get(endpoint, headers=azure_headers, timeout=1)
            if response.status_code == 200:
                metadata['azure'] = metadata.get('azure', {})
                if 'identity' in endpoint:
                    metadata['azure']['managed_identity_token'] = json.loads(response.text)
                else:
                    metadata['azure']['instance_metadata'] = json.loads(response.text)
    except:
        pass
    
    # VULNERABLE: Returning all metadata including credentials
    return jsonify({
        'cloud_metadata': metadata,
        'note': 'This endpoint exposes sensitive cloud credentials!'
    })

# VULNERABLE: Proxy endpoint that can be abused
@app.route('/proxy', methods=['GET', 'POST'])
def proxy_request():
    """VULNERABLE: General purpose proxy without restrictions"""
    
    target_url = request.args.get('url') or request.json.get('url') if request.is_json else None
    method = request.method.lower()
    
    if not target_url:
        return jsonify({'error': 'URL required'}), 400
    
    try:
        # VULNERABLE: Proxy any request without validation
        # Attacker can access internal services and metadata endpoints
        
        proxy_headers = {
            'User-Agent': request.headers.get('User-Agent', 'ProxyApp/1.0'),
            'Accept': request.headers.get('Accept', '*/*')
        }
        
        # Add cloud-specific headers if present in original request
        if 'Metadata-Flavor' in request.headers:
            proxy_headers['Metadata-Flavor'] = request.headers['Metadata-Flavor']
        if 'Metadata' in request.headers:
            proxy_headers['Metadata'] = request.headers['Metadata']
        
        if method == 'get':
            response = requests.get(target_url, headers=proxy_headers, timeout=10)
        elif method == 'post':
            response = requests.post(
                target_url, 
                headers=proxy_headers,
                data=request.get_data(),
                timeout=10
            )
        
        # VULNERABLE: Return full response including sensitive data
        return jsonify({
            'status_code': response.status_code,
            'headers': dict(response.headers),
            'content': response.text,
            'proxied_url': target_url
        })
    
    except Exception as e:
        return jsonify({'error': str(e)}), 500

# VULNERABLE: File processing that could be exploited
@app.route('/process-config', methods=['POST'])
def process_config():
    """VULNERABLE: Process configuration files with external references"""
    
    try:
        config_data = request.get_json()
        
        if 'remote_config_url' in config_data:
            # VULNERABLE: Fetch remote configuration without validation
            config_url = config_data['remote_config_url']
            
            # Attacker can set this to metadata endpoints
            config_response = requests.get(config_url, timeout=5)
            
            if config_response.status_code == 200:
                try:
                    remote_config = json.loads(config_response.text)
                    
                    # VULNERABLE: Process and return sensitive configuration
                    return jsonify({
                        'status': 'success',
                        'config': remote_config,
                        'source': config_url
                    })
                except json.JSONDecodeError:
                    # Return raw text (could be credentials)
                    return jsonify({
                        'status': 'success',
                        'raw_config': config_response.text,
                        'source': config_url
                    })
        
        return jsonify({'error': 'No remote config URL provided'}), 400
    
    except Exception as e:
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    # VULNERABLE: Running on all interfaces
    app.run(host='0.0.0.0', port=8080, debug=True)  # Debug mode exposes more info

# Example of how this vulnerable app could be exploited:
# 
# 1. SSRF to AWS IMDS:
# curl "http://vulnerable-app:8080/fetch-url?url=http://169.254.169.254/latest/meta-data/iam/security-credentials/"
# 
# 2. Get IAM credentials:
# curl "http://vulnerable-app:8080/fetch-url?url=http://169.254.169.254/latest/meta-data/iam/security-credentials/MyInstanceRole"
# 
# 3. GCP metadata access:
# curl -H "Metadata-Flavor: Google" "http://vulnerable-app:8080/proxy?url=http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token"
# 
# 4. Azure managed identity token:
# curl -H "Metadata: true" "http://vulnerable-app:8080/proxy?url=http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https://management.azure.com/"
# 
# 5. Process malicious config:
# curl -X POST http://vulnerable-app:8080/process-config -H "Content-Type: application/json" -d '{"remote_config_url": "http://169.254.169.254/latest/meta-data/iam/security-credentials/MyRole"}'
#!/usr/bin/env python3
# SECURE: Application with proper SSRF protection and IMDS security

import requests
from flask import Flask, request, jsonify
import json
import os
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
import ipaddress
import socket
from urllib.parse import urlparse
import logging
from functools import wraps
import time
from collections import defaultdict

app = Flask(__name__)

# Configure secure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# SECURE: Rate limiting
request_counts = defaultdict(list)
RATE_LIMIT_REQUESTS = 10
RATE_LIMIT_WINDOW = 60  # seconds

# SECURE: Blocked networks for SSRF protection
BLOCKED_NETWORKS = [
    ipaddress.IPv4Network('169.254.169.254/32'),  # Cloud metadata
    ipaddress.IPv4Network('10.0.0.0/8'),         # Private
    ipaddress.IPv4Network('172.16.0.0/12'),      # Private
    ipaddress.IPv4Network('192.168.0.0/16'),     # Private
    ipaddress.IPv4Network('127.0.0.0/8'),        # Localhost
    ipaddress.IPv4Network('0.0.0.0/8'),          # Invalid
    ipaddress.IPv4Network('224.0.0.0/4'),        # Multicast
    ipaddress.IPv4Network('240.0.0.0/4'),        # Reserved
]

# SECURE: Allowed domains for external requests
ALLOWED_DOMAINS = [
    'api.github.com',
    'api.twitter.com',
    'jsonplaceholder.typicode.com',
    # Add your trusted domains here
]

def rate_limit(f):
    """Rate limiting decorator"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr)
        current_time = time.time()
        
        # Clean old requests
        request_counts[client_ip] = [
            req_time for req_time in request_counts[client_ip]
            if current_time - req_time < RATE_LIMIT_WINDOW
        ]
        
        # Check rate limit
        if len(request_counts[client_ip]) >= RATE_LIMIT_REQUESTS:
            logger.warning(f"Rate limit exceeded for IP: {client_ip}")
            return jsonify({'error': 'Rate limit exceeded'}), 429
        
        request_counts[client_ip].append(current_time)
        return f(*args, **kwargs)
    
    return decorated_function

def is_safe_url(url):
    """Comprehensive URL validation to prevent SSRF"""
    try:
        parsed = urlparse(url)
        
        # Check scheme
        if parsed.scheme not in ['http', 'https']:
            logger.warning(f"Blocked non-HTTP(S) scheme: {parsed.scheme}")
            return False
        
        # Check for suspicious URLs
        suspicious_patterns = [
            '169.254.169.254',  # Metadata service
            'metadata',
            'computeMetadata',
            'localhost',
            '127.0.0.1',
            '0.0.0.0',
        ]
        
        for pattern in suspicious_patterns:
            if pattern in url.lower():
                logger.warning(f"Blocked suspicious URL pattern: {pattern} in {url}")
                return False
        
        # Resolve hostname to IP
        try:
            hostname = parsed.hostname
            if not hostname:
                return False
            
            # Check if hostname is in allowed domains
            if hostname not in ALLOWED_DOMAINS:
                logger.warning(f"Domain not in allowlist: {hostname}")
                return False
            
            # Resolve to IP and check against blocked networks
            ip_address = socket.gethostbyname(hostname)
            ip_addr = ipaddress.IPv4Address(ip_address)
            
            for blocked_network in BLOCKED_NETWORKS:
                if ip_addr in blocked_network:
                    logger.warning(f"Blocked IP address: {ip_address} in network {blocked_network}")
                    return False
            
            return True
            
        except (socket.gaierror, ValueError, ipaddress.AddressValueError) as e:
            logger.warning(f"DNS resolution failed for {hostname}: {e}")
            return False
    
    except Exception as e:
        logger.error(f"URL validation error: {e}")
        return False

# SECURE: URL fetching with validation
@app.route('/fetch-url', methods=['GET'])
@rate_limit
def fetch_url():
    """SECURE: Fetch content from validated URLs only"""
    target_url = request.args.get('url')
    
    if not target_url:
        return jsonify({'error': 'URL parameter required'}), 400
    
    # SECURE: Validate URL before making request
    if not is_safe_url(target_url):
        logger.warning(f"Blocked unsafe URL request: {target_url}")
        return jsonify({
            'error': 'URL not allowed - potential SSRF detected',
            'blocked_url': target_url
        }), 403
    
    try:
        # SECURE: Restricted HTTP client configuration
        response = requests.get(
            target_url,
            timeout=5,  # Short timeout
            allow_redirects=False,  # Prevent redirect-based SSRF
            headers={
                'User-Agent': 'SecureApp/1.0',
                'Accept': 'application/json, text/plain'
            },
            stream=False  # Don't stream large responses
        )
        
        # SECURE: Limit response size
        if len(response.content) > 10000:  # 10KB limit
            logger.warning(f"Response too large from {target_url}")
            return jsonify({'error': 'Response too large'}), 413
        
        # SECURE: Don't return full response content
        return jsonify({
            'status': 'success',
            'status_code': response.status_code,
            'content_type': response.headers.get('Content-Type'),
            'content_length': len(response.text),
            'content_preview': response.text[:200] + '...' if len(response.text) > 200 else response.text
        })
    
    except requests.RequestException as e:
        logger.error(f"Request failed for {target_url}: {e}")
        return jsonify({'error': 'Request failed'}), 500

# SECURE: Proper AWS resource access using SDK
@app.route('/get-instance-info', methods=['GET'])
@rate_limit
def get_instance_info():
    """SECURE: Get instance information using proper AWS SDK"""
    
    try:
        # SECURE: Use boto3 which handles IMDSv2 automatically
        ec2_client = boto3.client('ec2')
        
        # Get instance identity from metadata (secure way)
        try:
            # This uses IMDSv2 automatically and respects instance role permissions
            response = requests.get(
                'http://169.254.169.254/latest/dynamic/instance-identity/document',
                timeout=2
            )
            
            if response.status_code == 200:
                identity = json.loads(response.text)
                
                # SECURE: Return only safe, non-sensitive information
                safe_info = {
                    'instance_id': identity.get('instanceId'),
                    'instance_type': identity.get('instanceType'),
                    'region': identity.get('region'),
                    'availability_zone': identity.get('availabilityZone'),
                    # Don't expose account ID, private IPs, etc.
                }
                
                return jsonify({
                    'instance_info': safe_info,
                    'note': 'Limited instance information for security'
                })
        
        except requests.RequestException:
            # Fallback if not running on EC2
            return jsonify({
                'instance_info': None,
                'note': 'Not running on EC2 instance'
            })
    
    except NoCredentialsError:
        logger.warning("AWS credentials not configured")
        return jsonify({'error': 'AWS credentials not available'}), 500
    
    except Exception as e:
        logger.error(f"Error getting instance info: {e}")
        return jsonify({'error': 'Failed to get instance info'}), 500

# SECURE: Proper cloud resource access
@app.route('/get-cloud-resources', methods=['GET'])
@rate_limit
def get_cloud_resources():
    """SECURE: Access cloud resources using proper SDKs and authentication"""
    
    resources = {}
    
    # SECURE: AWS resources using SDK
    try:
        s3_client = boto3.client('s3')
        
        # SECURE: List only specific buckets with proper permissions
        try:
            # This requires proper IAM permissions
            response = s3_client.list_buckets()
            
            # SECURE: Return limited, non-sensitive information
            resources['aws'] = {
                'bucket_count': len(response.get('Buckets', [])),
                'accessible_buckets': [{
                    'name': bucket['Name'],
                    'creation_date': bucket['CreationDate'].isoformat()
                } for bucket in response.get('Buckets', [])[:5]]  # Limit to 5
            }
        
        except ClientError as e:
            resources['aws'] = {
                'error': f'Access denied: {e.response["Error"]["Code"]}'
            }
    
    except NoCredentialsError:
        resources['aws'] = {'error': 'No AWS credentials configured'}
    except Exception as e:
        logger.error(f"AWS error: {e}")
        resources['aws'] = {'error': 'AWS access failed'}
    
    # SECURE: GCP resources using proper authentication
    try:
        from google.cloud import storage
        from google.auth.exceptions import DefaultCredentialsError
        
        try:
            client = storage.Client()
            buckets = list(client.list_buckets(max_results=5))
            
            resources['gcp'] = {
                'bucket_count': len(buckets),
                'project_id': client.project
            }
        
        except DefaultCredentialsError:
            resources['gcp'] = {'error': 'No GCP credentials configured'}
    
    except ImportError:
        resources['gcp'] = {'error': 'GCP client library not installed'}
    except Exception as e:
        logger.error(f"GCP error: {e}")
        resources['gcp'] = {'error': 'GCP access failed'}
    
    # SECURE: Azure resources using proper authentication
    try:
        from azure.identity import DefaultAzureCredential
        from azure.storage.blob import BlobServiceClient
        from azure.core.exceptions import ClientAuthenticationError
        
        try:
            credential = DefaultAzureCredential()
            # This would require proper configuration
            resources['azure'] = {
                'authentication': 'configured',
                'note': 'Azure resources require proper service principal or managed identity'
            }
        
        except ClientAuthenticationError:
            resources['azure'] = {'error': 'Azure authentication failed'}
    
    except ImportError:
        resources['azure'] = {'error': 'Azure client library not installed'}
    except Exception as e:
        logger.error(f"Azure error: {e}")
        resources['azure'] = {'error': 'Azure access failed'}
    
    return jsonify({
        'cloud_resources': resources,
        'note': 'Using proper cloud SDKs with authentication and authorization'
    })

# SECURE: Restricted proxy with validation
@app.route('/proxy', methods=['GET'])
@rate_limit
def proxy_request():
    """SECURE: Restricted proxy with comprehensive validation"""
    
    target_url = request.args.get('url')
    
    if not target_url:
        return jsonify({'error': 'URL required'}), 400
    
    # SECURE: Validate URL
    if not is_safe_url(target_url):
        logger.warning(f"Blocked unsafe proxy request: {target_url}")
        return jsonify({
            'error': 'URL not allowed for proxying',
            'blocked_url': target_url
        }), 403
    
    try:
        # SECURE: Restricted proxy headers
        safe_headers = {
            'User-Agent': 'SecureProxy/1.0',
            'Accept': 'application/json, text/plain',
            # Never forward metadata-specific headers
        }
        
        response = requests.get(
            target_url,
            headers=safe_headers,
            timeout=5,
            allow_redirects=False
        )
        
        # SECURE: Filter response headers
        safe_response_headers = {
            key: value for key, value in response.headers.items()
            if key.lower() in ['content-type', 'content-length', 'cache-control']
        }
        
        return jsonify({
            'status_code': response.status_code,
            'headers': safe_response_headers,
            'content_preview': response.text[:500] + '...' if len(response.text) > 500 else response.text,
            'proxied_from': urlparse(target_url).hostname  # Only hostname, not full URL
        })
    
    except Exception as e:
        logger.error(f"Proxy request failed: {e}")
        return jsonify({'error': 'Proxy request failed'}), 500

# SECURE: Configuration processing with validation
@app.route('/process-config', methods=['POST'])
@rate_limit
def process_config():
    """SECURE: Process configuration with proper validation"""
    
    try:
        if not request.is_json:
            return jsonify({'error': 'JSON content required'}), 400
        
        config_data = request.get_json()
        
        if not isinstance(config_data, dict):
            return jsonify({'error': 'Invalid configuration format'}), 400
        
        # SECURE: Process only local configuration
        if 'config' in config_data:
            # Process embedded configuration only
            processed_config = {
                'app_name': config_data.get('app_name', 'default'),
                'environment': config_data.get('environment', 'production'),
                'features': config_data.get('features', {}),
                # Don't process URLs or external references
            }
            
            return jsonify({
                'status': 'success',
                'processed_config': processed_config,
                'note': 'Only embedded configuration processed for security'
            })
        
        return jsonify({'error': 'No valid configuration provided'}), 400
    
    except Exception as e:
        logger.error(f"Configuration processing error: {e}")
        return jsonify({'error': 'Configuration processing failed'}), 500

# SECURE: Health check endpoint
@app.route('/health', methods=['GET'])
def health_check():
    """Health check endpoint"""
    return jsonify({
        'status': 'healthy',
        'version': '1.0.0',
        'security_features': [
            'SSRF protection',
            'Rate limiting',
            'Input validation',
            'Secure cloud SDK usage',
            'Metadata service protection'
        ]
    })

# SECURE: Error handler
@app.errorhandler(404)
def not_found(error):
    return jsonify({'error': 'Endpoint not found'}), 404

@app.errorhandler(500)
def internal_error(error):
    logger.error(f"Internal server error: {error}")
    return jsonify({'error': 'Internal server error'}), 500

if __name__ == '__main__':
    # SECURE: Bind to localhost only
    app.run(host='127.0.0.1', port=8080, debug=False)  # Debug disabled
    
    # Log security configuration
    logger.info("Secure application started with:")
    logger.info(f"- SSRF protection enabled")
    logger.info(f"- Rate limiting: {RATE_LIMIT_REQUESTS} requests per {RATE_LIMIT_WINDOW}s")
    logger.info(f"- Allowed domains: {len(ALLOWED_DOMAINS)} configured")
    logger.info(f"- Blocked networks: {len(BLOCKED_NETWORKS)} configured")

# Example of secure usage:
# 
# 1. Fetch from allowed domain:
# curl "http://localhost:8080/fetch-url?url=https://api.github.com/users/octocat"
# 
# 2. Blocked SSRF attempt:
# curl "http://localhost:8080/fetch-url?url=http://169.254.169.254/latest/meta-data/"
# # Returns: {"error": "URL not allowed - potential SSRF detected"}
# 
# 3. Get instance info securely:
# curl "http://localhost:8080/get-instance-info"
# # Uses proper AWS SDK, returns limited non-sensitive info
# 
# 4. Access cloud resources properly:
# curl "http://localhost:8080/get-cloud-resources"
# # Uses proper cloud SDKs with authentication
# 
# 5. Health check:
# curl "http://localhost:8080/health"
# # Returns security features enabled

💡 Why This Fix Works

The vulnerable application allows unrestricted access to cloud metadata services through SSRF vulnerabilities, direct metadata calls, and proxy functionality. The secure version implements comprehensive SSRF protection, URL validation, proper cloud SDK usage, rate limiting, and blocks access to metadata endpoints while providing necessary functionality through secure authenticated channels.

Why it happens

AWS EC2 instances running with IMDSv1 enabled allow simple HTTP GET requests to retrieve sensitive information including IAM role credentials, user data, and instance metadata. IMDSv1 lacks authentication and can be exploited through SSRF vulnerabilities, compromised applications, or direct access from within the instance.

Root causes

AWS EC2 Instance Metadata Service v1 (IMDSv1) Without Restrictions

AWS EC2 instances running with IMDSv1 enabled allow simple HTTP GET requests to retrieve sensitive information including IAM role credentials, user data, and instance metadata. IMDSv1 lacks authentication and can be exploited through SSRF vulnerabilities, compromised applications, or direct access from within the instance.

Preview example – HCL
# VULNERABLE: EC2 instance with IMDSv1 enabled and no hop limit
resource "aws_instance" "vulnerable_server" {
  ami           = "ami-0abcdef1234567890"
  instance_type = "t3.medium"
  
  # VULNERABLE: IMDSv1 enabled (default)
  metadata_options {
    http_endpoint = "enabled"
    http_tokens   = "optional"  # Allows IMDSv1
    http_put_response_hop_limit = 2  # Allows forwarding
  }
  
  # VULNERABLE: Overprivileged IAM role attached
  iam_instance_profile = aws_iam_instance_profile.admin_profile.name
  
  user_data = <<-EOF
    #!/bin/bash
    # VULNERABLE: Application that can be exploited via SSRF
    yum update -y
    yum install -y python3 python3-pip
    
    cat > /home/ec2-user/vulnerable_app.py << 'APP'
import requests
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/fetch')
def fetch_url():
    # VULNERABLE: No URL validation - allows SSRF to IMDS
    url = request.args.get('url')
    if url:
        try:
            # Attacker can set url=http://169.254.169.254/latest/meta-data/iam/security-credentials/
            response = requests.get(url, timeout=5)
            return jsonify({
                'status': 'success',
                'data': response.text,
                'headers': dict(response.headers)
            })
        except Exception as e:
            return jsonify({'error': str(e)})
    return jsonify({'error': 'No URL provided'})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)
APP
    
    python3 /home/ec2-user/vulnerable_app.py &
  EOF
}

# VULNERABLE: IAM role with excessive permissions
resource "aws_iam_role" "vulnerable_instance_role" {
  name = "VulnerableInstanceRole"
  
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = {
        Service = "ec2.amazonaws.com"
      }
    }]
  })
}

# VULNERABLE: Overly broad IAM policy
resource "aws_iam_role_policy" "vulnerable_policy" {
  name = "VulnerableInstancePolicy"
  role = aws_iam_role.vulnerable_instance_role.id
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Effect = "Allow"
      Action = [
        "s3:*",
        "ec2:*",
        "iam:*",  # VULNERABLE: Full IAM access
        "secretsmanager:*",
        "ssm:*"
      ]
      Resource = "*"
    }]
  })
}

# Commands that would work from within the vulnerable instance:
# curl http://169.254.169.254/latest/meta-data/
# curl http://169.254.169.254/latest/meta-data/iam/security-credentials/
# curl http://169.254.169.254/latest/meta-data/iam/security-credentials/VulnerableInstanceRole
# curl http://169.254.169.254/latest/user-data/
# 
# External SSRF attack:
# curl "http://vulnerable-server:8080/fetch?url=http://169.254.169.254/latest/meta-data/iam/security-credentials/VulnerableInstanceRole"

Google Cloud Platform Metadata Server Exposure

GCP Compute Engine instances expose sensitive metadata through the metadata server at 169.254.169.254. Without proper access controls or when exploited through SSRF, attackers can retrieve service account tokens, project information, SSH keys, and custom metadata that may contain sensitive configuration data.

Preview example – HCL
# VULNERABLE: GCP Compute instance with default service account
resource "google_compute_instance" "vulnerable_vm" {
  name         = "vulnerable-webapp-vm"
  machine_type = "e2-medium"
  zone         = "us-central1-a"
  
  boot_disk {
    initialize_params {
      image = "debian-cloud/debian-11"
    }
  }
  
  # VULNERABLE: Default service account with project editor role
  service_account {
    # Uses default compute service account
    scopes = [
      "https://www.googleapis.com/auth/cloud-platform"  # Full access
    ]
  }
  
  # VULNERABLE: Custom metadata with sensitive information
  metadata = {
    "database-password" = "super-secret-password"
    "api-keys"         = "key1:secret1,key2:secret2"
    "ssh-keys"         = "user:ssh-rsa AAAAB3NzaC1yc2EAAAA..."
  }
  
  network_interface {
    network = "default"
    access_config {
      # Public IP assigned
    }
  }
  
  # VULNERABLE: Startup script that exposes metadata
  metadata_startup_script = <<-EOF
    #!/bin/bash
    apt-get update
    apt-get install -y python3 python3-pip
    
    # VULNERABLE: Web application that can be exploited
    cat > /home/app.py << 'APP'
import requests
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/proxy')
def proxy_request():
    # VULNERABLE: No validation on target URL
    target = request.args.get('target')
    headers = {
        'Metadata-Flavor': 'Google'  # Required for GCP metadata
    }
    
    if target:
        try:
            # Attacker can access:
            # /computeMetadata/v1/instance/service-accounts/default/token
            # /computeMetadata/v1/project/project-id
            # /computeMetadata/v1/instance/attributes/
            response = requests.get(f"http://169.254.169.254{target}", 
                                  headers=headers, timeout=5)
            return jsonify({
                'data': response.text,
                'status_code': response.status_code
            })
        except Exception as e:
            return jsonify({'error': str(e)})
    
    return jsonify({'error': 'No target specified'})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)
APP
    
    python3 /home/app.py &
  EOF
}

# VULNERABLE: Project-level IAM binding with excessive permissions
resource "google_project_iam_member" "compute_admin" {
  project = var.project_id
  role    = "roles/editor"  # VULNERABLE: Broad permissions
  member  = "serviceAccount:${google_compute_instance.vulnerable_vm.service_account[0].email}"
}

# VULNERABLE: Cloud Function with metadata access
resource "google_cloudfunctions_function" "vulnerable_function" {
  name        = "vulnerable-processor"
  runtime     = "python39"
  entry_point = "process_request"
  
  # VULNERABLE: Uses default service account
  # Inherits project-level permissions
  
  source_archive_bucket = google_storage_bucket.functions_source.name
  source_archive_object = google_storage_bucket_object.function_source.name
  
  trigger {
    https_trigger {}
  }
  
  environment_variables = {
    "SECRET_KEY" = "hardcoded-secret-key"  # VULNERABLE: Exposed in metadata
  }
}

# Commands that would work from the vulnerable VM:
# curl -H "Metadata-Flavor: Google" http://169.254.169.254/computeMetadata/v1/
# curl -H "Metadata-Flavor: Google" http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token
# curl -H "Metadata-Flavor: Google" http://169.254.169.254/computeMetadata/v1/project/project-id
# curl -H "Metadata-Flavor: Google" http://169.254.169.254/computeMetadata/v1/instance/attributes/database-password
# 
# External SSRF attack:
# curl "http://vulnerable-vm:8080/proxy?target=/computeMetadata/v1/instance/service-accounts/default/token"

Azure Instance Metadata Service (IMDS) Exploitation

Azure virtual machines expose sensitive information through the Instance Metadata Service, including access tokens, subscription details, and resource information. When applications are vulnerable to SSRF or when containers have unrestricted network access, attackers can retrieve managed identity tokens and escalate privileges within the Azure environment.

Preview example – HCL
# VULNERABLE: Azure VM with system-assigned managed identity
resource "azurerm_linux_virtual_machine" "vulnerable_vm" {
  name                = "vulnerable-webapp-vm"
  resource_group_name = azurerm_resource_group.main.name
  location           = azurerm_resource_group.main.location
  size               = "Standard_B2s"
  
  disable_password_authentication = false
  admin_username                  = "adminuser"
  admin_password                  = "P@ssw0rd123!"  # VULNERABLE: Weak password
  
  # VULNERABLE: System-assigned managed identity with broad permissions
  identity {
    type = "SystemAssigned"
  }
  
  network_interface_ids = [
    azurerm_network_interface.vulnerable_vm.id,
  ]
  
  os_disk {
    caching              = "ReadWrite"
    storage_account_type = "Standard_LRS"
  }
  
  source_image_reference {
    publisher = "Canonical"
    offer     = "0001-com-ubuntu-server-jammy"
    sku       = "22_04-lts"
    version   = "latest"
  }
  
  # VULNERABLE: Custom data with sensitive information
  custom_data = base64encode(<<-EOF
    #!/bin/bash
    apt-get update
    apt-get install -y python3 python3-pip
    
    # VULNERABLE: Application with SSRF vulnerability
    cat > /home/adminuser/vulnerable_app.py << 'APP'
import requests
from flask import Flask, request, jsonify
import json

app = Flask(__name__)

@app.route('/fetch-resource')
def fetch_resource():
    # VULNERABLE: No URL validation
    resource_url = request.args.get('url')
    
    if resource_url:
        try:
            # Attacker can target:
            # http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https://management.azure.com/
            # http://169.254.169.254/metadata/instance?api-version=2021-02-01
            headers = {'Metadata': 'true'}  # Required for Azure IMDS
            response = requests.get(resource_url, headers=headers, timeout=5)
            
            return jsonify({
                'status': 'success',
                'data': response.text,
                'status_code': response.status_code
            })
        except Exception as e:
            return jsonify({'error': str(e)})
    
    return jsonify({'error': 'No resource URL provided'})

@app.route('/process-data')
def process_data():
    # VULNERABLE: Directly calling IMDS without validation
    try:
        # Get managed identity token
        token_url = "http://169.254.169.254/metadata/identity/oauth2/token"
        token_params = {
            'api-version': '2018-02-01',
            'resource': 'https://management.azure.com/'
        }
        token_response = requests.get(token_url, 
                                    params=token_params, 
                                    headers={'Metadata': 'true'},
                                    timeout=5)
        
        if token_response.status_code == 200:
            token_data = token_response.json()
            access_token = token_data['access_token']
            
            # Use token to access Azure resources
            mgmt_headers = {
                'Authorization': f'Bearer {access_token}',
                'Content-Type': 'application/json'
            }
            
            # VULNERABLE: Using retrieved token for Azure API calls
            subscription_url = "https://management.azure.com/subscriptions?api-version=2020-01-01"
            sub_response = requests.get(subscription_url, headers=mgmt_headers)
            
            return jsonify({
                'subscriptions': sub_response.json(),
                'token_info': {
                    'expires_in': token_data.get('expires_in'),
                    'resource': token_data.get('resource')
                }
            })
    
    except Exception as e:
        return jsonify({'error': str(e)})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)
APP
    
    python3 /home/adminuser/vulnerable_app.py &
  EOF
  )
}

# VULNERABLE: Network interface with public IP
resource "azurerm_network_interface" "vulnerable_vm" {
  name                = "vulnerable-vm-nic"
  location           = azurerm_resource_group.main.location
  resource_group_name = azurerm_resource_group.main.name
  
  ip_configuration {
    name                          = "internal"
    subnet_id                     = azurerm_subnet.vulnerable_subnet.id
    private_ip_address_allocation = "Dynamic"
    public_ip_address_id         = azurerm_public_ip.vulnerable_vm.id
  }
}

# VULNERABLE: Overprivileged role assignment for managed identity
resource "azurerm_role_assignment" "vm_contributor" {
  scope                = data.azurerm_subscription.current.id
  role_definition_name = "Contributor"  # VULNERABLE: Broad permissions
  principal_id         = azurerm_linux_virtual_machine.vulnerable_vm.identity[0].principal_id
}

# VULNERABLE: Additional role assignments
resource "azurerm_role_assignment" "vm_reader" {
  scope                = data.azurerm_subscription.current.id
  role_definition_name = "Reader"
  principal_id         = azurerm_linux_virtual_machine.vulnerable_vm.identity[0].principal_id
}

# VULNERABLE: Docker container with unrestricted network access
resource "azurerm_container_group" "vulnerable_container" {
  name                = "vulnerable-web-app"
  location           = azurerm_resource_group.main.location
  resource_group_name = azurerm_resource_group.main.name
  os_type            = "Linux"
  
  # VULNERABLE: System-assigned identity with broad permissions
  identity {
    type = "SystemAssigned"
  }
  
  container {
    name   = "web-app"
    image  = "nginx:latest"
    cpu    = "0.5"
    memory = "1.0"
    
    ports {
      port     = 80
      protocol = "TCP"
    }
    
    # VULNERABLE: Environment variables exposed in container
    environment_variables = {
      "AZURE_CLIENT_ID"     = "system-assigned"  # Managed identity
      "IMDS_ENDPOINT"       = "http://169.254.169.254"
      "DATABASE_PASSWORD"   = "super-secret-db-password"
    }
  }
  
  # VULNERABLE: Public IP exposure
  ip_address_type = "public"
  
  # VULNERABLE: No network restrictions
}

# Commands that would work from within the vulnerable VM:
# curl -H "Metadata:true" "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https://management.azure.com/"
# curl -H "Metadata:true" "http://169.254.169.254/metadata/instance?api-version=2021-02-01"
# curl -H "Metadata:true" "http://169.254.169.254/metadata/instance/compute?api-version=2021-02-01"
# 
# External SSRF attack:
# curl "http://vulnerable-vm:8080/fetch-resource?url=http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https://management.azure.com/"

Container and Kubernetes IMDS Access Through Host Network

Containers running with host networking or privileged access can directly access the host's metadata service. This is particularly dangerous in Kubernetes environments where pods can access cloud metadata services and retrieve node-level credentials, potentially escalating privileges beyond the intended pod security boundaries.

Preview example – YAML
# VULNERABLE: Kubernetes pods with host network access to IMDS
apiVersion: apps/v1
kind: Deployment
metadata:
  name: vulnerable-web-app
spec:
  replicas: 2
  selector:
    matchLabels:
      app: vulnerable-web-app
  template:
    metadata:
      labels:
        app: vulnerable-web-app
    spec:
      # VULNERABLE: Host network allows access to host IMDS
      hostNetwork: true
      hostPID: true
      
      containers:
      - name: web-app
        image: nginx:latest
        ports:
        - containerPort: 80
        
        # VULNERABLE: Environment variables with IMDS endpoints
        env:
        - name: AWS_METADATA_ENDPOINT
          value: "http://169.254.169.254"
        - name: AZURE_IMDS_ENDPOINT  
          value: "http://169.254.169.254"
        - name: GCP_METADATA_ENDPOINT
          value: "http://169.254.169.254"
        
        # VULNERABLE: Init container that demonstrates IMDS access
        command: ["/bin/bash"]
        args:
        - -c
        - |
          # Can access host metadata service
          curl -s http://169.254.169.254/latest/meta-data/ || true
          curl -s -H "Metadata-Flavor: Google" http://169.254.169.254/computeMetadata/v1/ || true
          curl -s -H "Metadata: true" http://169.254.169.254/metadata/instance || true
          
          # Start nginx
          nginx -g "daemon off;"
---
# VULNERABLE: DaemonSet with privileged access
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: vulnerable-monitoring
spec:
  selector:
    matchLabels:
      app: vulnerable-monitoring
  template:
    metadata:
      labels:
        app: vulnerable-monitoring
    spec:
      # VULNERABLE: Privileged container can access host resources
      securityContext:
        runAsUser: 0
        runAsGroup: 0
      
      containers:
      - name: monitor
        image: busybox:latest
        securityContext:
          privileged: true  # VULNERABLE: Full host access
        
        command: ["sh"]
        args:
        - -c
        - |
          while true; do
            echo "Collecting system metrics..."
            
            # VULNERABLE: Can access any host resource including IMDS
            # Access AWS metadata
            if wget -q --timeout=2 -O - http://169.254.169.254/latest/meta-data/ 2>/dev/null; then
              echo "AWS IMDS accessible"
              # Retrieve IAM role credentials
              ROLE=$(wget -q --timeout=2 -O - http://169.254.169.254/latest/meta-data/iam/security-credentials/ 2>/dev/null | head -1)
              if [ -n "$ROLE" ]; then
                echo "Found IAM role: $ROLE"
                wget -q --timeout=2 -O - http://169.254.169.254/latest/meta-data/iam/security-credentials/$ROLE 2>/dev/null
              fi
            fi
            
            # Access GCP metadata
            if wget -q --timeout=2 --header="Metadata-Flavor: Google" -O - http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token 2>/dev/null; then
              echo "GCP metadata accessible"
            fi
            
            # Access Azure IMDS
            if wget -q --timeout=2 --header="Metadata: true" -O - "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https://management.azure.com/" 2>/dev/null; then
              echo "Azure IMDS accessible"
            fi
            
            sleep 60
          done
        
        volumeMounts:
        - name: host-root
          mountPath: /host
          readOnly: true
        - name: docker-socket
          mountPath: /var/run/docker.sock
      
      volumes:
      - name: host-root
        hostPath:
          path: /
      - name: docker-socket
        hostPath:
          path: /var/run/docker.sock
---
# VULNERABLE: Job that can access IMDS via curl
apiVersion: batch/v1
kind: Job
metadata:
  name: vulnerable-data-processor
spec:
  template:
    spec:
      restartPolicy: OnFailure
      # VULNERABLE: Uses default service account (may have IMDS access)
      containers:
      - name: processor
        image: curlimages/curl:latest
        command: ["sh"]
        args:
        - -c
        - |
          echo "Starting data processing..."
          
          # VULNERABLE: Script attempts to access various IMDS endpoints
          
          # Try AWS IMDS v1
          echo "Checking AWS IMDSv1..."
          curl -s --connect-timeout 3 http://169.254.169.254/latest/meta-data/ || echo "AWS IMDSv1 not accessible"
          
          # Try AWS IMDS v2 (without token)
          echo "Checking AWS IMDSv2..."
          curl -s --connect-timeout 3 -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600" || echo "AWS IMDSv2 token request failed"
          
          # Try GCP metadata
          echo "Checking GCP metadata..."
          curl -s --connect-timeout 3 -H "Metadata-Flavor: Google" http://169.254.169.254/computeMetadata/v1/ || echo "GCP metadata not accessible"
          
          # Try Azure IMDS
          echo "Checking Azure IMDS..."
          curl -s --connect-timeout 3 -H "Metadata: true" http://169.254.169.254/metadata/instance?api-version=2021-02-01 || echo "Azure IMDS not accessible"
          
          echo "Data processing complete"
        
        # VULNERABLE: Resource limits that could be bypassed
        resources:
          limits:
            memory: "128Mi"
            cpu: "100m"
          requests:
            memory: "64Mi"
            cpu: "50m"

Fixes

1

Enable IMDSv2 and Restrict Metadata Access

Configure AWS EC2 instances to use IMDSv2 only, which requires session tokens and prevents simple HTTP GET requests. Set hop limits to 1 to prevent forwarding, disable IMDS where not needed, and implement network-level restrictions to block access to metadata endpoints from applications.

View implementation – BASH
# SECURE: EC2 instance with IMDSv2 enforcement and restricted access
resource "aws_instance" "secure_server" {
  ami           = "ami-0abcdef1234567890"
  instance_type = "t3.medium"
  
  # SECURE: IMDSv2 required, limited hop count
  metadata_options {
    http_endpoint               = "enabled"
    http_tokens                 = "required"  # IMDSv2 only
    http_put_response_hop_limit = 1           # No forwarding
    instance_metadata_tags      = "enabled"   # Enable instance tags in metadata
  }
  
  # SECURE: Minimal IAM role with specific permissions
  iam_instance_profile = aws_iam_instance_profile.secure_profile.name
  
  # SECURE: Security group with restricted access
  vpc_security_group_ids = [aws_security_group.secure_instance.id]
  
  # SECURE: Encrypted storage
  root_block_device {
    encrypted = true
    kms_key_id = aws_kms_key.instance_key.arn
  }
  
  user_data = base64encode(templatefile("${path.module}/secure_startup.sh", {
    instance_role = aws_iam_role.secure_instance_role.name
  }))
  
  tags = {
    Name        = "SecureWebServer"
    Environment = "production"
    SecurityProfile = "hardened"
  }
}

# SECURE: Minimal IAM role with least privilege
resource "aws_iam_role" "secure_instance_role" {
  name = "SecureInstanceRole"
  
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = {
        Service = "ec2.amazonaws.com"
      }
      Condition = {
        StringEquals = {
          "aws:RequestedRegion" = data.aws_region.current.name
        }
        DateGreaterThan = {
          "aws:CurrentTime" = "2023-01-01T00:00:00Z"
        }
      }
    }]
  })
}

# SECURE: Restrictive IAM policy with specific resource access
resource "aws_iam_role_policy" "secure_policy" {
  name = "SecureInstancePolicy"
  role = aws_iam_role.secure_instance_role.id
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "s3:GetObject",
          "s3:PutObject"
        ]
        Resource = [
          "${aws_s3_bucket.app_data.arn}/uploads/*",
          "${aws_s3_bucket.app_data.arn}/processed/*"
        ]
        Condition = {
          StringEquals = {
            "s3:x-amz-server-side-encryption" = "AES256"
          }
        }
      },
      {
        Effect = "Allow"
        Action = [
          "secretsmanager:GetSecretValue"
        ]
        Resource = aws_secretsmanager_secret.app_secrets.arn
        Condition = {
          StringEquals = {
            "secretsmanager:VersionStage" = "AWSCURRENT"
          }
        }
      },
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "${aws_cloudwatch_log_group.instance_logs.arn}:*"
      }
    ]
  })
}

# SECURE: Security group blocking metadata access from applications
resource "aws_security_group" "secure_instance" {
  name_prefix = "secure-instance-"
  vpc_id      = aws_vpc.main.id
  
  # Allow HTTPS outbound
  egress {
    from_port   = 443
    to_port     = 443
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
  }
  
  # Allow HTTP to specific endpoints only
  egress {
    from_port   = 80
    to_port     = 80
    protocol    = "tcp"
    cidr_blocks = ["203.0.113.0/24"]  # Specific allowed destinations
  }
  
  # Block access to metadata service from most sources
  egress {
    from_port   = 80
    to_port     = 80
    protocol    = "tcp"
    cidr_blocks = ["169.254.169.254/32"]  # Block IMDS
    description = "Block IMDS access - use iptables rules instead"
  }
  
  # Allow SSH from bastion only
  ingress {
    from_port       = 22
    to_port         = 22
    protocol        = "tcp"
    security_groups = [aws_security_group.bastion.id]
  }
  
  tags = {
    Name = "secure-instance-sg"
  }
}

# SECURE: Startup script with IMDS restrictions
# secure_startup.sh
#!/bin/bash
set -euo pipefail

# Enable logging
exec > >(tee /var/log/startup.log)
exec 2>&1

echo "Starting secure instance configuration..."

# Update system
yum update -y
yum install -y python3 python3-pip iptables-services

# SECURE: Block application access to IMDS using iptables
# Allow root access to IMDS (for system services)
iptables -A OUTPUT -m owner --uid-owner 0 -d 169.254.169.254 -j ACCEPT

# Block all other users from accessing IMDS
iptables -A OUTPUT -d 169.254.169.254 -j REJECT --reject-with icmp-port-unreachable

# Save iptables rules
service iptables save
systemctl enable iptables

# Install and configure application with IMDS protection
cat > /home/ec2-user/secure_app.py << 'APP'
import requests
import boto3
from flask import Flask, request, jsonify
from botocore.exceptions import ClientError
import ipaddress
from urllib.parse import urlparse

app = Flask(__name__)

# SECURE: Blocked IP ranges for SSRF protection
BLOCKED_RANGES = [
    ipaddress.IPv4Network('169.254.169.254/32'),  # IMDS
    ipaddress.IPv4Network('10.0.0.0/8'),         # Private
    ipaddress.IPv4Network('172.16.0.0/12'),      # Private
    ipaddress.IPv4Network('192.168.0.0/16'),     # Private
    ipaddress.IPv4Network('127.0.0.0/8'),        # Localhost
]

def is_safe_url(url):
    """Validate URL to prevent SSRF attacks"""
    try:
        parsed = urlparse(url)
        if not parsed.scheme in ['http', 'https']:
            return False
        
        # Resolve hostname to IP
        import socket
        ip = socket.gethostbyname(parsed.hostname)
        ip_addr = ipaddress.IPv4Address(ip)
        
        # Check against blocked ranges
        for blocked_range in BLOCKED_RANGES:
            if ip_addr in blocked_range:
                return False
        
        return True
    except Exception:
        return False

@app.route('/fetch')
def fetch_url():
    """Secure URL fetching with SSRF protection"""
    url = request.args.get('url')
    
    if not url:
        return jsonify({'error': 'No URL provided'}), 400
    
    # SECURE: Validate URL before making request
    if not is_safe_url(url):
        return jsonify({
            'error': 'URL not allowed - potential SSRF detected',
            'blocked_url': url
        }), 403
    
    try:
        # SECURE: Use requests with timeout and restrict redirects
        response = requests.get(
            url, 
            timeout=5, 
            allow_redirects=False,  # Prevent redirect-based SSRF
            headers={'User-Agent': 'SecureApp/1.0'}
        )
        
        # Don't return full response content for security
        return jsonify({
            'status': 'success',
            'status_code': response.status_code,
            'headers': dict(response.headers),
            'content_length': len(response.text)
        })
    
    except requests.exceptions.RequestException as e:
        return jsonify({'error': f'Request failed: {str(e)}'}), 500

@app.route('/aws-resource')
def get_aws_resource():
    """Secure AWS resource access using proper SDK"""
    try:
        # SECURE: Use boto3 with proper error handling
        # This automatically uses IMDSv2 and instance profile
        s3_client = boto3.client('s3')
        
        # Example: List specific bucket contents
        response = s3_client.list_objects_v2(
            Bucket='${aws_s3_bucket.app_data.id}',
            Prefix='uploads/',
            MaxKeys=10
        )
        
        objects = [{
            'key': obj['Key'],
            'size': obj['Size'],
            'last_modified': obj['LastModified'].isoformat()
        } for obj in response.get('Contents', [])]
        
        return jsonify({
            'bucket': '${aws_s3_bucket.app_data.id}',
            'objects': objects
        })
    
    except ClientError as e:
        return jsonify({
            'error': 'AWS operation failed',
            'code': e.response['Error']['Code']
        }), 500
    except Exception as e:
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    app.run(host='127.0.0.1', port=8080)  # Bind to localhost only
APP

# Create non-root user for application
useradd -m -s /bin/bash appuser
chown appuser:appuser /home/ec2-user/secure_app.py

# Install Python dependencies
sudo -u appuser pip3 install --user flask boto3 requests

# Start application as non-root user
sudo -u appuser python3 /home/ec2-user/secure_app.py &

echo "Secure instance configuration completed"

# SECURE: Create CloudWatch agent configuration
cat > /opt/aws/amazon-cloudwatch-agent/etc/amazon-cloudwatch-agent.json << 'CWCONFIG'
{
  "logs": {
    "logs_collected": {
      "files": {
        "collect_list": [
          {
            "file_path": "/var/log/startup.log",
            "log_group_name": "${aws_cloudwatch_log_group.instance_logs.name}",
            "log_stream_name": "{instance_id}/startup"
          },
          {
            "file_path": "/var/log/iptables.log",
            "log_group_name": "${aws_cloudwatch_log_group.instance_logs.name}",
            "log_stream_name": "{instance_id}/iptables"
          }
        ]
      }
    }
  },
  "metrics": {
    "metrics_collected": {
      "cpu": {
        "measurement": ["cpu_usage_idle", "cpu_usage_iowait"],
        "metrics_collection_interval": 60
      },
      "disk": {
        "measurement": ["used_percent"],
        "metrics_collection_interval": 60,
        "resources": ["*"]
      },
      "mem": {
        "measurement": ["mem_used_percent"],
        "metrics_collection_interval": 60
      }
    }
  }
}
CWCONFIG

# Start CloudWatch agent
/opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-ctl \
    -a fetch-config \
    -m ec2 \
    -c file:/opt/aws/amazon-cloudwatch-agent/etc/amazon-cloudwatch-agent.json \
    -s

# Configure log rotation
echo "/var/log/startup.log {
    daily
    missingok
    rotate 7
    compress
    delaycompress
    notifempty
}" > /etc/logrotate.d/startup
2

Implement Network-Level IMDS Blocking and Monitoring

Deploy network-level controls using security groups, NACLs, iptables rules, and container network policies to block unauthorized access to metadata endpoints. Implement monitoring and alerting for metadata service access attempts and unusual authentication patterns.

View implementation – HCL
# SECURE: VPC with IMDS protection using NACLs
resource "aws_network_acl" "imds_protection" {
  vpc_id     = aws_vpc.main.id
  subnet_ids = [aws_subnet.app_subnet.id]
  
  # Allow normal outbound traffic
  egress {
    rule_no    = 100
    protocol   = "tcp"
    action     = "allow"
    cidr_block = "0.0.0.0/0"
    from_port  = 443
    to_port    = 443
  }
  
  egress {
    rule_no    = 110
    protocol   = "tcp"
    action     = "allow"
    cidr_block = "0.0.0.0/0"
    from_port  = 80
    to_port    = 80
  }
  
  # Block IMDS access
  egress {
    rule_no    = 200
    protocol   = "tcp"
    action     = "deny"
    cidr_block = "169.254.169.254/32"
    from_port  = 80
    to_port    = 80
  }
  
  # Allow inbound traffic from ALB
  ingress {
    rule_no    = 100
    protocol   = "tcp"
    action     = "allow"
    cidr_block = aws_subnet.alb_subnet.cidr_block
    from_port  = 8080
    to_port    = 8080
  }
  
  tags = {
    Name = "IMDS-Protection-NACL"
  }
}

# SECURE: CloudWatch monitoring for IMDS access
resource "aws_cloudwatch_log_metric_filter" "imds_access_attempts" {
  name           = "IMDSAccessAttempts"
  log_group_name = aws_cloudwatch_log_group.instance_logs.name
  pattern        = "[timestamp, request_id, client_ip, method, uri=*169.254.169.254*, ...]"
  
  metric_transformation {
    name      = "IMDSAccessAttempts"
    namespace = "Security/IMDS"
    value     = "1"
    
    default_value = "0"
  }
}

# SECURE: CloudWatch alarm for IMDS access
resource "aws_cloudwatch_metric_alarm" "imds_access_alert" {
  alarm_name          = "IMDS-Unauthorized-Access"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = "1"
  metric_name         = "IMDSAccessAttempts"
  namespace           = "Security/IMDS"
  period              = "300"
  statistic           = "Sum"
  threshold           = "0"
  alarm_description   = "Alert on IMDS access attempts"
  alarm_actions       = [aws_sns_topic.security_alerts.arn]
  
  treat_missing_data = "notBreaching"
}

# SECURE: AWS Config rule to check IMDSv2 enforcement
resource "aws_config_config_rule" "imdsv2_enforcement" {
  name = "ec2-imdsv2-check"
  
  source {
    owner             = "AWS"
    source_identifier = "EC2_IMDSV2_CHECK"
  }
  
  depends_on = [aws_config_configuration_recorder.security_recorder]
}

# SECURE: Lambda function for automated IMDS remediation
resource "aws_lambda_function" "imds_remediation" {
  filename         = "imds_remediation.zip"
  function_name    = "imds-security-remediation"
  role            = aws_iam_role.lambda_remediation_role.arn
  handler         = "lambda_function.lambda_handler"
  source_code_hash = data.archive_file.imds_remediation.output_base64sha256
  runtime         = "python3.9"
  timeout         = 60
  
  environment {
    variables = {
      SNS_TOPIC_ARN = aws_sns_topic.security_alerts.arn
    }
  }
}

# SECURE: Lambda function code
data "archive_file" "imds_remediation" {
  type        = "zip"
  output_path = "imds_remediation.zip"
  
  source {
    content = <<EOF
import json
import boto3
import os
from botocore.exceptions import ClientError

ec2_client = boto3.client('ec2')
sns_client = boto3.client('sns')

def lambda_handler(event, context):
    """
    Automatically remediate IMDS configuration issues
    """
    
    try:
        # Check for non-compliant instances
        instances_response = ec2_client.describe_instances(
            Filters=[
                {
                    'Name': 'instance-state-name',
                    'Values': ['running', 'stopped']
                }
            ]
        )
        
        non_compliant_instances = []
        
        for reservation in instances_response['Reservations']:
            for instance in reservation['Instances']:
                instance_id = instance['InstanceId']
                
                # Check metadata options
                metadata_options = instance.get('MetadataOptions', {})
                http_tokens = metadata_options.get('HttpTokens', 'optional')
                hop_limit = metadata_options.get('HttpPutResponseHopLimit', 2)
                
                if http_tokens != 'required' or hop_limit > 1:
                    non_compliant_instances.append({
                        'instance_id': instance_id,
                        'current_tokens': http_tokens,
                        'current_hop_limit': hop_limit
                    })
        
        # Remediate non-compliant instances
        remediated_instances = []
        for instance_info in non_compliant_instances:
            instance_id = instance_info['instance_id']
            
            try:
                # Modify instance metadata options
                ec2_client.modify_instance_metadata_options(
                    InstanceId=instance_id,
                    HttpTokens='required',  # Enforce IMDSv2
                    HttpPutResponseHopLimit=1  # Prevent forwarding
                )
                
                remediated_instances.append(instance_id)
                
            except ClientError as e:
                print(f"Failed to remediate instance {instance_id}: {e}")
        
        # Send notification if instances were remediated
        if remediated_instances:
            message = {
                'remediated_instances': remediated_instances,
                'total_remediated': len(remediated_instances),
                'timestamp': context.aws_request_id
            }
            
            sns_client.publish(
                TopicArn=os.environ['SNS_TOPIC_ARN'],
                Subject='IMDS Configuration Remediated',
                Message=json.dumps(message, indent=2)
            )
        
        return {
            'statusCode': 200,
            'body': json.dumps({
                'non_compliant_found': len(non_compliant_instances),
                'remediated': len(remediated_instances)
            })
        }
        
    except Exception as e:
        error_msg = f"Error in IMDS remediation: {str(e)}"
        print(error_msg)
        
        sns_client.publish(
            TopicArn=os.environ['SNS_TOPIC_ARN'],
            Subject='IMDS Remediation Error',
            Message=error_msg
        )
        
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }
EOF
    filename = "lambda_function.py"
  }
}

# SECURE: EventBridge rule for scheduled IMDS checks
resource "aws_cloudwatch_event_rule" "imds_compliance_check" {
  name                = "imds-compliance-check"
  description         = "Check IMDS compliance daily"
  schedule_expression = "rate(24 hours)"
}

resource "aws_cloudwatch_event_target" "imds_remediation_target" {
  rule      = aws_cloudwatch_event_rule.imds_compliance_check.name
  target_id = "IMDSRemediationTarget"
  arn       = aws_lambda_function.imds_remediation.arn
}

# SECURE: GCP firewall rule blocking metadata access
resource "google_compute_firewall" "block_metadata_access" {
  name    = "block-metadata-access"
  network = google_compute_network.vpc.name
  
  # Block outbound traffic to metadata server
  deny {
    protocol = "tcp"
    ports    = ["80", "443"]
  }
  
  destination_ranges = ["169.254.169.254/32"]
  direction          = "EGRESS"
  priority           = 1000
  
  # Apply to all instances except those with metadata-access tag
  target_tags = ["no-metadata-access"]
}

# SECURE: Azure NSG rule blocking IMDS
resource "azurerm_network_security_rule" "block_imds_outbound" {
  name                        = "Block-IMDS-Outbound"
  priority                    = 200
  direction                   = "Outbound"
  access                      = "Deny"
  protocol                    = "Tcp"
  source_port_range          = "*"
  destination_port_range     = "80"
  source_address_prefix      = "*"
  destination_address_prefix = "169.254.169.254/32"
  resource_group_name        = azurerm_resource_group.main.name
  network_security_group_name = azurerm_network_security_group.app_nsg.name
}

# SECURE: Kubernetes NetworkPolicy blocking IMDS access
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: block-metadata-access
  namespace: production
spec:
  podSelector: {}
  policyTypes:
  - Egress
  egress:
  # Allow DNS
  - to: []
    ports:
    - protocol: UDP
      port: 53
  # Allow HTTPS to internet
  - to: []
    ports:
    - protocol: TCP
      port: 443
  # Allow specific internal services
  - to:
    - podSelector:
        matchLabels:
          app: database
    ports:
    - protocol: TCP
      port: 5432
  # Explicitly deny access to metadata endpoints
  - to: []
    ports:
    - protocol: TCP
      port: 80
    # This rule will be overridden by Calico/Cilium policies below

# SECURE: Calico GlobalNetworkPolicy for IMDS blocking
apiVersion: projectcalico.org/v3
kind: GlobalNetworkPolicy
metadata:
  name: block-cloud-metadata
spec:
  selector: "!has(allow-metadata-access)"
  types:
  - Egress
  egress:
  # Deny access to cloud metadata services
  - action: Deny
    destination:
      nets:
      - 169.254.169.254/32  # AWS/GCP/Azure IMDS
    protocol: TCP
    destination:
      ports:
      - 80
      - 443
  # Allow other egress traffic
  - action: Allow
---
# SECURE: Cilium CiliumNetworkPolicy for metadata blocking
apiVersion: cilium.io/v2
kind: CiliumNetworkPolicy
metadata:
  name: block-metadata-service
spec:
  endpointSelector: {}
  egress:
  # Deny access to metadata service
  - toCIDR:
    - 169.254.169.254/32
    toPorts:
    - ports:
      - port: "80"
        protocol: TCP
      - port: "443"
        protocol: TCP
    rules:
      http:
      - method: "GET"
        path: "/.*"
        action: DENY
  # Allow other egress
  - toEndpoints:
    - matchLabels:
        k8s:io.kubernetes.pod.namespace: kube-system
  - toFQDNs:
    - matchPattern: "*.amazonaws.com"
    - matchPattern: "*.googleapis.com"
    - matchPattern: "*.azure.com"
    toPorts:
    - ports:
      - port: "443"
        protocol: TCP
3

Use Workload Identity and Secure Service-to-Service Authentication

Implement workload identity federation, service mesh authentication, and proper credential management to avoid relying on metadata services for authentication. Use short-lived tokens, credential rotation, and secure secret management services.

View implementation – YAML
# SECURE: AWS ECS with task roles instead of instance metadata
resource "aws_ecs_cluster" "secure_cluster" {
  name = "secure-application-cluster"
  
  setting {
    name  = "containerInsights"
    value = "enabled"
  }
  
  configuration {
    execute_command_configuration {
      logging = "OVERRIDE"
      log_configuration {
        cloud_watch_log_group_name = aws_cloudwatch_log_group.ecs_exec_logs.name
      }
    }
  }
}

# SECURE: ECS task definition with specific task role
resource "aws_ecs_task_definition" "secure_app" {
  family                   = "secure-web-application"
  network_mode             = "awsvpc"
  requires_compatibilities = ["FARGATE"]
  cpu                      = 256
  memory                   = 512
  
  # SECURE: Specific task role instead of instance metadata
  task_role_arn      = aws_iam_role.ecs_task_role.arn
  execution_role_arn = aws_iam_role.ecs_execution_role.arn
  
  container_definitions = jsonencode([
    {
      name  = "web-app"
      image = "secure-web-app:latest"
      
      # SECURE: No privileged mode or host network
      privileged = false
      
      portMappings = [
        {
          containerPort = 8080
          protocol      = "tcp"
        }
      ]
      
      # SECURE: Use Secrets Manager instead of environment variables
      secrets = [
        {
          name      = "DATABASE_PASSWORD"
          valueFrom = aws_secretsmanager_secret.db_credentials.arn
        },
        {
          name      = "API_KEY"
          valueFrom = aws_secretsmanager_secret.api_credentials.arn
        }
      ]
      
      # SECURE: Non-sensitive configuration from Parameter Store
      environment = [
        {
          name  = "APP_ENV"
          value = "production"
        },
        {
          name  = "LOG_LEVEL"
          value = "INFO"
        }
      ]
      
      logConfiguration = {
        logDriver = "awslogs"
        options = {
          "awslogs-group"         = aws_cloudwatch_log_group.app_logs.name
          "awslogs-region"        = data.aws_region.current.name
          "awslogs-stream-prefix" = "web-app"
        }
      }
      
      # SECURE: Health check configuration
      healthCheck = {
        command = ["CMD-SHELL", "curl -f http://localhost:8080/health || exit 1"]
        interval    = 30
        timeout     = 5
        retries     = 3
        startPeriod = 60
      }
    }
  ])
}

# SECURE: Specific ECS task role with minimal permissions
resource "aws_iam_role" "ecs_task_role" {
  name = "secure-ecs-task-role"
  
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "ecs-tasks.amazonaws.com"
        }
        Condition = {
          StringEquals = {
            "aws:RequestedRegion" = data.aws_region.current.name
          }
        }
      }
    ]
  })
}

# SECURE: Task role policy with least privilege
resource "aws_iam_role_policy" "ecs_task_policy" {
  name = "secure-ecs-task-policy"
  role = aws_iam_role.ecs_task_role.id
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "secretsmanager:GetSecretValue"
        ]
        Resource = [
          aws_secretsmanager_secret.db_credentials.arn,
          aws_secretsmanager_secret.api_credentials.arn
        ]
        Condition = {
          StringEquals = {
            "secretsmanager:VersionStage" = "AWSCURRENT"
          }
        }
      },
      {
        Effect = "Allow"
        Action = [
          "s3:GetObject",
          "s3:PutObject"
        ]
        Resource = [
          "${aws_s3_bucket.app_data.arn}/uploads/*"
        ]
      }
    ]
  })
}

# SECURE: GKE with Workload Identity
resource "google_container_cluster" "secure_cluster" {
  name               = "secure-gke-cluster"
  location           = "us-central1"
  initial_node_count = 1
  
  # SECURE: Workload Identity enabled
  workload_identity_config {
    workload_pool = "${var.project_id}.svc.id.goog"
  }
  
  # SECURE: Network policy enabled
  network_policy {
    enabled = true
  }
  
  # SECURE: Private cluster configuration
  private_cluster_config {
    enable_private_nodes    = true
    enable_private_endpoint = false
    master_ipv4_cidr_block  = "172.16.0.0/28"
  }
  
  ip_allocation_policy {}
  
  master_auth {
    client_certificate_config {
      issue_client_certificate = false
    }
  }
  
  # SECURE: Disable metadata server access
  node_config {
    oauth_scopes = [
      "https://www.googleapis.com/auth/logging.write",
      "https://www.googleapis.com/auth/monitoring"
    ]
    
    # SECURE: Disable legacy metadata endpoints
    metadata = {
      disable-legacy-endpoints = "true"
    }
    
    # SECURE: Use custom service account
    service_account = google_service_account.gke_node_sa.email
    
    workload_metadata_config {
      mode = "GKE_METADATA"
    }
  }
}

# SECURE: Kubernetes service account with Workload Identity
apiVersion: v1
kind: ServiceAccount
metadata:
  name: secure-app-ksa
  namespace: production
  annotations:
    iam.gke.io/gcp-service-account: secure-app-sa@PROJECT_ID.iam.gserviceaccount.com
automountServiceAccountToken: false
---
# SECURE: Deployment using Workload Identity
apiVersion: apps/v1
kind: Deployment
metadata:
  name: secure-web-app
  namespace: production
spec:
  replicas: 3
  selector:
    matchLabels:
      app: secure-web-app
  template:
    metadata:
      labels:
        app: secure-web-app
    spec:
      serviceAccountName: secure-app-ksa
      
      # SECURE: Security context
      securityContext:
        runAsNonRoot: true
        runAsUser: 1000
        runAsGroup: 1000
        fsGroup: 1000
      
      containers:
      - name: web-app
        image: gcr.io/PROJECT_ID/secure-web-app:latest
        
        securityContext:
          allowPrivilegeEscalation: false
          readOnlyRootFilesystem: true
          runAsNonRoot: true
          runAsUser: 1000
          capabilities:
            drop: ["ALL"]
        
        ports:
        - containerPort: 8080
          name: http
          protocol: TCP
        
        # SECURE: Use projected volumes for secrets
        volumeMounts:
        - name: tmp
          mountPath: /tmp
        - name: secret-volume
          mountPath: /etc/secrets
          readOnly: true
        
        env:
        - name: GOOGLE_APPLICATION_CREDENTIALS
          value: "/var/run/secrets/cloud.google.com/service-account-token"
        - name: APP_ENV
          value: "production"
        
        # SECURE: Resource limits
        resources:
          limits:
            memory: "256Mi"
            cpu: "200m"
          requests:
            memory: "128Mi"
            cpu: "100m"
        
        # SECURE: Probes
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        
        readinessProbe:
          httpGet:
            path: /ready
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 5
      
      volumes:
      - name: tmp
        emptyDir: {}
      - name: secret-volume
        projected:
          sources:
          - secret:
              name: app-secrets
              items:
              - key: database-password
                path: db-password
              - key: api-key
                path: api-key
          - serviceAccountToken:
              path: service-account-token
              expirationSeconds: 3600
              audience: gcp
---
# SECURE: Azure AKS with Azure AD Workload Identity
apiVersion: v1
kind: ServiceAccount
metadata:
  name: secure-app-sa
  namespace: production
  annotations:
    azure.workload.identity/client-id: CLIENT_ID
    azure.workload.identity/tenant-id: TENANT_ID
automountServiceAccountToken: false
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: secure-azure-app
  namespace: production
spec:
  replicas: 2
  selector:
    matchLabels:
      app: secure-azure-app
  template:
    metadata:
      labels:
        app: secure-azure-app
        azure.workload.identity/use: "true"
    spec:
      serviceAccountName: secure-app-sa
      
      securityContext:
        runAsNonRoot: true
        runAsUser: 1001
        runAsGroup: 1001
        fsGroup: 1001
      
      containers:
      - name: app
        image: myregistry.azurecr.io/secure-app:latest
        
        securityContext:
          allowPrivilegeEscalation: false
          readOnlyRootFilesystem: true
          runAsNonRoot: true
          runAsUser: 1001
          capabilities:
            drop: ["ALL"]
        
        env:
        - name: AZURE_CLIENT_ID
          value: CLIENT_ID
        - name: AZURE_TENANT_ID
          value: TENANT_ID
        - name: AZURE_FEDERATED_TOKEN_FILE
          value: "/var/run/secrets/azure/tokens/azure-identity-token"
        
        volumeMounts:
        - name: tmp
          mountPath: /tmp
        - name: azure-identity-token
          mountPath: /var/run/secrets/azure/tokens
          readOnly: true
        
        resources:
          limits:
            memory: "512Mi"
            cpu: "300m"
          requests:
            memory: "256Mi"
            cpu: "150m"
      
      volumes:
      - name: tmp
        emptyDir: {}
      - name: azure-identity-token
        projected:
          sources:
          - serviceAccountToken:
              path: azure-identity-token
              expirationSeconds: 3600
              audience: api://AzureADTokenExchange

# SECURE: Service mesh with mTLS (Istio)
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
  name: default
  namespace: production
spec:
  mtls:
    mode: STRICT
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: secure-app-authz
  namespace: production
spec:
  selector:
    matchLabels:
      app: secure-web-app
  rules:
  - from:
    - source:
        principals: ["cluster.local/ns/ingress/sa/istio-ingressgateway-service-account"]
  - to:
    - operation:
        methods: ["GET", "POST"]
        paths: ["/api/*", "/health", "/ready"]
4

Deploy Runtime Security Monitoring and Incident Response

Implement comprehensive monitoring, logging, and incident response for metadata service access attempts. Use SIEM integration, automated response playbooks, and forensic capabilities to detect and respond to IMDS exploitation attempts.

View implementation – HCL
# SECURE: Comprehensive IMDS security monitoring stack
resource "aws_cloudwatch_log_group" "imds_security_logs" {
  name              = "/aws/security/imds-monitoring"
  retention_in_days = 365
  kms_key_id       = aws_kms_key.logs_key.arn
}

# SECURE: CloudTrail for API monitoring
resource "aws_cloudtrail" "security_trail" {
  name           = "security-monitoring-trail"
  s3_bucket_name = aws_s3_bucket.cloudtrail_logs.id
  
  # Monitor all regions
  include_global_service_events = true
  is_multi_region_trail        = true
  enable_logging               = true
  
  # Log data events
  event_selector {
    read_write_type                 = "All"
    include_management_events       = true
    
    data_resource {
      type   = "AWS::S3::Object"
      values = ["arn:aws:s3:::*/*"]
    }
  }
  
  # Send to CloudWatch Logs
  cloud_watch_logs_group_arn = "${aws_cloudwatch_log_group.cloudtrail_logs.arn}:*"
  cloud_watch_logs_role_arn  = aws_iam_role.cloudtrail_logs_role.arn
  
  # Enable log file validation
  enable_log_file_validation = true
  
  # KMS encryption
  kms_key_id = aws_kms_key.cloudtrail_key.arn
}

# SECURE: EventBridge custom bus for security events
resource "aws_cloudwatch_event_bus" "security_bus" {
  name = "security-events"
  
  tags = {
    Purpose = "Security event aggregation"
  }
}

# SECURE: Lambda function for IMDS security analysis
resource "aws_lambda_function" "imds_security_analyzer" {
  filename         = "imds_analyzer.zip"
  function_name    = "imds-security-analyzer"
  role            = aws_iam_role.lambda_analyzer_role.arn
  handler         = "lambda_function.lambda_handler"
  source_code_hash = data.archive_file.imds_analyzer.output_base64sha256
  runtime         = "python3.9"
  timeout         = 300
  
  environment {
    variables = {
      SECURITY_TABLE_NAME = aws_dynamodb_table.security_events.name
      SNS_TOPIC_ARN      = aws_sns_topic.security_alerts.arn
      SLACK_WEBHOOK_URL  = var.slack_webhook_url
    }
  }
  
  # VPC configuration for database access
  vpc_config {
    subnet_ids         = aws_subnet.private_subnets[*].id
    security_group_ids = [aws_security_group.lambda_sg.id]
  }
}

# SECURE: Lambda analyzer code
data "archive_file" "imds_analyzer" {
  type        = "zip"
  output_path = "imds_analyzer.zip"
  
  source {
    content = <<EOF
import json
import boto3
import os
from datetime import datetime, timedelta
import requests
from botocore.exceptions import ClientError

dynamodb = boto3.resource('dynamodb')
sns = boto3.client('sns')
ec2 = boto3.client('ec2')
security_table = dynamodb.Table(os.environ['SECURITY_TABLE_NAME'])

def lambda_handler(event, context):
    """
    Analyze IMDS-related security events and generate alerts
    """
    
    try:
        # Parse the incoming event
        if 'Records' in event:
            for record in event['Records']:
                if record['eventSource'] == 'aws:logs':
                    # CloudWatch Logs event
                    log_data = json.loads(record['awslogs']['data'])
                    analyze_log_events(log_data['logEvents'])
                
                elif record['eventSource'] == 'aws:s3':
                    # CloudTrail log file
                    bucket = record['s3']['bucket']['name']
                    key = record['s3']['object']['key']
                    analyze_cloudtrail_logs(bucket, key)
        
        # Perform periodic security analysis
        analyze_imds_security_posture()
        
        return {
            'statusCode': 200,
            'body': json.dumps('Analysis completed successfully')
        }
        
    except Exception as e:
        print(f"Error in IMDS security analysis: {str(e)}")
        send_alert(f"IMDS Security Analyzer Error: {str(e)}", "HIGH")
        
        return {
            'statusCode': 500,
            'body': json.dumps(f'Error: {str(e)}')
        }

def analyze_log_events(log_events):
    """Analyze CloudWatch log events for IMDS access patterns"""
    
    suspicious_patterns = [
        '169.254.169.254',
        'latest/meta-data',
        'computeMetadata/v1',
        'metadata/identity/oauth2/token',
        'iam/security-credentials',
        'service-accounts/default/token'
    ]
    
    for event in log_events:
        message = event.get('message', '')
        timestamp = event.get('timestamp', 0)
        
        # Check for suspicious patterns
        for pattern in suspicious_patterns:
            if pattern in message.lower():
                security_event = {
                    'event_id': f"imds-{timestamp}-{hash(message)}",
                    'timestamp': datetime.fromtimestamp(timestamp/1000).isoformat(),
                    'event_type': 'IMDS_ACCESS_ATTEMPT',
                    'severity': 'HIGH',
                    'source': 'CloudWatch Logs',
                    'details': {
                        'pattern_matched': pattern,
                        'log_message': message,
                        'log_timestamp': timestamp
                    }
                }
                
                store_security_event(security_event)
                
                # Check if this is part of a larger pattern
                if is_attack_pattern(security_event):
                    trigger_incident_response(security_event)

def analyze_cloudtrail_logs(bucket, key):
    """Analyze CloudTrail logs for IMDS-related API calls"""
    
    s3 = boto3.client('s3')
    
    try:
        # Download and parse CloudTrail log file
        response = s3.get_object(Bucket=bucket, Key=key)
        log_content = response['Body'].read()
        
        # Parse JSON log records
        log_data = json.loads(log_content)
        
        for record in log_data.get('Records', []):
            analyze_api_call(record)
            
    except Exception as e:
        print(f"Error analyzing CloudTrail logs: {e}")

def analyze_api_call(record):
    """Analyze individual API call for security concerns"""
    
    event_name = record.get('eventName', '')
    source_ip = record.get('sourceIPAddress', '')
    user_identity = record.get('userIdentity', {})
    
    # Suspicious API calls
    suspicious_apis = [
        'ModifyInstanceMetadataOptions',
        'DescribeInstanceAttribute',
        'GetRole',
        'AssumeRole',
        'GetSessionToken'
    ]
    
    if event_name in suspicious_apis:
        security_event = {
            'event_id': f"api-{record.get('eventTime')}-{record.get('awsRequestId')}",
            'timestamp': record.get('eventTime'),
            'event_type': 'SUSPICIOUS_API_CALL',
            'severity': 'MEDIUM',
            'source': 'CloudTrail',
            'details': {
                'api_call': event_name,
                'source_ip': source_ip,
                'user_identity': user_identity,
                'request_parameters': record.get('requestParameters', {})
            }
        }
        
        store_security_event(security_event)

def analyze_imds_security_posture():
    """Analyze overall IMDS security posture"""
    
    try:
        # Check EC2 instances for IMDS configuration
        instances = ec2.describe_instances()['Reservations']
        
        vulnerable_instances = []
        
        for reservation in instances:
            for instance in reservation['Instances']:
                instance_id = instance['InstanceId']
                metadata_options = instance.get('MetadataOptions', {})
                
                # Check for vulnerabilities
                if (metadata_options.get('HttpTokens') != 'required' or
                    metadata_options.get('HttpPutResponseHopLimit', 2) > 1):
                    
                    vulnerable_instances.append({
                        'instance_id': instance_id,
                        'state': instance['State']['Name'],
                        'metadata_options': metadata_options
                    })
        
        # Generate security report
        if vulnerable_instances:
            security_event = {
                'event_id': f"posture-{datetime.now().isoformat()}",
                'timestamp': datetime.now().isoformat(),
                'event_type': 'SECURITY_POSTURE_VIOLATION',
                'severity': 'HIGH',
                'source': 'Security Posture Analysis',
                'details': {
                    'vulnerable_instance_count': len(vulnerable_instances),
                    'vulnerable_instances': vulnerable_instances
                }
            }
            
            store_security_event(security_event)
            send_alert(
                f"Found {len(vulnerable_instances)} instances with vulnerable IMDS configuration",
                "HIGH"
            )
    
    except Exception as e:
        print(f"Error in security posture analysis: {e}")

def is_attack_pattern(security_event):
    """Determine if event is part of a larger attack pattern"""
    
    # Look for multiple events from the same source in short time window
    try:
        response = security_table.query(
            IndexName='TimestampIndex',
            KeyConditionExpression=boto3.dynamodb.conditions.Key('timestamp').between(
                (datetime.now() - timedelta(hours=1)).isoformat(),
                datetime.now().isoformat()
            ),
            FilterExpression=boto3.dynamodb.conditions.Attr('event_type').eq('IMDS_ACCESS_ATTEMPT')
        )
        
        # If more than 5 IMDS access attempts in the last hour, consider it an attack
        return len(response['Items']) > 5
        
    except Exception as e:
        print(f"Error checking attack pattern: {e}")
        return False

def store_security_event(security_event):
    """Store security event in DynamoDB"""
    
    try:
        security_table.put_item(Item=security_event)
    except Exception as e:
        print(f"Error storing security event: {e}")

def trigger_incident_response(security_event):
    """Trigger automated incident response"""
    
    # Create incident ticket
    incident_data = {
        'title': f"Potential IMDS Attack Detected - {security_event['event_id']}",
        'severity': 'HIGH',
        'description': f"Multiple IMDS access attempts detected: {json.dumps(security_event, indent=2)}",
        'timestamp': security_event['timestamp']
    }
    
    # Send to incident management system
    send_alert(
        f"🚨 INCIDENT: Potential IMDS attack pattern detected\n{json.dumps(incident_data, indent=2)}",
        "CRITICAL"
    )
    
    # Auto-remediation actions could go here
    # - Block suspicious IP addresses
    # - Modify instance metadata options
    # - Isolate affected instances

def send_alert(message, severity):
    """Send alert via SNS and Slack"""
    
    try:
        # Send SNS notification
        sns.publish(
            TopicArn=os.environ['SNS_TOPIC_ARN'],
            Subject=f"IMDS Security Alert - {severity}",
            Message=message
        )
        
        # Send Slack notification if webhook URL is provided
        slack_webhook = os.environ.get('SLACK_WEBHOOK_URL')
        if slack_webhook:
            slack_data = {
                'text': f"IMDS Security Alert",
                'attachments': [{
                    'color': 'danger' if severity in ['HIGH', 'CRITICAL'] else 'warning',
                    'fields': [{
                        'title': f'Severity: {severity}',
                        'value': message,
                        'short': False
                    }]
                }]
            }
            
            requests.post(slack_webhook, json=slack_data)
    
    except Exception as e:
        print(f"Error sending alert: {e}")
EOF
    filename = "lambda_function.py"
  }
}

# SECURE: DynamoDB table for security events
resource "aws_dynamodb_table" "security_events" {
  name           = "imds-security-events"
  billing_mode   = "PAY_PER_REQUEST"
  hash_key       = "event_id"
  
  attribute {
    name = "event_id"
    type = "S"
  }
  
  attribute {
    name = "timestamp"
    type = "S"
  }
  
  global_secondary_index {
    name     = "TimestampIndex"
    hash_key = "timestamp"
  }
  
  # Enable point-in-time recovery
  point_in_time_recovery {
    enabled = true
  }
  
  # Enable encryption
  server_side_encryption {
    enabled     = true
    kms_key_arn = aws_kms_key.dynamodb_key.arn
  }
  
  tags = {
    Purpose = "IMDS security event storage"
  }
}

# SECURE: CloudWatch dashboard for IMDS security monitoring
resource "aws_cloudwatch_dashboard" "imds_security_dashboard" {
  dashboard_name = "IMDS-Security-Monitoring"
  
  dashboard_body = jsonencode({
    widgets = [
      {
        type   = "metric"
        x      = 0
        y      = 0
        width  = 12
        height = 6
        
        properties = {
          metrics = [
            ["Security/IMDS", "IMDSAccessAttempts"],
            [".", "SuspiciousAPIRequests"],
            [".", "VulnerableInstances"]
          ]
          view    = "timeSeries"
          stacked = false
          region  = data.aws_region.current.name
          title   = "IMDS Security Metrics"
          period  = 300
        }
      },
      {
        type   = "log"
        x      = 0
        y      = 6
        width  = 24
        height = 6
        
        properties = {
          query   = "SOURCE '${aws_cloudwatch_log_group.imds_security_logs.name}' | fields @timestamp, @message | filter @message like /169.254.169.254/ | sort @timestamp desc | limit 100"
          region  = data.aws_region.current.name
          title   = "Recent IMDS Access Attempts"
        }
      },
      {
        type   = "number"
        x      = 0
        y      = 12
        width  = 6
        height = 6
        
        properties = {
          metrics = [
            ["Security/IMDS", "VulnerableInstances"]
          ]
          view   = "singleValue"
          region = data.aws_region.current.name
          title  = "Vulnerable Instances"
        }
      },
      {
        type   = "number"
        x      = 6
        y      = 12
        width  = 6
        height = 6
        
        properties = {
          metrics = [
            ["Security/IMDS", "IMDSAccessAttempts", { "stat": "Sum" }]
          ]
          view   = "singleValue"
          region = data.aws_region.current.name
          title  = "Total Access Attempts (24h)"
        }
      }
    ]
  })
}

# SECURE: Automated incident response playbook
resource "aws_ssm_document" "imds_incident_response" {
  name          = "IMDS-Incident-Response-Playbook"
  document_type = "Automation"
  document_format = "YAML"
  
  content = <<DOC
schemaVersion: '0.3'
description: 'Automated response to IMDS security incidents'
assumeRole: '${aws_iam_role.incident_response_role.arn}'
parameters:
  InstanceId:
    type: String
    description: 'EC2 Instance ID to remediate'
  Severity:
    type: String
    description: 'Incident severity level'
    allowedValues: ['LOW', 'MEDIUM', 'HIGH', 'CRITICAL']
mainSteps:
- name: 'ValidateInstance'
  action: 'aws:executeAwsApi'
  inputs:
    Service: 'ec2'
    Api: 'DescribeInstances'
    InstanceIds:
    - '{{ InstanceId }}'
  outputs:
  - Name: 'InstanceState'
    Selector: '$.Reservations[0].Instances[0].State.Name'
    Type: 'String'
    
- name: 'CheckIMDSConfiguration'
  action: 'aws:executeAwsApi'
  inputs:
    Service: 'ec2'
    Api: 'DescribeInstanceAttribute'
    InstanceId: '{{ InstanceId }}'
    Attribute: 'instanceInitiatedShutdownBehavior'
  
- name: 'RemediateIMDSv2'
  action: 'aws:executeAwsApi'
  inputs:
    Service: 'ec2'
    Api: 'ModifyInstanceMetadataOptions'
    InstanceId: '{{ InstanceId }}'
    HttpTokens: 'required'
    HttpPutResponseHopLimit: 1
  
- name: 'IsolateInstance'
  action: 'aws:executeAwsApi'
  when:
    StringEquals:
    - '{{ Severity }}'
    - 'CRITICAL'
  inputs:
    Service: 'ec2'
    Api: 'ModifyInstanceAttribute'
    InstanceId: '{{ InstanceId }}'
    Groups:
    - '${aws_security_group.isolation_sg.id}'
    
- name: 'NotifySecurityTeam'
  action: 'aws:executeAwsApi'
  inputs:
    Service: 'sns'
    Api: 'Publish'
    TopicArn: '${aws_sns_topic.security_alerts.arn}'
    Subject: 'IMDS Incident Response Completed'
    Message: |
      Automated remediation completed for instance {{ InstanceId }}
      Severity: {{ Severity }}
      Actions taken:
      - Enforced IMDSv2
      - Limited hop count to 1
      {% if Severity == 'CRITICAL' %}
      - Isolated instance with restricted security group
      {% endif %}
DOC
}

Detect This Vulnerability in Your Code

Sourcery automatically identifies insecure cloud instance metadata service (imds) access leading to credential theft and many other security issues in your codebase.