Python AWS Lambda MySQL SQL Injection Vulnerability

Critical Risk SQL Injection
pythonaws-lambdamysqlsql-injectionserverlessclouddatabaseinjectionuser-inputparameterized-queriesevent-driven

What it is

A critical vulnerability that occurs when AWS Lambda functions using Python and MySQL construct SQL queries by concatenating user input from Lambda events directly into query strings. This serverless environment vulnerability allows attackers to inject malicious SQL code through Lambda event parameters, potentially leading to data theft, database corruption, or unauthorized access to cloud resources.

import json
import mysql.connector
import os

def get_db_connection():
    return mysql.connector.connect(
        host=os.environ['DB_HOST'],
        database=os.environ['DB_NAME'],
        user=os.environ['DB_USER'],
        password=os.environ['DB_PASSWORD']
    )

def lambda_handler(event, context):
    """AWS Lambda handler for user operations"""

    action = event.get('action')

    if action == 'get_user':
        return get_user(event)
    elif action == 'search_users':
        return search_users(event)
    elif action == 'update_status':
        return update_user_status(event)
    else:
        return {'statusCode': 400, 'body': 'Invalid action'}

def get_user(event):
    # VULNERABLE: Direct event data in SQL
    user_id = event.get('userId')

    conn = get_db_connection()
    cursor = conn.cursor()

    query = f"SELECT * FROM users WHERE id = {user_id}"
    cursor.execute(query)

    result = cursor.fetchone()
    cursor.close()
    conn.close()

    return {
        'statusCode': 200,
        'body': json.dumps(result)
    }

def search_users(event):
    # VULNERABLE: String formatting
    name = event.get('name', '')
    department = event.get('department', '')
    min_salary = event.get('minSalary', 0)

    conn = get_db_connection()
    cursor = conn.cursor()

    query = ("SELECT id, name, email FROM users WHERE "
             f"name LIKE '%{name}%' AND department = '{department}' "
             f"AND salary >= {min_salary}")

    cursor.execute(query)
    results = cursor.fetchall()

    cursor.close()
    conn.close()

    return {
        'statusCode': 200,
        'body': json.dumps(results)
    }

def update_user_status(event):
    # VULNERABLE: .format() method
    user_email = event.get('userEmail')
    new_status = event.get('newStatus')
    reason = event.get('reason', '')

    conn = get_db_connection()
    cursor = conn.cursor()

    query = ("UPDATE users SET status = '{}', status_reason = '{}' "
             "WHERE email = '{}'").format(new_status, reason, user_email)

    cursor.execute(query)
    conn.commit()

    cursor.close()
    conn.close()

    return {
        'statusCode': 200,
        'body': json.dumps({'message': 'Status updated'})
    }

# Malicious Lambda event examples:
# {"action": "get_user", "userId": "1 OR 1=1 --"}
# {"action": "search_users", "name": "'; DROP TABLE users; --"}
# {"action": "update_status", "newStatus": "'; DELETE FROM users WHERE '1'='1'; --"}
import json
import mysql.connector
import os
import logging
from typing import Dict, Any, Optional

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def get_db_connection():
    return mysql.connector.connect(
        host=os.environ['DB_HOST'],
        database=os.environ['DB_NAME'],
        user=os.environ['DB_USER'],
        password=os.environ['DB_PASSWORD'],
        autocommit=False
    )

def validate_user_id(user_id: Any) -> int:
    """Validate and convert user ID"""
    try:
        uid = int(user_id)
        if uid <= 0:
            raise ValueError("User ID must be positive")
        return uid
    except (ValueError, TypeError):
        raise ValueError(f"Invalid user ID: {user_id}")

def validate_email(email: str) -> str:
    """Basic email validation"""
    if not email or '@' not in email or len(email) > 100:
        raise ValueError("Invalid email format")
    return email.strip()

def lambda_handler(event, context):
    """AWS Lambda handler for user operations"""

    try:
        action = event.get('action')

        if action == 'get_user':
            return get_user(event)
        elif action == 'search_users':
            return search_users(event)
        elif action == 'update_status':
            return update_user_status(event)
        else:
            return {
                'statusCode': 400,
                'body': json.dumps({'error': 'Invalid action'})
            }

    except ValueError as e:
        logger.error(f"Validation error: {e}")
        return {
            'statusCode': 400,
            'body': json.dumps({'error': str(e)})
        }
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': 'Internal server error'})
        }

def get_user(event):
    # Input validation
    user_id = validate_user_id(event.get('userId'))

    conn = get_db_connection()
    cursor = conn.cursor(dictionary=True)

    try:
        # SECURE: Parameterized query
        query = "SELECT id, name, email, department, status FROM users WHERE id = %s"
        cursor.execute(query, (user_id,))

        result = cursor.fetchone()

        if not result:
            return {
                'statusCode': 404,
                'body': json.dumps({'error': 'User not found'})
            }

        return {
            'statusCode': 200,
            'body': json.dumps(result)
        }

    finally:
        cursor.close()
        conn.close()

