String Concatenation in Raw Queries
Developers build SQL queries by concatenating user input directly into query strings, bypassing Django's ORM protections and creating injection opportunities.
SQL injection vulnerabilities in Django occur when user input is directly concatenated or interpolated into raw SQL queries without proper parameterization. This allows attackers to manipulate database queries, potentially leading to unauthorized data access, data modification, privilege escalation, and complete database compromise.
# VULNERABLE: Django views with SQL injection flaws
from django.shortcuts import render
from django.http import JsonResponse, HttpResponse
from django.db import connection
from django.contrib.auth.models import User
from django.views.decorators.csrf import csrf_exempt
import json
# VULNERABLE: Direct string concatenation in raw SQL
def vulnerable_user_search(request):
search_term = request.GET.get('q', '')
# VULNERABLE: Direct concatenation allows SQL injection
query = f"SELECT * FROM auth_user WHERE username LIKE '%{search_term}%'"
with connection.cursor() as cursor:
cursor.execute(query) # VULNERABLE: Unsanitized input
results = cursor.fetchall()
return JsonResponse({'users': results})
# VULNERABLE: String formatting in raw queries
def vulnerable_get_user_posts(request, user_id):
# VULNERABLE: String formatting allows injection
query = "SELECT * FROM blog_post WHERE author_id = %s AND status = '%s'" % (
user_id,
request.GET.get('status', 'published')
)
with connection.cursor() as cursor:
cursor.execute(query)
posts = cursor.fetchall()
return JsonResponse({'posts': posts})
# VULNERABLE: Dynamic ORDER BY clause
def vulnerable_list_products(request):
sort_by = request.GET.get('sort', 'name')
order = request.GET.get('order', 'asc')
# VULNERABLE: Dynamic ORDER BY without validation
query = f"SELECT * FROM products ORDER BY {sort_by} {order}"
with connection.cursor() as cursor:
cursor.execute(query)
products = cursor.fetchall()
return JsonResponse({'products': products})
# VULNERABLE: Complex search with multiple injection points
@csrf_exempt
def vulnerable_advanced_search(request):
if request.method == 'POST':
data = json.loads(request.body)
category = data.get('category', '')
min_price = data.get('min_price', 0)
max_price = data.get('max_price', 1000)
search_text = data.get('search', '')
# VULNERABLE: Multiple injection points
query = f"""
SELECT p.*, c.name as category_name
FROM products p
JOIN categories c ON p.category_id = c.id
WHERE c.name = '{category}'
AND p.price BETWEEN {min_price} AND {max_price}
AND p.description LIKE '%{search_text}%'
ORDER BY p.created_at DESC
"""
with connection.cursor() as cursor:
cursor.execute(query)
results = cursor.fetchall()
return JsonResponse({'results': results})
# VULNERABLE: Admin functionality with elevated privileges
def vulnerable_admin_user_management(request):
if not request.user.is_staff:
return JsonResponse({'error': 'Unauthorized'}, status=403)
action = request.POST.get('action')
user_id = request.POST.get('user_id')
# VULNERABLE: Admin operations with string concatenation
if action == 'delete':
query = f"DELETE FROM auth_user WHERE id = {user_id}"
elif action == 'activate':
query = f"UPDATE auth_user SET is_active = 1 WHERE id = {user_id}"
elif action == 'deactivate':
query = f"UPDATE auth_user SET is_active = 0 WHERE id = {user_id}"
else:
return JsonResponse({'error': 'Invalid action'})
with connection.cursor() as cursor:
cursor.execute(query) # VULNERABLE: Could affect multiple users
return JsonResponse({'success': True})
# VULNERABLE: Report generation with dynamic queries
def vulnerable_generate_report(request):
start_date = request.GET.get('start_date')
end_date = request.GET.get('end_date')
department = request.GET.get('department')
# VULNERABLE: Date injection and department filtering
query = f"""
SELECT u.username, p.title, p.created_at
FROM auth_user u
JOIN blog_post p ON u.id = p.author_id
WHERE p.created_at BETWEEN '{start_date}' AND '{end_date}'
AND u.department = '{department}'
"""
with connection.cursor() as cursor:
cursor.execute(query)
report_data = cursor.fetchall()
return JsonResponse({'report': report_data})
# VULNERABLE: Custom authentication bypass
def vulnerable_custom_login(request):
username = request.POST.get('username')
password = request.POST.get('password')
# VULNERABLE: Classic authentication bypass opportunity
query = f"""
SELECT id, username, is_staff
FROM auth_user
WHERE username = '{username}' AND password = '{password}'
"""
with connection.cursor() as cursor:
cursor.execute(query)
user_data = cursor.fetchone()
if user_data:
# Login successful
return JsonResponse({'success': True, 'user': user_data})
else:
return JsonResponse({'error': 'Invalid credentials'})
# VULNERABLE: Bulk operations
def vulnerable_bulk_update(request):
if request.method == 'POST':
updates = json.loads(request.body).get('updates', [])
for update in updates:
user_id = update.get('id')
field = update.get('field')
value = update.get('value')
# VULNERABLE: Dynamic field updates
query = f"UPDATE auth_user SET {field} = '{value}' WHERE id = {user_id}"
with connection.cursor() as cursor:
cursor.execute(query)
return JsonResponse({'success': True})# SECURE: Django views with proper SQL injection protection
from django.shortcuts import render
from django.http import JsonResponse, HttpResponse
from django.db import connection
from django.contrib.auth.models import User
from django.views.decorators.csrf import csrf_exempt
from django.core.exceptions import ValidationError
from django.db.models import Q, Count, Avg
from django.contrib.auth import authenticate
from django.utils import timezone
from datetime import datetime
import json
import re
# SECURE: Parameterized raw SQL queries
def secure_user_search(request):
search_term = request.GET.get('q', '').strip()
# Input validation
if len(search_term) > 50:
return JsonResponse({'error': 'Search term too long'}, status=400)
# SECURE: Parameterized query with %s placeholder
query = "SELECT id, username, email, first_name, last_name FROM auth_user WHERE username LIKE %s"
search_param = f'%{search_term}%'
with connection.cursor() as cursor:
cursor.execute(query, [search_param]) # SECURE: Parameters separate
columns = [col[0] for col in cursor.description]
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
return JsonResponse({'users': results})
# SECURE: Better approach using Django ORM
def secure_user_search_orm(request):
search_term = request.GET.get('q', '').strip()
if len(search_term) > 50:
return JsonResponse({'error': 'Search term too long'}, status=400)
# SECURE: Django ORM with built-in protection
users = User.objects.filter(
Q(username__icontains=search_term) |
Q(first_name__icontains=search_term) |
Q(last_name__icontains=search_term)
).values('id', 'username', 'email', 'first_name', 'last_name')[:50]
return JsonResponse({'users': list(users)})
# SECURE: Parameterized queries with validation
def secure_get_user_posts(request, user_id):
# Input validation
try:
user_id = int(user_id)
except (ValueError, TypeError):
return JsonResponse({'error': 'Invalid user ID'}, status=400)
status = request.GET.get('status', 'published')
# Validate status parameter
valid_statuses = ['published', 'draft', 'pending', 'archived']
if status not in valid_statuses:
return JsonResponse({'error': 'Invalid status'}, status=400)
# SECURE: Parameterized query
query = """
SELECT id, title, content, created_at, status
FROM blog_post
WHERE author_id = %s AND status = %s
ORDER BY created_at DESC
"""
with connection.cursor() as cursor:
cursor.execute(query, [user_id, status])
columns = [col[0] for col in cursor.description]
posts = [dict(zip(columns, row)) for row in cursor.fetchall()]
return JsonResponse({'posts': posts})
# SECURE: Safe dynamic sorting
def secure_list_products(request):
sort_by = request.GET.get('sort', 'name')
order = request.GET.get('order', 'asc')
# SECURE: Whitelist allowed sort fields
allowed_sort_fields = {
'name': 'name',
'price': 'price',
'created': 'created_at',
'category': 'category_id'
}
# SECURE: Validate sort field
if sort_by not in allowed_sort_fields:
sort_by = 'name'
# SECURE: Validate order direction
if order.lower() not in ['asc', 'desc']:
order = 'asc'
# SECURE: Build safe ORDER BY clause
order_clause = f"{allowed_sort_fields[sort_by]} {order.upper()}"
# SECURE: Safe query construction
query = f"SELECT id, name, price, category_id, created_at FROM products ORDER BY {order_clause}"
with connection.cursor() as cursor:
cursor.execute(query) # Safe because order_clause is validated
columns = [col[0] for col in cursor.description]
products = [dict(zip(columns, row)) for row in cursor.fetchall()]
return JsonResponse({'products': products})
# SECURE: Better ORM-based sorting
def secure_list_products_orm(request):
sort_by = request.GET.get('sort', 'name')
order = request.GET.get('order', 'asc')
# SECURE: Whitelist and map to model fields
sort_mapping = {
'name': 'name',
'price': 'price',
'created': 'created_at',
'category': 'category__name'
}
sort_field = sort_mapping.get(sort_by, 'name')
if order.lower() == 'desc':
sort_field = f'-{sort_field}'
# SECURE: Django ORM handles escaping
products = Product.objects.select_related('category').order_by(sort_field).values(
'id', 'name', 'price', 'category__name', 'created_at'
)[:100]
return JsonResponse({'products': list(products)})
# SECURE: Complex search with proper parameterization
@csrf_exempt
def secure_advanced_search(request):
if request.method != 'POST':
return JsonResponse({'error': 'Method not allowed'}, status=405)
try:
data = json.loads(request.body)
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
# SECURE: Input validation and sanitization
category = data.get('category', '').strip()
search_text = data.get('search', '').strip()
try:
min_price = float(data.get('min_price', 0))
max_price = float(data.get('max_price', 1000))
except (ValueError, TypeError):
return JsonResponse({'error': 'Invalid price range'}, status=400)
# Validate inputs
if len(category) > 50 or len(search_text) > 100:
return JsonResponse({'error': 'Search parameters too long'}, status=400)
if min_price < 0 or max_price < min_price or max_price > 10000:
return JsonResponse({'error': 'Invalid price range'}, status=400)
# SECURE: Parameterized query with all values passed separately
query = """
SELECT p.id, p.name, p.price, p.description, c.name as category_name
FROM products p
JOIN categories c ON p.category_id = c.id
WHERE c.name = %s
AND p.price BETWEEN %s AND %s
AND p.description LIKE %s
ORDER BY p.created_at DESC
LIMIT 100
"""
search_param = f'%{search_text}%'
with connection.cursor() as cursor:
cursor.execute(query, [category, min_price, max_price, search_param])
columns = [col[0] for col in cursor.description]
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
return JsonResponse({'results': results})
# SECURE: ORM-based advanced search
def secure_advanced_search_orm(request):
if request.method != 'POST':
return JsonResponse({'error': 'Method not allowed'}, status=405)
try:
data = json.loads(request.body)
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
# Input validation (same as above)
category = data.get('category', '').strip()
search_text = data.get('search', '').strip()
try:
min_price = float(data.get('min_price', 0))
max_price = float(data.get('max_price', 1000))
except (ValueError, TypeError):
return JsonResponse({'error': 'Invalid price range'}, status=400)
# SECURE: Django ORM with Q objects
products = Product.objects.select_related('category').filter(
category__name=category,
price__gte=min_price,
price__lte=max_price,
description__icontains=search_text
).values(
'id', 'name', 'price', 'description', 'category__name'
).order_by('-created_at')[:100]
return JsonResponse({'results': list(products)})
# SECURE: Admin functionality with proper validation
def secure_admin_user_management(request):
if not request.user.is_staff:
return JsonResponse({'error': 'Unauthorized'}, status=403)
if request.method != 'POST':
return JsonResponse({'error': 'Method not allowed'}, status=405)
action = request.POST.get('action')
user_id = request.POST.get('user_id')
# Input validation
try:
user_id = int(user_id)
except (ValueError, TypeError):
return JsonResponse({'error': 'Invalid user ID'}, status=400)
# Validate action
valid_actions = ['delete', 'activate', 'deactivate']
if action not in valid_actions:
return JsonResponse({'error': 'Invalid action'}, status=400)
# Check if user exists
try:
user = User.objects.get(id=user_id)
except User.DoesNotExist:
return JsonResponse({'error': 'User not found'}, status=404)
# Prevent self-modification
if user.id == request.user.id:
return JsonResponse({'error': 'Cannot modify own account'}, status=400)
# SECURE: Use Django ORM for safe operations
if action == 'delete':
user.delete()
elif action == 'activate':
user.is_active = True
user.save()
elif action == 'deactivate':
user.is_active = False
user.save()
return JsonResponse({'success': True, 'message': f'User {action}d successfully'})
# SECURE: Report generation with proper parameterization
def secure_generate_report(request):
start_date = request.GET.get('start_date')
end_date = request.GET.get('end_date')
department = request.GET.get('department')
# SECURE: Date validation
try:
start_date = datetime.strptime(start_date, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date, '%Y-%m-%d').date()
except (ValueError, TypeError):
return JsonResponse({'error': 'Invalid date format. Use YYYY-MM-DD'}, status=400)
if start_date > end_date:
return JsonResponse({'error': 'Start date must be before end date'}, status=400)
# Validate department
if not department or len(department) > 50:
return JsonResponse({'error': 'Invalid department'}, status=400)
# SECURE: Parameterized query
query = """
SELECT u.username, p.title, p.created_at
FROM auth_user u
JOIN blog_post p ON u.id = p.author_id
WHERE p.created_at::date BETWEEN %s AND %s
AND u.department = %s
ORDER BY p.created_at DESC
LIMIT 1000
"""
with connection.cursor() as cursor:
cursor.execute(query, [start_date, end_date, department])
columns = [col[0] for col in cursor.description]
report_data = [dict(zip(columns, row)) for row in cursor.fetchall()]
return JsonResponse({'report': report_data})
# SECURE: Proper authentication implementation
def secure_custom_login(request):
if request.method != 'POST':
return JsonResponse({'error': 'Method not allowed'}, status=405)
username = request.POST.get('username', '').strip()
password = request.POST.get('password', '')
# Input validation
if not username or not password:
return JsonResponse({'error': 'Username and password required'}, status=400)
if len(username) > 150 or len(password) > 128:
return JsonResponse({'error': 'Invalid credentials'}, status=400)
# SECURE: Use Django's built-in authentication
user = authenticate(request, username=username, password=password)
if user is not None and user.is_active:
# Login successful
from django.contrib.auth import login
login(request, user)
return JsonResponse({
'success': True,
'user': {
'id': user.id,
'username': user.username,
'is_staff': user.is_staff
}
})
else:
return JsonResponse({'error': 'Invalid credentials'}, status=401)
# SECURE: Safe bulk operations
def secure_bulk_update(request):
if not request.user.is_staff:
return JsonResponse({'error': 'Unauthorized'}, status=403)
if request.method != 'POST':
return JsonResponse({'error': 'Method not allowed'}, status=405)
try:
data = json.loads(request.body)
updates = data.get('updates', [])
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
# Limit number of updates
if len(updates) > 100:
return JsonResponse({'error': 'Too many updates'}, status=400)
# SECURE: Whitelist allowed fields
allowed_fields = {
'first_name': 'first_name',
'last_name': 'last_name',
'email': 'email',
'is_active': 'is_active'
}
successful_updates = 0
errors = []
for i, update in enumerate(updates):
try:
user_id = int(update.get('id'))
field = update.get('field')
value = update.get('value')
# Validate field
if field not in allowed_fields:
errors.append(f'Update {i}: Invalid field {field}')
continue
# Get user
user = User.objects.get(id=user_id)
# SECURE: Use setattr with validated field
setattr(user, allowed_fields[field], value)
user.full_clean() # Validate model
user.save()
successful_updates += 1
except (ValueError, TypeError, User.DoesNotExist, ValidationError) as e:
errors.append(f'Update {i}: {str(e)}')
continue
return JsonResponse({
'success': True,
'updated': successful_updates,
'errors': errors
})
# SECURE: Utility functions for validation
def validate_sql_identifier(identifier):
"""Validate SQL identifiers (table/column names)"""
if not identifier or not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', identifier):
return False
return len(identifier) <= 63 # PostgreSQL limit
def sanitize_search_term(term):
"""Sanitize search terms"""
if not term:
return ''
# Remove SQL special characters but keep alphanumeric and spaces
return re.sub(r'[^\w\s-]', '', term).strip()[:100]
# Example models (for reference)
class Product(models.Model):
name = models.CharField(max_length=100)
price = models.DecimalField(max_digits=10, decimal_places=2)
description = models.TextField()
category = models.ForeignKey('Category', on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
class Category(models.Model):
name = models.CharField(max_length=50)
class BlogPost(models.Model):
title = models.CharField(max_length=200)
content = models.TextField()
author = models.ForeignKey(User, on_delete=models.CASCADE)
status = models.CharField(max_length=20, default='draft')
created_at = models.DateTimeField(auto_now_add=True)The vulnerable examples show various SQL injection scenarios in Django including string concatenation, format string usage, and dynamic query construction. The secure alternatives demonstrate proper parameterization using %s placeholders, input validation, whitelisting for dynamic elements, and leveraging Django's ORM for built-in protection. The secure code also includes comprehensive error handling, input sanitization, and proper authentication mechanisms.
Developers build SQL queries by concatenating user input directly into query strings, bypassing Django's ORM protections and creating injection opportunities.
Sourcery automatically identifies sql injection via raw sql queries in django applications and many other security issues in your codebase.