import os
from flask import request
@app.route('/system_call')
def system_call():
# Vulnerable: User input in environment affects system call
user_path = request.args.get('path')
os.environ['CUSTOM_PATH'] = user_path # Tainted environment
# Dangerous: System call with tainted environment
result = os.system('echo $CUSTOM_PATH') # Command injection risk
return f'Result: {result}'
@app.route('/execute_with_env')
def execute_with_env():
# Vulnerable: Environment variables from user input
env_vars = request.get_json() or {}
for key, value in env_vars.items():
os.environ[key] = str(value) # Dangerous assignment
# System call inherits tainted environment
os.system('printenv | grep USER_') # Can execute malicious commands
return 'Environment updated'
@app.route('/run_script')
def run_script():
# Vulnerable: Script execution with user-controlled environment
script_env = request.args.get('env')
os.environ['SCRIPT_CONFIG'] = script_env
# Dangerous: Shell command with environment dependency
return os.system('bash -c "echo Processing: $SCRIPT_CONFIG"')
import subprocess
import re
from flask import request
# Safe environment allowlist
SAFE_ENV_PATTERN = re.compile(r'^[a-zA-Z0-9_\-\.]+$')
ALLOWED_ENV_VARS = ['LANG', 'LC_ALL', 'TZ', 'USER']
@app.route('/system_call')
def system_call():
# Secure: Validate and sanitize user input
user_path = request.args.get('path', '')
# Validate path format
if not re.match(r'^[a-zA-Z0-9/_\-\.]+$', user_path):
return 'Invalid path format', 400
# Use safe subprocess instead of os.system
try:
result = subprocess.run(
['echo', user_path], # Safe argument list
capture_output=True,
text=True,
timeout=5,
env={'PATH': '/usr/bin:/bin'} # Fixed safe environment
)
return f'Result: {result.stdout}'
except subprocess.TimeoutExpired:
return 'Command timeout', 408
@app.route('/execute_with_env')
def execute_with_env():
# Secure: Validate environment variables
env_vars = request.get_json() or {}
safe_env = {'PATH': '/usr/bin:/bin'} # Start with safe base
for key, value in env_vars.items():
# Validate environment variable name and value
if (key in ALLOWED_ENV_VARS and
isinstance(value, str) and
len(value) <= 100 and
SAFE_ENV_PATTERN.match(value)):
safe_env[key] = value
else:
return f'Invalid environment variable: {key}', 400
# Safe subprocess with validated environment
try:
result = subprocess.run(
['printenv'], # Safe command
capture_output=True,
text=True,
env=safe_env,
timeout=5
)
return f'Environment: {result.stdout}'
except subprocess.TimeoutExpired:
return 'Command timeout', 408
@app.route('/run_script')
def run_script():
# Secure: Use predefined safe configuration
script_config = request.args.get('config', '')
# Validate configuration value
allowed_configs = ['production', 'staging', 'development']
if script_config not in allowed_configs:
return 'Invalid configuration', 400
# Safe execution with fixed environment
safe_env = {
'PATH': '/usr/bin:/bin',
'SCRIPT_CONFIG': script_config, # Validated value
'HOME': '/tmp'
}
try:
result = subprocess.run(
['/opt/scripts/safe_processor.sh'], # Fixed safe script
capture_output=True,
text=True,
env=safe_env,
timeout=30
)
return f'Processing result: {result.stdout}'
except subprocess.TimeoutExpired:
return 'Script timeout', 408
# Secure helper function for environment validation
def create_safe_system_environment(user_vars=None):
"""Create a safe environment for system calls."""
base_env = {
'PATH': '/usr/bin:/bin',
'HOME': '/tmp',
'SHELL': '/bin/bash',
'LANG': 'en_US.UTF-8'
}
if user_vars:
for key, value in user_vars.items():
if (key in ALLOWED_ENV_VARS and
isinstance(value, str) and
len(value) <= 100 and
SAFE_ENV_PATTERN.match(value)):
base_env[key] = value
return base_env