Flask Command Injection via subprocess Module

Critical Risk Command Injection
flaskpythoncommand-injectionsubprocessremote-code-executionshell-injection

What it is

The Flask application uses the subprocess module with user-controlled input in an unsafe manner, leading to command injection vulnerabilities. When user input is passed to subprocess functions like subprocess.call(), subprocess.run(), or subprocess.Popen() with shell=True or without proper argument sanitization, attackers can execute arbitrary system commands on the server, potentially gaining full control of the system.

# Vulnerable: subprocess with user input in Flask import subprocess from flask import Flask, request, jsonify app = Flask(__name__) # Extremely dangerous: subprocess with shell=True @app.route('/execute', methods=['POST']) def execute_command(): command = request.json.get('command', '') # CRITICAL: Command injection via shell=True try: result = subprocess.run(command, shell=True, capture_output=True, text=True) return jsonify({ 'stdout': result.stdout, 'stderr': result.stderr, 'returncode': result.returncode }) except Exception as e: return jsonify({'error': str(e)}) # Another dangerous pattern @app.route('/file-ops', methods=['POST']) def file_operations(): operation = request.json.get('operation', '') filename = request.json.get('filename', '') # Dangerous: User controls command arguments command = [operation, filename] try: # Still dangerous if operation is user-controlled result = subprocess.run(command, capture_output=True, text=True) return jsonify({ 'output': result.stdout, 'status': 'executed' }) except Exception as e: return jsonify({'error': str(e)}) # Git operations with subprocess @app.route('/git', methods=['POST']) def git_operations(): git_command = request.json.get('git_command', '') repository = request.json.get('repository', '') # Dangerous: Git command injection command = f"git {git_command} {repository}" try: result = subprocess.run(command, shell=True, capture_output=True, text=True) return jsonify({ 'git_output': result.stdout, 'git_error': result.stderr }) except Exception as e: return jsonify({'error': str(e)}) # Image processing with ImageMagick @app.route('/convert-image', methods=['POST']) def convert_image(): input_file = request.json.get('input_file', '') output_file = request.json.get('output_file', '') options = request.json.get('options', '') # Dangerous: ImageMagick command injection command = f"convert {options} {input_file} {output_file}" try: subprocess.run(command, shell=True, check=True) return jsonify({'status': 'image_converted'}) except subprocess.CalledProcessError as e: return jsonify({'error': str(e)}) # System monitoring @app.route('/system', methods=['GET']) def system_info(): info_type = request.args.get('type', '') options = request.args.get('options', '') # Dangerous: System command with user options commands = { 'ps': f'ps {options}', 'netstat': f'netstat {options}', 'top': f'top {options}', 'df': f'df {options}' } if info_type in commands: try: result = subprocess.run( commands[info_type], shell=True, capture_output=True, text=True ) return jsonify({'output': result.stdout}) except Exception as e: return jsonify({'error': str(e)}) return jsonify({'error': 'Invalid system info type'}) # Archive operations @app.route('/archive', methods=['POST']) def archive_files(): archive_type = request.json.get('type', 'tar') source_path = request.json.get('source', '') archive_name = request.json.get('archive_name', '') compression = request.json.get('compression', '') # Dangerous: Archive command injection if archive_type == 'tar': command = f"tar {compression} -cf {archive_name} {source_path}" elif archive_type == 'zip': command = f"zip {compression} {archive_name} {source_path}" try: subprocess.run(command, shell=True, check=True) return jsonify({'status': 'archive_created'}) except subprocess.CalledProcessError as e: return jsonify({'error': str(e)}) # Network operations @app.route('/network', methods=['POST']) def network_operations(): operation = request.json.get('operation', '') target = request.json.get('target', '') options = request.json.get('options', '') # Dangerous: Network command injection network_commands = { 'ping': f'ping {options} {target}', 'traceroute': f'traceroute {options} {target}', 'nslookup': f'nslookup {options} {target}', 'curl': f'curl {options} {target}' } if operation in network_commands: try: result = subprocess.run( network_commands[operation], shell=True, capture_output=True, text=True, timeout=30 ) return jsonify({ 'output': result.stdout, 'error': result.stderr }) except Exception as e: return jsonify({'error': str(e)}) return jsonify({'error': 'Invalid network operation'}) # Database operations @app.route('/database', methods=['POST']) def database_operations(): db_command = request.json.get('command', '') database = request.json.get('database', '') query = request.json.get('query', '') # Dangerous: Database command injection command = f"{db_command} -D {database} -e '{query}'" try: result = subprocess.run(command, shell=True, capture_output=True, text=True) return jsonify({ 'db_output': result.stdout, 'db_error': result.stderr }) except Exception as e: return jsonify({'error': str(e)}) # Package management @app.route('/packages', methods=['POST']) def package_operations(): package_manager = request.json.get('manager', 'apt') action = request.json.get('action', 'install') package_name = request.json.get('package', '') options = request.json.get('options', '') # Dangerous: Package manager command injection command = f"{package_manager} {action} {options} {package_name}" try: result = subprocess.run(command, shell=True, capture_output=True, text=True) return jsonify({ 'package_output': result.stdout, 'package_error': result.stderr }) except Exception as e: return jsonify({'error': str(e)})
# Secure: Safe subprocess usage in Flask import subprocess import shutil from flask import Flask, request, jsonify from pathlib import Path import re from marshmallow import Schema, fields, ValidationError as MarshmallowValidationError import os app = Flask(__name__) # Safe: Configuration with allowed operations app.config['ALLOWED_OPERATIONS'] = { 'file_info': {'command': ['stat'], 'max_args': 1}, 'directory_list': {'command': ['ls', '-la'], 'max_args': 1}, 'file_count': {'command': ['wc', '-l'], 'max_args': 1} } # Input validation schemas class FileOperationSchema(Schema): operation = fields.Str(required=True, validate=lambda x: x in ['file_info', 'directory_list', 'file_count']) target = fields.Str(required=True) class SystemInfoSchema(Schema): info_type = fields.Str(required=True, validate=lambda x: x in ['disk_usage', 'memory_info', 'process_count']) # Safe: No direct command execution @app.route('/execute', methods=['POST']) def safe_execute_operation(): try: # Validate input schema = FileOperationSchema() data = schema.load(request.json or {}) # Execute safe operation result = execute_safe_operation(data) return jsonify(result) except (ValueError, MarshmallowValidationError) as e: return jsonify({'error': str(e)}), 400 def execute_safe_operation(data): operation = data['operation'] target = data['target'] # Validate target path validated_target = validate_target_path(target) # Get operation configuration op_config = app.config['ALLOWED_OPERATIONS'][operation] # Build safe command command = op_config['command'] + [validated_target] try: # Safe: No shell=True, use argument list result = subprocess.run( command, capture_output=True, text=True, timeout=10, check=False # Don't raise on non-zero exit ) return { 'operation': operation, 'output': result.stdout[:1000], # Limit output 'error': result.stderr[:1000] if result.stderr else None, 'return_code': result.returncode } except subprocess.TimeoutExpired: raise ValueError('Operation timed out') except Exception as e: raise ValueError(f'Operation failed: {str(e)}') def validate_target_path(target): """Validate and sanitize target path""" if not target: raise ValueError('Target path is required') # Remove dangerous characters if re.search(r'[;&|`$(){}\[\]\*\?<>\"\\]', target): raise ValueError('Target contains invalid characters') # Prevent path traversal if '..' in target or target.startswith('/'): raise ValueError('Invalid target path') # Limit to safe directory safe_base = '/app/data' target_path = Path(safe_base) / target try: # Ensure target is within safe directory target_path.resolve().relative_to(Path(safe_base).resolve()) return str(target_path) except ValueError: raise ValueError('Target path outside allowed directory') # Safe: File operations using Python libraries @app.route('/file-ops', methods=['POST']) def safe_file_operations(): try: operation = request.json.get('operation', '') if request.json else '' filename = request.json.get('filename', '') if request.json else '' # Validate inputs validated_data = validate_file_operation(operation, filename) # Execute safe file operation result = execute_safe_file_operation(validated_data) return jsonify(result) except ValueError as e: return jsonify({'error': str(e)}), 400 def validate_file_operation(operation, filename): # Validate operation allowed_operations = ['copy', 'move', 'delete', 'info'] if operation not in allowed_operations: raise ValueError('Operation not allowed') # Validate filename if not filename or len(filename) > 100: raise ValueError('Invalid filename') if not re.match(r'^[a-zA-Z0-9._-]+$', filename): raise ValueError('Filename contains invalid characters') return {'operation': operation, 'filename': filename} def execute_safe_file_operation(data): operation = data['operation'] filename = data['filename'] # Define safe base directory safe_dir = Path('/app/data/user_files') file_path = safe_dir / filename try: # Ensure file is within safe directory file_path.resolve().relative_to(safe_dir.resolve()) except ValueError: raise ValueError('File path outside allowed directory') if operation == 'copy': if not file_path.exists(): raise ValueError('Source file not found') backup_dir = safe_dir / 'backups' backup_dir.mkdir(exist_ok=True) backup_path = backup_dir / filename shutil.copy2(file_path, backup_path) return {'status': 'File copied successfully'} elif operation == 'move': if not file_path.exists(): raise ValueError('Source file not found') archive_dir = safe_dir / 'archive' archive_dir.mkdir(exist_ok=True) archive_path = archive_dir / filename shutil.move(file_path, archive_path) return {'status': 'File moved successfully'} elif operation == 'delete': if file_path.exists(): file_path.unlink() return {'status': 'File deleted successfully'} else: return {'status': 'File not found'} elif operation == 'info': if file_path.exists(): stat = file_path.stat() return { 'exists': True, 'size': stat.st_size, 'modified': stat.st_mtime, 'is_file': file_path.is_file(), 'is_dir': file_path.is_dir() } else: return {'exists': False} # Safe: System information using Python libraries @app.route('/system', methods=['GET']) def safe_system_info(): try: # Validate input schema = SystemInfoSchema() data = schema.load(request.args) # Get system information safely result = get_safe_system_info(data['info_type']) return jsonify(result) except (ValueError, MarshmallowValidationError) as e: return jsonify({'error': str(e)}), 400 def get_safe_system_info(info_type): """Get system information using Python libraries""" import psutil if info_type == 'disk_usage': disk = psutil.disk_usage('/') return { 'total': disk.total, 'used': disk.used, 'free': disk.free, 'percent': (disk.used / disk.total) * 100 } elif info_type == 'memory_info': memory = psutil.virtual_memory() return { 'total': memory.total, 'available': memory.available, 'used': memory.used, 'percent': memory.percent } elif info_type == 'process_count': return { 'total_processes': len(psutil.pids()), 'cpu_count': psutil.cpu_count(), 'cpu_percent': psutil.cpu_percent() } # Safe: Image processing using Python libraries @app.route('/convert-image', methods=['POST']) def safe_convert_image(): try: # Get uploaded file if 'file' not in request.files: raise ValueError('No file provided') uploaded_file = request.files['file'] output_format = request.form.get('format', 'PNG') # Validate inputs validated_data = validate_image_conversion(uploaded_file, output_format) # Convert image safely result = convert_image_safely(validated_data) return jsonify(result) except ValueError as e: return jsonify({'error': str(e)}), 400 def validate_image_conversion(uploaded_file, output_format): # Validate file type allowed_types = ['image/jpeg', 'image/png', 'image/gif'] if uploaded_file.content_type not in allowed_types: raise ValueError('File type not allowed') # Validate file size max_size = 10 * 1024 * 1024 # 10MB uploaded_file.seek(0, 2) # Seek to end file_size = uploaded_file.tell() uploaded_file.seek(0) # Reset if file_size > max_size: raise ValueError('File too large') # Validate output format allowed_formats = ['PNG', 'JPEG', 'GIF', 'WEBP'] if output_format.upper() not in allowed_formats: raise ValueError('Output format not allowed') return { 'file': uploaded_file, 'output_format': output_format.upper() } def convert_image_safely(data): """Convert image using Pillow""" from PIL import Image import io uploaded_file = data['file'] output_format = data['output_format'] try: # Open and validate image image = Image.open(uploaded_file) image.verify() # Verify it's a valid image # Reopen for processing (verify() closes the file) uploaded_file.seek(0) image = Image.open(uploaded_file) # Convert format output_io = io.BytesIO() # Convert to RGB if necessary (for JPEG) if output_format == 'JPEG' and image.mode in ('RGBA', 'LA', 'P'): image = image.convert('RGB') image.save(output_io, format=output_format) # Save converted file output_filename = f'converted_{uploaded_file.filename}.{output_format.lower()}' output_path = Path('/app/data/converted') / output_filename output_path.parent.mkdir(exist_ok=True) with open(output_path, 'wb') as f: f.write(output_io.getvalue()) return { 'status': 'Image converted successfully', 'output_file': output_filename, 'original_format': image.format, 'new_format': output_format } except Exception as e: raise ValueError(f'Image conversion failed: {str(e)}') # Safe: Archive operations using Python libraries @app.route('/archive', methods=['POST']) def safe_archive_files(): try: archive_type = request.json.get('type', 'tar') if request.json else '' source_name = request.json.get('source', '') if request.json else '' archive_name = request.json.get('archive_name', '') if request.json else '' # Validate inputs validated_data = validate_archive_operation(archive_type, source_name, archive_name) # Create archive safely result = create_archive_safely(validated_data) return jsonify(result) except ValueError as e: return jsonify({'error': str(e)}), 400 def validate_archive_operation(archive_type, source_name, archive_name): # Validate archive type allowed_types = ['tar', 'zip'] if archive_type not in allowed_types: raise ValueError('Archive type not allowed') # Validate names for name in [source_name, archive_name]: if not name or not re.match(r'^[a-zA-Z0-9._-]+$', name): raise ValueError('Invalid file/archive name') return { 'type': archive_type, 'source': source_name, 'archive_name': archive_name } def create_archive_safely(data): """Create archive using Python libraries""" import tarfile import zipfile archive_type = data['type'] source_name = data['source'] archive_name = data['archive_name'] # Define safe directories source_dir = Path('/app/data/user_files') archive_dir = Path('/app/data/archives') archive_dir.mkdir(exist_ok=True) source_path = source_dir / source_name if not source_path.exists(): raise ValueError('Source file/directory not found') # Ensure source is within safe directory try: source_path.resolve().relative_to(source_dir.resolve()) except ValueError: raise ValueError('Source path outside allowed directory') if archive_type == 'tar': archive_path = archive_dir / f'{archive_name}.tar.gz' with tarfile.open(archive_path, 'w:gz') as tar: tar.add(source_path, arcname=source_name) elif archive_type == 'zip': archive_path = archive_dir / f'{archive_name}.zip' with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf: if source_path.is_file(): zipf.write(source_path, source_name) else: for file_path in source_path.rglob('*'): if file_path.is_file(): relative_path = file_path.relative_to(source_path.parent) zipf.write(file_path, str(relative_path)) return { 'status': 'Archive created successfully', 'archive_file': archive_path.name, 'archive_type': archive_type } # Safe: Network operations using Python libraries @app.route('/network', methods=['POST']) def safe_network_operations(): try: operation = request.json.get('operation', '') if request.json else '' target = request.json.get('target', '') if request.json else '' # Validate inputs validated_data = validate_network_operation(operation, target) # Execute safe network operation result = execute_safe_network_operation(validated_data) return jsonify(result) except ValueError as e: return jsonify({'error': str(e)}), 400 def validate_network_operation(operation, target): # Validate operation allowed_operations = ['ping', 'resolve_hostname'] if operation not in allowed_operations: raise ValueError('Network operation not allowed') # Validate target if not target: raise ValueError('Target is required') # Basic hostname/IP validation if not re.match(r'^[a-zA-Z0-9.-]+$', target): raise ValueError('Invalid target format') return {'operation': operation, 'target': target} def execute_safe_network_operation(data): """Execute network operations using Python libraries""" import socket import platform operation = data['operation'] target = data['target'] if operation == 'ping': # Safe: Use platform-specific ping command with controlled arguments if platform.system().lower() == 'windows': command = ['ping', '-n', '4', target] else: command = ['ping', '-c', '4', target] try: result = subprocess.run( command, capture_output=True, text=True, timeout=10 ) return { 'operation': 'ping', 'target': target, 'output': result.stdout[:500], # Limit output 'success': result.returncode == 0 } except subprocess.TimeoutExpired: return { 'operation': 'ping', 'target': target, 'error': 'Ping timed out', 'success': False } elif operation == 'resolve_hostname': try: ip_address = socket.gethostbyname(target) return { 'operation': 'resolve_hostname', 'hostname': target, 'ip_address': ip_address, 'success': True } except socket.gaierror as e: return { 'operation': 'resolve_hostname', 'hostname': target, 'error': str(e), 'success': False } if __name__ == '__main__': app.run(debug=False)

