Django Server-Side Request Forgery (SSRF) via requests

High Risk Server-Side Request Forgery (SSRF)
djangopythonssrfrequestsurl-injectioninternal-access

What it is

The Django application uses the requests library with user-controlled URLs without proper validation, creating Server-Side Request Forgery (SSRF) vulnerabilities. Attackers can manipulate URLs to make the server perform HTTP requests to internal resources, cloud metadata services, or arbitrary external systems, potentially accessing sensitive information, performing internal network reconnaissance, or bypassing security controls.

# Vulnerable: SSRF via requests with user input import requests from django.http import JsonResponse from django.views import View from django.conf import settings # Dangerous: Direct URL from user input class WebFetchView(View): def post(self, request): url = request.POST.get('url', '') try: # CRITICAL: User controls URL for server request response = requests.get(url) return JsonResponse({ 'status_code': response.status_code, 'content': response.text[:1000] # Limit content }) except Exception as e: return JsonResponse({'error': str(e)}) # Another dangerous pattern def fetch_external_data(request): api_url = request.GET.get('api_url', '') endpoint = request.GET.get('endpoint', '') # Dangerous: User controls both URL components full_url = f"{api_url}/{endpoint}" try: response = requests.get(full_url, timeout=10) return JsonResponse({'data': response.json()}) except Exception as e: return JsonResponse({'error': str(e)}) # Webhook processing def process_webhook(request): webhook_url = request.POST.get('webhook_url', '') payload = request.POST.get('payload', '') # Dangerous: User-controlled webhook URL try: response = requests.post(webhook_url, data=payload) return JsonResponse({'webhook_status': response.status_code}) except Exception as e: return JsonResponse({'error': str(e)}) # Image processing from URL def process_image_from_url(request): image_url = request.POST.get('image_url', '') # Dangerous: Fetch image from user URL try: response = requests.get(image_url) if response.status_code == 200: # Process image content return JsonResponse({'status': 'Image processed', 'size': len(response.content)}) else: return JsonResponse({'error': 'Failed to fetch image'}) except Exception as e: return JsonResponse({'error': str(e)}) # API proxy functionality def api_proxy(request): target_url = request.GET.get('target', '') method = request.GET.get('method', 'GET') # Dangerous: User controls target URL and method try: if method.upper() == 'GET': response = requests.get(target_url) elif method.upper() == 'POST': response = requests.post(target_url, data=request.POST) return JsonResponse({ 'status': response.status_code, 'headers': dict(response.headers), 'content': response.text }) except Exception as e: return JsonResponse({'error': str(e)}) # URL validation check def check_url_status(request): check_url = request.GET.get('url', '') # Dangerous: No validation of target URL try: response = requests.head(check_url, timeout=5) return JsonResponse({ 'url': check_url, 'status': response.status_code, 'accessible': True }) except requests.RequestException: return JsonResponse({ 'url': check_url, 'accessible': False }) # RSS feed fetcher def fetch_rss_feed(request): feed_url = request.POST.get('feed_url', '') # Dangerous: User-controlled RSS URL try: response = requests.get(feed_url) if 'xml' in response.headers.get('content-type', ''): return JsonResponse({'feed_content': response.text}) else: return JsonResponse({'error': 'Invalid RSS feed'}) except Exception as e: return JsonResponse({'error': str(e)})
# Secure: Safe URL handling with SSRF protection import requests from django.http import JsonResponse from django.views import View from django.conf import settings from django.core.exceptions import ValidationError from urllib.parse import urlparse import socket import ipaddress import re # Safe: URL validation and SSRF protection class SafeWebFetchView(View): def post(self, request): url = request.POST.get('url', '') try: # Validate URL validated_url = self.validate_url(url) # Fetch content safely content = self.fetch_url_safely(validated_url) return JsonResponse({ 'status': 'success', 'content_length': len(content), 'content_preview': content[:200] }) except ValidationError as e: return JsonResponse({'error': str(e)}, status=400) def validate_url(self, url): # Basic URL validation if not url or len(url) > 500: raise ValidationError('Invalid URL length') # Parse URL try: parsed = urlparse(url) except Exception: raise ValidationError('Invalid URL format') # Validate scheme if parsed.scheme not in ['http', 'https']: raise ValidationError('Only HTTP and HTTPS protocols are allowed') # Validate hostname if not parsed.hostname: raise ValidationError('URL must have a valid hostname') # Check against allowlist if not self.is_allowed_domain(parsed.hostname): raise ValidationError('Domain not allowed') # Prevent SSRF to internal networks self.check_ssrf_protection(parsed.hostname) return url def is_allowed_domain(self, hostname): # Define allowed domains allowed_domains = [ 'api.example.com', 'feeds.example.com', 'cdn.example.com', 'public-api.service.com' ] # Check exact match or subdomain for domain in allowed_domains: if hostname == domain or hostname.endswith('.' + domain): return True return False def check_ssrf_protection(self, hostname): try: # Resolve hostname to IP ip = socket.gethostbyname(hostname) ip_obj = ipaddress.ip_address(ip) # Block private/internal networks if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local: raise ValidationError('Requests to internal networks are not allowed') # Block specific dangerous IPs dangerous_ranges = [ ipaddress.ip_network('169.254.0.0/16'), # AWS metadata ipaddress.ip_network('10.0.0.0/8'), # Private class A ipaddress.ip_network('172.16.0.0/12'), # Private class B ipaddress.ip_network('192.168.0.0/16'), # Private class C ] for network in dangerous_ranges: if ip_obj in network: raise ValidationError('IP address not allowed') except socket.gaierror: raise ValidationError('Unable to resolve hostname') except ValueError: raise ValidationError('Invalid IP address') def fetch_url_safely(self, url): try: # Configure safe request response = requests.get( url, timeout=10, allow_redirects=False, # Prevent redirect SSRF headers={'User-Agent': 'SafeApp/1.0'}, stream=True ) # Check response size content_length = response.headers.get('content-length') if content_length and int(content_length) > 1024 * 1024: # 1MB limit raise ValidationError('Response too large') # Read content with size limit content = response.content[:1024 * 1024] # 1MB limit return content.decode('utf-8', errors='ignore') except requests.RequestException as e: raise ValidationError(f'Request failed: {str(e)}') # Safe: External API integration def safe_fetch_external_data(request): api_name = request.GET.get('api', '') endpoint = request.GET.get('endpoint', '') try: # Validate inputs validated_request = validate_api_request(api_name, endpoint) # Fetch data safely data = fetch_api_data_safely(validated_request) return JsonResponse({'data': data}) except ValidationError as e: return JsonResponse({'error': str(e)}, status=400) def validate_api_request(api_name, endpoint): # Validate API name allowed_apis = { 'weather': 'https://api.weather.com', 'news': 'https://api.news.com', 'finance': 'https://api.finance.com' } if api_name not in allowed_apis: raise ValidationError('API not allowed') # Validate endpoint if not endpoint or len(endpoint) > 100: raise ValidationError('Invalid endpoint') # Only allow safe characters in endpoint if not re.match(r'^[a-zA-Z0-9/_.-]+$', endpoint): raise ValidationError('Endpoint contains invalid characters') # Prevent traversal if '..' in endpoint or endpoint.startswith('/'): raise ValidationError('Invalid endpoint format') return { 'base_url': allowed_apis[api_name], 'endpoint': endpoint } def fetch_api_data_safely(request_data): url = f"{request_data['base_url']}/{request_data['endpoint']}" try: response = requests.get( url, timeout=15, headers={ 'User-Agent': 'SafeApp/1.0', 'Accept': 'application/json' } ) response.raise_for_status() return response.json() except requests.RequestException as e: raise ValidationError(f'API request failed: {str(e)}') except ValueError: raise ValidationError('Invalid JSON response') # Safe: Webhook processing with validation def safe_process_webhook(request): webhook_id = request.POST.get('webhook_id', '') payload = request.POST.get('payload', '') try: # Validate webhook webhook_config = validate_webhook_request(webhook_id, payload) # Send webhook safely send_webhook_safely(webhook_config) return JsonResponse({'status': 'Webhook sent successfully'}) except ValidationError as e: return JsonResponse({'error': str(e)}, status=400) def validate_webhook_request(webhook_id, payload): # Validate webhook ID if not webhook_id.isdigit(): raise ValidationError('Invalid webhook ID') # Get webhook configuration from database from .models import WebhookConfiguration try: webhook = WebhookConfiguration.objects.get(id=int(webhook_id)) except WebhookConfiguration.DoesNotExist: raise ValidationError('Webhook not found') # Validate payload if not payload or len(payload) > 10000: raise ValidationError('Invalid payload size') try: import json json.loads(payload) # Validate JSON except json.JSONDecodeError: raise ValidationError('Payload must be valid JSON') return { 'url': webhook.url, 'payload': payload, 'secret': webhook.secret } def send_webhook_safely(webhook_config): # Validate webhook URL (should be done when webhook is created) parsed_url = urlparse(webhook_config['url']) if parsed_url.scheme not in ['https']: # Only HTTPS for webhooks raise ValidationError('Webhook URL must use HTTPS') # Prepare headers headers = { 'Content-Type': 'application/json', 'User-Agent': 'SafeApp-Webhook/1.0' } # Add signature if secret is provided if webhook_config.get('secret'): import hmac import hashlib signature = hmac.new( webhook_config['secret'].encode(), webhook_config['payload'].encode(), hashlib.sha256 ).hexdigest() headers['X-Signature-SHA256'] = f'sha256={signature}' try: response = requests.post( webhook_config['url'], data=webhook_config['payload'], headers=headers, timeout=30, allow_redirects=False ) if response.status_code not in [200, 201, 202]: raise ValidationError(f'Webhook failed with status {response.status_code}') except requests.RequestException as e: raise ValidationError(f'Webhook delivery failed: {str(e)}') # Safe: URL status checker def safe_check_url_status(request): url_id = request.GET.get('url_id', '') try: # Validate URL ID and get URL from database validated_url = validate_url_check_request(url_id) # Check URL status safely status = check_url_status_safely(validated_url) return JsonResponse(status) except ValidationError as e: return JsonResponse({'error': str(e)}, status=400) def validate_url_check_request(url_id): if not url_id.isdigit(): raise ValidationError('Invalid URL ID') # Get URL from database (pre-validated URLs) from .models import MonitoredURL try: url_obj = MonitoredURL.objects.get(id=int(url_id)) return url_obj.url except MonitoredURL.DoesNotExist: raise ValidationError('URL not found') def check_url_status_safely(url): try: response = requests.head( url, timeout=10, allow_redirects=True, headers={'User-Agent': 'SafeApp-Monitor/1.0'} ) return { 'url': url, 'status_code': response.status_code, 'accessible': True, 'response_time': response.elapsed.total_seconds() } except requests.RequestException: return { 'url': url, 'accessible': False, 'error': 'Request failed' }

