# Vulnerable: Host header injection in Flask
from flask import Flask, request, redirect, url_for, jsonify
from urllib.parse import urljoin
app = Flask(__name__)
# Dangerous: Using Host header without validation
@app.route('/reset-password')
def reset_password():
email = request.args.get('email', '')
# CRITICAL: Host header used in password reset link
reset_link = f"https://{request.host}/reset?token=abc123"
# Send email with reset link (vulnerable to host header injection)
send_password_reset_email(email, reset_link)
return jsonify({'message': 'Password reset email sent'})
# Another dangerous pattern
@app.route('/api/callback')
def generate_callback():
callback_path = request.args.get('path', '/callback')
# Dangerous: Host header in callback URL
callback_url = f"https://{request.host}{callback_path}"
return jsonify({'callback_url': callback_url})
# OAuth redirect with Host header
@app.route('/oauth/login')
def oauth_login():
provider = request.args.get('provider', 'google')
# Dangerous: Host header in OAuth redirect URI
redirect_uri = f"https://{request.host}/oauth/callback"
oauth_url = f"https://oauth.{provider}.com/auth?redirect_uri={redirect_uri}"
return redirect(oauth_url)
# File download with Host header
@app.route('/download')
def download_file():
file_id = request.args.get('file_id', '')
# Dangerous: Host header in download URL
download_url = f"https://{request.host}/files/{file_id}"
return jsonify({'download_url': download_url})
# API base URL generation
@app.route('/api/info')
def api_info():
# Dangerous: Host header in API base URL
api_base = f"https://{request.host}/api"
return jsonify({
'api_base': api_base,
'endpoints': [
f"{api_base}/users",
f"{api_base}/posts",
f"{api_base}/files"
]
})
# Asset URL generation
@app.route('/assets')
def get_asset_urls():
# Dangerous: Host header in asset URLs
base_url = f"https://{request.host}"
assets = {
'css': f"{base_url}/static/css/style.css",
'js': f"{base_url}/static/js/app.js",
'images': f"{base_url}/static/images/"
}
return jsonify(assets)
# Webhook URL generation
@app.route('/webhooks/create')
def create_webhook():
webhook_id = request.args.get('id', '')
# Dangerous: Host header in webhook URL
webhook_url = f"https://{request.host}/webhooks/{webhook_id}"
return jsonify({'webhook_url': webhook_url})
# Cache key generation using Host header
@app.route('/cached-data')
def get_cached_data():
data_type = request.args.get('type', '')
# Dangerous: Host header in cache key
cache_key = f"{request.host}:{data_type}"
# This could lead to cache poisoning
cached_data = get_from_cache(cache_key)
if not cached_data:
cached_data = generate_data(data_type)
set_cache(cache_key, cached_data)
return jsonify(cached_data)
# Email link generation
@app.route('/share')
def share_content():
content_id = request.args.get('id', '')
# Dangerous: Host header in share URL
share_url = f"https://{request.host}/content/{content_id}"
return jsonify({'share_url': share_url})
# Subdomain detection
@app.route('/subdomain-info')
def subdomain_info():
# Dangerous: Trusting Host header for subdomain logic
host = request.host
subdomain = host.split('.')[0] if '.' in host else None
return jsonify({
'host': host,
'subdomain': subdomain,
'is_admin': subdomain == 'admin' # Dangerous security check
})
# Secure: Safe Host header handling in Flask
from flask import Flask, request, redirect, url_for, jsonify
from urllib.parse import urljoin, urlparse
import re
app = Flask(__name__)
# Safe: Configuration with allowed hosts
app.config['ALLOWED_HOSTS'] = [
'myapp.com',
'www.myapp.com',
'api.myapp.com',
'localhost',
'127.0.0.1'
]
app.config['BASE_URL'] = 'https://myapp.com'
app.config['API_BASE_URL'] = 'https://api.myapp.com'
# Safe: Host header validation
def validate_host_header():
"""Validate the Host header against allowed hosts"""
host = request.host
allowed_hosts = app.config.get('ALLOWED_HOSTS', [])
# Check if host is in allowed list
if host not in allowed_hosts:
# Check for subdomain matches
for allowed_host in allowed_hosts:
if allowed_host.startswith('.') and host.endswith(allowed_host):
return True
app.logger.warning(f'Invalid Host header: {host}')
return False
return True
def get_safe_base_url():
"""Get base URL from configuration, not Host header"""
return app.config.get('BASE_URL', 'https://myapp.com')
def get_safe_api_base_url():
"""Get API base URL from configuration"""
return app.config.get('API_BASE_URL', 'https://api.myapp.com')
# Safe: Password reset with validated host
@app.route('/reset-password')
def safe_reset_password():
email = request.args.get('email', '')
try:
# Validate email
validated_email = validate_email(email)
# Safe: Use configured base URL
base_url = get_safe_base_url()
# Generate secure reset token
reset_token = generate_secure_token(validated_email)
reset_link = urljoin(base_url, f'/reset?token={reset_token}')
# Send email with validated reset link
send_password_reset_email(validated_email, reset_link)
return jsonify({'message': 'Password reset email sent'})
except ValueError as e:
return jsonify({'error': str(e)}), 400
def validate_email(email):
if not email or len(email) > 254:
raise ValueError('Invalid email length')
# Basic email validation
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email):
raise ValueError('Invalid email format')
return email
def generate_secure_token(email):
import secrets
import hashlib
import time
# Generate secure token
token_data = f"{email}:{time.time()}:{secrets.token_hex(16)}"
return hashlib.sha256(token_data.encode()).hexdigest()
# Safe: Callback URL generation
@app.route('/api/callback')
def safe_generate_callback():
callback_type = request.args.get('type', '')
try:
# Validate callback type
validated_type = validate_callback_type(callback_type)
# Safe: Use configured base URL
base_url = get_safe_api_base_url()
callback_path = f'/callbacks/{validated_type}'
callback_url = urljoin(base_url, callback_path)
return jsonify({'callback_url': callback_url})
except ValueError as e:
return jsonify({'error': str(e)}), 400
def validate_callback_type(callback_type):
allowed_types = ['payment', 'webhook', 'oauth', 'notification']
if callback_type not in allowed_types:
raise ValueError('Invalid callback type')
return callback_type
# Safe: OAuth login with validation
@app.route('/oauth/login')
def safe_oauth_login():
provider = request.args.get('provider', '')
try:
# Validate provider
validated_provider = validate_oauth_provider(provider)
# Safe: Use configured redirect URI
base_url = get_safe_base_url()
redirect_uri = urljoin(base_url, '/oauth/callback')
oauth_url = generate_oauth_url(validated_provider, redirect_uri)
return redirect(oauth_url)
except ValueError as e:
return jsonify({'error': str(e)}), 400
def validate_oauth_provider(provider):
allowed_providers = ['google', 'github', 'facebook']
if provider not in allowed_providers:
raise ValueError('OAuth provider not supported')
return provider
def generate_oauth_url(provider, redirect_uri):
oauth_endpoints = {
'google': 'https://accounts.google.com/oauth/authorize',
'github': 'https://github.com/login/oauth/authorize',
'facebook': 'https://www.facebook.com/v12.0/dialog/oauth'
}
from urllib.parse import urlencode
params = {
'client_id': app.config.get(f'{provider.upper()}_CLIENT_ID'),
'redirect_uri': redirect_uri,
'response_type': 'code',
'scope': 'read'
}
query_string = urlencode(params)
return f"{oauth_endpoints[provider]}?{query_string}"
# Safe: File download with validation
@app.route('/download')
def safe_download_file():
file_id = request.args.get('file_id', '')
try:
# Validate file ID
validated_file_id = validate_file_id(file_id)
# Check file access permissions
if not check_file_access(validated_file_id, request.user if hasattr(request, 'user') else None):
return jsonify({'error': 'Access denied'}), 403
# Safe: Use configured base URL for download
base_url = get_safe_base_url()
download_url = urljoin(base_url, f'/files/{validated_file_id}')
return jsonify({'download_url': download_url})
except ValueError as e:
return jsonify({'error': str(e)}), 400
def validate_file_id(file_id):
if not file_id or not file_id.isalnum():
raise ValueError('Invalid file ID format')
if len(file_id) > 32:
raise ValueError('File ID too long')
return file_id
def check_file_access(file_id, user):
# Implement file access check logic
# This would typically check database for file ownership/permissions
return True # Placeholder
# Safe: API info with configured URLs
@app.route('/api/info')
def safe_api_info():
# Safe: Use configured API base URL
api_base = get_safe_api_base_url()
endpoints = {
'users': urljoin(api_base, '/users'),
'posts': urljoin(api_base, '/posts'),
'files': urljoin(api_base, '/files')
}
return jsonify({
'api_base': api_base,
'endpoints': endpoints,
'version': '1.0',
'documentation': urljoin(api_base, '/docs')
})
# Safe: Asset URLs with CDN configuration
@app.route('/assets')
def safe_get_asset_urls():
# Safe: Use configured CDN or static URL
static_base = app.config.get('CDN_BASE_URL', get_safe_base_url() + '/static')
assets = {
'css': urljoin(static_base, '/css/style.css'),
'js': urljoin(static_base, '/js/app.js'),
'images': urljoin(static_base, '/images/'),
'fonts': urljoin(static_base, '/fonts/')
}
return jsonify(assets)
# Safe: Webhook creation with validation
@app.route('/webhooks/create')
def safe_create_webhook():
webhook_data = request.json
try:
# Validate webhook data
validated_webhook = validate_webhook_data(webhook_data)
# Generate webhook ID
webhook_id = generate_webhook_id()
# Safe: Use configured base URL
api_base = get_safe_api_base_url()
webhook_url = urljoin(api_base, f'/webhooks/{webhook_id}')
# Save webhook configuration
save_webhook_config(webhook_id, validated_webhook, webhook_url)
return jsonify({
'webhook_id': webhook_id,
'webhook_url': webhook_url
})
except ValueError as e:
return jsonify({'error': str(e)}), 400
def validate_webhook_data(data):
if not data or not isinstance(data, dict):
raise ValueError('Invalid webhook data')
# Validate required fields
required_fields = ['events', 'secret']
for field in required_fields:
if field not in data:
raise ValueError(f'Missing required field: {field}')
# Validate events
allowed_events = ['user.created', 'user.updated', 'order.created', 'payment.completed']
events = data['events']
if not isinstance(events, list) or not events:
raise ValueError('Events must be a non-empty list')
for event in events:
if event not in allowed_events:
raise ValueError(f'Invalid event: {event}')
# Validate secret
secret = data['secret']
if not secret or len(secret) < 16:
raise ValueError('Secret must be at least 16 characters')
return {
'events': events,
'secret': secret
}
def generate_webhook_id():
import uuid
return str(uuid.uuid4())
def save_webhook_config(webhook_id, webhook_data, webhook_url):
# Save webhook configuration to database
# This would typically involve database operations
pass
# Safe: Cache with validated keys
@app.route('/cached-data')
def safe_get_cached_data():
data_type = request.args.get('type', '')
try:
# Validate data type
validated_type = validate_cache_data_type(data_type)
# Safe: Use application name in cache key, not Host header
app_name = app.config.get('APP_NAME', 'myapp')
cache_key = f"{app_name}:data:{validated_type}"
cached_data = get_from_cache(cache_key)
if not cached_data:
cached_data = generate_cache_data(validated_type)
set_cache(cache_key, cached_data)
return jsonify(cached_data)
except ValueError as e:
return jsonify({'error': str(e)}), 400
def validate_cache_data_type(data_type):
allowed_types = ['user_stats', 'system_info', 'popular_content']
if data_type not in allowed_types:
raise ValueError('Invalid data type')
return data_type
def generate_cache_data(data_type):
# Generate data based on type
if data_type == 'user_stats':
return {'total_users': 1000, 'active_users': 150}
elif data_type == 'system_info':
return {'status': 'healthy', 'version': '1.0.0'}
elif data_type == 'popular_content':
return {'top_posts': [1, 2, 3, 4, 5]}
def get_from_cache(key):
# Cache implementation
return None
def set_cache(key, data):
# Cache implementation
pass
# Middleware for Host header validation
@app.before_request
def validate_host():
# Skip validation for certain endpoints
skip_validation = ['/health', '/status']
if request.path in skip_validation:
return
if not validate_host_header():
return jsonify({'error': 'Invalid Host header'}), 400
if __name__ == '__main__':
app.run(debug=False)