Flask Host Header Injection Vulnerability

Medium Risk Host Header Injection
flaskpythonhost-headerinjectioncache-poisoningssrf

What it is

The Flask application trusts and uses the Host header from HTTP requests without proper validation, creating Host header injection vulnerabilities. Attackers can manipulate the Host header to perform cache poisoning attacks, password reset poisoning, Server-Side Request Forgery (SSRF), or bypass security controls that rely on hostname validation, potentially leading to unauthorized access, phishing attacks, or internal network reconnaissance.

# Vulnerable: Host header injection in Flask from flask import Flask, request, redirect, url_for, jsonify from urllib.parse import urljoin app = Flask(__name__) # Dangerous: Using Host header without validation @app.route('/reset-password') def reset_password(): email = request.args.get('email', '') # CRITICAL: Host header used in password reset link reset_link = f"https://{request.host}/reset?token=abc123" # Send email with reset link (vulnerable to host header injection) send_password_reset_email(email, reset_link) return jsonify({'message': 'Password reset email sent'}) # Another dangerous pattern @app.route('/api/callback') def generate_callback(): callback_path = request.args.get('path', '/callback') # Dangerous: Host header in callback URL callback_url = f"https://{request.host}{callback_path}" return jsonify({'callback_url': callback_url}) # OAuth redirect with Host header @app.route('/oauth/login') def oauth_login(): provider = request.args.get('provider', 'google') # Dangerous: Host header in OAuth redirect URI redirect_uri = f"https://{request.host}/oauth/callback" oauth_url = f"https://oauth.{provider}.com/auth?redirect_uri={redirect_uri}" return redirect(oauth_url) # File download with Host header @app.route('/download') def download_file(): file_id = request.args.get('file_id', '') # Dangerous: Host header in download URL download_url = f"https://{request.host}/files/{file_id}" return jsonify({'download_url': download_url}) # API base URL generation @app.route('/api/info') def api_info(): # Dangerous: Host header in API base URL api_base = f"https://{request.host}/api" return jsonify({ 'api_base': api_base, 'endpoints': [ f"{api_base}/users", f"{api_base}/posts", f"{api_base}/files" ] }) # Asset URL generation @app.route('/assets') def get_asset_urls(): # Dangerous: Host header in asset URLs base_url = f"https://{request.host}" assets = { 'css': f"{base_url}/static/css/style.css", 'js': f"{base_url}/static/js/app.js", 'images': f"{base_url}/static/images/" } return jsonify(assets) # Webhook URL generation @app.route('/webhooks/create') def create_webhook(): webhook_id = request.args.get('id', '') # Dangerous: Host header in webhook URL webhook_url = f"https://{request.host}/webhooks/{webhook_id}" return jsonify({'webhook_url': webhook_url}) # Cache key generation using Host header @app.route('/cached-data') def get_cached_data(): data_type = request.args.get('type', '') # Dangerous: Host header in cache key cache_key = f"{request.host}:{data_type}" # This could lead to cache poisoning cached_data = get_from_cache(cache_key) if not cached_data: cached_data = generate_data(data_type) set_cache(cache_key, cached_data) return jsonify(cached_data) # Email link generation @app.route('/share') def share_content(): content_id = request.args.get('id', '') # Dangerous: Host header in share URL share_url = f"https://{request.host}/content/{content_id}" return jsonify({'share_url': share_url}) # Subdomain detection @app.route('/subdomain-info') def subdomain_info(): # Dangerous: Trusting Host header for subdomain logic host = request.host subdomain = host.split('.')[0] if '.' in host else None return jsonify({ 'host': host, 'subdomain': subdomain, 'is_admin': subdomain == 'admin' # Dangerous security check })
# Secure: Safe Host header handling in Flask from flask import Flask, request, redirect, url_for, jsonify from urllib.parse import urljoin, urlparse import re app = Flask(__name__) # Safe: Configuration with allowed hosts app.config['ALLOWED_HOSTS'] = [ 'myapp.com', 'www.myapp.com', 'api.myapp.com', 'localhost', '127.0.0.1' ] app.config['BASE_URL'] = 'https://myapp.com' app.config['API_BASE_URL'] = 'https://api.myapp.com' # Safe: Host header validation def validate_host_header(): """Validate the Host header against allowed hosts""" host = request.host allowed_hosts = app.config.get('ALLOWED_HOSTS', []) # Check if host is in allowed list if host not in allowed_hosts: # Check for subdomain matches for allowed_host in allowed_hosts: if allowed_host.startswith('.') and host.endswith(allowed_host): return True app.logger.warning(f'Invalid Host header: {host}') return False return True def get_safe_base_url(): """Get base URL from configuration, not Host header""" return app.config.get('BASE_URL', 'https://myapp.com') def get_safe_api_base_url(): """Get API base URL from configuration""" return app.config.get('API_BASE_URL', 'https://api.myapp.com') # Safe: Password reset with validated host @app.route('/reset-password') def safe_reset_password(): email = request.args.get('email', '') try: # Validate email validated_email = validate_email(email) # Safe: Use configured base URL base_url = get_safe_base_url() # Generate secure reset token reset_token = generate_secure_token(validated_email) reset_link = urljoin(base_url, f'/reset?token={reset_token}') # Send email with validated reset link send_password_reset_email(validated_email, reset_link) return jsonify({'message': 'Password reset email sent'}) except ValueError as e: return jsonify({'error': str(e)}), 400 def validate_email(email): if not email or len(email) > 254: raise ValueError('Invalid email length') # Basic email validation email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if not re.match(email_pattern, email): raise ValueError('Invalid email format') return email def generate_secure_token(email): import secrets import hashlib import time # Generate secure token token_data = f"{email}:{time.time()}:{secrets.token_hex(16)}" return hashlib.sha256(token_data.encode()).hexdigest() # Safe: Callback URL generation @app.route('/api/callback') def safe_generate_callback(): callback_type = request.args.get('type', '') try: # Validate callback type validated_type = validate_callback_type(callback_type) # Safe: Use configured base URL base_url = get_safe_api_base_url() callback_path = f'/callbacks/{validated_type}' callback_url = urljoin(base_url, callback_path) return jsonify({'callback_url': callback_url}) except ValueError as e: return jsonify({'error': str(e)}), 400 def validate_callback_type(callback_type): allowed_types = ['payment', 'webhook', 'oauth', 'notification'] if callback_type not in allowed_types: raise ValueError('Invalid callback type') return callback_type # Safe: OAuth login with validation @app.route('/oauth/login') def safe_oauth_login(): provider = request.args.get('provider', '') try: # Validate provider validated_provider = validate_oauth_provider(provider) # Safe: Use configured redirect URI base_url = get_safe_base_url() redirect_uri = urljoin(base_url, '/oauth/callback') oauth_url = generate_oauth_url(validated_provider, redirect_uri) return redirect(oauth_url) except ValueError as e: return jsonify({'error': str(e)}), 400 def validate_oauth_provider(provider): allowed_providers = ['google', 'github', 'facebook'] if provider not in allowed_providers: raise ValueError('OAuth provider not supported') return provider def generate_oauth_url(provider, redirect_uri): oauth_endpoints = { 'google': 'https://accounts.google.com/oauth/authorize', 'github': 'https://github.com/login/oauth/authorize', 'facebook': 'https://www.facebook.com/v12.0/dialog/oauth' } from urllib.parse import urlencode params = { 'client_id': app.config.get(f'{provider.upper()}_CLIENT_ID'), 'redirect_uri': redirect_uri, 'response_type': 'code', 'scope': 'read' } query_string = urlencode(params) return f"{oauth_endpoints[provider]}?{query_string}" # Safe: File download with validation @app.route('/download') def safe_download_file(): file_id = request.args.get('file_id', '') try: # Validate file ID validated_file_id = validate_file_id(file_id) # Check file access permissions if not check_file_access(validated_file_id, request.user if hasattr(request, 'user') else None): return jsonify({'error': 'Access denied'}), 403 # Safe: Use configured base URL for download base_url = get_safe_base_url() download_url = urljoin(base_url, f'/files/{validated_file_id}') return jsonify({'download_url': download_url}) except ValueError as e: return jsonify({'error': str(e)}), 400 def validate_file_id(file_id): if not file_id or not file_id.isalnum(): raise ValueError('Invalid file ID format') if len(file_id) > 32: raise ValueError('File ID too long') return file_id def check_file_access(file_id, user): # Implement file access check logic # This would typically check database for file ownership/permissions return True # Placeholder # Safe: API info with configured URLs @app.route('/api/info') def safe_api_info(): # Safe: Use configured API base URL api_base = get_safe_api_base_url() endpoints = { 'users': urljoin(api_base, '/users'), 'posts': urljoin(api_base, '/posts'), 'files': urljoin(api_base, '/files') } return jsonify({ 'api_base': api_base, 'endpoints': endpoints, 'version': '1.0', 'documentation': urljoin(api_base, '/docs') }) # Safe: Asset URLs with CDN configuration @app.route('/assets') def safe_get_asset_urls(): # Safe: Use configured CDN or static URL static_base = app.config.get('CDN_BASE_URL', get_safe_base_url() + '/static') assets = { 'css': urljoin(static_base, '/css/style.css'), 'js': urljoin(static_base, '/js/app.js'), 'images': urljoin(static_base, '/images/'), 'fonts': urljoin(static_base, '/fonts/') } return jsonify(assets) # Safe: Webhook creation with validation @app.route('/webhooks/create') def safe_create_webhook(): webhook_data = request.json try: # Validate webhook data validated_webhook = validate_webhook_data(webhook_data) # Generate webhook ID webhook_id = generate_webhook_id() # Safe: Use configured base URL api_base = get_safe_api_base_url() webhook_url = urljoin(api_base, f'/webhooks/{webhook_id}') # Save webhook configuration save_webhook_config(webhook_id, validated_webhook, webhook_url) return jsonify({ 'webhook_id': webhook_id, 'webhook_url': webhook_url }) except ValueError as e: return jsonify({'error': str(e)}), 400 def validate_webhook_data(data): if not data or not isinstance(data, dict): raise ValueError('Invalid webhook data') # Validate required fields required_fields = ['events', 'secret'] for field in required_fields: if field not in data: raise ValueError(f'Missing required field: {field}') # Validate events allowed_events = ['user.created', 'user.updated', 'order.created', 'payment.completed'] events = data['events'] if not isinstance(events, list) or not events: raise ValueError('Events must be a non-empty list') for event in events: if event not in allowed_events: raise ValueError(f'Invalid event: {event}') # Validate secret secret = data['secret'] if not secret or len(secret) < 16: raise ValueError('Secret must be at least 16 characters') return { 'events': events, 'secret': secret } def generate_webhook_id(): import uuid return str(uuid.uuid4()) def save_webhook_config(webhook_id, webhook_data, webhook_url): # Save webhook configuration to database # This would typically involve database operations pass # Safe: Cache with validated keys @app.route('/cached-data') def safe_get_cached_data(): data_type = request.args.get('type', '') try: # Validate data type validated_type = validate_cache_data_type(data_type) # Safe: Use application name in cache key, not Host header app_name = app.config.get('APP_NAME', 'myapp') cache_key = f"{app_name}:data:{validated_type}" cached_data = get_from_cache(cache_key) if not cached_data: cached_data = generate_cache_data(validated_type) set_cache(cache_key, cached_data) return jsonify(cached_data) except ValueError as e: return jsonify({'error': str(e)}), 400 def validate_cache_data_type(data_type): allowed_types = ['user_stats', 'system_info', 'popular_content'] if data_type not in allowed_types: raise ValueError('Invalid data type') return data_type def generate_cache_data(data_type): # Generate data based on type if data_type == 'user_stats': return {'total_users': 1000, 'active_users': 150} elif data_type == 'system_info': return {'status': 'healthy', 'version': '1.0.0'} elif data_type == 'popular_content': return {'top_posts': [1, 2, 3, 4, 5]} def get_from_cache(key): # Cache implementation return None def set_cache(key, data): # Cache implementation pass # Middleware for Host header validation @app.before_request def validate_host(): # Skip validation for certain endpoints skip_validation = ['/health', '/status'] if request.path in skip_validation: return if not validate_host_header(): return jsonify({'error': 'Invalid Host header'}), 400 if __name__ == '__main__': app.run(debug=False)

