import urllib.request
import urllib2 # Python 2.x
from flask import request
@app.route('/fetch_url')
def fetch_url():
# Vulnerable: HTTP URL with urlopen
url = 'http://api.example.com/data'
response = urllib.request.urlopen(url)
return response.read().decode()
@app.route('/proxy')
def proxy_request():
# Vulnerable: User-controlled URL without validation
target_url = request.args.get('url')
try:
# Dangerous: No scheme validation
response = urllib.request.urlopen(target_url)
return response.read()
except:
return 'Error fetching URL', 500
# Vulnerable: Python 2.x style
def legacy_fetch(url):
# Insecure: urllib2 with HTTP
response = urllib2.urlopen(url) # No HTTPS enforcement
return response.read()
import urllib.request
import urllib.parse
import ssl
from flask import request
def validate_https_url(url):
"""Validate that URL uses HTTPS scheme."""
parsed = urllib.parse.urlparse(url)
if parsed.scheme != 'https':
raise ValueError(f'Only HTTPS URLs allowed, got: {parsed.scheme}')
return parsed
def create_secure_context():
"""Create secure SSL context."""
context = ssl.create_default_context()
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED
return context
@app.route('/fetch_url')
def fetch_url():
"""Securely fetch URL with HTTPS."""
try:
# Secure: HTTPS URL
url = 'https://api.example.com/data'
validate_https_url(url)
# Secure context with certificate verification
context = create_secure_context()
response = urllib.request.urlopen(
url,
timeout=10,
context=context
)
return response.read().decode('utf-8')
except (ValueError, urllib.error.URLError) as e:
return f'Error: {str(e)}', 500
@app.route('/proxy')
def proxy_request():
"""Secure proxy with URL validation."""
target_url = request.args.get('url', '')
if not target_url:
return 'URL parameter required', 400
try:
# Validate HTTPS scheme
parsed_url = validate_https_url(target_url)
# Allowlist of permitted domains
allowed_domains = [
'api.trusted.com',
'secure.partner.com',
'data.example.com'
]
if parsed_url.netloc not in allowed_domains:
return 'Domain not allowed', 403
# Secure request with SSL verification
context = create_secure_context()
response = urllib.request.urlopen(
target_url,
timeout=15,
context=context
)
# Limit response size
content = response.read(1024 * 1024) # 1MB limit
return content.decode('utf-8', errors='ignore')
except ValueError as e:
return f'Invalid URL: {str(e)}', 400
except urllib.error.URLError as e:
return f'Request failed: {str(e)}', 500
# Secure helper function for HTTP requests
def secure_urlopen(url, timeout=10, max_size=1024*1024):
"""Secure wrapper for urllib.request.urlopen."""
# Validate URL
validate_https_url(url)
# Create secure SSL context
context = create_secure_context()
try:
response = urllib.request.urlopen(
url,
timeout=timeout,
context=context
)
# Read with size limit
content = response.read(max_size)
return content
except urllib.error.URLError as e:
raise RuntimeError(f'Secure request failed: {str(e)}')
@app.route('/secure_fetch')
def secure_fetch():
"""Example using secure urlopen wrapper."""
try:
content = secure_urlopen('https://api.example.com/secure-data')
return content.decode('utf-8')
except (ValueError, RuntimeError) as e:
return f'Error: {str(e)}', 500
# Additional security: Custom URL opener
def create_secure_opener():
"""Create URL opener with security configurations."""
# Create HTTPS handler with SSL context
context = create_secure_context()
https_handler = urllib.request.HTTPSHandler(context=context)
# Block HTTP handler entirely
opener = urllib.request.build_opener(https_handler)
# Set secure headers
opener.addheaders = [
('User-Agent', 'SecureApp/1.0'),
('Accept', 'application/json, text/plain')
]
return opener
@app.route('/opener_example')
def opener_example():
"""Example using secure opener."""
try:
opener = create_secure_opener()
response = opener.open('https://api.example.com/data', timeout=10)
return response.read().decode('utf-8')
except urllib.error.URLError as e:
return f'Request failed: {str(e)}', 500