import sqlite3
import mysql.connector
from typing import List, Optional, Dict, Any
import re
def get_database_connection():
return sqlite3.connect('example.db')
class UserService:
def __init__(self, cursor):
self.cursor = cursor
def validate_user_id(self, user_id: str) -> int:
"""Validate and convert user ID to integer"""
try:
uid = int(user_id)
if uid <= 0:
raise ValueError("User ID must be positive")
return uid
except ValueError as e:
raise ValueError(f"Invalid user ID: {e}")
def get_user_by_id(self, user_id: str) -> Optional[Dict]:
# Input validation
validated_id = self.validate_user_id(user_id)
# SECURE: Parameterized query
query = "SELECT * FROM users WHERE id = ?"
self.cursor.execute(query, (validated_id,))
row = self.cursor.fetchone()
if row:
columns = [desc[0] for desc in self.cursor.description]
return dict(zip(columns, row))
return None
def search_users(self, name: str, department: str) -> List[Dict]:
# Input validation
if len(name) > 100:
raise ValueError("Name search term too long")
valid_departments = {'IT', 'HR', 'Finance', 'Marketing', 'Sales'}
if department and department not in valid_departments:
raise ValueError(f"Invalid department: {department}")
# SECURE: Parameterized query
query = "SELECT * FROM users WHERE name LIKE ? AND department = ?"
self.cursor.execute(query, (f'%{name}%', department))
rows = self.cursor.fetchall()
columns = [desc[0] for desc in self.cursor.description]
return [dict(zip(columns, row)) for row in rows]
def update_user_profile(self, user_id: str, bio: str, website: str) -> None:
# Input validation
validated_id = self.validate_user_id(user_id)
if len(bio) > 1000:
raise ValueError("Bio too long")
# Simple URL validation
if website and not re.match(r'^https?://', website):
raise ValueError("Invalid website URL")
# SECURE: Parameterized query
query = "UPDATE users SET bio = ?, website = ? WHERE id = ?"
self.cursor.execute(query, (bio, website, validated_id))
def get_orders_by_status(self, status: str, limit: int) -> List[Dict]:
# Input validation
valid_statuses = {'pending', 'processing', 'shipped', 'delivered', 'cancelled'}
if status not in valid_statuses:
raise ValueError(f"Invalid status: {status}")
if not 1 <= limit <= 1000:
raise ValueError("Limit must be between 1 and 1000")
# SECURE: Parameterized query
query = "SELECT * FROM orders WHERE status = ? LIMIT ?"
self.cursor.execute(query, (status, limit))
rows = self.cursor.fetchall()
columns = [desc[0] for desc in self.cursor.description]
return [dict(zip(columns, row)) for row in rows]
def advanced_search(self, filters: Dict[str, Any]) -> List[Dict]:
# Whitelist allowed fields
allowed_fields = {'name', 'department', 'status', 'age', 'salary'}
conditions = []
params = []
for field, value in filters.items():
if field not in allowed_fields:
raise ValueError(f"Invalid search field: {field}")
# SECURE: Build parameterized conditions
conditions.append(f"{field} = ?")
params.append(value)
if not conditions:
return []
where_clause = " AND ".join(conditions)
query = f"SELECT * FROM users WHERE {where_clause}"
# Safe because field names are whitelisted and values are parameterized
self.cursor.execute(query, params)
rows = self.cursor.fetchall()
columns = [desc[0] for desc in self.cursor.description]
return [dict(zip(columns, row)) for row in rows]
# Safe usage examples:
def main():
conn = get_database_connection()
cursor = conn.cursor()
service = UserService(cursor)
try:
# These are now safe:
user = service.get_user_by_id("123")
users = service.search_users("John", "IT")
service.update_user_profile("1", "Software Engineer", "https://example.com")
orders = service.get_orders_by_status("pending", 10)
# Advanced search with validation
results = service.advanced_search({
'department': 'IT',
'status': 'active'
})
except ValueError as e:
print(f"Validation error: {e}")
except Exception as e:
print(f"Database error: {e}")
finally:
conn.close()