import requests
from urllib.parse import urlparse
import ssl
class SecureAPIClient:
def __init__(self, base_url=None):
self.session = self._create_secure_session()
self.base_url = self._validate_base_url(base_url)
def _create_secure_session(self):
"""Create a session with security configurations."""
session = requests.Session()
# Enforce SSL/TLS verification
session.verify = True
# Set secure headers
session.headers.update({
'User-Agent': 'SecureClient/1.0',
'Accept': 'application/json',
'X-Requested-With': 'SecureAPIClient'
})
# Mount adapter that blocks HTTP
session.mount('http://', HTTPSOnlyAdapter())
return session
def _validate_base_url(self, url):
"""Validate that base URL uses HTTPS."""
if not url:
return 'https://api.service.com' # Default secure URL
parsed = urlparse(url)
if parsed.scheme != 'https':
raise ValueError('Only HTTPS base URLs are allowed')
return url
def fetch_user_data(self, user_id):
"""Securely fetch user data."""
if not isinstance(user_id, (int, str)) or not str(user_id).isalnum():
raise ValueError('Invalid user ID format')
url = f'{self.base_url}/users/{user_id}'
try:
response = self.session.get(
url,
timeout=10,
verify=True
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise RuntimeError(f'Failed to fetch user data: {str(e)}')
def authenticate(self, username, password):
"""Securely authenticate user."""
# Validate input
if not username or not password:
raise ValueError('Username and password required')
# Use secure authentication endpoint
auth_url = f'{self.base_url}/auth/login'
auth_data = {
'username': str(username)[:50], # Limit length
'password': str(password)[:100]
}
try:
response = self.session.post(
auth_url,
json=auth_data,
timeout=15,
verify=True
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise RuntimeError(f'Authentication failed: {str(e)}')
class HTTPSOnlyAdapter(requests.adapters.HTTPAdapter):
"""Adapter that blocks HTTP connections."""
def send(self, request, **kwargs):
if request.url.startswith('http://'):
raise requests.exceptions.InvalidURL(
'HTTP connections are not allowed. Use HTTPS instead.'
)
return super().send(request, **kwargs)
@app.route('/api_proxy/
')
def api_proxy(endpoint):
"""Secure API proxy with validation."""
# Validate endpoint parameter
if not endpoint or '..' in endpoint or endpoint.startswith('/'):
return {'error': 'Invalid endpoint'}, 400
# Allowlist of permitted endpoints
allowed_endpoints = ['users', 'products', 'orders', 'status']
base_endpoint = endpoint.split('/')[0]
if base_endpoint not in allowed_endpoints:
return {'error': 'Endpoint not allowed'}, 403
try:
client = SecureAPIClient('https://internal-api.secure.local')
# Make secure request
url = f'{client.base_url}/{endpoint}'
response = client.session.get(
url,
timeout=10,
verify=True
)
response.raise_for_status()
return response.json()
except (ValueError, RuntimeError) as e:
return {'error': str(e)}, 400
except requests.exceptions.RequestException as e:
return {'error': f'Request failed: {str(e)}'}, 500
def secure_bulk_api_calls(endpoints, base_url='https://api.example.com'):
"""Perform bulk API calls securely."""
# Validate base URL
parsed = urlparse(base_url)
if parsed.scheme != 'https':
raise ValueError('Only HTTPS base URLs allowed')
# Limit number of endpoints to prevent abuse
if len(endpoints) > 10:
raise ValueError('Too many endpoints requested')
client = SecureAPIClient(base_url)
results = []
for endpoint in endpoints:
# Validate each endpoint
if not isinstance(endpoint, str) or '..' in endpoint:
results.append({'error': f'Invalid endpoint: {endpoint}'})
continue
try:
url = f'{base_url}/{endpoint}'
response = client.session.get(
url,
timeout=5,
verify=True
)
response.raise_for_status()
results.append(response.json())
except requests.exceptions.RequestException as e:
results.append({'error': f'Request failed for {endpoint}: {str(e)}'})
return results
# Additional security: Connection pool configuration
def create_enterprise_session():
"""Create session with enterprise security settings."""
session = requests.Session()
# SSL/TLS configuration
session.verify = True
# Connection pool settings
adapter = requests.adapters.HTTPAdapter(
pool_connections=10,
pool_maxsize=20,
max_retries=3
)
# Mount only for HTTPS
session.mount('https://', adapter)
# Block HTTP entirely
session.mount('http://', HTTPSOnlyAdapter())
return session