SQL injection from AWS Lambda event data in MySQL query execution

Critical Risk SQL Injection
pythonaws-lambdamysqlsql-injectionserverless

What it is

SQL injection vulnerability in Python AWS Lambda functions where event data is concatenated into MySQL queries without parameterization, allowing attackers to manipulate database operations through malicious Lambda payloads.

import json
import mysql.connector
import os

def get_db_connection():
    return mysql.connector.connect(
        host=os.environ['DB_HOST'],
        database=os.environ['DB_NAME'],
        user=os.environ['DB_USER'],
        password=os.environ['DB_PASSWORD']
    )

def lambda_handler(event, context):
    """AWS Lambda handler for user operations"""

    action = event.get('action')

    if action == 'get_user':
        return get_user(event)
    elif action == 'search_users':
        return search_users(event)
    elif action == 'update_status':
        return update_user_status(event)
    else:
        return {'statusCode': 400, 'body': 'Invalid action'}

def get_user(event):
    # VULNERABLE: Direct event data in SQL
    user_id = event.get('userId')

    conn = get_db_connection()
    cursor = conn.cursor()

    query = f"SELECT * FROM users WHERE id = {user_id}"
    cursor.execute(query)

    result = cursor.fetchone()
    cursor.close()
    conn.close()

    return {
        'statusCode': 200,
        'body': json.dumps(result)
    }

def search_users(event):
    # VULNERABLE: String formatting
    name = event.get('name', '')
    department = event.get('department', '')
    min_salary = event.get('minSalary', 0)

    conn = get_db_connection()
    cursor = conn.cursor()

    query = ("SELECT id, name, email FROM users WHERE "
             f"name LIKE '%{name}%' AND department = '{department}' "
             f"AND salary >= {min_salary}")

    cursor.execute(query)
    results = cursor.fetchall()

    cursor.close()
    conn.close()

    return {
        'statusCode': 200,
        'body': json.dumps(results)
    }

def update_user_status(event):
    # VULNERABLE: .format() method
    user_email = event.get('userEmail')
    new_status = event.get('newStatus')
    reason = event.get('reason', '')

    conn = get_db_connection()
    cursor = conn.cursor()

    query = ("UPDATE users SET status = '{}', status_reason = '{}' "
             "WHERE email = '{}'").format(new_status, reason, user_email)

    cursor.execute(query)
    conn.commit()

    cursor.close()
    conn.close()

    return {
        'statusCode': 200,
        'body': json.dumps({'message': 'Status updated'})
    }

# Malicious Lambda event examples:
# {"action": "get_user", "userId": "1 OR 1=1 --"}
# {"action": "search_users", "name": "'; DROP TABLE users; --"}
# {"action": "update_status", "newStatus": "'; DELETE FROM users WHERE '1'='1'; --"}
import json
import mysql.connector
import os
import logging
from typing import Dict, Any, Optional

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def get_db_connection():
    return mysql.connector.connect(
        host=os.environ['DB_HOST'],
        database=os.environ['DB_NAME'],
        user=os.environ['DB_USER'],
        password=os.environ['DB_PASSWORD'],
        autocommit=False
    )

def validate_user_id(user_id: Any) -> int:
    """Validate and convert user ID"""
    try:
        uid = int(user_id)
        if uid <= 0:
            raise ValueError("User ID must be positive")
        return uid
    except (ValueError, TypeError):
        raise ValueError(f"Invalid user ID: {user_id}")

def validate_email(email: str) -> str:
    """Basic email validation"""
    if not email or '@' not in email or len(email) > 100:
        raise ValueError("Invalid email format")
    return email.strip()

def lambda_handler(event, context):
    """AWS Lambda handler for user operations"""

    try:
        action = event.get('action')

        if action == 'get_user':
            return get_user(event)
        elif action == 'search_users':
            return search_users(event)
        elif action == 'update_status':
            return update_user_status(event)
        else:
            return {
                'statusCode': 400,
                'body': json.dumps({'error': 'Invalid action'})
            }

    except ValueError as e:
        logger.error(f"Validation error: {e}")
        return {
            'statusCode': 400,
            'body': json.dumps({'error': str(e)})
        }
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': 'Internal server error'})
        }

def get_user(event):
    # Input validation
    user_id = validate_user_id(event.get('userId'))

    conn = get_db_connection()
    cursor = conn.cursor(dictionary=True)

    try:
        # SECURE: Parameterized query
        query = "SELECT id, name, email, department, status FROM users WHERE id = %s"
        cursor.execute(query, (user_id,))

        result = cursor.fetchone()

        if not result:
            return {
                'statusCode': 404,
                'body': json.dumps({'error': 'User not found'})
            }

        return {
            'statusCode': 200,
            'body': json.dumps(result)
        }

    finally:
        cursor.close()
        conn.close()

