# Vulnerable: Tainted code execution in Lambda
import json
def lambda_handler(event, context):
user_code = event.get('code', '')
# Extremely dangerous: Direct execution of user code
try:
result = eval(user_code)
return {
'statusCode': 200,
'body': json.dumps({'result': str(result)})
}
except Exception as e:
return {
'statusCode': 400,
'body': json.dumps({'error': str(e)})
}
# Another vulnerable pattern
def execute_script(event, context):
script = event.get('script', '')
globals_dict = event.get('globals', {})
# Dangerous: exec with user-controlled globals
try:
exec(script, globals_dict)
return {'status': 'executed', 'globals': globals_dict}
except Exception as e:
return {'error': str(e)}
# Template processing vulnerability
def process_template(event, context):
template = event.get('template', '')
variables = event.get('variables', {})
# Dangerous: Code injection through template
try:
# User can inject code like: ${__import__('os').system('rm -rf /')}
result = eval(f"f'{template}'", variables)
return {'rendered': result}
except Exception as e:
return {'error': str(e)}
# Dynamic function creation
def create_function(event, context):
func_code = event.get('function_code', '')
func_name = event.get('function_name', 'user_func')
# Dangerous: Compiling and executing user code
try:
compiled_code = compile(func_code, '', 'exec')
exec(compiled_code)
# Try to call the user-defined function
if func_name in locals():
result = locals()[func_name]()
return {'result': result}
except Exception as e:
return {'error': str(e)}
# Secure: Safe alternatives to code execution in Lambda
import json
import ast
import re
import operator
from typing import Any, Dict, List
def lambda_handler(event, context):
operation = event.get('operation', '')
data = event.get('data', {})
# Use predefined operations instead of code execution
allowed_operations = {
'calculate': safe_calculate,
'transform': safe_transform,
'filter': safe_filter,
'aggregate': safe_aggregate
}
if operation not in allowed_operations:
return {
'statusCode': 400,
'body': json.dumps({'error': 'Operation not allowed'})
}
try:
result = allowed_operations[operation](data)
return {
'statusCode': 200,
'body': json.dumps({'result': result})
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': 'Operation failed'})
}
# Safe mathematical operations
def safe_calculate(data: Dict[str, Any]) -> Any:
expression = data.get('expression', '')
# Validate expression contains only safe characters
if not re.match(r'^[0-9+\-*/().\s]+$', expression):
raise ValueError('Invalid expression format')
try:
# Use ast.literal_eval for safe evaluation of literals
# Note: ast.literal_eval only works for literals, not expressions
# For expressions, implement a safe parser
return safe_math_eval(expression)
except (ValueError, SyntaxError) as e:
raise ValueError('Invalid mathematical expression')
def safe_math_eval(expression: str) -> float:
"""Safe evaluation of mathematical expressions"""
# Remove whitespace
expression = expression.replace(' ', '')
# Define allowed operators
operators = {
'+': operator.add,
'-': operator.sub,
'*': operator.mul,
'/': operator.truediv
}
# Simple expression evaluation (extend as needed)
# This is a basic implementation - use a proper math parser in production
try:
# Handle simple binary operations
for op_symbol, op_func in operators.items():
if op_symbol in expression:
parts = expression.split(op_symbol, 1)
if len(parts) == 2:
left = float(parts[0])
right = float(parts[1])
return op_func(left, right)
# Single number
return float(expression)
except (ValueError, ZeroDivisionError):
raise ValueError('Invalid calculation')
# Safe data transformation
def safe_transform(data: Dict[str, Any]) -> List[Dict[str, Any]]:
items = data.get('items', [])
transform_type = data.get('transform', '')
allowed_transforms = {
'uppercase': lambda x: x.upper() if isinstance(x, str) else x,
'lowercase': lambda x: x.lower() if isinstance(x, str) else x,
'multiply_by_2': lambda x: x * 2 if isinstance(x, (int, float)) else x,
'add_prefix': lambda x: f"item_{x}" if isinstance(x, str) else x
}
if transform_type not in allowed_transforms:
raise ValueError('Transform not allowed')
transform_func = allowed_transforms[transform_type]
transformed = []
for item in items:
if isinstance(item, dict):
new_item = {}
for key, value in item.items():
new_item[key] = transform_func(value)
transformed.append(new_item)
else:
transformed.append(transform_func(item))
return transformed
# Safe filtering
def safe_filter(data: Dict[str, Any]) -> List[Any]:
items = data.get('items', [])
filter_criteria = data.get('filter', {})
# Only allow simple comparison filters
allowed_operators = ['eq', 'ne', 'gt', 'lt', 'gte', 'lte', 'contains']
filtered = []
for item in items:
should_include = True
for field, criteria in filter_criteria.items():
if not isinstance(criteria, dict):
continue
for operator, expected_value in criteria.items():
if operator not in allowed_operators:
continue
if field not in item:
should_include = False
break
actual_value = item[field]
if operator == 'eq' and actual_value != expected_value:
should_include = False
elif operator == 'ne' and actual_value == expected_value:
should_include = False
elif operator == 'gt' and actual_value <= expected_value:
should_include = False
elif operator == 'lt' and actual_value >= expected_value:
should_include = False
elif operator == 'gte' and actual_value < expected_value:
should_include = False
elif operator == 'lte' and actual_value > expected_value:
should_include = False
elif operator == 'contains' and str(expected_value) not in str(actual_value):
should_include = False
if not should_include:
break
if not should_include:
break
if should_include:
filtered.append(item)
return filtered
# Safe aggregation
def safe_aggregate(data: Dict[str, Any]) -> Dict[str, Any]:
items = data.get('items', [])
group_by = data.get('group_by', '')
aggregation = data.get('aggregation', 'count')
allowed_aggregations = ['count', 'sum', 'avg', 'min', 'max']
if aggregation not in allowed_aggregations:
raise ValueError('Aggregation not allowed')
if not group_by:
# Simple aggregation without grouping
if aggregation == 'count':
return {'result': len(items)}
numeric_values = [item for item in items if isinstance(item, (int, float))]
if not numeric_values:
return {'result': 0}
if aggregation == 'sum':
return {'result': sum(numeric_values)}
elif aggregation == 'avg':
return {'result': sum(numeric_values) / len(numeric_values)}
elif aggregation == 'min':
return {'result': min(numeric_values)}
elif aggregation == 'max':
return {'result': max(numeric_values)}
# Grouping aggregation
groups = {}
for item in items:
if isinstance(item, dict) and group_by in item:
group_key = str(item[group_by])
if group_key not in groups:
groups[group_key] = []
groups[group_key].append(item)
result = {}
for group_key, group_items in groups.items():
if aggregation == 'count':
result[group_key] = len(group_items)
# Add other aggregations as needed
return {'grouped_result': result}
# Safe template processing
def safe_template_process(event, context):
template = event.get('template', '')
variables = event.get('variables', {})
# Use simple string replacement instead of code execution
if not isinstance(variables, dict):
raise ValueError('Variables must be a dictionary')
# Validate template format
if not re.match(r'^[a-zA-Z0-9\s{}._-]+$', template):
raise ValueError('Invalid template format')
# Safe variable substitution
result = template
for key, value in variables.items():
# Sanitize key and value
safe_key = re.sub(r'[^a-zA-Z0-9_]', '', str(key))
safe_value = str(value)[:100] # Limit length
# Simple replacement
result = result.replace(f'{{{safe_key}}}', safe_value)
return {'rendered': result}