# Vulnerable: SSRF via requests with user input
import requests
from django.http import JsonResponse
from django.views import View
from django.conf import settings
# Dangerous: Direct URL from user input
class WebFetchView(View):
def post(self, request):
url = request.POST.get('url', '')
try:
# CRITICAL: User controls URL for server request
response = requests.get(url)
return JsonResponse({
'status_code': response.status_code,
'content': response.text[:1000] # Limit content
})
except Exception as e:
return JsonResponse({'error': str(e)})
# Another dangerous pattern
def fetch_external_data(request):
api_url = request.GET.get('api_url', '')
endpoint = request.GET.get('endpoint', '')
# Dangerous: User controls both URL components
full_url = f"{api_url}/{endpoint}"
try:
response = requests.get(full_url, timeout=10)
return JsonResponse({'data': response.json()})
except Exception as e:
return JsonResponse({'error': str(e)})
# Webhook processing
def process_webhook(request):
webhook_url = request.POST.get('webhook_url', '')
payload = request.POST.get('payload', '')
# Dangerous: User-controlled webhook URL
try:
response = requests.post(webhook_url, data=payload)
return JsonResponse({'webhook_status': response.status_code})
except Exception as e:
return JsonResponse({'error': str(e)})
# Image processing from URL
def process_image_from_url(request):
image_url = request.POST.get('image_url', '')
# Dangerous: Fetch image from user URL
try:
response = requests.get(image_url)
if response.status_code == 200:
# Process image content
return JsonResponse({'status': 'Image processed', 'size': len(response.content)})
else:
return JsonResponse({'error': 'Failed to fetch image'})
except Exception as e:
return JsonResponse({'error': str(e)})
# API proxy functionality
def api_proxy(request):
target_url = request.GET.get('target', '')
method = request.GET.get('method', 'GET')
# Dangerous: User controls target URL and method
try:
if method.upper() == 'GET':
response = requests.get(target_url)
elif method.upper() == 'POST':
response = requests.post(target_url, data=request.POST)
return JsonResponse({
'status': response.status_code,
'headers': dict(response.headers),
'content': response.text
})
except Exception as e:
return JsonResponse({'error': str(e)})
# URL validation check
def check_url_status(request):
check_url = request.GET.get('url', '')
# Dangerous: No validation of target URL
try:
response = requests.head(check_url, timeout=5)
return JsonResponse({
'url': check_url,
'status': response.status_code,
'accessible': True
})
except requests.RequestException:
return JsonResponse({
'url': check_url,
'accessible': False
})
# RSS feed fetcher
def fetch_rss_feed(request):
feed_url = request.POST.get('feed_url', '')
# Dangerous: User-controlled RSS URL
try:
response = requests.get(feed_url)
if 'xml' in response.headers.get('content-type', ''):
return JsonResponse({'feed_content': response.text})
else:
return JsonResponse({'error': 'Invalid RSS feed'})
except Exception as e:
return JsonResponse({'error': str(e)})
# Secure: Safe URL handling with SSRF protection
import requests
from django.http import JsonResponse
from django.views import View
from django.conf import settings
from django.core.exceptions import ValidationError
from urllib.parse import urlparse
import socket
import ipaddress
import re
# Safe: URL validation and SSRF protection
class SafeWebFetchView(View):
def post(self, request):
url = request.POST.get('url', '')
try:
# Validate URL
validated_url = self.validate_url(url)
# Fetch content safely
content = self.fetch_url_safely(validated_url)
return JsonResponse({
'status': 'success',
'content_length': len(content),
'content_preview': content[:200]
})
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_url(self, url):
# Basic URL validation
if not url or len(url) > 500:
raise ValidationError('Invalid URL length')
# Parse URL
try:
parsed = urlparse(url)
except Exception:
raise ValidationError('Invalid URL format')
# Validate scheme
if parsed.scheme not in ['http', 'https']:
raise ValidationError('Only HTTP and HTTPS protocols are allowed')
# Validate hostname
if not parsed.hostname:
raise ValidationError('URL must have a valid hostname')
# Check against allowlist
if not self.is_allowed_domain(parsed.hostname):
raise ValidationError('Domain not allowed')
# Prevent SSRF to internal networks
self.check_ssrf_protection(parsed.hostname)
return url
def is_allowed_domain(self, hostname):
# Define allowed domains
allowed_domains = [
'api.example.com',
'feeds.example.com',
'cdn.example.com',
'public-api.service.com'
]
# Check exact match or subdomain
for domain in allowed_domains:
if hostname == domain or hostname.endswith('.' + domain):
return True
return False
def check_ssrf_protection(self, hostname):
try:
# Resolve hostname to IP
ip = socket.gethostbyname(hostname)
ip_obj = ipaddress.ip_address(ip)
# Block private/internal networks
if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local:
raise ValidationError('Requests to internal networks are not allowed')
# Block specific dangerous IPs
dangerous_ranges = [
ipaddress.ip_network('169.254.0.0/16'), # AWS metadata
ipaddress.ip_network('10.0.0.0/8'), # Private class A
ipaddress.ip_network('172.16.0.0/12'), # Private class B
ipaddress.ip_network('192.168.0.0/16'), # Private class C
]
for network in dangerous_ranges:
if ip_obj in network:
raise ValidationError('IP address not allowed')
except socket.gaierror:
raise ValidationError('Unable to resolve hostname')
except ValueError:
raise ValidationError('Invalid IP address')
def fetch_url_safely(self, url):
try:
# Configure safe request
response = requests.get(
url,
timeout=10,
allow_redirects=False, # Prevent redirect SSRF
headers={'User-Agent': 'SafeApp/1.0'},
stream=True
)
# Check response size
content_length = response.headers.get('content-length')
if content_length and int(content_length) > 1024 * 1024: # 1MB limit
raise ValidationError('Response too large')
# Read content with size limit
content = response.content[:1024 * 1024] # 1MB limit
return content.decode('utf-8', errors='ignore')
except requests.RequestException as e:
raise ValidationError(f'Request failed: {str(e)}')
# Safe: External API integration
def safe_fetch_external_data(request):
api_name = request.GET.get('api', '')
endpoint = request.GET.get('endpoint', '')
try:
# Validate inputs
validated_request = validate_api_request(api_name, endpoint)
# Fetch data safely
data = fetch_api_data_safely(validated_request)
return JsonResponse({'data': data})
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_api_request(api_name, endpoint):
# Validate API name
allowed_apis = {
'weather': 'https://api.weather.com',
'news': 'https://api.news.com',
'finance': 'https://api.finance.com'
}
if api_name not in allowed_apis:
raise ValidationError('API not allowed')
# Validate endpoint
if not endpoint or len(endpoint) > 100:
raise ValidationError('Invalid endpoint')
# Only allow safe characters in endpoint
if not re.match(r'^[a-zA-Z0-9/_.-]+$', endpoint):
raise ValidationError('Endpoint contains invalid characters')
# Prevent traversal
if '..' in endpoint or endpoint.startswith('/'):
raise ValidationError('Invalid endpoint format')
return {
'base_url': allowed_apis[api_name],
'endpoint': endpoint
}
def fetch_api_data_safely(request_data):
url = f"{request_data['base_url']}/{request_data['endpoint']}"
try:
response = requests.get(
url,
timeout=15,
headers={
'User-Agent': 'SafeApp/1.0',
'Accept': 'application/json'
}
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
raise ValidationError(f'API request failed: {str(e)}')
except ValueError:
raise ValidationError('Invalid JSON response')
# Safe: Webhook processing with validation
def safe_process_webhook(request):
webhook_id = request.POST.get('webhook_id', '')
payload = request.POST.get('payload', '')
try:
# Validate webhook
webhook_config = validate_webhook_request(webhook_id, payload)
# Send webhook safely
send_webhook_safely(webhook_config)
return JsonResponse({'status': 'Webhook sent successfully'})
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_webhook_request(webhook_id, payload):
# Validate webhook ID
if not webhook_id.isdigit():
raise ValidationError('Invalid webhook ID')
# Get webhook configuration from database
from .models import WebhookConfiguration
try:
webhook = WebhookConfiguration.objects.get(id=int(webhook_id))
except WebhookConfiguration.DoesNotExist:
raise ValidationError('Webhook not found')
# Validate payload
if not payload or len(payload) > 10000:
raise ValidationError('Invalid payload size')
try:
import json
json.loads(payload) # Validate JSON
except json.JSONDecodeError:
raise ValidationError('Payload must be valid JSON')
return {
'url': webhook.url,
'payload': payload,
'secret': webhook.secret
}
def send_webhook_safely(webhook_config):
# Validate webhook URL (should be done when webhook is created)
parsed_url = urlparse(webhook_config['url'])
if parsed_url.scheme not in ['https']: # Only HTTPS for webhooks
raise ValidationError('Webhook URL must use HTTPS')
# Prepare headers
headers = {
'Content-Type': 'application/json',
'User-Agent': 'SafeApp-Webhook/1.0'
}
# Add signature if secret is provided
if webhook_config.get('secret'):
import hmac
import hashlib
signature = hmac.new(
webhook_config['secret'].encode(),
webhook_config['payload'].encode(),
hashlib.sha256
).hexdigest()
headers['X-Signature-SHA256'] = f'sha256={signature}'
try:
response = requests.post(
webhook_config['url'],
data=webhook_config['payload'],
headers=headers,
timeout=30,
allow_redirects=False
)
if response.status_code not in [200, 201, 202]:
raise ValidationError(f'Webhook failed with status {response.status_code}')
except requests.RequestException as e:
raise ValidationError(f'Webhook delivery failed: {str(e)}')
# Safe: URL status checker
def safe_check_url_status(request):
url_id = request.GET.get('url_id', '')
try:
# Validate URL ID and get URL from database
validated_url = validate_url_check_request(url_id)
# Check URL status safely
status = check_url_status_safely(validated_url)
return JsonResponse(status)
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_url_check_request(url_id):
if not url_id.isdigit():
raise ValidationError('Invalid URL ID')
# Get URL from database (pre-validated URLs)
from .models import MonitoredURL
try:
url_obj = MonitoredURL.objects.get(id=int(url_id))
return url_obj.url
except MonitoredURL.DoesNotExist:
raise ValidationError('URL not found')
def check_url_status_safely(url):
try:
response = requests.head(
url,
timeout=10,
allow_redirects=True,
headers={'User-Agent': 'SafeApp-Monitor/1.0'}
)
return {
'url': url,
'status_code': response.status_code,
'accessible': True,
'response_time': response.elapsed.total_seconds()
}
except requests.RequestException:
return {
'url': url,
'accessible': False,
'error': 'Request failed'
}