import json
import psycopg2
import psycopg2.extras
import os
import logging
from typing import Dict, Any, List, Optional
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def get_db_connection():
return psycopg2.connect(
host=os.environ['DB_HOST'],
database=os.environ['DB_NAME'],
user=os.environ['DB_USER'],
password=os.environ['DB_PASSWORD'],
port=os.environ.get('DB_PORT', 5432),
cursor_factory=psycopg2.extras.RealDictCursor
)
def validate_project_id(project_id: Any) -> int:
"""Validate and convert project ID"""
try:
pid = int(project_id)
if pid <= 0:
raise ValueError("Project ID must be positive")
return pid
except (ValueError, TypeError):
raise ValueError(f"Invalid project ID: {project_id}")
def validate_email(email: str) -> str:
"""Basic email validation"""
if not email or '@' not in email or len(email) > 100:
raise ValueError("Invalid email format")
return email.strip()
def lambda_handler(event, context):
"""AWS Lambda handler for PostgreSQL operations"""
try:
action = event.get('action')
if action == 'get_project':
return get_project(event)
elif action == 'search_projects':
return search_projects(event)
elif action == 'update_status':
return update_project_status(event)
else:
return {
'statusCode': 400,
'body': json.dumps({'error': 'Invalid action'})
}
except ValueError as e:
logger.error(f"Validation error: {e}")
return {
'statusCode': 400,
'body': json.dumps({'error': str(e)})
}
except Exception as e:
logger.error(f"Unexpected error: {e}")
return {
'statusCode': 500,
'body': json.dumps({'error': 'Internal server error'})
}
def get_project(event):
# Input validation
project_id = validate_project_id(event.get('projectId'))
conn = get_db_connection()
cursor = conn.cursor()
try:
# SECURE: Parameterized query
query = "SELECT id, name, owner, status, budget, created_at FROM projects WHERE id = %s"
cursor.execute(query, (project_id,))
result = cursor.fetchone()
if not result:
return {
'statusCode': 404,
'body': json.dumps({'error': 'Project not found'})
}
return {
'statusCode': 200,
'body': json.dumps(dict(result))
}
finally:
cursor.close()
conn.close()
def search_projects(event):
# Input validation
name = event.get('name', '').strip()
owner = event.get('owner', '').strip()
min_budget = event.get('minBudget', 0)
if len(name) > 100:
raise ValueError("Name search term too long")
if owner:
owner = validate_email(owner)
# Validate budget
try:
min_budget = float(min_budget)
if min_budget < 0:
min_budget = 0
except (ValueError, TypeError):
min_budget = 0
conn = get_db_connection()
cursor = conn.cursor()
try:
# SECURE: Parameterized query with dynamic conditions
conditions = []
params = []
base_query = "SELECT id, name, owner, status, budget FROM projects WHERE 1=1"
if name:
conditions.append(" AND name ILIKE %s")
params.append(f"%{name}%")
if owner:
conditions.append(" AND owner = %s")
params.append(owner)
if min_budget > 0:
conditions.append(" AND budget >= %s")
params.append(min_budget)
query = base_query + "".join(conditions) + " ORDER BY created_at DESC LIMIT 100"
cursor.execute(query, params)
results = cursor.fetchall()
return {
'statusCode': 200,
'body': json.dumps([dict(row) for row in results])
}
finally:
cursor.close()
conn.close()
def update_project_status(event):
# Input validation
project_id = validate_project_id(event.get('projectId'))
new_status = event.get('newStatus', '').strip()
notes = event.get('notes', '').strip()
# Validate status
valid_statuses = {'planning', 'active', 'completed', 'cancelled', 'on_hold'}
if new_status not in valid_statuses:
raise ValueError(f"Invalid status: {new_status}")
if len(notes) > 1000:
raise ValueError("Notes too long")
conn = get_db_connection()
cursor = conn.cursor()
try:
# SECURE: Parameterized query with transaction
conn.autocommit = False
# Check if project exists first
check_query = "SELECT id FROM projects WHERE id = %s"
cursor.execute(check_query, (project_id,))
if not cursor.fetchone():
return {
'statusCode': 404,
'body': json.dumps({'error': 'Project not found'})
}
# Update with parameterized query
update_query = ("""
UPDATE projects
SET status = %s, notes = %s, updated_at = CURRENT_TIMESTAMP
WHERE id = %s
""")
cursor.execute(update_query, (new_status, notes, project_id))
# Log the change
log_query = ("""
INSERT INTO project_status_log (project_id, old_status, new_status, notes, changed_at)
VALUES (%s, (SELECT status FROM projects WHERE id = %s), %s, %s, CURRENT_TIMESTAMP)
""")
cursor.execute(log_query, (project_id, project_id, new_status, notes))
conn.commit()
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Status updated successfully',
'projectId': project_id,
'newStatus': new_status
})
}
except psycopg2.Error as e:
conn.rollback()
logger.error(f"Database error: {e}")
raise
finally:
cursor.close()
conn.close()
# Example with connection pooling for high-frequency operations
import psycopg2.pool
connection_pool = None
def get_pooled_connection():
global connection_pool
if connection_pool is None:
connection_pool = psycopg2.pool.SimpleConnectionPool(
1, 10, # min and max connections
host=os.environ['DB_HOST'],
database=os.environ['DB_NAME'],
user=os.environ['DB_USER'],
password=os.environ['DB_PASSWORD'],
port=os.environ.get('DB_PORT', 5432)
)
return connection_pool.getconn()
def return_pooled_connection(conn):
global connection_pool
if connection_pool:
connection_pool.putconn(conn)