import requests
from flask import request
@app.route('/fetch_data')
def fetch_data():
# Vulnerable: HTTP request exposes data
response = requests.get('http://api.example.com/data')
return response.json()
@app.route('/submit_payment')
def submit_payment():
# Very dangerous: Payment data over HTTP
payment_data = {
'card_number': request.form.get('card'),
'amount': request.form.get('amount')
}
# Extremely vulnerable: Financial data unencrypted
response = requests.post(
'http://payment.service.com/process',
json=payment_data
)
return response.json()
@app.route('/webhook')
def process_webhook():
# Vulnerable: HTTP webhook notification
webhook_data = request.get_json()
# Insecure: Webhook callback over HTTP
callback_url = webhook_data.get('callback_url')
response = requests.post(
callback_url, # Could be HTTP
json={'status': 'processed'}
)
return 'OK'
# Vulnerable: Configuration and health checks
def check_service_health():
services = [
'http://api.internal.com/health',
'http://db.internal.com/status',
'http://cache.internal.com/ping'
]
results = {}
for service in services:
try:
# Insecure: Internal communications over HTTP
response = requests.get(service, timeout=5)
results[service] = response.status_code == 200
except:
results[service] = False
return results
import requests
from flask import request
from urllib.parse import urlparse
import os
# Secure configuration
API_BASE_URL = os.getenv('API_BASE_URL', 'https://api.example.com')
PAYMENT_SERVICE_URL = os.getenv('PAYMENT_SERVICE_URL', 'https://payment.secure.com')
def validate_https_url(url):
"""Validate that URL uses HTTPS scheme."""
if not url:
raise ValueError('URL cannot be empty')
parsed = urlparse(url)
if parsed.scheme != 'https':
raise ValueError(f'Only HTTPS URLs are allowed, got: {parsed.scheme}')
return url
def make_secure_request(method, url, **kwargs):
"""Make a secure HTTP request with proper validation."""
# Validate URL scheme
validate_https_url(url)
# Set secure defaults
kwargs.setdefault('timeout', 10)
kwargs.setdefault('verify', True) # SSL verification
kwargs.setdefault('headers', {}).update({
'User-Agent': 'SecureApp/1.0'
})
try:
response = requests.request(method, url, **kwargs)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
raise RuntimeError(f'Secure request failed: {str(e)}')
@app.route('/fetch_data')
def fetch_data():
"""Securely fetch data from API."""
try:
api_url = f'{API_BASE_URL}/data'
response = make_secure_request('GET', api_url)
return response.json()
except (ValueError, RuntimeError) as e:
return {'error': str(e)}, 500
@app.route('/submit_payment')
def submit_payment():
"""Securely process payment data."""
# Validate input
card_number = request.form.get('card', '')
amount = request.form.get('amount', '')
if not card_number or not amount:
return {'error': 'Missing payment information'}, 400
# Basic validation
if not amount.replace('.', '').isdigit():
return {'error': 'Invalid amount'}, 400
payment_data = {
'card_number': card_number[:20], # Limit length
'amount': float(amount)
}
try:
# Secure: HTTPS payment processing
payment_url = f'{PAYMENT_SERVICE_URL}/process'
response = make_secure_request(
'POST',
payment_url,
json=payment_data,
timeout=30 # Longer timeout for payment processing
)
return response.json()
except (ValueError, RuntimeError) as e:
return {'error': f'Payment processing failed: {str(e)}'}, 500
@app.route('/webhook')
def process_webhook():
"""Securely process webhook with URL validation."""
webhook_data = request.get_json()
if not webhook_data:
return {'error': 'No webhook data provided'}, 400
callback_url = webhook_data.get('callback_url', '')
# Validate callback URL
try:
validate_https_url(callback_url)
except ValueError as e:
return {'error': f'Invalid callback URL: {str(e)}'}, 400
# Allowlist of permitted callback domains
allowed_domains = [
'webhook.trusted.com',
'callbacks.partner.com',
'api.secure-client.com'
]
parsed_url = urlparse(callback_url)
if parsed_url.netloc not in allowed_domains:
return {'error': 'Callback domain not allowed'}, 403
try:
# Secure webhook callback
response = make_secure_request(
'POST',
callback_url,
json={'status': 'processed', 'timestamp': '2024-01-01T00:00:00Z'},
timeout=15
)
return {'status': 'callback_sent', 'response_code': response.status_code}
except (ValueError, RuntimeError) as e:
return {'error': f'Callback failed: {str(e)}'}, 500
# Secure service health checks
def check_service_health():
"""Perform secure health checks on internal services."""
# Use HTTPS for all internal communications
services = {
'api': 'https://api.internal.secure/health',
'database': 'https://db.internal.secure/status',
'cache': 'https://cache.internal.secure/ping'
}
results = {}
for service_name, service_url in services.items():
try:
# Secure internal communication
response = make_secure_request(
'GET',
service_url,
timeout=5,
verify=True # Verify internal certificates
)
results[service_name] = {
'healthy': response.status_code == 200,
'response_time': response.elapsed.total_seconds()
}
except (ValueError, RuntimeError) as e:
results[service_name] = {
'healthy': False,
'error': str(e)
}
return results
@app.route('/health')
def health_check():
"""Endpoint for secure health checking."""
try:
health_results = check_service_health()
overall_health = all(result['healthy'] for result in health_results.values())
return {
'status': 'healthy' if overall_health else 'unhealthy',
'services': health_results
}
except Exception as e:
return {'status': 'error', 'message': str(e)}, 500
# Additional security: Custom requests session
class SecureSession(requests.Session):
"""Custom session that enforces HTTPS."""
def request(self, method, url, **kwargs):
# Validate HTTPS before making request
validate_https_url(url)
# Set secure defaults
kwargs.setdefault('verify', True)
kwargs.setdefault('timeout', 10)
return super().request(method, url, **kwargs)
# Usage example
secure_session = SecureSession()
@app.route('/secure_api_call')
def secure_api_call():
"""Example using secure session."""
try:
response = secure_session.get('https://api.example.com/secure-endpoint')
return response.json()
except (ValueError, requests.exceptions.RequestException) as e:
return {'error': str(e)}, 500