Python SQL Injection via String Formatting

Critical Risk SQL Injection
pythonsql-injectionstring-formattingdatabaseinjectionsqlitemysqluser-inputparameterized-queries

What it is

A critical vulnerability that occurs when Python applications use string formatting methods (%, f-strings, .format()) to construct SQL queries with user input. This allows attackers to inject malicious SQL code by manipulating the formatting parameters, potentially leading to data theft, database corruption, or unauthorized access.

import sqlite3
import mysql.connector
from typing import List, Optional, Dict, Any

def get_database_connection():
    return sqlite3.connect('example.db')

class UserService:
    def __init__(self, cursor):
        self.cursor = cursor

    def get_user_by_id(self, user_id: str) -> Optional[Dict]:
        # VULNERABLE: String formatting
        query = "SELECT * FROM users WHERE id = %s" % user_id
        self.cursor.execute(query)
        return self.cursor.fetchone()

    def search_users(self, name: str, department: str) -> List[Dict]:
        # VULNERABLE: f-string formatting
        query = f"SELECT * FROM users WHERE name LIKE '%{name}%' AND department = '{department}'"
        self.cursor.execute(query)
        return self.cursor.fetchall()

    def update_user_profile(self, user_id: str, bio: str, website: str) -> None:
        # VULNERABLE: .format() method
        query = "UPDATE users SET bio = '{}', website = '{}' WHERE id = {}".format(
            bio, website, user_id
        )
        self.cursor.execute(query)

    def get_orders_by_status(self, status: str, limit: int) -> List[Dict]:
        # VULNERABLE: Mixed formatting
        query = f"SELECT * FROM orders WHERE status = '{status}' LIMIT " + str(limit)
        self.cursor.execute(query)
        return self.cursor.fetchall()

    def advanced_search(self, filters: Dict[str, Any]) -> List[Dict]:
        # VULNERABLE: Dynamic query building with formatting
        conditions = []
        for field, value in filters.items():
            if isinstance(value, str):
                conditions.append(f"{field} = '{value}'")
            else:
                conditions.append(f"{field} = {value}")

        where_clause = " AND ".join(conditions)
        query = f"SELECT * FROM users WHERE {where_clause}"
        self.cursor.execute(query)
        return self.cursor.fetchall()

# Usage examples that would be vulnerable:
def main():
    conn = get_database_connection()
    cursor = conn.cursor()
    service = UserService(cursor)

    # These calls allow SQL injection:
    user = service.get_user_by_id("1 OR 1=1 --")
    users = service.search_users("'; DROP TABLE users; --", "IT")
    service.update_user_profile("1", "'; DELETE FROM users; --", "http://evil.com")

# Malicious inputs:
# user_id: "1 OR 1=1 --"
# name: "'; DROP TABLE users; --"
# bio: "'; UPDATE users SET password='hacked' WHERE '1'='1'; --"
import sqlite3
import mysql.connector
from typing import List, Optional, Dict, Any
import re

def get_database_connection():
    return sqlite3.connect('example.db')

