# Vulnerable: Format string injection in Flask API methods
from flask import Flask, request, jsonify
import datetime
import logging
app = Flask(__name__)
# Dangerous: API method with format string injection
@app.route('/api/users//profile', methods=['GET'])
def get_user_profile(user_id):
template = request.args.get('template', 'User: %s')
# CRITICAL: User controls format string
try:
formatted_response = template % user_id
return jsonify({'profile': formatted_response})
except Exception as e:
return jsonify({'error': str(e)})
# Another dangerous pattern in API
@app.route('/api/search', methods=['POST'])
def search_api():
query = request.json.get('query', '')
format_type = request.json.get('format', '%s')
# Dangerous: Format string from user input
try:
search_results = perform_search(query)
formatted_results = []
for result in search_results:
# User controls format for each result
formatted_result = format_type % result
formatted_results.append(formatted_result)
return jsonify({'results': formatted_results})
except Exception as e:
return jsonify({'error': str(e)})
# Log formatting in API
@app.route('/api/log', methods=['POST'])
def api_log_entry():
log_template = request.json.get('template', '')
log_data = request.json.get('data', {})
# Dangerous: User-controlled log template
try:
log_message = log_template % log_data
app.logger.info(log_message)
return jsonify({'status': 'logged', 'message': log_message})
except Exception as e:
return jsonify({'error': str(e)})
# Report generation API
@app.route('/api/reports/generate', methods=['POST'])
def generate_api_report():
report_template = request.json.get('template', '')
report_data = request.json.get('data', {})
# Dangerous: Format string in report generation
try:
report_content = report_template % report_data
return jsonify({
'report_id': 'rpt_123',
'content': report_content,
'generated_at': datetime.datetime.now().isoformat()
})
except Exception as e:
return jsonify({'error': str(e)})
# Message formatting API
@app.route('/api/messages/format', methods=['POST'])
def format_message_api():
message_template = request.json.get('template', '')
variables = request.json.get('variables', {})
# Dangerous: User controls message template
try:
formatted_message = message_template % variables
return jsonify({
'formatted_message': formatted_message,
'template_used': message_template
})
except Exception as e:
return jsonify({'error': str(e)})
# Error message API
@app.route('/api/errors/format', methods=['GET'])
def format_error_api():
error_template = request.args.get('template', '')
error_code = request.args.get('code', '')
error_details = request.args.get('details', '')
# Dangerous: Format string in error handling
try:
error_data = {
'code': error_code,
'details': error_details,
'timestamp': datetime.datetime.now()
}
formatted_error = error_template % error_data
return jsonify({'error_message': formatted_error})
except Exception as e:
return jsonify({'error': str(e)})
# Configuration API with formatting
@app.route('/api/config/display', methods=['POST'])
def display_config_api():
config_template = request.json.get('template', '')
config_section = request.json.get('section', 'general')
# Get configuration data
config_data = get_config_data(config_section)
# Dangerous: User-controlled config display template
try:
formatted_config = config_template % config_data
return jsonify({
'section': config_section,
'formatted_config': formatted_config
})
except Exception as e:
return jsonify({'error': str(e)})
# Notification API with format strings
@app.route('/api/notifications/send', methods=['POST'])
def send_notification_api():
notification_template = request.json.get('template', '')
recipient = request.json.get('recipient', '')
notification_data = request.json.get('data', {})
# Dangerous: Format string in notification
try:
# Add recipient to data
notification_data['recipient'] = recipient
formatted_notification = notification_template % notification_data
# Send notification (mock)
send_notification(recipient, formatted_notification)
return jsonify({
'status': 'sent',
'recipient': recipient,
'message': formatted_notification
})
except Exception as e:
return jsonify({'error': str(e)})
def perform_search(query):
# Mock search function
return ['result1', 'result2', 'result3']
def get_config_data(section):
# Mock config function
return {
'setting1': 'value1',
'setting2': 'value2',
'debug': True
}
def send_notification(recipient, message):
# Mock notification function
pass
# Secure: Safe string formatting in Flask API methods
from flask import Flask, request, jsonify
from marshmallow import Schema, fields, ValidationError as MarshmallowValidationError
import datetime
import logging
import re
from markupsafe import escape
app = Flask(__name__)
# Safe: Input validation schemas
class UserProfileSchema(Schema):
template_type = fields.Str(validate=lambda x: x in ['simple', 'detailed', 'minimal'])
class SearchSchema(Schema):
query = fields.Str(required=True, validate=lambda x: len(x) <= 100)
format_type = fields.Str(validate=lambda x: x in ['json', 'text', 'summary'])
class LogEntrySchema(Schema):
level = fields.Str(validate=lambda x: x in ['info', 'warning', 'error'])
message = fields.Str(required=True, validate=lambda x: len(x) <= 500)
context = fields.Dict(missing={})
# Safe: User profile API with validation
@app.route('/api/users//profile', methods=['GET'])
def safe_get_user_profile(user_id):
try:
# Validate user ID
validated_user_id = validate_user_id(user_id)
# Validate query parameters
schema = UserProfileSchema()
args = schema.load(request.args)
# Get profile data
profile_data = get_user_profile_data(validated_user_id)
# Format response safely
formatted_response = format_user_profile_safely(profile_data, args.get('template_type', 'simple'))
return jsonify(formatted_response)
except (ValueError, MarshmallowValidationError) as e:
return jsonify({'error': str(e)}), 400
def validate_user_id(user_id):
if not user_id or not user_id.isdigit():
raise ValueError('Invalid user ID format')
user_id_int = int(user_id)
if user_id_int <= 0 or user_id_int > 999999:
raise ValueError('User ID out of range')
return user_id_int
def get_user_profile_data(user_id):
# Mock function - would query database
return {
'id': user_id,
'username': f'user_{user_id}',
'email': f'user{user_id}@example.com',
'created_at': '2023-01-01T00:00:00Z'
}
def format_user_profile_safely(profile_data, template_type):
# Safe: Predefined response formats
if template_type == 'simple':
return {
'profile': {
'id': profile_data['id'],
'username': profile_data['username']
}
}
elif template_type == 'detailed':
return {
'profile': profile_data
}
elif template_type == 'minimal':
return {
'profile': {
'id': profile_data['id']
}
}
# Safe: Search API with validation
@app.route('/api/search', methods=['POST'])
def safe_search_api():
try:
# Validate input
schema = SearchSchema()
data = schema.load(request.json or {})
# Perform search
search_results = perform_safe_search(data['query'])
# Format results safely
formatted_results = format_search_results_safely(
search_results,
data.get('format_type', 'json')
)
return jsonify({
'results': formatted_results,
'total': len(search_results),
'query': data['query']
})
except MarshmallowValidationError as e:
return jsonify({'error': e.messages}), 400
def perform_safe_search(query):
# Safe: Sanitize query
clean_query = re.sub(r'[^a-zA-Z0-9\s-]', '', query)
# Mock search - would use database/search engine
mock_results = [
{'id': 1, 'title': 'Result 1', 'score': 0.95},
{'id': 2, 'title': 'Result 2', 'score': 0.87},
{'id': 3, 'title': 'Result 3', 'score': 0.76}
]
# Filter based on query (mock)
return mock_results
def format_search_results_safely(results, format_type):
# Safe: Structured formatting
if format_type == 'json':
return results
elif format_type == 'text':
return [f"#{result['id']}: {result['title']}" for result in results]
elif format_type == 'summary':
return {
'count': len(results),
'top_result': results[0] if results else None
}
# Safe: Logging API with validation
@app.route('/api/log', methods=['POST'])
def safe_api_log_entry():
try:
# Validate input
schema = LogEntrySchema()
data = schema.load(request.json or {})
# Create log entry safely
create_safe_log_entry(data)
return jsonify({'status': 'logged successfully'})
except MarshmallowValidationError as e:
return jsonify({'error': e.messages}), 400
def create_safe_log_entry(log_data):
level = log_data.get('level', 'info')
message = log_data['message']
context = log_data.get('context', {})
# Safe: Sanitize message
clean_message = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', message)
# Safe: Structured logging
log_entry = {
'timestamp': datetime.datetime.utcnow().isoformat(),
'level': level,
'message': clean_message,
'context': context,
'source': 'api'
}
# Use appropriate logging level
if level == 'info':
app.logger.info('API Log: %(message)s', log_entry)
elif level == 'warning':
app.logger.warning('API Log: %(message)s', log_entry)
elif level == 'error':
app.logger.error('API Log: %(message)s', log_entry)
# Safe: Report generation API
@app.route('/api/reports/generate', methods=['POST'])
def safe_generate_api_report():
try:
# Validate input
report_data = validate_report_request(request.json or {})
# Generate report safely
report = generate_safe_report(report_data)
return jsonify(report)
except ValueError as e:
return jsonify({'error': str(e)}), 400
def validate_report_request(data):
report_type = data.get('type', '')
parameters = data.get('parameters', {})
# Validate report type
allowed_types = ['user_activity', 'system_status', 'error_summary']
if report_type not in allowed_types:
raise ValueError('Invalid report type')
# Validate parameters based on type
if report_type == 'user_activity':
required_params = ['start_date', 'end_date']
for param in required_params:
if param not in parameters:
raise ValueError(f'Missing parameter: {param}')
# Validate dates
try:
datetime.datetime.fromisoformat(parameters['start_date'])
datetime.datetime.fromisoformat(parameters['end_date'])
except ValueError:
raise ValueError('Invalid date format')
return {
'type': report_type,
'parameters': parameters
}
def generate_safe_report(report_data):
report_type = report_data['type']
parameters = report_data['parameters']
# Safe: Predefined report templates
if report_type == 'user_activity':
return {
'report_id': f'rpt_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")}',
'type': 'user_activity',
'period': {
'start': parameters['start_date'],
'end': parameters['end_date']
},
'data': {
'total_users': 1000,
'active_users': 250,
'new_registrations': 15
},
'generated_at': datetime.datetime.utcnow().isoformat()
}
elif report_type == 'system_status':
return {
'report_id': f'rpt_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")}',
'type': 'system_status',
'data': {
'status': 'operational',
'uptime': '99.9%',
'response_time': '150ms'
},
'generated_at': datetime.datetime.utcnow().isoformat()
}
elif report_type == 'error_summary':
return {
'report_id': f'rpt_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")}',
'type': 'error_summary',
'data': {
'total_errors': 5,
'critical_errors': 0,
'error_rate': '0.01%'
},
'generated_at': datetime.datetime.utcnow().isoformat()
}
# Safe: Message formatting API
@app.route('/api/messages/format', methods=['POST'])
def safe_format_message_api():
try:
# Validate input
message_data = validate_message_request(request.json or {})
# Format message safely
formatted_message = format_message_safely(message_data)
return jsonify(formatted_message)
except ValueError as e:
return jsonify({'error': str(e)}), 400
def validate_message_request(data):
template_type = data.get('template_type', '')
variables = data.get('variables', {})
# Validate template type
allowed_templates = ['greeting', 'notification', 'reminder', 'confirmation']
if template_type not in allowed_templates:
raise ValueError('Invalid template type')
# Validate variables
if not isinstance(variables, dict):
raise ValueError('Variables must be a dictionary')
# Sanitize variable values
sanitized_variables = {}
for key, value in variables.items():
if isinstance(value, str) and len(value) <= 100:
sanitized_variables[key] = escape(value)
elif isinstance(value, (int, float, bool)):
sanitized_variables[key] = value
return {
'template_type': template_type,
'variables': sanitized_variables
}
def format_message_safely(message_data):
template_type = message_data['template_type']
variables = message_data['variables']
# Safe: Predefined message templates
templates = {
'greeting': 'Hello {name}, welcome to our service!',
'notification': 'You have {count} new notifications.',
'reminder': 'Don\'t forget about {event} on {date}.',
'confirmation': 'Your {action} has been completed successfully.'
}
template = templates[template_type]
try:
# Safe: Use .format() with validated variables
formatted_message = template.format(**variables)
return {
'template_type': template_type,
'formatted_message': formatted_message,
'variables_used': list(variables.keys())
}
except KeyError as e:
raise ValueError(f'Missing template variable: {e}')
# Safe: Error handling API
@app.route('/api/errors/format', methods=['GET'])
def safe_format_error_api():
try:
error_code = request.args.get('code', '')
# Validate error code
validated_code = validate_error_code(error_code)
# Get error information safely
error_info = get_safe_error_info(validated_code)
return jsonify(error_info)
except ValueError as e:
return jsonify({'error': str(e)}), 400
def validate_error_code(code):
if not code or not code.isdigit():
raise ValueError('Invalid error code format')
code_int = int(code)
if code_int < 100 or code_int > 999:
raise ValueError('Error code out of range')
return code_int
def get_safe_error_info(error_code):
# Safe: Predefined error messages
error_messages = {
400: {
'code': 400,
'message': 'Bad Request',
'description': 'The request was invalid or malformed'
},
401: {
'code': 401,
'message': 'Unauthorized',
'description': 'Authentication is required'
},
403: {
'code': 403,
'message': 'Forbidden',
'description': 'Access to this resource is denied'
},
404: {
'code': 404,
'message': 'Not Found',
'description': 'The requested resource was not found'
},
500: {
'code': 500,
'message': 'Internal Server Error',
'description': 'An unexpected error occurred'
}
}
return error_messages.get(error_code, {
'code': error_code,
'message': 'Unknown Error',
'description': 'An unknown error occurred'
})
if __name__ == '__main__':
app.run(debug=False)