Django SQL Injection via Raw SQL Queries

Critical Risk SQL Injection
djangopythonsql-injectionraw-sqldatabaseuser-input

What it is

The Django application uses raw SQL queries with user-controlled input, creating SQL injection vulnerabilities. When user input is directly incorporated into raw SQL statements without proper parameterization, attackers can inject malicious SQL code to manipulate database queries, potentially accessing unauthorized data, modifying database contents, executing arbitrary database commands, or escalating privileges.

# Vulnerable: Raw SQL with user input in Django from django.http import JsonResponse from django.views import View from django.db import connection import json # Dangerous: String concatenation in raw SQL class UserSearchView(View): def get(self, request): username = request.GET.get('username', '') email = request.GET.get('email', '') # CRITICAL: SQL injection in raw query with connection.cursor() as cursor: query = f"SELECT * FROM auth_user WHERE username = '{username}' OR email = '{email}'" cursor.execute(query) results = cursor.fetchall() users = [] for row in results: users.append({ 'id': row[0], 'username': row[1], 'email': row[2] }) return JsonResponse({'users': users}) # Another dangerous pattern with formatting def product_search(request): category = request.GET.get('category', '') min_price = request.GET.get('min_price', '0') max_price = request.GET.get('max_price', '1000') order_by = request.GET.get('order', 'name') # Dangerous: Multiple user inputs in SQL query = """ SELECT p.id, p.name, p.price, c.name as category FROM products p JOIN categories c ON p.category_id = c.id WHERE c.name = '{}' AND p.price BETWEEN {} AND {} ORDER BY {} """.format(category, min_price, max_price, order_by) with connection.cursor() as cursor: cursor.execute(query) results = cursor.fetchall() return JsonResponse({'products': list(results)}) # Complex query with user input def advanced_search(request): search_data = json.loads(request.body) table_name = search_data.get('table', '') columns = search_data.get('columns', []) conditions = search_data.get('conditions', []) # Dangerous: User controls table, columns, and conditions column_list = ', '.join(columns) condition_clauses = [] for condition in conditions: field = condition['field'] operator = condition['operator'] value = condition['value'] condition_clauses.append(f"{field} {operator} '{value}'") where_clause = ' AND '.join(condition_clauses) query = f"SELECT {column_list} FROM {table_name} WHERE {where_clause}" with connection.cursor() as cursor: cursor.execute(query) results = cursor.fetchall() return JsonResponse({'data': list(results)}) # Report generation with user input def generate_report(request): report_type = request.POST.get('report', '') date_from = request.POST.get('from', '') date_to = request.POST.get('to', '') group_by = request.POST.get('group_by', '') # Dangerous: Dynamic SQL construction if report_type == 'sales': query = f""" SELECT {group_by}, SUM(amount) as total FROM sales WHERE date >= '{date_from}' AND date <= '{date_to}' GROUP BY {group_by} """ elif report_type == 'users': query = f""" SELECT {group_by}, COUNT(*) as count FROM users WHERE created_at >= '{date_from}' AND created_at <= '{date_to}' GROUP BY {group_by} """ with connection.cursor() as cursor: cursor.execute(query) results = cursor.fetchall() return JsonResponse({'report': list(results)}) # Authentication bypass def custom_login(request): username = request.POST.get('username', '') password = request.POST.get('password', '') # Dangerous: Authentication check with raw SQL query = f""" SELECT id, username, is_staff FROM auth_user WHERE username = '{username}' AND password = '{password}' """ with connection.cursor() as cursor: cursor.execute(query) user_data = cursor.fetchone() if user_data: return JsonResponse({'success': True, 'user_id': user_data[0]}) else: return JsonResponse({'success': False}) # Data export with user-controlled query def export_data(request): export_query = request.POST.get('query', '') format_type = request.POST.get('format', 'json') # Dangerous: User provides entire SQL query with connection.cursor() as cursor: try: cursor.execute(export_query) results = cursor.fetchall() if format_type == 'json': return JsonResponse({'data': list(results)}) except Exception as e: return JsonResponse({'error': str(e)})
# Secure: Safe raw SQL usage in Django from django.http import JsonResponse from django.views import View from django.db import connection from django.core.exceptions import ValidationError import json import re # Safe: Parameterized queries class SafeUserSearchView(View): def get(self, request): username = request.GET.get('username', '') email = request.GET.get('email', '') try: # Validate inputs validated_data = self.validate_search_params(username, email) # Safe: Parameterized query users = self.search_users_safely(validated_data) return JsonResponse({'users': users}) except ValidationError as e: return JsonResponse({'error': str(e)}, status=400) def validate_search_params(self, username, email): # Validate username if username and (len(username) > 150 or not re.match(r'^[a-zA-Z0-9_.-]+$', username)): raise ValidationError('Invalid username format') # Validate email if email and (len(email) > 254 or '@' not in email): raise ValidationError('Invalid email format') return {'username': username, 'email': email} def search_users_safely(self, validated_data): with connection.cursor() as cursor: # Safe: Parameterized query query = """ SELECT id, username, email, first_name, last_name FROM auth_user WHERE (%s = '' OR username = %s) AND (%s = '' OR email = %s) LIMIT 100 """ cursor.execute(query, [ validated_data['username'], validated_data['username'], validated_data['email'], validated_data['email'] ]) results = cursor.fetchall() users = [] for row in results: users.append({ 'id': row[0], 'username': row[1], 'email': row[2], 'full_name': f"{row[3]} {row[4]}".strip() }) return users # Safe: Product search with validation def safe_product_search(request): category = request.GET.get('category', '') min_price = request.GET.get('min_price', '0') max_price = request.GET.get('max_price', '1000') order_by = request.GET.get('order', 'name') try: # Validate all inputs validated_data = validate_product_search_params(category, min_price, max_price, order_by) # Search products safely products = search_products_safely(validated_data) return JsonResponse({'products': products}) except ValidationError as e: return JsonResponse({'error': str(e)}, status=400) def validate_product_search_params(category, min_price, max_price, order_by): # Validate category allowed_categories = ['electronics', 'clothing', 'books', 'home', 'sports'] if category and category not in allowed_categories: raise ValidationError('Invalid category') # Validate prices try: min_price = float(min_price) if min_price else 0 max_price = float(max_price) if max_price else 10000 if min_price < 0 or max_price < 0 or min_price > max_price: raise ValidationError('Invalid price range') except ValueError: raise ValidationError('Invalid price format') # Validate order field allowed_order_fields = ['name', 'price', 'created_at', 'category'] if order_by not in allowed_order_fields: raise ValidationError('Invalid sort field') return { 'category': category, 'min_price': min_price, 'max_price': max_price, 'order_by': order_by } def search_products_safely(validated_data): with connection.cursor() as cursor: # Safe: Parameterized query with validated order field base_query = """ SELECT p.id, p.name, p.price, c.name as category, p.description FROM products p JOIN categories c ON p.category_id = c.id WHERE (%s = '' OR c.name = %s) AND p.price BETWEEN %s AND %s """ # Safe: Validated order field (allowlist checked) order_field = validated_data['order_by'] if order_field == 'category': order_field = 'c.name' else: order_field = f'p.{order_field}' query = f"{base_query} ORDER BY {order_field} LIMIT 100" cursor.execute(query, [ validated_data['category'], validated_data['category'], validated_data['min_price'], validated_data['max_price'] ]) results = cursor.fetchall() products = [] for row in results: products.append({ 'id': row[0], 'name': row[1], 'price': float(row[2]), 'category': row[3], 'description': row[4][:200] if row[4] else '' # Limit description }) return products # Safe: Advanced search with structured parameters def safe_advanced_search(request): try: search_data = json.loads(request.body) # Validate search structure validated_search = validate_advanced_search(search_data) # Execute safe search results = execute_safe_advanced_search(validated_search) return JsonResponse({'data': results}) except (json.JSONDecodeError, ValidationError) as e: return JsonResponse({'error': 'Invalid search parameters'}, status=400) def validate_advanced_search(search_data): # Validate table allowed_tables = ['products', 'users', 'orders'] table = search_data.get('table', '') if table not in allowed_tables: raise ValidationError('Table not allowed') # Validate columns allowed_columns = { 'products': ['id', 'name', 'price', 'category', 'description'], 'users': ['id', 'username', 'email', 'first_name', 'last_name'], 'orders': ['id', 'user_id', 'total', 'status', 'created_at'] } columns = search_data.get('columns', []) table_columns = allowed_columns[table] validated_columns = [] for col in columns: if col in table_columns: validated_columns.append(col) if not validated_columns: validated_columns = ['id'] # Default safe column # Validate conditions conditions = search_data.get('conditions', []) validated_conditions = [] for condition in conditions: if not isinstance(condition, dict): continue field = condition.get('field', '') operator = condition.get('operator', '') value = condition.get('value', '') # Validate field if field not in table_columns: continue # Validate operator allowed_operators = ['=', '!=', '>', '<', '>=', '<=', 'LIKE'] if operator not in allowed_operators: continue # Validate value if isinstance(value, (str, int, float)) and len(str(value)) <= 100: validated_conditions.append({ 'field': field, 'operator': operator, 'value': value }) return { 'table': table, 'columns': validated_columns, 'conditions': validated_conditions } def execute_safe_advanced_search(search_params): table = search_params['table'] columns = search_params['columns'] conditions = search_params['conditions'] # Build safe query column_list = ', '.join(columns) # Build WHERE clause with parameters where_parts = [] params = [] for condition in conditions: where_parts.append(f"{condition['field']} {condition['operator']} %s") params.append(condition['value']) where_clause = ' AND '.join(where_parts) if where_parts else '1=1' query = f"SELECT {column_list} FROM {table} WHERE {where_clause} LIMIT 100" with connection.cursor() as cursor: cursor.execute(query, params) results = cursor.fetchall() columns_names = [desc[0] for desc in cursor.description] # Convert to list of dictionaries data = [] for row in results: row_dict = dict(zip(columns_names, row)) data.append(row_dict) return data # Safe: Report generation with predefined queries def safe_generate_report(request): report_type = request.POST.get('report', '') date_from = request.POST.get('from', '') date_to = request.POST.get('to', '') try: # Validate inputs validated_data = validate_report_params(report_type, date_from, date_to) # Generate report safely report_data = generate_report_safely(validated_data) return JsonResponse({'report': report_data}) except ValidationError as e: return JsonResponse({'error': str(e)}, status=400) def validate_report_params(report_type, date_from, date_to): # Validate report type allowed_reports = ['sales_by_category', 'user_registrations', 'order_summary'] if report_type not in allowed_reports: raise ValidationError('Invalid report type') # Validate dates from datetime import datetime try: if date_from: from_date = datetime.strptime(date_from, '%Y-%m-%d').date() else: from_date = None if date_to: to_date = datetime.strptime(date_to, '%Y-%m-%d').date() else: to_date = None if from_date and to_date and from_date > to_date: raise ValidationError('Invalid date range') except ValueError: raise ValidationError('Invalid date format. Use YYYY-MM-DD') return { 'report_type': report_type, 'date_from': from_date, 'date_to': to_date } def generate_report_safely(validated_data): report_type = validated_data['report_type'] date_from = validated_data['date_from'] date_to = validated_data['date_to'] with connection.cursor() as cursor: if report_type == 'sales_by_category': query = """ SELECT c.name as category, COUNT(o.id) as order_count, SUM(o.total) as total_sales FROM orders o JOIN order_items oi ON o.id = oi.order_id JOIN products p ON oi.product_id = p.id JOIN categories c ON p.category_id = c.id WHERE (%s IS NULL OR DATE(o.created_at) >= %s) AND (%s IS NULL OR DATE(o.created_at) <= %s) GROUP BY c.id, c.name ORDER BY total_sales DESC LIMIT 50 """ cursor.execute(query, [date_from, date_from, date_to, date_to]) elif report_type == 'user_registrations': query = """ SELECT DATE(date_joined) as registration_date, COUNT(*) as new_users FROM auth_user WHERE (%s IS NULL OR DATE(date_joined) >= %s) AND (%s IS NULL OR DATE(date_joined) <= %s) GROUP BY DATE(date_joined) ORDER BY registration_date DESC LIMIT 100 """ cursor.execute(query, [date_from, date_from, date_to, date_to]) elif report_type == 'order_summary': query = """ SELECT DATE(created_at) as order_date, COUNT(*) as order_count, AVG(total) as avg_order_value, SUM(total) as total_revenue FROM orders WHERE (%s IS NULL OR DATE(created_at) >= %s) AND (%s IS NULL OR DATE(created_at) <= %s) GROUP BY DATE(created_at) ORDER BY order_date DESC LIMIT 100 """ cursor.execute(query, [date_from, date_from, date_to, date_to]) results = cursor.fetchall() columns = [desc[0] for desc in cursor.description] # Convert to list of dictionaries report_data = [] for row in results: row_dict = dict(zip(columns, row)) # Convert Decimal to float for JSON serialization for key, value in row_dict.items(): if hasattr(value, '__float__'): row_dict[key] = float(value) report_data.append(row_dict) return report_data # Safe: Authentication with parameterized queries def safe_custom_login(request): username = request.POST.get('username', '') password = request.POST.get('password', '') try: # Validate inputs if not username or not password: raise ValidationError('Username and password required') if len(username) > 150 or len(password) > 128: raise ValidationError('Invalid credentials') # Safe: Use Django's built-in authentication from django.contrib.auth import authenticate user = authenticate(username=username, password=password) if user: return JsonResponse({ 'success': True, 'user_id': user.id, 'username': user.username }) else: return JsonResponse({'success': False, 'error': 'Invalid credentials'}) except ValidationError as e: return JsonResponse({'success': False, 'error': str(e)})