💡 Why This Fix Works

See fix suggestions for detailed explanation.

Why it happens

Flask views directly access Host header: host = request.host or request.headers.get('Host'). Attackers manipulate Host header in HTTP requests. Used in URL construction for redirects or emails, enabling cache poisoning, password reset poisoning, SSRF, or web cache deception attacks.

Root causes

Using request.host or request.headers['Host'] Without Validation

Flask views directly access Host header: host = request.host or request.headers.get('Host'). Attackers manipulate Host header in HTTP requests. Used in URL construction for redirects or emails, enabling cache poisoning, password reset poisoning, SSRF, or web cache deception attacks.

Constructing URLs with request.host_url or request.url_root

Views build URLs using Flask's request.host_url or request.url_root: reset_link = request.host_url + 'reset/' + token. These derive from Host header. Attacker-controlled domain in password reset emails sends tokens to malicious sites. Affects URL generation in templates and API responses.

Not Configuring SERVER_NAME in Flask Application

Missing SERVER_NAME configuration allows Flask to trust Host header: app.config['SERVER_NAME'] not set. Without SERVER_NAME, request.host returns client-supplied header value. Application accepts any Host value, enabling host header injection for all URL generation and redirect operations.

Using X-Forwarded-Host Header Without Validation

Behind proxies, applications trust X-Forwarded-Host: host = request.headers.get('X-Forwarded-Host', request.host). If proxy misconfigured or absent, attackers set X-Forwarded-Host directly. ProxyFix middleware without proper num_proxies configuration accepts forged headers, enabling injection through multiple header vectors.