def search_users(event):
    # Input validation
    name = event.get('name', '').strip()
    department = event.get('department', '').strip()
    min_salary = event.get('minSalary', 0)

    if len(name) > 100:
        raise ValueError("Name search term too long")

    # Validate department
    valid_departments = {'IT', 'HR', 'Finance', 'Marketing', 'Sales'}
    if department and department not in valid_departments:
        raise ValueError(f"Invalid department: {department}")

    # Validate salary
    try:
        min_salary = float(min_salary)
        if min_salary < 0:
            min_salary = 0
    except (ValueError, TypeError):
        min_salary = 0

    conn = get_db_connection()
    cursor = conn.cursor(dictionary=True)

    try:
        # SECURE: Parameterized query with dynamic conditions
        conditions = []
        params = []

        base_query = "SELECT id, name, email, department FROM users WHERE 1=1"

        if name:
            conditions.append(" AND name LIKE %s")
            params.append(f"%{name}%")

        if department:
            conditions.append(" AND department = %s")
            params.append(department)

        if min_salary > 0:
            conditions.append(" AND salary >= %s")
            params.append(min_salary)

        query = base_query + "".join(conditions) + " LIMIT 100"
        cursor.execute(query, params)

        results = cursor.fetchall()

        return {
            'statusCode': 200,
            'body': json.dumps(results)
        }

    finally:
        cursor.close()
        conn.close()

def update_user_status(event):
    # Input validation
    user_email = validate_email(event.get('userEmail', ''))
    new_status = event.get('newStatus', '').strip()
    reason = event.get('reason', '').strip()

    # Validate status
    valid_statuses = {'active', 'inactive', 'suspended', 'pending'}
    if new_status not in valid_statuses:
        raise ValueError(f"Invalid status: {new_status}")

    if len(reason) > 500:
        raise ValueError("Reason too long")

    conn = get_db_connection()
    cursor = conn.cursor()

    try:
        # SECURE: Parameterized query
        query = ("UPDATE users SET status = %s, status_reason = %s, "
                "updated_at = NOW() WHERE email = %s")

        cursor.execute(query, (new_status, reason, user_email))

        if cursor.rowcount == 0:
            return {
                'statusCode': 404,
                'body': json.dumps({'error': 'User not found'})
            }

        conn.commit()

        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': 'Status updated successfully',
                'rowsAffected': cursor.rowcount
            })
        }

    except mysql.connector.Error as e:
        conn.rollback()
        logger.error(f"Database error: {e}")
        raise

    finally:
        cursor.close()
        conn.close()

💡 Why This Fix Works

The vulnerable code uses f-strings, string formatting, and the .format() method to insert Lambda event data directly into SQL queries. The fixed version uses parameterized queries with %s placeholders, implements comprehensive input validation, proper error handling, and uses transactions for data consistency.

Why it happens

Using f-strings to format SQL queries with data from Lambda events creates direct injection vulnerabilities. Lambda event data is completely untrusted and can be manipulated by attackers to inject malicious SQL code.

Root causes

F-String Formatting with Lambda Event Data

Using f-strings to format SQL queries with data from Lambda events creates direct injection vulnerabilities. Lambda event data is completely untrusted and can be manipulated by attackers to inject malicious SQL code.

Preview example – PYTHON
# VULNERABLE: f-string with Lambda event data
def lambda_handler(event, context):
    user_id = event.get('userId')  # Could be: "1 OR 1=1 --"
    query = f"SELECT * FROM users WHERE id = {user_id}"
    cursor.execute(query)
    # Results in: SELECT * FROM users WHERE id = 1 OR 1=1 --

String .format() Method with Event Parameters

Using the .format() method to insert Lambda event parameters into SQL queries provides no protection against injection attacks. Event parameters can contain arbitrary SQL code that gets executed as part of the query.

Preview example – PYTHON
# VULNERABLE: .format() with event data
def update_user_status(event):
    user_email = event.get('userEmail')
    new_status = event.get('newStatus')
    query = "UPDATE users SET status = '{}' WHERE email = '{}'".format(new_status, user_email)
    cursor.execute(query)
    # Malicious input: newStatus = "'; DELETE FROM users WHERE '1'='1'; --"

Percent (%) Formatting in Serverless Functions

Using % formatting to build SQL queries with Lambda event data creates injection vulnerabilities. The serverless environment amplifies the risk as functions can be triggered by external events with malicious payloads.

Preview example – PYTHON
# VULNERABLE: % formatting with event data
def search_users(event):
    name = event.get('name')  # Could be: "'; DROP TABLE users; --"
    department = event.get('department')
    query = "SELECT * FROM users WHERE name LIKE '%%%s%%' AND department = '%s'" % (name, department)
    cursor.execute(query)

Lack of Input Validation in Lambda Functions

Failing to validate Lambda event parameters before using them in database queries creates serious security vulnerabilities. Unlike traditional web applications, Lambda functions may receive events from various sources, making input validation crucial.

Preview example – PYTHON
# VULNERABLE: No validation of Lambda event data
def lambda_handler(event, context):
    # Direct use of event data without validation
    filters = event.get('filters', {})
    for field, value in filters.items():
        # Both field and value are untrusted
        conditions.append(f"{field} = '{value}'")
    query = f"SELECT * FROM users WHERE {' AND '.join(conditions)}"