💡 Why This Fix Works

See fix suggestions for detailed explanation.

Why it happens

Applications build SQL with f-strings or concatenation: query = f"SELECT * FROM users WHERE name='{username}'"; cursor.execute(query). User input inserted directly enables SQL injection through quotes, UNION attacks, or comment injection.

Root causes

Using String Concatenation or F-Strings in Raw SQL Queries

Applications build SQL with f-strings or concatenation: query = f"SELECT * FROM users WHERE name='{username}'"; cursor.execute(query). User input inserted directly enables SQL injection through quotes, UNION attacks, or comment injection.

Missing Parameterization in cursor.execute() Calls

Code calls cursor.execute() with interpolated strings instead of parameters: cursor.execute("SELECT * FROM products WHERE id=" + product_id). Missing parameterization allows injection through numeric or string fields with crafted input.

Direct Inclusion of User Input in SQL Statements

Views insert request.GET or request.POST directly into queries: query = "DELETE FROM items WHERE id=" + request.GET['id']; cursor.execute(query). Complete control over SQL syntax enables data exfiltration or modification.

Unsafe String Interpolation in Raw Queries with format()

Using .format() for SQL construction: query = "UPDATE users SET role='{}' WHERE id={}".format(role, user_id); cursor.execute(query). Format string injection allows breaking out of quotes and injecting arbitrary SQL commands.