Building Redirects with Unvalidated Host Headers

Redirect logic uses Host header: return redirect(f'http://{request.host}/dashboard'). Attackers control destination domain through Host manipulation. Open redirect vulnerability combined with host injection enables phishing. After_request handlers constructing Location headers inherit same vulnerability through request.host usage.

Fixes

1

Configure SERVER_NAME in Flask Application Settings

Set explicit SERVER_NAME: app.config['SERVER_NAME'] = 'example.com:443'. Flask validates Host header against SERVER_NAME, rejecting mismatches. Use environment-specific configuration: os.environ.get('SERVER_NAME', 'localhost:5000'). Makes request.host return trusted configured value instead of header value.

2

Validate Host Header Against Allowlist of Permitted Domains

Implement before_request validation: ALLOWED_HOSTS = ['example.com', 'www.example.com']; if request.host not in ALLOWED_HOSTS: abort(400). Check both request.host and X-Forwarded-Host if behind proxy. Reject requests with invalid hosts early in request lifecycle before processing.

3

Use url_for() with _external=True Instead of request.host

Generate URLs with url_for('endpoint', _external=True) which uses SERVER_NAME: reset_link = url_for('reset_password', token=token, _external=True). Flask constructs URL from configuration, not headers. Ensures consistent trusted domain in generated URLs, emails, and redirects.

4

Configure ProxyFix with Correct num_proxies Parameter

If behind proxy, use ProxyFix properly: from werkzeug.middleware.proxy_fix import ProxyFix; app.wsgi_app = ProxyFix(app.wsgi_app, x_host=1, x_for=1). Set num_proxies or x_host to number of trusted proxies. Prevents accepting forged X-Forwarded headers from clients.

5

Use Relative URLs Instead of Absolute URLs Where Possible

For redirects and links, prefer relative paths: return redirect('/dashboard') instead of building absolute URLs with host. Browser handles domain resolution. For external URLs, use hardcoded trusted domain: SITE_URL = 'https://example.com'; return redirect(f'{SITE_URL}/dashboard').

6

Implement Middleware to Normalize and Validate Host Header

Create middleware validating Host header: @app.before_request validate_host(). Check format, charset, and domain. Set request.environ['HTTP_HOST'] to trusted value. Log suspicious Host headers for monitoring. Combine with rate limiting on invalid hosts to detect scanning attempts.

Detect This Vulnerability in Your Code

Sourcery automatically identifies flask host header injection vulnerability and many other security issues in your codebase.