# Vulnerable: Dangerous subprocess usage in Lambda
import subprocess
import os
import json
def lambda_handler(event, context):
# Dangerous: Direct user input in subprocess
filename = event.get('filename', '')
# Extremely dangerous: Command injection possible
result = subprocess.run(f"ls -la {filename}", shell=True, capture_output=True, text=True)
return {
'statusCode': 200,
'body': json.dumps({
'output': result.stdout,
'error': result.stderr
})
}
# Another vulnerable pattern
def process_file(event, context):
file_path = event['file_path']
command = event.get('command', 'cat')
# Dangerous: User controls both command and arguments
cmd = f"{command} {file_path}"
try:
output = subprocess.check_output(cmd, shell=True, text=True)
return {'result': output}
except subprocess.CalledProcessError as e:
return {'error': str(e)}
# Converting files with user input
def convert_document(event, context):
input_file = event['input_file']
output_format = event['output_format']
# Dangerous: Potential command injection
convert_cmd = f"pandoc {input_file} -t {output_format}"
os.system(convert_cmd) # Very dangerous
return {'status': 'converted'}
# Secure: Safe alternatives to subprocess in Lambda
import subprocess
import json
import re
import boto3
from pathlib import Path
# Use AWS services instead of shell commands
def lambda_handler(event, context):
filename = event.get('filename', '')
# Validate filename
if not is_safe_filename(filename):
return {
'statusCode': 400,
'body': json.dumps({'error': 'Invalid filename'})
}
# Secure: Use AWS S3 API instead of shell commands
s3_client = boto3.client('s3')
bucket_name = os.environ['BUCKET_NAME']
try:
# List objects with prefix (safe S3 operation)
response = s3_client.list_objects_v2(
Bucket=bucket_name,
Prefix=filename
)
files = [obj['Key'] for obj in response.get('Contents', [])]
return {
'statusCode': 200,
'body': json.dumps({'files': files})
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': 'Failed to list files'})
}
def is_safe_filename(filename):
# Validate filename format
if not filename or len(filename) > 255:
return False
# Only allow alphanumeric, dots, hyphens, underscores
if not re.match(r'^[a-zA-Z0-9._-]+$', filename):
return False
# Prevent directory traversal
if '..' in filename or filename.startswith('/'):
return False
return True
# Secure file processing with validation
def process_file_secure(event, context):
file_key = event.get('file_key', '')
operation = event.get('operation', '')
# Validate inputs
if not is_safe_filename(file_key):
return {'error': 'Invalid file key'}
allowed_operations = ['read', 'metadata', 'exists']
if operation not in allowed_operations:
return {'error': 'Operation not allowed'}
# Use AWS services instead of subprocess
s3_client = boto3.client('s3')
bucket_name = os.environ['BUCKET_NAME']
try:
if operation == 'read':
response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
content = response['Body'].read().decode('utf-8')
return {'content': content[:1000]} # Limit output
elif operation == 'metadata':
response = s3_client.head_object(Bucket=bucket_name, Key=file_key)
return {
'size': response['ContentLength'],
'last_modified': response['LastModified'].isoformat(),
'content_type': response.get('ContentType', 'unknown')
}
elif operation == 'exists':
try:
s3_client.head_object(Bucket=bucket_name, Key=file_key)
return {'exists': True}
except s3_client.exceptions.NoSuchKey:
return {'exists': False}
except Exception as e:
return {'error': 'Operation failed'}
# When subprocess is absolutely necessary (rare cases)
def secure_subprocess_example(event, context):
# Validate that subprocess is needed
if not os.environ.get('ALLOW_SUBPROCESS'):
return {'error': 'Subprocess not allowed in this environment'}
command_type = event.get('command_type', '')
# Use allowlist of commands
allowed_commands = {
'date': ['/bin/date'],
'whoami': ['/usr/bin/whoami'],
'pwd': ['/bin/pwd']
}
if command_type not in allowed_commands:
return {'error': 'Command not allowed'}
try:
# Secure: Use argument list, no shell=True
result = subprocess.run(
allowed_commands[command_type],
capture_output=True,
text=True,
timeout=5, # Prevent hanging
shell=False # Critical: No shell interpretation
)
if result.returncode != 0:
return {'error': 'Command failed'}
return {
'output': result.stdout.strip()[:500] # Limit output
}
except subprocess.TimeoutExpired:
return {'error': 'Command timeout'}
except Exception as e:
return {'error': 'Command execution failed'}
# Alternative: Use Lambda layers for tools
def process_with_lambda_layer(event, context):
"""
Instead of using subprocess, install tools in Lambda layers
and use Python libraries or AWS services
"""
file_content = event.get('content', '')
# Use Python libraries instead of shell commands
import base64
import hashlib
# Process content with Python libraries
encoded_content = base64.b64encode(file_content.encode()).decode()
content_hash = hashlib.sha256(file_content.encode()).hexdigest()
return {
'encoded': encoded_content,
'hash': content_hash,
'length': len(file_content)
}