Fixes

1

Use Parameterized Queries with %s Placeholders

Always use parameterized queries with %s placeholders (for MySQL) instead of string formatting when building SQL queries in Lambda functions. This ensures Lambda event data is treated as data rather than executable SQL code.

View implementation – PYTHON
# SECURE: Parameterized query in Lambda
import mysql.connector

def lambda_handler(event, context):
    try:
        # Input validation
        user_id = validate_user_id(event.get('userId'))
        
        conn = get_db_connection()
        cursor = conn.cursor(dictionary=True)
        
        # Safe parameterized query
        query = "SELECT id, name, email FROM users WHERE id = %s"
        cursor.execute(query, (user_id,))
        
        result = cursor.fetchone()
        return {
            'statusCode': 200,
            'body': json.dumps(result)
        }
    except ValueError as e:
        return {
            'statusCode': 400,
            'body': json.dumps({'error': str(e)})
        }
2

Implement Comprehensive Lambda Event Validation

Validate all Lambda event parameters before using them in database operations. Implement type checking, format validation, length restrictions, and whitelist validation specifically for serverless environments.

View implementation – PYTHON
# Lambda event validation functions
def validate_user_id(user_id):
    if user_id is None:
        raise ValueError("User ID is required")
    try:
        uid = int(user_id)
        if uid <= 0 or uid > 999999:
            raise ValueError("User ID out of valid range")
        return uid
    except (ValueError, TypeError):
        raise ValueError(f"Invalid user ID format: {user_id}")

def validate_email(email):
    if not email or '@' not in email or len(email) > 100:
        raise ValueError("Invalid email format")
    return email.strip()

def validate_department(department):
    valid_departments = {'IT', 'HR', 'Finance', 'Marketing', 'Sales'}
    if department not in valid_departments:
        raise ValueError(f"Invalid department: {department}")
    return department
3

Use Transaction Management in Lambda Functions

Implement proper transaction management in Lambda functions to ensure data consistency and provide rollback capabilities in case of errors. This is especially important when handling multiple database operations.

View implementation – PYTHON
# SECURE: Transaction management in Lambda
def update_user_status(event):
    user_email = validate_email(event.get('userEmail'))
    new_status = validate_status(event.get('newStatus'))
    reason = event.get('reason', '').strip()[:500]  # Limit length
    
    conn = get_db_connection()
    cursor = conn.cursor()
    
    try:
        conn.autocommit = False
        
        # Check if user exists
        check_query = "SELECT id FROM users WHERE email = %s"
        cursor.execute(check_query, (user_email,))
        
        if not cursor.fetchone():
            raise ValueError("User not found")
        
        # Update with parameterized query
        update_query = "UPDATE users SET status = %s, status_reason = %s, updated_at = NOW() WHERE email = %s"
        cursor.execute(update_query, (new_status, reason, user_email))
        
        conn.commit()
        
        return {
            'statusCode': 200,
            'body': json.dumps({'message': 'Status updated successfully'})
        }
        
    except Exception as e:
        conn.rollback()
        raise
    finally:
        cursor.close()
        conn.close()
4

Implement Proper Error Handling and Logging

Add comprehensive error handling and logging specific to Lambda functions. Don't expose database errors to Lambda responses, and implement monitoring to detect unusual patterns that might indicate injection attempts.

View implementation – PYTHON
# SECURE: Error handling and logging
import logging
import json

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    try:
        # Log the incoming event (be careful not to log sensitive data)
        logger.info(f"Processing request for action: {event.get('action')}")
        
        action = event.get('action')
        if action == 'get_user':
            return get_user_safely(event)
        else:
            return {
                'statusCode': 400,
                'body': json.dumps({'error': 'Invalid action'})
            }
            
    except ValueError as e:
        logger.warning(f"Input validation failed: {e}")
        return {
            'statusCode': 400,
            'body': json.dumps({'error': 'Invalid input provided'})
        }
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': 'Internal server error'})
        }
5

Use Connection Pooling for Lambda Functions

Implement connection pooling or reuse database connections across Lambda invocations to improve performance and security. Avoid creating new connections for each request when possible.

View implementation – PYTHON
# SECURE: Connection pooling for Lambda
import mysql.connector.pooling

# Global connection pool (reused across Lambda invocations)
connection_pool = None

def get_pooled_connection():
    global connection_pool
    if connection_pool is None:
        connection_pool = mysql.connector.pooling.MySQLConnectionPool(
            pool_name="lambda_pool",
            pool_size=5,
            pool_reset_session=True,
            host=os.environ['DB_HOST'],
            database=os.environ['DB_NAME'],
            user=os.environ['DB_USER'],
            password=os.environ['DB_PASSWORD']
        )
    return connection_pool.get_connection()

def lambda_handler(event, context):
    conn = get_pooled_connection()
    try:
        # Use connection safely
        pass
    finally:
        conn.close()  # Returns connection to pool

Detect This Vulnerability in Your Code

Sourcery automatically identifies python aws lambda mysql sql injection vulnerability and many other security issues in your codebase.