💡 Why This Fix Works

See fix suggestions for detailed explanation.

Why it happens

Flask views execute commands with user data and shell: subprocess.run(f'ping {request.args["host"]}', shell=True). Shell interprets metacharacters like ; && | allowing command chaining. Input like host=example.com; rm -rf / executes arbitrary commands. shell=True makes subprocess vulnerable to injection.

Root causes

Passing User Input to subprocess with shell=True

Flask views execute commands with user data and shell: subprocess.run(f'ping {request.args["host"]}', shell=True). Shell interprets metacharacters like ; && | allowing command chaining. Input like host=example.com; rm -rf / executes arbitrary commands. shell=True makes subprocess vulnerable to injection.

Using subprocess Without List Syntax for Command Arguments

Code passes command as string instead of list: subprocess.run('ls ' + directory) without shell=True still risky. While safer than shell=True, incorrect usage patterns create vulnerabilities. String concatenation with user input builds unsafe commands. List syntax subprocess.run(['ls', directory]) required for safety.

Constructing Commands Through String Formatting or Concatenation

Building command strings: cmd = f'convert {filename} output.png'; subprocess.run(cmd, shell=True). Format operations don't sanitize shell metacharacters. User input filename=$(whoami).png enables command substitution. String operations fail to prevent injection, require list-based argument passing.

