# Vulnerable: Dangerous system calls in Lambda
import os
import json
def lambda_handler(event, context):
user_command = event.get('command', '')
# Extremely dangerous: Direct command execution
result = os.system(user_command)
return {
'statusCode': 200,
'body': json.dumps({'result': result})
}
# Another vulnerable pattern
def execute_script(event, context):
script_code = event.get('script', '')
variables = event.get('variables', {})
# Dangerous: Dynamic code execution
try:
# User can execute arbitrary Python code
exec(script_code, variables)
return {'status': 'executed'}
except Exception as e:
return {'error': str(e)}
# File operations with user input
def process_files(event, context):
filename = event['filename']
operation = event['operation']
# Dangerous: Command injection via filename
command = f"find /tmp -name '{filename}' -exec {operation} {{}} \;"
# Very dangerous
output = os.popen(command).read()
return {'output': output}
# Mathematical operations with eval
def calculate(event, context):
expression = event.get('expression', '')
try:
# Dangerous: eval can execute arbitrary code
result = eval(expression)
return {'result': result}
except Exception as e:
return {'error': str(e)}
# Secure: Safe alternatives to system calls in Lambda
import json
import re
import math
import operator
import boto3
from decimal import Decimal
def lambda_handler(event, context):
operation = event.get('operation', '')
# Use allowlist of operations
allowed_operations = {
'list_files': list_s3_files,
'get_file_info': get_file_metadata,
'calculate': safe_calculate,
'convert_data': convert_data_format
}
if operation not in allowed_operations:
return {
'statusCode': 400,
'body': json.dumps({'error': 'Operation not allowed'})
}
try:
result = allowed_operations[operation](event)
return {
'statusCode': 200,
'body': json.dumps(result)
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': 'Operation failed'})
}
# Secure file operations using AWS services
def list_s3_files(event):
bucket_name = event.get('bucket_name', '')
prefix = event.get('prefix', '')
# Validate inputs
if not re.match(r'^[a-zA-Z0-9.-]+$', bucket_name):
raise ValueError('Invalid bucket name')
if not re.match(r'^[a-zA-Z0-9/._-]*$', prefix):
raise ValueError('Invalid prefix')
# Use AWS SDK instead of shell commands
s3_client = boto3.client('s3')
try:
response = s3_client.list_objects_v2(
Bucket=bucket_name,
Prefix=prefix,
MaxKeys=100
)
files = []
for obj in response.get('Contents', []):
files.append({
'key': obj['Key'],
'size': obj['Size'],
'last_modified': obj['LastModified'].isoformat()
})
return {'files': files}
except Exception as e:
raise Exception('Failed to list files')
def get_file_metadata(event):
bucket_name = event.get('bucket_name', '')
file_key = event.get('file_key', '')
# Validate inputs
if not re.match(r'^[a-zA-Z0-9.-]+$', bucket_name):
raise ValueError('Invalid bucket name')
if not re.match(r'^[a-zA-Z0-9/._-]+$', file_key):
raise ValueError('Invalid file key')
s3_client = boto3.client('s3')
try:
response = s3_client.head_object(Bucket=bucket_name, Key=file_key)
return {
'size': response['ContentLength'],
'content_type': response.get('ContentType', 'unknown'),
'last_modified': response['LastModified'].isoformat(),
'etag': response['ETag']
}
except Exception as e:
raise Exception('Failed to get file metadata')
# Safe mathematical operations
def safe_calculate(event):
expression = event.get('expression', '')
# Validate expression format
if not re.match(r'^[0-9+\-*/().\s]+$', expression):
raise ValueError('Invalid expression format')
# Use safe evaluation with limited operations
allowed_operators = {
'+': operator.add,
'-': operator.sub,
'*': operator.mul,
'/': operator.truediv,
'**': operator.pow
}
# Simple expression parser (for basic operations)
try:
# Remove spaces
expression = expression.replace(' ', '')
# For simple cases, use a safe parser
# This is a simplified example - use a proper math parser in production
if '+' in expression:
parts = expression.split('+')
result = sum(float(part) for part in parts)
elif '-' in expression and expression.count('-') == 1:
parts = expression.split('-')
result = float(parts[0]) - float(parts[1])
elif '*' in expression:
parts = expression.split('*')
result = 1
for part in parts:
result *= float(part)
else:
result = float(expression)
return {'result': result}
except (ValueError, ZeroDivisionError) as e:
raise ValueError('Invalid calculation')
# Safe data conversion
def convert_data_format(event):
data = event.get('data', {})
target_format = event.get('format', 'json')
allowed_formats = ['json', 'csv', 'xml']
if target_format not in allowed_formats:
raise ValueError('Format not supported')
try:
if target_format == 'json':
return {'converted': json.dumps(data)}
elif target_format == 'csv':
# Convert to CSV format (simplified)
if isinstance(data, list) and all(isinstance(item, dict) for item in data):
if data:
headers = list(data[0].keys())
csv_lines = [','.join(headers)]
for item in data:
row = [str(item.get(header, '')) for header in headers]
csv_lines.append(','.join(row))
return {'converted': '\n'.join(csv_lines)}
raise ValueError('Data not suitable for CSV conversion')
elif target_format == 'xml':
# Simple XML conversion (use proper library in production)
xml_content = ''
for key, value in data.items():
xml_content += f'<{key}>{value}{key}>'
xml_content += ''
return {'converted': xml_content}
except Exception as e:
raise Exception('Conversion failed')
# Configuration-based processing
def process_with_config(event, context):
# Use environment variables instead of dynamic execution
processing_mode = os.environ.get('PROCESSING_MODE', 'safe')
max_items = int(os.environ.get('MAX_ITEMS', '100'))
data = event.get('data', [])
# Validate data length
if len(data) > max_items:
raise ValueError(f'Too many items (max: {max_items})')
# Process based on configuration
if processing_mode == 'safe':
# Safe processing only
processed = []
for item in data:
if isinstance(item, dict) and 'value' in item:
processed.append({
'value': float(item['value']) * 2,
'processed': True
})
return {'processed_data': processed}
else:
raise ValueError('Processing mode not supported')