class UserService:
    def __init__(self, cursor):
        self.cursor = cursor

    def validate_user_id(self, user_id: str) -> int:
        """Validate and convert user ID to integer"""
        try:
            uid = int(user_id)
            if uid <= 0:
                raise ValueError("User ID must be positive")
            return uid
        except ValueError as e:
            raise ValueError(f"Invalid user ID: {e}")

    def get_user_by_id(self, user_id: str) -> Optional[Dict]:
        # Input validation
        validated_id = self.validate_user_id(user_id)

        # SECURE: Parameterized query
        query = "SELECT * FROM users WHERE id = ?"
        self.cursor.execute(query, (validated_id,))

        row = self.cursor.fetchone()
        if row:
            columns = [desc[0] for desc in self.cursor.description]
            return dict(zip(columns, row))
        return None

    def search_users(self, name: str, department: str) -> List[Dict]:
        # Input validation
        if len(name) > 100:
            raise ValueError("Name search term too long")

        valid_departments = {'IT', 'HR', 'Finance', 'Marketing', 'Sales'}
        if department and department not in valid_departments:
            raise ValueError(f"Invalid department: {department}")

        # SECURE: Parameterized query
        query = "SELECT * FROM users WHERE name LIKE ? AND department = ?"
        self.cursor.execute(query, (f'%{name}%', department))

        rows = self.cursor.fetchall()
        columns = [desc[0] for desc in self.cursor.description]
        return [dict(zip(columns, row)) for row in rows]

    def update_user_profile(self, user_id: str, bio: str, website: str) -> None:
        # Input validation
        validated_id = self.validate_user_id(user_id)

        if len(bio) > 1000:
            raise ValueError("Bio too long")

        # Simple URL validation
        if website and not re.match(r'^https?://', website):
            raise ValueError("Invalid website URL")

        # SECURE: Parameterized query
        query = "UPDATE users SET bio = ?, website = ? WHERE id = ?"
        self.cursor.execute(query, (bio, website, validated_id))

    def get_orders_by_status(self, status: str, limit: int) -> List[Dict]:
        # Input validation
        valid_statuses = {'pending', 'processing', 'shipped', 'delivered', 'cancelled'}
        if status not in valid_statuses:
            raise ValueError(f"Invalid status: {status}")

        if not 1 <= limit <= 1000:
            raise ValueError("Limit must be between 1 and 1000")

        # SECURE: Parameterized query
        query = "SELECT * FROM orders WHERE status = ? LIMIT ?"
        self.cursor.execute(query, (status, limit))

        rows = self.cursor.fetchall()
        columns = [desc[0] for desc in self.cursor.description]
        return [dict(zip(columns, row)) for row in rows]

    def advanced_search(self, filters: Dict[str, Any]) -> List[Dict]:
        # Whitelist allowed fields
        allowed_fields = {'name', 'department', 'status', 'age', 'salary'}

        conditions = []
        params = []

        for field, value in filters.items():
            if field not in allowed_fields:
                raise ValueError(f"Invalid search field: {field}")

            # SECURE: Build parameterized conditions
            conditions.append(f"{field} = ?")
            params.append(value)

        if not conditions:
            return []

        where_clause = " AND ".join(conditions)
        query = f"SELECT * FROM users WHERE {where_clause}"

        # Safe because field names are whitelisted and values are parameterized
        self.cursor.execute(query, params)

        rows = self.cursor.fetchall()
        columns = [desc[0] for desc in self.cursor.description]
        return [dict(zip(columns, row)) for row in rows]

# Safe usage examples:
def main():
    conn = get_database_connection()
    cursor = conn.cursor()
    service = UserService(cursor)

    try:
        # These are now safe:
        user = service.get_user_by_id("123")
        users = service.search_users("John", "IT")
        service.update_user_profile("1", "Software Engineer", "https://example.com")
        orders = service.get_orders_by_status("pending", 10)

        # Advanced search with validation
        results = service.advanced_search({
            'department': 'IT',
            'status': 'active'
        })

    except ValueError as e:
        print(f"Validation error: {e}")
    except Exception as e:
        print(f"Database error: {e}")
    finally:
        conn.close()

💡 Why This Fix Works

The vulnerable code uses string formatting (%, f-strings, .format()) to build SQL queries, allowing injection attacks. The fixed version uses parameterized queries with ? placeholders, implements comprehensive input validation, and uses whitelisting for dynamic field names.

Why it happens

Using Python's % operator to format SQL queries with user input creates direct injection vulnerabilities. The % operator performs simple string substitution without any escaping or validation, making it trivial for attackers to inject malicious SQL code.

Root causes

Percent (%) String Formatting in SQL Queries

Using Python's % operator to format SQL queries with user input creates direct injection vulnerabilities. The % operator performs simple string substitution without any escaping or validation, making it trivial for attackers to inject malicious SQL code.

Preview example – PYTHON
# VULNERABLE: % formatting
user_id = request.get('user_id')  # Could be: "1 OR 1=1 --"
query = "SELECT * FROM users WHERE id = %s" % user_id
cursor.execute(query)
# Results in: SELECT * FROM users WHERE id = 1 OR 1=1 --

F-String Formatting with User Input

F-strings (formatted string literals) in Python provide a convenient way to embed expressions in strings, but when used with user input in SQL queries, they create serious injection vulnerabilities. F-strings perform direct string interpolation without any security controls.

Preview example – PYTHON
# VULNERABLE: f-string formatting
name = request.get('name')  # Could be: "'; DROP TABLE users; --"
department = request.get('dept')
query = f"SELECT * FROM users WHERE name='{name}' AND dept='{department}'"
cursor.execute(query)

String .format() Method in Database Queries

The .format() method provides more control than % formatting but is equally vulnerable when used to construct SQL queries. Attackers can exploit the positional and named formatting to inject malicious SQL code into any placeholder position.

Preview example – PYTHON
# VULNERABLE: .format() method
bio = request.get('bio')  # Could be: "'; DELETE FROM users; --"
website = request.get('website')
user_id = request.get('user_id')
query = "UPDATE users SET bio='{}', website='{}' WHERE id={}".format(bio, website, user_id)
cursor.execute(query)

Dynamic Query Building with String Concatenation

