#!/usr/bin/env python3
# SECURE: Application with proper SSRF protection and IMDS security
import requests
from flask import Flask, request, jsonify
import json
import os
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
import ipaddress
import socket
from urllib.parse import urlparse
import logging
from functools import wraps
import time
from collections import defaultdict
app = Flask(__name__)
# Configure secure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# SECURE: Rate limiting
request_counts = defaultdict(list)
RATE_LIMIT_REQUESTS = 10
RATE_LIMIT_WINDOW = 60 # seconds
# SECURE: Blocked networks for SSRF protection
BLOCKED_NETWORKS = [
ipaddress.IPv4Network('169.254.169.254/32'), # Cloud metadata
ipaddress.IPv4Network('10.0.0.0/8'), # Private
ipaddress.IPv4Network('172.16.0.0/12'), # Private
ipaddress.IPv4Network('192.168.0.0/16'), # Private
ipaddress.IPv4Network('127.0.0.0/8'), # Localhost
ipaddress.IPv4Network('0.0.0.0/8'), # Invalid
ipaddress.IPv4Network('224.0.0.0/4'), # Multicast
ipaddress.IPv4Network('240.0.0.0/4'), # Reserved
]
# SECURE: Allowed domains for external requests
ALLOWED_DOMAINS = [
'api.github.com',
'api.twitter.com',
'jsonplaceholder.typicode.com',
# Add your trusted domains here
]
def rate_limit(f):
"""Rate limiting decorator"""
@wraps(f)
def decorated_function(*args, **kwargs):
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr)
current_time = time.time()
# Clean old requests
request_counts[client_ip] = [
req_time for req_time in request_counts[client_ip]
if current_time - req_time < RATE_LIMIT_WINDOW
]
# Check rate limit
if len(request_counts[client_ip]) >= RATE_LIMIT_REQUESTS:
logger.warning(f"Rate limit exceeded for IP: {client_ip}")
return jsonify({'error': 'Rate limit exceeded'}), 429
request_counts[client_ip].append(current_time)
return f(*args, **kwargs)
return decorated_function
def is_safe_url(url):
"""Comprehensive URL validation to prevent SSRF"""
try:
parsed = urlparse(url)
# Check scheme
if parsed.scheme not in ['http', 'https']:
logger.warning(f"Blocked non-HTTP(S) scheme: {parsed.scheme}")
return False
# Check for suspicious URLs
suspicious_patterns = [
'169.254.169.254', # Metadata service
'metadata',
'computeMetadata',
'localhost',
'127.0.0.1',
'0.0.0.0',
]
for pattern in suspicious_patterns:
if pattern in url.lower():
logger.warning(f"Blocked suspicious URL pattern: {pattern} in {url}")
return False
# Resolve hostname to IP
try:
hostname = parsed.hostname
if not hostname:
return False
# Check if hostname is in allowed domains
if hostname not in ALLOWED_DOMAINS:
logger.warning(f"Domain not in allowlist: {hostname}")
return False
# Resolve to IP and check against blocked networks
ip_address = socket.gethostbyname(hostname)
ip_addr = ipaddress.IPv4Address(ip_address)
for blocked_network in BLOCKED_NETWORKS:
if ip_addr in blocked_network:
logger.warning(f"Blocked IP address: {ip_address} in network {blocked_network}")
return False
return True
except (socket.gaierror, ValueError, ipaddress.AddressValueError) as e:
logger.warning(f"DNS resolution failed for {hostname}: {e}")
return False
except Exception as e:
logger.error(f"URL validation error: {e}")
return False
# SECURE: URL fetching with validation
@app.route('/fetch-url', methods=['GET'])
@rate_limit
def fetch_url():
"""SECURE: Fetch content from validated URLs only"""
target_url = request.args.get('url')
if not target_url:
return jsonify({'error': 'URL parameter required'}), 400
# SECURE: Validate URL before making request
if not is_safe_url(target_url):
logger.warning(f"Blocked unsafe URL request: {target_url}")
return jsonify({
'error': 'URL not allowed - potential SSRF detected',
'blocked_url': target_url
}), 403
try:
# SECURE: Restricted HTTP client configuration
response = requests.get(
target_url,
timeout=5, # Short timeout
allow_redirects=False, # Prevent redirect-based SSRF
headers={
'User-Agent': 'SecureApp/1.0',
'Accept': 'application/json, text/plain'
},
stream=False # Don't stream large responses
)
# SECURE: Limit response size
if len(response.content) > 10000: # 10KB limit
logger.warning(f"Response too large from {target_url}")
return jsonify({'error': 'Response too large'}), 413
# SECURE: Don't return full response content
return jsonify({
'status': 'success',
'status_code': response.status_code,
'content_type': response.headers.get('Content-Type'),
'content_length': len(response.text),
'content_preview': response.text[:200] + '...' if len(response.text) > 200 else response.text
})
except requests.RequestException as e:
logger.error(f"Request failed for {target_url}: {e}")
return jsonify({'error': 'Request failed'}), 500
# SECURE: Proper AWS resource access using SDK
@app.route('/get-instance-info', methods=['GET'])
@rate_limit
def get_instance_info():
"""SECURE: Get instance information using proper AWS SDK"""
try:
# SECURE: Use boto3 which handles IMDSv2 automatically
ec2_client = boto3.client('ec2')
# Get instance identity from metadata (secure way)
try:
# This uses IMDSv2 automatically and respects instance role permissions
response = requests.get(
'http://169.254.169.254/latest/dynamic/instance-identity/document',
timeout=2
)
if response.status_code == 200:
identity = json.loads(response.text)
# SECURE: Return only safe, non-sensitive information
safe_info = {
'instance_id': identity.get('instanceId'),
'instance_type': identity.get('instanceType'),
'region': identity.get('region'),
'availability_zone': identity.get('availabilityZone'),
# Don't expose account ID, private IPs, etc.
}
return jsonify({
'instance_info': safe_info,
'note': 'Limited instance information for security'
})
except requests.RequestException:
# Fallback if not running on EC2
return jsonify({
'instance_info': None,
'note': 'Not running on EC2 instance'
})
except NoCredentialsError:
logger.warning("AWS credentials not configured")
return jsonify({'error': 'AWS credentials not available'}), 500
except Exception as e:
logger.error(f"Error getting instance info: {e}")
return jsonify({'error': 'Failed to get instance info'}), 500
# SECURE: Proper cloud resource access
@app.route('/get-cloud-resources', methods=['GET'])
@rate_limit
def get_cloud_resources():
"""SECURE: Access cloud resources using proper SDKs and authentication"""
resources = {}
# SECURE: AWS resources using SDK
try:
s3_client = boto3.client('s3')
# SECURE: List only specific buckets with proper permissions
try:
# This requires proper IAM permissions
response = s3_client.list_buckets()
# SECURE: Return limited, non-sensitive information
resources['aws'] = {
'bucket_count': len(response.get('Buckets', [])),
'accessible_buckets': [{
'name': bucket['Name'],
'creation_date': bucket['CreationDate'].isoformat()
} for bucket in response.get('Buckets', [])[:5]] # Limit to 5
}
except ClientError as e:
resources['aws'] = {
'error': f'Access denied: {e.response["Error"]["Code"]}'
}
except NoCredentialsError:
resources['aws'] = {'error': 'No AWS credentials configured'}
except Exception as e:
logger.error(f"AWS error: {e}")
resources['aws'] = {'error': 'AWS access failed'}
# SECURE: GCP resources using proper authentication
try:
from google.cloud import storage
from google.auth.exceptions import DefaultCredentialsError
try:
client = storage.Client()
buckets = list(client.list_buckets(max_results=5))
resources['gcp'] = {
'bucket_count': len(buckets),
'project_id': client.project
}
except DefaultCredentialsError:
resources['gcp'] = {'error': 'No GCP credentials configured'}
except ImportError:
resources['gcp'] = {'error': 'GCP client library not installed'}
except Exception as e:
logger.error(f"GCP error: {e}")
resources['gcp'] = {'error': 'GCP access failed'}
# SECURE: Azure resources using proper authentication
try:
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ClientAuthenticationError
try:
credential = DefaultAzureCredential()
# This would require proper configuration
resources['azure'] = {
'authentication': 'configured',
'note': 'Azure resources require proper service principal or managed identity'
}
except ClientAuthenticationError:
resources['azure'] = {'error': 'Azure authentication failed'}
except ImportError:
resources['azure'] = {'error': 'Azure client library not installed'}
except Exception as e:
logger.error(f"Azure error: {e}")
resources['azure'] = {'error': 'Azure access failed'}
return jsonify({
'cloud_resources': resources,
'note': 'Using proper cloud SDKs with authentication and authorization'
})
# SECURE: Restricted proxy with validation
@app.route('/proxy', methods=['GET'])
@rate_limit
def proxy_request():
"""SECURE: Restricted proxy with comprehensive validation"""
target_url = request.args.get('url')
if not target_url:
return jsonify({'error': 'URL required'}), 400
# SECURE: Validate URL
if not is_safe_url(target_url):
logger.warning(f"Blocked unsafe proxy request: {target_url}")
return jsonify({
'error': 'URL not allowed for proxying',
'blocked_url': target_url
}), 403
try:
# SECURE: Restricted proxy headers
safe_headers = {
'User-Agent': 'SecureProxy/1.0',
'Accept': 'application/json, text/plain',
# Never forward metadata-specific headers
}
response = requests.get(
target_url,
headers=safe_headers,
timeout=5,
allow_redirects=False
)
# SECURE: Filter response headers
safe_response_headers = {
key: value for key, value in response.headers.items()
if key.lower() in ['content-type', 'content-length', 'cache-control']
}
return jsonify({
'status_code': response.status_code,
'headers': safe_response_headers,
'content_preview': response.text[:500] + '...' if len(response.text) > 500 else response.text,
'proxied_from': urlparse(target_url).hostname # Only hostname, not full URL
})
except Exception as e:
logger.error(f"Proxy request failed: {e}")
return jsonify({'error': 'Proxy request failed'}), 500
# SECURE: Configuration processing with validation
@app.route('/process-config', methods=['POST'])
@rate_limit
def process_config():
"""SECURE: Process configuration with proper validation"""
try:
if not request.is_json:
return jsonify({'error': 'JSON content required'}), 400
config_data = request.get_json()
if not isinstance(config_data, dict):
return jsonify({'error': 'Invalid configuration format'}), 400
# SECURE: Process only local configuration
if 'config' in config_data:
# Process embedded configuration only
processed_config = {
'app_name': config_data.get('app_name', 'default'),
'environment': config_data.get('environment', 'production'),
'features': config_data.get('features', {}),
# Don't process URLs or external references
}
return jsonify({
'status': 'success',
'processed_config': processed_config,
'note': 'Only embedded configuration processed for security'
})
return jsonify({'error': 'No valid configuration provided'}), 400
except Exception as e:
logger.error(f"Configuration processing error: {e}")
return jsonify({'error': 'Configuration processing failed'}), 500
# SECURE: Health check endpoint
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
return jsonify({
'status': 'healthy',
'version': '1.0.0',
'security_features': [
'SSRF protection',
'Rate limiting',
'Input validation',
'Secure cloud SDK usage',
'Metadata service protection'
]
})
# SECURE: Error handler
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': 'Endpoint not found'}), 404
@app.errorhandler(500)
def internal_error(error):
logger.error(f"Internal server error: {error}")
return jsonify({'error': 'Internal server error'}), 500
if __name__ == '__main__':
# SECURE: Bind to localhost only
app.run(host='127.0.0.1', port=8080, debug=False) # Debug disabled
# Log security configuration
logger.info("Secure application started with:")
logger.info(f"- SSRF protection enabled")
logger.info(f"- Rate limiting: {RATE_LIMIT_REQUESTS} requests per {RATE_LIMIT_WINDOW}s")
logger.info(f"- Allowed domains: {len(ALLOWED_DOMAINS)} configured")
logger.info(f"- Blocked networks: {len(BLOCKED_NETWORKS)} configured")
# Example of secure usage:
#
# 1. Fetch from allowed domain:
# curl "http://localhost:8080/fetch-url?url=https://api.github.com/users/octocat"
#
# 2. Blocked SSRF attempt:
# curl "http://localhost:8080/fetch-url?url=http://169.254.169.254/latest/meta-data/"
# # Returns: {"error": "URL not allowed - potential SSRF detected"}
#
# 3. Get instance info securely:
# curl "http://localhost:8080/get-instance-info"
# # Uses proper AWS SDK, returns limited non-sensitive info
#
# 4. Access cloud resources properly:
# curl "http://localhost:8080/get-cloud-resources"
# # Uses proper cloud SDKs with authentication
#
# 5. Health check:
# curl "http://localhost:8080/health"
# # Returns security features enabled