Bypassing Django's ORM Safety with Raw SQL

Developers use raw SQL to avoid ORM limitations but forget parameterization: with connection.cursor() as c: c.execute("SELECT * FROM " + table_name). Dynamic table/column names from user input enable injection.

Fixes

1

Always Use Parameterized Queries with cursor.execute()

Pass parameters as second argument: cursor.execute("SELECT * FROM users WHERE id=%s", [user_id]). Database driver handles escaping. Use %s placeholders, never interpolate strings. Prevents all SQL injection through proper parameter binding.

2

Never Use String Formatting or Concatenation in SQL Queries

Avoid f-strings, .format(), %, or + in SQL: NO: f"WHERE name='{name}'". YES: cursor.execute("WHERE name=%s", [name]). String operations bypass parameterization protection, enabling injection through crafted input values.

3

Validate and Sanitize All User Input Before Database Operations

Validate data types, formats, lengths before queries. Use Django forms/serializers: user_id = int(request.GET['id']). For strings, check against allowlists. Reject unexpected characters, ensure inputs match expected patterns before SQL execution.

4

Use Django's ORM Methods Instead of Raw SQL When Possible

Prefer User.objects.filter(name=username) over raw SQL. ORM automatically parameterizes queries. Use Q objects for complex logic, annotate() for aggregations, select_related() for joins. Raw SQL only when absolutely necessary.

5

Implement Allowlists for Permitted Columns and Table Names

For dynamic columns/tables, validate against allowlists: ALLOWED_COLS = ['id', 'name']; if col in ALLOWED_COLS: query = f"SELECT {col}...". Parameters can't be used for identifiers, so allowlists are essential.

6

Use Prepared Statements for Complex Queries

For repeated queries, use prepared statements or store procedures. Separate SQL structure from data. Example: cursor.execute("PREPARE stmt FROM 'SELECT * FROM users WHERE id=?'"); cursor.execute("EXECUTE stmt USING %s", [user_id]).

Detect This Vulnerability in Your Code

Sourcery automatically identifies django sql injection via raw sql queries and many other security issues in your codebase.