💡 Why This Fix Works

See fix suggestions for detailed explanation.

Why it happens

Views pass request parameters to requests: url = request.GET['url']; requests.get(url). Attackers control destination, enabling requests to internal services, cloud metadata (169.254.169.254), or localhost bypassing firewalls through server-side requests.

Root causes

Using User Input Directly in requests.get() URL Parameter

Views pass request parameters to requests: url = request.GET['url']; requests.get(url). Attackers control destination, enabling requests to internal services, cloud metadata (169.254.169.254), or localhost bypassing firewalls through server-side requests.

Missing URL Validation and Allowlisting Before HTTP Requests

Applications make requests without validating URLs. No scheme, domain, or IP checks allow SSRF to internal networks, file:// schemes, or cloud APIs. Requests.get(user_url) without validation enables arbitrary destinations.

Insufficient URL Parsing and Sanitization

Weak validation like checking startswith('http') fails against redirects, IP encoding (0x7f000001), or DNS rebinding. URL parser bypasses through @, #, or Unicode enable SSRF past simple string checks.

Trusting User-Provided URLs for Webhooks or Callbacks

Webhook endpoints accept callback URLs from users: requests.post(callback_url, data=event). Attackers specify internal endpoints, enabling data exfiltration or triggering actions on internal services through SSRF.

