# Vulnerable: Django ORM extra() with user input
from django.http import JsonResponse
from django.views import View
from .models import Product, User, Order
# Dangerous: User input in extra() where clause
class ProductSearchView(View):
def get(self, request):
search_term = request.GET.get('search', '')
sort_field = request.GET.get('sort', 'name')
category = request.GET.get('category', '')
# CRITICAL: SQL injection in extra() where
products = Product.objects.extra(
where=[f"name LIKE '%{search_term}%' AND category = '{category}'"],
order_by=[sort_field]
)
results = [{'id': p.id, 'name': p.name} for p in products]
return JsonResponse({'products': results})
# Another dangerous pattern with select
def user_statistics(request):
user_field = request.GET.get('field', 'username')
calculation = request.GET.get('calc', 'COUNT(*)')
# Dangerous: User input in extra() select
users = User.objects.extra(
select={
'user_field': user_field,
'calculated_value': calculation
}
)
results = []
for user in users:
results.append({
'user': user.user_field,
'value': user.calculated_value
})
return JsonResponse({'statistics': results})
# Complex query with user input
def advanced_report(request):
date_condition = request.POST.get('date_filter', '')
join_table = request.POST.get('join_table', '')
select_fields = request.POST.get('fields', '')
# Dangerous: Multiple user inputs in extra()
orders = Order.objects.extra(
select=select_fields.split(','),
tables=[join_table],
where=[f"DATE(created_at) {date_condition}"]
)
return JsonResponse({'orders': list(orders.values())})
# Aggregation with user input
def custom_aggregation(request):
group_by = request.GET.get('group_by', '')
having_clause = request.GET.get('having', '')
# Dangerous: User-controlled GROUP BY and HAVING
products = Product.objects.extra(
select={'group_field': group_by},
where=[f"1=1 GROUP BY {group_by} HAVING {having_clause}"]
)
return JsonResponse({'results': list(products.values())})
# Dynamic filtering
def dynamic_filter(request):
filter_expr = request.POST.get('filter', '')
order_expr = request.POST.get('order', '')
# Dangerous: Complete SQL expressions from user
queryset = Product.objects.extra(
where=[filter_expr],
order_by=[order_expr]
)
return JsonResponse({'data': list(queryset.values())})
# Report generation with joins
def generate_report(request):
table_name = request.GET.get('table', '')
join_condition = request.GET.get('join_on', '')
select_clause = request.GET.get('select', '')
# Dangerous: User controls all query parts
results = Order.objects.extra(
select={select_clause: select_clause},
tables=[table_name],
where=[f"orders.id = {table_name}.{join_condition}"]
)
return JsonResponse({'report': list(results.values())})
# Secure: Safe Django ORM usage without SQL injection
from django.http import JsonResponse
from django.views import View
from django.db.models import Q, F, Count, Avg
from django.core.exceptions import ValidationError
from .models import Product, User, Order
import re
# Safe: Parameterized queries and validation
class SafeProductSearchView(View):
def get(self, request):
search_term = request.GET.get('search', '')
sort_field = request.GET.get('sort', 'name')
category = request.GET.get('category', '')
try:
# Validate inputs
validated_data = self.validate_search_params(search_term, sort_field, category)
# Safe: Use Django ORM methods
products = self.search_products_safely(validated_data)
results = [{'id': p.id, 'name': p.name, 'category': p.category} for p in products]
return JsonResponse({'products': results})
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_search_params(self, search_term, sort_field, category):
# Validate search term
if len(search_term) > 100:
raise ValidationError('Search term too long')
# Remove dangerous characters
clean_search = re.sub(r'[^a-zA-Z0-9\s-]', '', search_term)
# Validate sort field
allowed_sort_fields = ['name', 'price', 'created_at', 'category']
if sort_field not in allowed_sort_fields:
raise ValidationError('Invalid sort field')
# Validate category
allowed_categories = ['electronics', 'clothing', 'books', 'home']
if category and category not in allowed_categories:
raise ValidationError('Invalid category')
return {
'search_term': clean_search,
'sort_field': sort_field,
'category': category
}
def search_products_safely(self, validated_data):
queryset = Product.objects.all()
# Safe: Django ORM filtering
if validated_data['search_term']:
queryset = queryset.filter(
Q(name__icontains=validated_data['search_term']) |
Q(description__icontains=validated_data['search_term'])
)
if validated_data['category']:
queryset = queryset.filter(category=validated_data['category'])
# Safe: Validated sorting
queryset = queryset.order_by(validated_data['sort_field'])
return queryset[:100] # Limit results
# Safe: Statistics without extra()
def safe_user_statistics(request):
statistic_type = request.GET.get('type', '')
try:
# Validate statistic type
validated_type = validate_statistic_type(statistic_type)
# Safe: Predefined statistics
statistics = calculate_safe_statistics(validated_type)
return JsonResponse({'statistics': statistics})
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_statistic_type(stat_type):
allowed_types = ['user_count', 'active_users', 'new_users_today', 'order_summary']
if stat_type not in allowed_types:
raise ValidationError('Invalid statistic type')
return stat_type
def calculate_safe_statistics(stat_type):
if stat_type == 'user_count':
return {'total_users': User.objects.count()}
elif stat_type == 'active_users':
return {'active_users': User.objects.filter(is_active=True).count()}
elif stat_type == 'new_users_today':
from datetime import date
return {'new_today': User.objects.filter(date_joined__date=date.today()).count()}
elif stat_type == 'order_summary':
return {
'total_orders': Order.objects.count(),
'avg_order_value': Order.objects.aggregate(avg=Avg('total'))['avg'] or 0
}
# Safe: Advanced reporting with ORM
def safe_advanced_report(request):
report_type = request.POST.get('report_type', '')
date_from = request.POST.get('date_from', '')
date_to = request.POST.get('date_to', '')
try:
# Validate inputs
validated_data = validate_report_params(report_type, date_from, date_to)
# Generate report safely
report_data = generate_safe_report(validated_data)
return JsonResponse({'report': report_data})
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_report_params(report_type, date_from, date_to):
# Validate report type
allowed_reports = ['sales', 'inventory', 'user_activity']
if report_type not in allowed_reports:
raise ValidationError('Invalid report type')
# Validate dates
from datetime import datetime
try:
if date_from:
from_date = datetime.strptime(date_from, '%Y-%m-%d').date()
else:
from_date = None
if date_to:
to_date = datetime.strptime(date_to, '%Y-%m-%d').date()
else:
to_date = None
except ValueError:
raise ValidationError('Invalid date format')
return {
'report_type': report_type,
'date_from': from_date,
'date_to': to_date
}
def generate_safe_report(data):
queryset = Order.objects.all()
# Safe: Date filtering
if data['date_from']:
queryset = queryset.filter(created_at__date__gte=data['date_from'])
if data['date_to']:
queryset = queryset.filter(created_at__date__lte=data['date_to'])
if data['report_type'] == 'sales':
return {
'total_orders': queryset.count(),
'total_revenue': queryset.aggregate(total=Sum('total'))['total'] or 0,
'avg_order_value': queryset.aggregate(avg=Avg('total'))['avg'] or 0
}
elif data['report_type'] == 'inventory':
# Safe: Product inventory report
products = Product.objects.annotate(
order_count=Count('order_items')
).values('name', 'stock_quantity', 'order_count')
return {'inventory': list(products[:100])}
elif data['report_type'] == 'user_activity':
# Safe: User activity report
users = User.objects.annotate(
order_count=Count('orders')
).filter(order_count__gt=0).values('username', 'order_count')
return {'user_activity': list(users[:100])}
# Safe: Dynamic filtering with allowlists
def safe_dynamic_filter(request):
filter_field = request.POST.get('field', '')
filter_value = request.POST.get('value', '')
filter_operator = request.POST.get('operator', 'exact')
try:
# Validate filter parameters
validated_filter = validate_filter_params(filter_field, filter_value, filter_operator)
# Apply safe filtering
products = apply_safe_filter(validated_filter)
return JsonResponse({'data': list(products.values('id', 'name', 'price'))})
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_filter_params(field, value, operator):
# Validate field
allowed_fields = ['name', 'category', 'price', 'stock_quantity']
if field not in allowed_fields:
raise ValidationError('Invalid filter field')
# Validate operator
allowed_operators = ['exact', 'icontains', 'gte', 'lte', 'in']
if operator not in allowed_operators:
raise ValidationError('Invalid filter operator')
# Validate value based on field type
if field in ['price', 'stock_quantity']:
try:
value = float(value)
except ValueError:
raise ValidationError('Invalid numeric value')
elif field == 'category':
allowed_categories = ['electronics', 'clothing', 'books', 'home']
if value not in allowed_categories:
raise ValidationError('Invalid category')
return {
'field': field,
'value': value,
'operator': operator
}
def apply_safe_filter(filter_data):
field = filter_data['field']
value = filter_data['value']
operator = filter_data['operator']
# Safe: Build filter using Django ORM
filter_kwargs = {f"{field}__{operator}": value}
return Product.objects.filter(**filter_kwargs)[:100]
# Safe: Report generation with joins using ORM
def safe_generate_report(request):
report_type = request.GET.get('type', '')
try:
# Validate report type
if report_type not in ['order_details', 'user_orders', 'product_sales']:
raise ValidationError('Invalid report type')
# Generate report using safe ORM methods
report_data = generate_report_data(report_type)
return JsonResponse({'report': report_data})
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def generate_report_data(report_type):
if report_type == 'order_details':
# Safe: Using select_related for joins
orders = Order.objects.select_related('user').prefetch_related('items__product')
data = []
for order in orders[:100]:
data.append({
'order_id': order.id,
'user': order.user.username,
'total': float(order.total),
'item_count': order.items.count()
})
return data
elif report_type == 'user_orders':
# Safe: Aggregation with ORM
users = User.objects.annotate(
order_count=Count('orders'),
total_spent=Sum('orders__total')
).values('username', 'order_count', 'total_spent')
return list(users[:100])
elif report_type == 'product_sales':
# Safe: Complex aggregation
products = Product.objects.annotate(
times_ordered=Count('order_items'),
total_revenue=Sum('order_items__quantity') * F('price')
).values('name', 'times_ordered', 'total_revenue')
return list(products[:100])