Building SQL queries dynamically by concatenating strings with user input creates multiple injection points. This approach often combines different formatting methods and string operations, making the vulnerability harder to detect but equally dangerous.

Preview example – PYTHON
# VULNERABLE: Mixed string operations
filters = request.get_json()  # {"role": "'; DROP TABLE users; --", "status": "active"}
conditions = []
for field, value in filters.items():
    conditions.append(f"{field} = '{value}'")
where_clause = " AND ".join(conditions)
query = f"SELECT * FROM users WHERE {where_clause}"

Fixes

1

Use Parameterized Queries with Placeholders

Replace all string formatting with parameterized queries using ? placeholders (SQLite) or %s placeholders (MySQL/PostgreSQL). This separates SQL logic from data and ensures user input is treated as data rather than executable code.

View implementation – PYTHON
# SECURE: Parameterized query with ? placeholders
def get_user_by_id(cursor, user_id):
    # Input validation
    try:
        uid = int(user_id)
        if uid <= 0:
            raise ValueError("User ID must be positive")
    except ValueError:
        raise ValueError("Invalid user ID format")
    
    # Safe parameterized query
    query = "SELECT * FROM users WHERE id = ?"
    cursor.execute(query, (uid,))
    return cursor.fetchone()
2

Implement Comprehensive Input Validation

Validate all user input before using it in database operations. Use type checking, format validation, length restrictions, and whitelist validation. Never trust user input and always validate on the server side.

View implementation – PYTHON
# Input validation functions
def validate_user_id(user_id_str):
    try:
        uid = int(user_id_str)
        if uid <= 0 or uid > 999999:
            raise ValueError("User ID out of valid range")
        return uid
    except (ValueError, TypeError):
        raise ValueError("Invalid user ID format")

def validate_department(department):
    valid_departments = {'IT', 'HR', 'Finance', 'Marketing', 'Sales'}
    if department not in valid_departments:
        raise ValueError(f"Invalid department: {department}")
    return department
3

Use Whitelisting for Dynamic Field Names

When building dynamic queries, whitelist allowed field names and operations. Never directly interpolate user input as field names or SQL keywords. Use a mapping of allowed fields to actual column names.

View implementation – PYTHON
# SECURE: Whitelisted field mapping
def advanced_search(cursor, filters):
    allowed_fields = {
        'name': 'user_name',
        'dept': 'department',
        'status': 'user_status'
    }
    
    conditions = []
    params = []
    
    for field, value in filters.items():
        if field not in allowed_fields:
            raise ValueError(f"Invalid search field: {field}")
        
        # Safe because field is whitelisted
        conditions.append(f"{allowed_fields[field]} = ?")
        params.append(value)
    
    where_clause = " AND ".join(conditions)
    query = f"SELECT * FROM users WHERE {where_clause}"
    cursor.execute(query, params)
4

Use ORM Query Builders Safely

Modern ORMs like SQLAlchemy, Django ORM, or Peewee provide safe query building methods. Use their parameterized query methods rather than raw SQL construction. Always use ORM-provided escaping mechanisms.

View implementation – PYTHON
# SECURE: SQLAlchemy ORM usage
from sqlalchemy.orm import sessionmaker
from sqlalchemy import and_

def search_users_orm(session, name_filter, department):
    # Input validation
    if len(name_filter) > 100:
        raise ValueError("Search term too long")
    
    valid_departments = {'IT', 'HR', 'Finance'}
    if department not in valid_departments:
        raise ValueError("Invalid department")
    
    # Safe ORM query
    return session.query(User).filter(
        and_(
            User.name.like(f'%{name_filter}%'),
            User.department == department
        )
    ).limit(100).all()
5

Implement Proper Error Handling and Logging

Add comprehensive error handling to catch and log potential injection attempts. Don't expose database errors to users, and implement monitoring to detect unusual query patterns that might indicate injection attempts.

View implementation – PYTHON
# SECURE: Proper error handling
import logging

logger = logging.getLogger(__name__)

def safe_database_operation(cursor, user_input):
    try:
        # Validate input
        validated_input = validate_input(user_input)
        
        # Execute safe query
        query = "SELECT * FROM users WHERE field = ?"
        cursor.execute(query, (validated_input,))
        return cursor.fetchall()
        
    except ValueError as e:
        logger.warning(f"Input validation failed: {e}")
        raise ValueError("Invalid input provided")
    except Exception as e:
        logger.error(f"Database error: {e}")
        raise RuntimeError("Database operation failed")

Detect This Vulnerability in Your Code

Sourcery automatically identifies python sql injection via string formatting and many other security issues in your codebase.