SQL injection from formatted SQL string in database execute call

Critical Risk SQL Injection
pythonsql-injectiondatabasestring-formattingf-strings

What it is

SQL injection vulnerability in Python applications where SQL query strings are built via string formatting or f-strings before database execute() calls, allowing attackers to inject malicious SQL commands.

import sqlite3
import mysql.connector
from typing import List, Optional, Dict, Any

def get_database_connection():
    return sqlite3.connect('example.db')

class UserService:
    def __init__(self, cursor):
        self.cursor = cursor

    def get_user_by_id(self, user_id: str) -> Optional[Dict]:
        # VULNERABLE: String formatting
        query = "SELECT * FROM users WHERE id = %s" % user_id
        self.cursor.execute(query)
        return self.cursor.fetchone()

    def search_users(self, name: str, department: str) -> List[Dict]:
        # VULNERABLE: f-string formatting
        query = f"SELECT * FROM users WHERE name LIKE '%{name}%' AND department = '{department}'"
        self.cursor.execute(query)
        return self.cursor.fetchall()

    def update_user_profile(self, user_id: str, bio: str, website: str) -> None:
        # VULNERABLE: .format() method
        query = "UPDATE users SET bio = '{}', website = '{}' WHERE id = {}".format(
            bio, website, user_id
        )
        self.cursor.execute(query)

    def get_orders_by_status(self, status: str, limit: int) -> List[Dict]:
        # VULNERABLE: Mixed formatting
        query = f"SELECT * FROM orders WHERE status = '{status}' LIMIT " + str(limit)
        self.cursor.execute(query)
        return self.cursor.fetchall()

    def advanced_search(self, filters: Dict[str, Any]) -> List[Dict]:
        # VULNERABLE: Dynamic query building with formatting
        conditions = []
        for field, value in filters.items():
            if isinstance(value, str):
                conditions.append(f"{field} = '{value}'")
            else:
                conditions.append(f"{field} = {value}")

        where_clause = " AND ".join(conditions)
        query = f"SELECT * FROM users WHERE {where_clause}"
        self.cursor.execute(query)
        return self.cursor.fetchall()

# Usage examples that would be vulnerable:
def main():
    conn = get_database_connection()
    cursor = conn.cursor()
    service = UserService(cursor)

    # These calls allow SQL injection:
    user = service.get_user_by_id("1 OR 1=1 --")
    users = service.search_users("'; DROP TABLE users; --", "IT")
    service.update_user_profile("1", "'; DELETE FROM users; --", "http://evil.com")

# Malicious inputs:
# user_id: "1 OR 1=1 --"
# name: "'; DROP TABLE users; --"
# bio: "'; UPDATE users SET password='hacked' WHERE '1'='1'; --"
import sqlite3
import mysql.connector
from typing import List, Optional, Dict, Any
import re

def get_database_connection():
    return sqlite3.connect('example.db')

class UserService:
    def __init__(self, cursor):
        self.cursor = cursor

    def validate_user_id(self, user_id: str) -> int:
        """Validate and convert user ID to integer"""
        try:
            uid = int(user_id)
            if uid <= 0:
                raise ValueError("User ID must be positive")
            return uid
        except ValueError as e:
            raise ValueError(f"Invalid user ID: {e}")

    def get_user_by_id(self, user_id: str) -> Optional[Dict]:
        # Input validation
        validated_id = self.validate_user_id(user_id)

        # SECURE: Parameterized query
        query = "SELECT * FROM users WHERE id = ?"
        self.cursor.execute(query, (validated_id,))

        row = self.cursor.fetchone()
        if row:
            columns = [desc[0] for desc in self.cursor.description]
            return dict(zip(columns, row))
        return None

    def search_users(self, name: str, department: str) -> List[Dict]:
        # Input validation
        if len(name) > 100:
            raise ValueError("Name search term too long")

        valid_departments = {'IT', 'HR', 'Finance', 'Marketing', 'Sales'}
        if department and department not in valid_departments:
            raise ValueError(f"Invalid department: {department}")

        # SECURE: Parameterized query
        query = "SELECT * FROM users WHERE name LIKE ? AND department = ?"
        self.cursor.execute(query, (f'%{name}%', department))

        rows = self.cursor.fetchall()
        columns = [desc[0] for desc in self.cursor.description]
        return [dict(zip(columns, row)) for row in rows]

    def update_user_profile(self, user_id: str, bio: str, website: str) -> None:
        # Input validation
        validated_id = self.validate_user_id(user_id)

        if len(bio) > 1000:
            raise ValueError("Bio too long")

        # Simple URL validation
        if website and not re.match(r'^https?://', website):
            raise ValueError("Invalid website URL")

        # SECURE: Parameterized query
        query = "UPDATE users SET bio = ?, website = ? WHERE id = ?"
        self.cursor.execute(query, (bio, website, validated_id))

    def get_orders_by_status(self, status: str, limit: int) -> List[Dict]:
        # Input validation
        valid_statuses = {'pending', 'processing', 'shipped', 'delivered', 'cancelled'}
        if status not in valid_statuses:
            raise ValueError(f"Invalid status: {status}")

        if not 1 <= limit <= 1000:
            raise ValueError("Limit must be between 1 and 1000")

        # SECURE: Parameterized query
        query = "SELECT * FROM orders WHERE status = ? LIMIT ?"
        self.cursor.execute(query, (status, limit))

        rows = self.cursor.fetchall()
        columns = [desc[0] for desc in self.cursor.description]
        return [dict(zip(columns, row)) for row in rows]

    def advanced_search(self, filters: Dict[str, Any]) -> List[Dict]:
        # Whitelist allowed fields
        allowed_fields = {'name', 'department', 'status', 'age', 'salary'}

        conditions = []
        params = []

        for field, value in filters.items():
            if field not in allowed_fields:
                raise ValueError(f"Invalid search field: {field}")

            # SECURE: Build parameterized conditions
            conditions.append(f"{field} = ?")
            params.append(value)

        if not conditions:
            return []

        where_clause = " AND ".join(conditions)
        query = f"SELECT * FROM users WHERE {where_clause}"

        # Safe because field names are whitelisted and values are parameterized
        self.cursor.execute(query, params)

        rows = self.cursor.fetchall()
        columns = [desc[0] for desc in self.cursor.description]
        return [dict(zip(columns, row)) for row in rows]