Using subprocess.call, subprocess.Popen, or os Functions with User Input

Legacy subprocess functions vulnerable: subprocess.call(command, shell=True) or subprocess.Popen(cmd, shell=True). Also os.system(), os.popen(). All execute shell by default or with shell parameter. Any user input in command string enables injection through shell metacharacter interpretation.

Insufficient Validation Before Subprocess Execution

Weak validation attempts: if ';' not in command: subprocess.run(command, shell=True). Blocklists miss && || ` $() newlines. Allowlist validation fails with argument injection: -o ProxyCommand. Even with validation, shell=True remains dangerous. Validation insufficient without proper subprocess usage patterns.

Fixes

1

Always Use subprocess.run() with List Arguments, Never shell=True

Pass arguments as list: subprocess.run(['ping', '-c', '4', host], shell=False, check=True, timeout=10). Each list element is separate argument, no shell interpretation. Set shell=False explicitly. Use check=True for error handling. capture_output=True for output. Prevents all command injection.

2

Validate Input Against Strict Allowlists

For limited commands, use allowlist: ALLOWED_HOSTS = ['example.com', 'test.com']; if host not in ALLOWED_HOSTS: abort(400). Validate format: re.match(r'^[a-z0-9.-]+$', host). Reject unexpected input. Combine allowlist validation with subprocess list syntax for defense-in-depth protection.

3

Use Python Libraries Instead of Subprocess Commands

Replace shell commands with native libraries: requests instead of curl, zipfile instead of unzip, Pillow instead of imagemagick, gitpython instead of git. Libraries provide safe APIs without shell. Eliminate command injection attack surface by avoiding subprocess entirely for common operations.

4

Implement Indirect References for Command Selection

Map user input to predefined commands: COMMANDS = {'backup': ['tar', '-czf', 'backup.tar.gz', '/data'], 'restore': ['tar', '-xzf']}; cmd = COMMANDS.get(request.args['action']). Users select from safe options. Application controls command components completely. No user data in command construction.

5

Use shlex.quote() Only as Defense-in-Depth, Not Primary Protection

If shell required, escape arguments: from shlex import quote; subprocess.run(f'ping {quote(host)}', shell=True). quote() escapes shell metacharacters. However, prefer subprocess list syntax as primary protection. shlex.quote() secondary measure only. Never trust escaping alone without list syntax.

6

Run Subprocesses with Least Privilege and Resource Limits

Execute with reduced privileges and constraints: subprocess.run(cmd, user='nobody', timeout=10, env={'PATH': '/usr/bin'}). Limit environment variables. Set resource limits. Use containers or sandboxing. Drop capabilities. Even with injection, privilege separation and sandboxing limit damage scope.

Detect This Vulnerability in Your Code

Sourcery automatically identifies flask command injection via subprocess module and many other security issues in your codebase.