Following Redirects Without Validation

Using requests with allow_redirects=True (default) follows attacker-controlled redirects. Initial safe URL redirects to internal service. No validation of redirect target enables SSRF through redirect chain bypass.

Fixes

1

Validate and Allowlist URLs Before Making Requests

Parse URLs and check against allowlists: parsed = urlparse(url); if parsed.netloc not in ALLOWED_HOSTS: raise Error. Validate scheme in ['http', 'https'], reject localhost/private IPs before requests.get().

2

Block Private IP Ranges and Cloud Metadata Endpoints

Check IP after DNS resolution: ip = socket.gethostbyname(hostname); if ipaddress.ip_address(ip).is_private: raise Error. Block 127.0.0.0/8, 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16, 169.254.169.254 (AWS metadata).

3

Disable Redirects or Validate Redirect Targets

Set allow_redirects=False in requests: requests.get(url, allow_redirects=False). If redirects needed, validate each Location header before following. Check redirect target against same allowlist as original URL.

4

Use Indirect References Instead of Direct URLs

Map IDs to URLs: WEBHOOK_URLS = {1: 'https://api.example.com'}; url = WEBHOOK_URLS.get(id). Users provide IDs, server controls destinations. Store validated URLs in database, never accept raw URLs.

5

Implement Request Timeouts and Network Restrictions

Set short timeouts: requests.get(url, timeout=5). Use network policies blocking egress to internal networks. Configure firewall rules preventing application server from accessing internal services or cloud metadata.

6

Validate URL Schemes and Reject Dangerous Protocols

Only allow http/https: if scheme not in ['http', 'https']: raise Error. Block file://, ftp://, gopher://, dict:// schemes. Check before DNS resolution to prevent protocol-based SSRF attacks.

Detect This Vulnerability in Your Code

Sourcery automatically identifies django server-side request forgery (ssrf) via requests and many other security issues in your codebase.