import requests
from flask import Flask, request
@app.route('/api_request')
def api_request():
# Vulnerable: API key sent over HTTP
api_key = 'sk-1234567890abcdef'
url = 'http://api.service.com/data' # HTTP, not HTTPS
headers = {
'Authorization': f'Bearer {api_key}' # Credentials over HTTP
}
response = requests.get(url, headers=headers)
return response.json()
@app.route('/login', methods=['POST'])
def login():
# Vulnerable: Basic auth over HTTP
username = request.form.get('username')
password = request.form.get('password')
auth_url = 'http://auth.service.com/login' # Insecure
# Credentials transmitted in plaintext
response = requests.post(
auth_url,
auth=(username, password), # Basic auth over HTTP
data={'action': 'login'}
)
return response.json()
@app.route('/oauth_callback')
def oauth_callback():
# Vulnerable: OAuth token over HTTP
access_token = request.args.get('access_token')
# Token sent to HTTP endpoint
profile_url = 'http://api.social.com/me'
headers = {'Authorization': f'Bearer {access_token}'}
response = requests.get(profile_url, headers=headers)
return response.json()
import requests
from urllib.parse import urlparse
from flask import Flask, request
import os
def validate_https_url(url):
"""Ensure URL uses HTTPS scheme."""
parsed = urlparse(url)
if parsed.scheme != 'https':
raise ValueError(f'Only HTTPS URLs allowed for authenticated requests, got: {parsed.scheme}')
return url
def create_secure_auth_headers(token, token_type='Bearer'):
"""Create secure authentication headers."""
if not token:
raise ValueError('Token cannot be empty')
return {
'Authorization': f'{token_type} {token}',
'User-Agent': 'SecureApp/1.0',
'Accept': 'application/json'
}
@app.route('/api_request')
def api_request():
"""Secure API request with HTTPS validation."""
try:
# Secure: API key from environment
api_key = os.getenv('API_KEY')
if not api_key:
return {'error': 'API key not configured'}, 500
# Secure: HTTPS URL
url = 'https://api.service.com/data'
validate_https_url(url)
# Secure headers
headers = create_secure_auth_headers(api_key)
response = requests.get(
url,
headers=headers,
timeout=10,
verify=True # Ensure certificate validation
)
response.raise_for_status()
return response.json()
except ValueError as e:
return {'error': str(e)}, 400
except requests.exceptions.RequestException as e:
return {'error': f'API request failed: {str(e)}'}, 500
@app.route('/login', methods=['POST'])
def login():
"""Secure login with HTTPS enforcement."""
try:
username = request.form.get('username', '')
password = request.form.get('password', '')
if not username or not password:
return {'error': 'Username and password required'}, 400
# Secure: HTTPS authentication endpoint
auth_url = 'https://auth.service.com/login'
validate_https_url(auth_url)
# Secure: Basic auth over HTTPS only
response = requests.post(
auth_url,
auth=(username, password),
data={'action': 'login'},
timeout=15,
verify=True,
headers={
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': 'SecureApp/1.0'
}
)
response.raise_for_status()
return response.json()
except ValueError as e:
return {'error': str(e)}, 400
except requests.exceptions.RequestException as e:
return {'error': f'Authentication failed: {str(e)}'}, 500
@app.route('/oauth_callback')
def oauth_callback():
"""Secure OAuth callback with token validation."""
try:
access_token = request.args.get('access_token', '')
if not access_token:
return {'error': 'Access token required'}, 400
# Validate token format (basic check)
if len(access_token) < 20 or not access_token.replace('-', '').replace('_', '').isalnum():
return {'error': 'Invalid token format'}, 400
# Secure: HTTPS profile endpoint
profile_url = 'https://api.social.com/me'
validate_https_url(profile_url)
headers = create_secure_auth_headers(access_token)
response = requests.get(
profile_url,
headers=headers,
timeout=10,
verify=True
)
response.raise_for_status()
return response.json()
except ValueError as e:
return {'error': str(e)}, 400
except requests.exceptions.RequestException as e:
return {'error': f'Profile request failed: {str(e)}'}, 500
# Secure session configuration
class SecureAuthSession(requests.Session):
"""Session that enforces HTTPS for authenticated requests."""
def request(self, method, url, **kwargs):
# Check if request includes authentication
has_auth = (
'Authorization' in kwargs.get('headers', {}) or
'auth' in kwargs or
'cookies' in kwargs
)
if has_auth:
validate_https_url(url)
# Ensure certificate validation
kwargs.setdefault('verify', True)
kwargs.setdefault('timeout', 10)
return super().request(method, url, **kwargs)
@app.route('/secure_session_example')
def secure_session_example():
"""Example using secure session."""
try:
session = SecureAuthSession()
# This will enforce HTTPS due to Authorization header
response = session.get(
'https://api.example.com/protected',
headers={'Authorization': 'Bearer token123'}
)
return {'status': response.status_code}
except ValueError as e:
return {'error': str(e)}, 400
except requests.exceptions.RequestException as e:
return {'error': str(e)}, 500
# Environment-based configuration
@app.route('/config_example')
def config_example():
"""Example with environment-based secure configuration."""
# Load configuration from environment
api_base_url = os.getenv('API_BASE_URL', 'https://api.example.com')
api_key = os.getenv('API_KEY')
if not api_key:
return {'error': 'API not configured'}, 500
try:
validate_https_url(api_base_url)
url = f'{api_base_url}/secure-endpoint'
headers = create_secure_auth_headers(api_key)
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
return response.json()
except ValueError as e:
return {'error': str(e)}, 400
except requests.exceptions.RequestException as e:
return {'error': str(e)}, 500