def search_users(event):
    # Input validation
    name = event.get('name', '').strip()
    department = event.get('department', '').strip()
    min_salary = event.get('minSalary', 0)

    if len(name) > 100:
        raise ValueError("Name search term too long")

    # Validate department
    valid_departments = {'IT', 'HR', 'Finance', 'Marketing', 'Sales'}
    if department and department not in valid_departments:
        raise ValueError(f"Invalid department: {department}")

    # Validate salary
    try:
        min_salary = float(min_salary)
        if min_salary < 0:
            min_salary = 0
    except (ValueError, TypeError):
        min_salary = 0

    conn = get_db_connection()
    cursor = conn.cursor(dictionary=True)

    try:
        # SECURE: Parameterized query with dynamic conditions
        conditions = []
        params = []

        base_query = "SELECT id, name, email, department FROM users WHERE 1=1"

        if name:
            conditions.append(" AND name LIKE %s")
            params.append(f"%{name}%")

        if department:
            conditions.append(" AND department = %s")
            params.append(department)

        if min_salary > 0:
            conditions.append(" AND salary >= %s")
            params.append(min_salary)

        query = base_query + "".join(conditions) + " LIMIT 100"
        cursor.execute(query, params)

        results = cursor.fetchall()

        return {
            'statusCode': 200,
            'body': json.dumps(results)
        }

    finally:
        cursor.close()
        conn.close()

def update_user_status(event):
    # Input validation
    user_email = validate_email(event.get('userEmail', ''))
    new_status = event.get('newStatus', '').strip()
    reason = event.get('reason', '').strip()

    # Validate status
    valid_statuses = {'active', 'inactive', 'suspended', 'pending'}
    if new_status not in valid_statuses:
        raise ValueError(f"Invalid status: {new_status}")

    if len(reason) > 500:
        raise ValueError("Reason too long")

    conn = get_db_connection()
    cursor = conn.cursor()

    try:
        # SECURE: Parameterized query
        query = ("UPDATE users SET status = %s, status_reason = %s, "
                "updated_at = NOW() WHERE email = %s")

        cursor.execute(query, (new_status, reason, user_email))

        if cursor.rowcount == 0:
            return {
                'statusCode': 404,
                'body': json.dumps({'error': 'User not found'})
            }

        conn.commit()

        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': 'Status updated successfully',
                'rowsAffected': cursor.rowcount
            })
        }

    except mysql.connector.Error as e:
        conn.rollback()
        logger.error(f"Database error: {e}")
        raise

    finally:
        cursor.close()
        conn.close()

💡 Why This Fix Works

The vulnerable code uses f-strings, string formatting, and the .format() method to insert Lambda event data directly into SQL queries. The fixed version uses parameterized queries with %s placeholders, implements comprehensive input validation, proper error handling, and uses transactions for data consistency.

Why it happens

User-controlled event fields are concatenated into SQL strings passed to cursor.execute without parameterization or proper input binding.

Root causes

Lambda Event Data Concatenation

User-controlled event fields are concatenated into SQL strings passed to cursor.execute without parameterization or proper input binding.

Preview example – PYTHON
# VULNERABLE: Event data concatenation
import mysql.connector

def lambda_handler(event, context):
    user_id = event.get('userId')
    status = event.get('status')

    # Direct concatenation of event data
    query = f"SELECT * FROM users WHERE id = {user_id} AND status = '{status}'"

    cursor.execute(query)
    return cursor.fetchall()

String Formatting with Lambda Events

Using string formatting methods to insert Lambda event fields directly into SQL queries enables SQL injection through crafted event payloads.

Preview example – PYTHON
# VULNERABLE: String formatting
def update_user_status(event, context):
    user_email = event['userEmail']
    new_status = event['newStatus']

    query = "UPDATE users SET status = '{}' WHERE email = '{}'".format(
        new_status, user_email
    )
    # Allows injection: newStatus = "'; DROP TABLE users; --"

Fixes

1

Use Parameterized Queries with MySQL Connector

Use parameterized queries with MySQL Connector/Python. Replace string concatenation with placeholders and bound parameters. Example: cursor.execute('SELECT * FROM projects WHERE status = %s', ('active',)).

View implementation – PYTHON
# SECURE: Parameterized query
import mysql.connector

def lambda_handler(event, context):
    user_id = event.get('userId')
    status = event.get('status')

    query = "SELECT * FROM users WHERE id = %s AND status = %s"
    cursor.execute(query, (user_id, status))
    return cursor.fetchall()
2

Validate and Cast Event Fields

Validate and cast event fields before binding. Implement proper input validation for all Lambda event data before using in database operations.

View implementation – PYTHON
# SECURE: Input validation and parameterized queries
def validate_user_id(user_id):
    try:
        uid = int(user_id)
        if uid <= 0:
            raise ValueError("User ID must be positive")
        return uid
    except (ValueError, TypeError):
        raise ValueError("Invalid user ID format")

def lambda_handler(event, context):
    # Validate inputs
    user_id = validate_user_id(event.get('userId'))
    valid_statuses = {'active', 'inactive', 'pending'}
    status = event.get('status')

    if status not in valid_statuses:
        raise ValueError(f"Invalid status: {status}")

    query = "SELECT * FROM users WHERE id = %s AND status = %s"
    cursor.execute(query, (user_id, status))
    return cursor.fetchall()

Detect This Vulnerability in Your Code

Sourcery automatically identifies sql injection from aws lambda event data in mysql query execution and many other security issues in your codebase.