# Safe usage examples:
def main():
    conn = get_database_connection()
    cursor = conn.cursor()
    service = UserService(cursor)

    try:
        # These are now safe:
        user = service.get_user_by_id("123")
        users = service.search_users("John", "IT")
        service.update_user_profile("1", "Software Engineer", "https://example.com")
        orders = service.get_orders_by_status("pending", 10)

        # Advanced search with validation
        results = service.advanced_search({
            'department': 'IT',
            'status': 'active'
        })

    except ValueError as e:
        print(f"Validation error: {e}")
    except Exception as e:
        print(f"Database error: {e}")
    finally:
        conn.close()

💡 Why This Fix Works

The vulnerable code uses string formatting (%, f-strings, .format()) to build SQL queries, allowing injection attacks. The fixed version uses parameterized queries with ? placeholders, implements comprehensive input validation, and uses whitelisting for dynamic field names.

Why it happens

The query string is built via string formatting or f-strings before execute(), letting untrusted input alter SQL structure.

Root causes

String Formatting in SQL Construction

The query string is built via string formatting or f-strings before execute(), letting untrusted input alter SQL structure.

Preview example – PYTHON
# VULNERABLE: String formatting
def get_user_by_email(cursor, email):
    query = "SELECT * FROM users WHERE email = '%s'" % email
    cursor.execute(query)
    return cursor.fetchone()

# VULNERABLE: f-string formatting
def search_products(cursor, category, min_price):
    query = f"SELECT * FROM products WHERE category = '{category}' AND price >= {min_price}"
    cursor.execute(query)
    return cursor.fetchall()

format() Method in SQL Queries

Using the str.format() method to insert user data into SQL queries creates injection vulnerabilities when values are not properly escaped.

Preview example – PYTHON
# VULNERABLE: .format() method
def update_user_status(cursor, user_id, status):
    query = "UPDATE users SET status = '{}' WHERE id = {}".format(status, user_id)
    cursor.execute(query)
    # Allows injection: status = "'; DROP TABLE users; --"

Fixes

1

Use Parameterized Queries with DB Driver

Use parameterized queries with placeholders from your DB driver (e.g., psycopg2, sqlite3, MySQLdb). Pass values as parameters to execute() instead of formatting them into the SQL string.

View implementation – PYTHON
# SECURE: Parameterized query
def get_user_by_email(cursor, email):
    query = "SELECT * FROM users WHERE email = %s"
    cursor.execute(query, (email,))
    return cursor.fetchone()
2

Use Database-Specific Parameter Styles

Different database drivers use different parameter styles. Learn and use the correct placeholder format for your database.

View implementation – PYTHON
# SECURE: Different parameter styles

# SQLite - ? placeholders
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))

# PostgreSQL (psycopg2) - %s placeholders
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))

# MySQL - %s placeholders
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))

Detect This Vulnerability in Your Code

Sourcery automatically identifies sql injection from formatted sql string in database execute call and many other security issues in your codebase.